_site/cover/pgmp_mm_common.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_mm_common).
17
18
19 -export([actions/3]).
20 -export([data/3]).
21 -export([field_names/1]).
22 -export([handle_event/4]).
23 -export([terminate/3]).
24 -import(pgmp_codec, [demarshal/1]).
25 -import(pgmp_codec, [size_inclusive/1]).
26 -import(pgmp_statem, [nei/1]).
27 -include_lib("kernel/include/logger.hrl").
28
29
30 field_names(Types) ->
31 3953 [Name || #{field_name := Name} <- Types].
32
33
34 data({call, From}, #{args := Args}, Data) ->
35 11917 Data#{from => From, args => Args, replies => []}.
36
37
38 actions({call, _} = Call, Arg, Data) ->
39 11917 [span_start(Call, Arg, Data),
40 command(Call, Arg, Data) | post_actions(Call, Arg, Data)].
41
42
43 command(_, #{action := Action, args := Args}, _) ->
44 11917 nei({Action, Args}).
45
46
47 span_start(_, #{action := Action}, _) ->
48 11917 nei({?FUNCTION_NAME, Action}).
49
50
51 post_actions(_, #{action := Action}, _) ->
52 11917 [nei(flush) || Action == parse
53 11917 orelse Action == describe
54 orelse Action == bind
55 orelse Action == execute].
56
57
58 terminate(_Reason, _State, #{config := #{scope := Scope, group := Group}}) ->
59 61 pg:leave(Scope, Group, self());
60
61 terminate(_Reason, _State, _Data) ->
62 5 ok.
63
64
65 handle_event(internal,
66 {response, #{label := pgmp_types, reply := ready}},
67 _,
68 #{types_ready := false} = Data) ->
69 46 {keep_state, Data#{types_ready := true}};
70
71 handle_event({call, From}, {recv, {Tag, _} = TM}, _, _) ->
72 35775 {Decoded, <<>>} = demarshal(TM),
73 35775 {keep_state_and_data, [{reply, From, ok}, nei({recv, {Tag, Decoded}})]};
74
75 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
76 40161 case gen_statem:check_response(Msg, Existing, true) of
77 {{reply, Reply}, Label, Updated} ->
78 40161 {keep_state,
79 Data#{requests := Updated},
80 nei({response, #{label => Label, reply => Reply}})};
81
82 {{error, {Reason, _}}, _, UpdatedRequests} ->
83
:-(
{stop, Reason, Data#{requests := UpdatedRequests}}
84 end;
85
86 handle_event(internal,
87 {response, #{label := pgmp_socket, reply := ok}},
88 _,
89 _) ->
90 39487 keep_state_and_data;
91
92 handle_event(internal,
93 {response, #{label := pgmp_connection,
94 reply := ok}},
95 _,
96 _) ->
97 342 keep_state_and_data;
98
99 handle_event(internal,
100 {send = EventName, Message},
101 _,
102 #{requests := Requests, socket := Socket} = Data) ->
103 39487 {keep_state,
104 Data#{requests := pgmp_socket:send(
105 #{server_ref => Socket,
106 data => Message,
107 requests => Requests})},
108 nei({telemetry, EventName, #{message => Message}})};
109
110 handle_event(internal,
111 {upgrade = EventName, Config},
112 _,
113 #{requests := Requests, socket := Socket} = Data) ->
114 66 {keep_state,
115 Data#{requests := pgmp_socket:upgrade(
116 #{server_ref => Socket,
117 config => Config,
118 label => EventName,
119 requests => Requests})},
120 nei({telemetry, EventName, #{config => Config}})};
121
122 handle_event(internal,
123 {recv, {parameter_status, {K, V}}},
124 _,
125 #{parameters := Parameters} = Data) ->
126 924 {keep_state, Data#{parameters := Parameters#{K => V}}};
127
128 handle_event(internal, terminate, _, _) ->
129
:-(
{keep_state_and_data,
130 nei({send, ["X", size_inclusive([])]})};
131
132 handle_event(internal,
133 {recv, {notice_response, _} = TM},
134 State,
135 _) when State == query; State == execute ->
136 2 {keep_state_and_data, nei({process, TM})};
137
138 handle_event(internal,
139 {process, {Tag, _} = Reply},
140 _,
141 #{from := _, replies := Rs} = Data) when Tag == error_response;
142 Tag == notice_response ->
143 12 {keep_state,
144 Data#{replies := [pgmp_error_notice_fields:map(Reply) | Rs]}};
145
146 handle_event(internal, {process, Reply}, _, #{from := _, replies := Rs} = Data) ->
147 26085 {keep_state, Data#{replies := [Reply | Rs]}};
148
149 handle_event(internal,
150 types_when_ready,
151 _,
152 #{requests := Requests, config := Config} = Data) ->
153
:-(
{keep_state,
154 Data#{requests := pgmp_types:when_ready(
155 #{requests => Requests,
156 server_ref => pgmp_types:server_ref(Config)})}};
157
158 handle_event(enter,
159 _,
160 {ready_for_query, State},
161 #{requests := Requests,
162 config := #{group := _} = Config} = Data) ->
163 280 {keep_state,
164 maps:with(
165 [backend,
166 cache,
167 config,
168 parameters,
169 requests,
170 socket,
171 supervisor,
172 types_ready],
173 Data#{requests => pgmp_connection:ready_for_query(
174 #{state => State,
175 server_ref => pgmp_connection:server_ref(Config),
176 requests => Requests})})};
177
178 handle_event(enter,
179 _,
180 {ready_for_query, _},
181 Data) ->
182 5 {keep_state,
183 maps:with(
184 [backend,
185 cache,
186 config,
187 parameters,
188 requests,
189 socket,
190 supervisor,
191 types_ready],
192 Data)};
193
194 handle_event(enter, _, _, _) ->
195 19900 keep_state_and_data;
196
197 handle_event(internal,
198 {span_start, Action},
199 _,
200 Data) ->
201 11917 StartTime = erlang:monotonic_time(),
202 11917 Metadata = maps:with([args], Data),
203 11917 {keep_state,
204 Data#{span => #{start_time => StartTime,
205 measurements => #{},
206 metadata => Metadata}},
207 nei({telemetry,
208 [Action, start],
209 #{monotonic_time => StartTime},
210 Metadata})};
211
212 handle_event(internal,
213 {span_stop, Action},
214 _,
215 #{span := #{start_time := StartTime,
216 measurements := Measurements,
217 metadata := Metadata}} = Data) ->
218 11917 StopTime = erlang:monotonic_time(),
219 11917 {keep_state,
220 maps:without([span], Data),
221 nei({telemetry,
222 [Action, stop],
223 maps:merge(
224 #{duration => StopTime - StartTime,
225 monotonic_time => StopTime},
226 Measurements),
227 Metadata})};
228
229 handle_event(internal, {span_stop, _}, _, _) ->
230
:-(
keep_state_and_data;
231
232 handle_event(internal,
233 {telemetry, EventName, Measurements},
234 _,
235 _) ->
236 39553 {keep_state_and_data,
237 nei({telemetry, EventName, Measurements, #{}})};
238
239 handle_event(internal,
240 {telemetry, EventName, Measurements, Metadata},
241 _,
242 _) when is_atom(EventName) ->
243 81602 {keep_state_and_data,
244 nei({telemetry, [EventName], Measurements, Metadata})};
245
246 handle_event(internal,
247 {telemetry, EventName, Measurements, Metadata},
248 _,
249 Data) ->
250 105633 ok = telemetry:execute(
251 [pgmp, mm] ++ EventName,
252 Measurements,
253 maps:merge(
254 maps:with([socket], Data),
255 args(EventName, Metadata))),
256 105633 keep_state_and_data;
257
258 handle_event(internal, {recv, {error_response, _} = TM}, _, Data) ->
259
:-(
{Tag, Message} = pgmp_error_notice_fields:map(TM),
260
:-(
?LOG_WARNING(#{tag => Tag, message => Message}),
261
:-(
{next_state,
262 limbo,
263 Data,
264 [nei({telemetry,
265 error,
266 #{count => 1},
267 maps:merge(
268 #{event => Tag},
269 maps:with(
270 [code, message, severity],
271 Message))}),
272 {state_timeout,
273 timer:seconds(
274 backoff:rand_increment(
275 pgmp_config:backoff(rand_increment))),
276 {backoff, #{action => Tag, reason => Message}}}]};
277
278 handle_event(internal, gc_unnamed_portal, _, #{cache := Cache}) ->
279 95 ets:delete(Cache, {parameter_description, <<>>}),
280 95 ets:delete(Cache, {row_description, <<>>}),
281 95 keep_state_and_data;
282
283 handle_event(state_timeout, {backoff, _}, limbo, _) ->
284
:-(
stop.
285
286
287 args([bind | _],
288 #{args := [Statement,
289 Portal,
290 Values,
291 ParameterFormat,
292 ResultFormat]} = Metadata) ->
293 7906 Metadata#{args := #{statement => Statement,
294 portal => Portal,
295 values => Values,
296 format => #{parameter => ParameterFormat,
297 result => ResultFormat}}};
298
299 args([execute | _], #{args := [Portal, MaxRows]} = Metadata) ->
300 7902 Metadata#{args := #{portal => Portal, max_rows => MaxRows}};
301
302 args([parse | _], #{args := [Name, SQL]} = Metadata) ->
303 7606 Metadata#{args := #{name => Name, sql => SQL}};
304
305 args([query | _], #{args := [SQL]} = Metadata) ->
306 410 Metadata#{args := #{sql => SQL}};
307
308 args(_, Metadata) ->
309 81809 Metadata.
Line Hits Source