_site/cover/pgmp_mm_equery.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_mm_equery).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -export([terminate/3]).
22 -import(pgmp_codec, [marshal/2]).
23 -import(pgmp_codec, [size_inclusive/1]).
24 -import(pgmp_data_row, [decode/3]).
25 -import(pgmp_mm_common, [actions/3]).
26 -import(pgmp_mm_common, [data/3]).
27 -import(pgmp_mm_common, [field_names/1]).
28 -import(pgmp_statem, [nei/1]).
29 -include_lib("kernel/include/logger.hrl").
30
31
32 %% https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
33
34
35 callback_mode() ->
36 130 [handle_event_function, state_enter].
37
38
39 terminate(Reason, State, Data) ->
40 47 pgmp_mm_common:terminate(Reason, State, Data).
41
42 handle_event({call, _} = Call,
43 {request, #{action := query = Action} = Arg},
44 unsynchronized,
45 Data) ->
46 78 {next_state,
47 Action,
48 data(Call, Arg, Data),
49 [nei(gc_unnamed_portal), pop_callback_module | actions(Call, Arg, Data)]};
50
51 handle_event({call, From},
52 {request, #{action := sync}},
53 unsynchronized,
54 _) ->
55 2 {keep_state_and_data, [{reply, From, ok}, nei(sync)]};
56
57 handle_event({call, _} = Call,
58 {request, #{action := Action} = Arg},
59 unsynchronized,
60 Data)
61 when Action == parse;
62 Action == describe;
63 Action == bind;
64 Action == execute ->
65 11582 ?LOG_DEBUG(#{call => Call, arg => Arg, data => Data}),
66 11582 {next_state,
67 Action,
68 data(Call, Arg, Data),
69 actions(Call, Arg, Data)};
70
71 handle_event({call, From},
72 {request, #{action := parameters}},
73 _,
74 #{parameters := Parameters}) ->
75
:-(
{keep_state_and_data, {reply, From, Parameters}};
76
77 handle_event({call, _}, {request, _}, _, _) ->
78 7753 {keep_state_and_data, postpone};
79
80 handle_event(internal, flush, _, _) ->
81 19462 {keep_state_and_data,
82 nei({send, ["H", size_inclusive([])]})};
83
84 handle_event(internal, sync, _, _) ->
85 12 {keep_state_and_data,
86 [nei({send, ["S", size_inclusive([])]}), nei(gc_unnamed_portal)]};
87
88 handle_event(internal, {describe, [What, Name]}, _, _) ->
89 7755 {keep_state_and_data,
90 nei({send, ["D", size_inclusive([What, marshal(string, Name)])]})};
91
92 handle_event(internal, {describe_statement, Statement}, unsynchronized, Data) ->
93 3801 Args = [$S, Statement],
94 3801 {next_state,
95 describe_statement,
96 Data#{args => Args},
97 [nei({describe, Args}), nei(flush)]};
98
99 handle_event(internal, {describe_portal, Portal}, unsynchronized, Data) ->
100 3949 Args = [$P, Portal],
101 3949 {next_state,
102 describe_portal,
103 Data#{args => Args},
104 [nei({describe, Args}), nei(flush)]};
105
106 handle_event(internal, {parse = EventName, [Name, SQL]}, _, _) ->
107 3803 ?LOG_DEBUG(#{name => Name, sql => SQL}),
108 3803 {keep_state_and_data,
109 [nei({telemetry,
110 EventName,
111 #{count => 1},
112 #{args => #{name => Name, sql => SQL}}}),
113 nei({send,
114 ["P",
115 size_inclusive(
116 [marshal(string, Name),
117 marshal(string, SQL),
118 marshal(int16, 0)])]})]};
119
120 handle_event(internal,
121 {bind,
122 [Statement,
123 Portal,
124 Values,
125 ParameterFormat,
126 ResultFormat]},
127 bind = Action,
128 #{config := Config,
129 cache := Cache,
130 parameters := Parameters} = Data) ->
131 3953 try
132 3953 case ets:lookup(Cache, {parameter_description, Statement}) of
133 [] ->
134 %% force an error response from PostgreSQL, as the
135 %% parameter description is not cached, this is either
136 %% because no statement has been parsed, or a previous
137 %% error has been ignored.
138 %%
139 2 {keep_state_and_data,
140 nei({send,
141 ["B",
142 size_inclusive(
143 [marshal(string, Portal),
144 marshal(string, Statement),
145 marshal(int16, 0),
146 marshal(int16, length(Values)),
147 marshal(int16, 1),
148 marshal(int16, format(ResultFormat))])]})};
149
150 [{_, Types}]
151 when length(Types) == length(Values),
152 length(Types) == 0 ->
153 9 {keep_state_and_data,
154 nei({send,
155 ["B",
156 size_inclusive(
157 [marshal(string, Portal),
158 marshal(string, Statement),
159 marshal(int16, 0),
160 marshal(int16, length(Values)),
161 marshal(int16, 1),
162 marshal(int16, format(ResultFormat))])]})};
163
164 [{_, Types}]
165 when length(Types) == length(Values),
166 length(Types) > 0 ->
167 3942 {keep_state_and_data,
168 nei({send,
169 ["B",
170 size_inclusive(
171 [marshal(string, Portal),
172 marshal(string, Statement),
173 marshal(int16, 1),
174 marshal(int16, format(ParameterFormat)),
175
176 marshal(int16, length(Values)),
177
178 lists:foldl(
179 fun
180 ({_, null}, A) ->
181
:-(
[A, marshal(int32, -1)];
182
183 ({TypeOID, Value}, A) ->
184 3961 Encoded = pgmp_data_row:encode(
185 Parameters,
186 [{#{format => ParameterFormat,
187 type_oid => TypeOID},
188 Value}],
189 pgmp_types:cache(Config)),
190 3960 [A,
191 marshal(int32, iolist_size(Encoded)),
192 Encoded]
193 end,
194 [],
195 lists:zip(Types, Values)),
196 marshal(int16, 1),
197 marshal(
198 int16,
199 format(ResultFormat))])]})}
200 end
201 catch
202 error:badarg ->
203 1 {next_state,
204 error,
205 Data,
206 [nei({process, {error, badarg}}),
207 nei({complete, Action}),
208 nei(sync)]}
209 end;
210
211 handle_event(internal, {execute = EventName, [Portal, MaxRows]}, _, _) ->
212 3951 {keep_state_and_data,
213 [nei({telemetry,
214 EventName,
215 #{count => 1},
216 #{args => #{portal => Portal, max_rows => MaxRows}}}),
217
218 nei({send,
219 ["E",
220 size_inclusive(
221 [marshal(string, Portal),
222 marshal(int32, MaxRows)])]})]};
223
224 handle_event(internal,
225 {recv = EventName, {ready_for_query = Tag, _} = TM},
226 State,
227 Data)
228 when State == unsynchronized;
229 State == error ->
230 5 {next_state,
231 TM,
232 Data,
233 [nei(gc_unnamed_portal),
234 pop_callback_module,
235 nei({telemetry,
236 EventName,
237 #{count => 1},
238 #{tag => Tag}})]};
239
240 handle_event(internal,
241 {recv = EventName, {data_row = Tag, Columns}},
242 execute,
243 #{config := Config,
244 parameters := Parameters,
245 types := Types}) ->
246 147 {keep_state_and_data,
247 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
248 nei({process,
249 {Tag,
250 decode(Parameters,
251 lists:zip(Types, Columns),
252 pgmp_types:cache(Config))}})]};
253
254 handle_event(internal,
255 {recv = EventName, {parse_complete = Tag, _}},
256 {named_statements, _},
257 #{args := [Statement, _]} = Data) ->
258
:-(
?LOG_DEBUG(#{statement => Statement}),
259
:-(
Args = [$S, Statement],
260
:-(
{keep_state,
261 Data#{args := Args},
262 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
263 nei({describe, Args}),
264 nei(flush)]};
265
266 handle_event(internal,
267 {recv = EventName, {parameter_description = Tag, Decoded}},
268 {named_statements, _},
269 #{args := [$S, Statement],
270 cache := Cache}) ->
271
:-(
ets:insert(Cache, {{Tag, Statement}, Decoded}),
272
:-(
{keep_state_and_data,
273 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
274
275 handle_event(internal,
276 {recv = EventName, {row_description = Tag, Decoded}},
277 {named_statements, _},
278 #{args := [$S, Statement], cache := Cache} = Data) ->
279
:-(
ets:insert(Cache, {{Tag, Statement}, Decoded}),
280
:-(
{keep_state,
281 maps:without([args], Data),
282 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
283 nei(next_named)]};
284
285 handle_event(internal,
286 {recv = EventName, {no_data = Tag, _}},
287 {named_statements, _},
288 #{args := [$S, _]} = Data) ->
289
:-(
{keep_state,
290 maps:without([args], Data),
291 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
292 nei(next_named)]};
293
294 handle_event(internal,
295 {recv = EventName, {ready_for_query = Tag, idle}},
296 {named_statements, Previous},
297 Data) ->
298
:-(
{next_state,
299 Previous,
300 Data,
301 [nei(gc_unnamed_portal),
302 pop_callback_module,
303 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})]};
304
305 handle_event(internal,
306 next_named,
307 {named_statements, _},
308 #{named := Named} = Data) ->
309
:-(
case maps:next(Named) of
310 {Statement, SQL, Iterator} ->
311
:-(
{keep_state,
312 Data#{named := Iterator,
313 args => [Statement, SQL]},
314 [nei({parse, [Statement, SQL]}), nei(flush)]};
315
316 none ->
317
:-(
{keep_state, maps:without([named], Data), nei(sync)}
318 end;
319
320 handle_event(internal,
321 {recv = EventName, {parse_complete = Tag, _} = TM},
322 parse = Action,
323 #{args := [Statement, _]} = Data) ->
324 3801 ?LOG_DEBUG(#{statement => Statement}),
325 3801 {next_state,
326 unsynchronized,
327 Data,
328 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
329 nei({process, TM}),
330 nei({complete, Action}),
331 nei({describe_statement, Statement}),
332 nei({sync_when_named, Statement})]};
333
334 handle_event(internal, {sync_when_named, <<>>}, _, _) ->
335 7750 keep_state_and_data;
336
337 handle_event(internal, {sync_when_named, _}, _, _) ->
338
:-(
{keep_state_and_data, nei(sync)};
339
340 handle_event(internal,
341 {recv = EventName, {error_response = Tag, _} = TM},
342 parse = Action,
343 Data) ->
344 2 ?LOG_DEBUG(#{tm => TM}),
345 2 {next_state,
346 error,
347 Data,
348 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
349 nei({process, TM}),
350 nei({complete, Action}),
351 nei(sync)]};
352
353 handle_event(internal,
354 {recv = EventName, {bind_complete = Tag, _} = Reply},
355 bind = Action,
356 #{args := [_, Portal, _, _, _]} = Data) ->
357 3949 {next_state,
358 unsynchronized,
359 Data,
360 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
361 nei({process, Reply}),
362 nei({complete, Action}),
363 nei({describe_portal, Portal}),
364 nei({sync_when_named, Portal})]};
365
366 handle_event(internal,
367 {recv = EventName, {error_response = Tag, _} = Reply},
368 bind = Action,
369 #{args := [_, Portal, _, _, _], cache := Cache} = Data) ->
370 3 ets:delete(Cache, {parameter_description, Portal}),
371 3 ets:delete(Cache, {row_description, Portal}),
372 3 {next_state,
373 error,
374 Data,
375 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
376 nei({process, Reply}),
377 nei({complete, Action}),
378 nei(sync)]};
379
380 handle_event(internal,
381 {recv = EventName, {error_response = Tag, _} = Reply},
382 execute = Action,
383 #{args := [Portal, _], cache := Cache} = Data) ->
384 4 ets:delete(Cache, {parameter_description, Portal}),
385 4 ets:delete(Cache, {row_description, Portal}),
386 4 {next_state,
387 error,
388 Data,
389 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
390 nei({process, Reply}),
391 nei({complete, Action}),
392 nei(sync)]};
393
394 handle_event(internal,
395 {recv = EventName, {data_row = Tag, Columns}},
396 execute,
397 #{args := [Portal, _],
398 config := Config,
399 parameters := Parameters,
400 cache := Cache} = Data) ->
401 3941 [{_, Types}] = ets:lookup(Cache, {row_description, Portal}),
402 3941 {keep_state,
403 Data#{types => Types},
404 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
405 nei({process, {row_description, field_names(Types)}}),
406 nei({process, {Tag, decode(Parameters, lists:zip(Types, Columns), pgmp_types:cache(Config))}})]};
407
408 handle_event(internal,
409 {recv = EventName,
410 {command_complete = Tag, {Command, Rows}} = Reply},
411 execute = Action,
412 #{span := #{metadata := Metadata,
413 measurements := Measurements} = Span} = Data)
414 when is_atom(Command),
415 is_integer(Rows) ->
416 3945 {next_state,
417 unsynchronized,
418 Data#{span := Span#{metadata := Metadata#{command => Command},
419 measurements := Measurements#{rows => Rows}}},
420 [nei({telemetry,
421 EventName,
422 #{count => 1, rows => Rows},
423 #{tag => Tag, command => Command}}),
424 nei({process, Reply}),
425 nei({complete, Action})]};
426
427 handle_event(internal,
428 {recv = EventName, {Tag, _} = Reply},
429 execute = Action,
430 Data)
431 when Tag == portal_suspended;
432 Tag == empty_query_response;
433 Tag == command_complete ->
434 2 {next_state,
435 unsynchronized,
436 Data,
437 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
438 nei({process, Reply}),
439 nei({complete, Action})]};
440
441 handle_event(internal, {recv = EventName, {Tag, _} = Reply}, describe, _)
442 when Tag == parameter_description ->
443 5 {keep_state_and_data,
444 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
445 nei({process, Reply})]};
446
447 handle_event(internal,
448 {recv = EventName, {error_response = Tag, _} = Reply},
449 describe = Action,
450 Data) ->
451
:-(
{next_state,
452 error,
453 Data,
454 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
455 nei({process, Reply}),
456 nei({complete, Action}),
457 nei(sync)]};
458
459 handle_event(internal,
460 {recv = EventName, {Tag, _} = Reply},
461 describe = Action,
462 Data)
463 when Tag == row_description;
464 Tag == no_data ->
465 5 {next_state,
466 unsynchronized,
467 Data,
468 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
469 nei({process, Reply}),
470 nei({complete, Action})]};
471
472 handle_event(internal,
473 {recv = EventName, {parameter_description = Tag, Decoded}},
474 describe_statement,
475 #{args := [$S, Statement],
476 cache := Cache}) ->
477 3800 ets:insert(Cache, {{Tag, Statement}, Decoded}),
478 3800 {keep_state_and_data,
479 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
480
481 handle_event(internal,
482 {recv = EventName, {row_description = Tag, Decoded}},
483 describe_statement,
484 #{args := [$S, Statement],
485 cache := Cache} = Data) ->
486 3798 ets:insert(Cache, {{Tag, Statement}, Decoded}),
487 3798 {next_state,
488 unsynchronized,
489 Data,
490 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
491
492 handle_event(internal,
493 {recv = EventName, {no_data = Tag, _}},
494 describe_statement,
495 #{args := [$S, _]} = Data) ->
496 2 {next_state,
497 unsynchronized,
498 Data,
499 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
500
501 handle_event(internal,
502 {recv = EventName, {parameter_description = Tag, Decoded}},
503 describe_portal,
504 #{args := [$P, Portal],
505 cache := Cache}) ->
506
:-(
ets:insert(Cache, {{Tag, Portal}, Decoded}),
507
:-(
{keep_state_and_data,
508 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
509
510 handle_event(internal,
511 {recv = EventName, {row_description = Tag, Decoded}},
512 describe_portal,
513 #{args := [$P, Portal],
514 cache := Cache} = Data) ->
515 3946 ets:insert(Cache, {{Tag, Portal}, Decoded}),
516 3946 {next_state,
517 unsynchronized,
518 Data,
519 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
520
521 handle_event(internal,
522 {recv = EventName, {no_data = Tag, _}},
523 describe_portal,
524 #{args := [$P, _]} = Data) ->
525 2 {next_state,
526 unsynchronized,
527 Data,
528 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
529
530 handle_event(internal,
531 {recv = EventName, {error_response = Tag, _}},
532 describe_portal,
533 #{args := [$P, _]} = Data) ->
534
:-(
{next_state,
535 error,
536 Data,
537 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
538
539 handle_event(internal,
540 {complete, Action},
541 _,
542 #{replies := Replies, from := From} = Data) ->
543 11712 {keep_state,
544 maps:without([from, replies], Data),
545 [{reply, From, lists:reverse(Replies)}, nei({span_stop, Action})]};
546
547 handle_event(enter, _, unsynchronized, Data) ->
548 19450 {keep_state, maps:without([args, types], Data)};
549
550 handle_event(Type, Content, State, Data) ->
551 417345 pgmp_mm_common:handle_event(Type, Content, State, Data).
552
553
554 format(text) ->
555
:-(
0;
556
557 format(binary) ->
558 7894 1.
Line Hits Source