/home/runner/work/pgec/pgec/_site/ct/ct_run.ct_pgec@fv-az651-965.2023-12-04_14.30.54/pgec_resp_emulator.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgec_resp_emulator).
17
18
19 -export([info/1]).
20 -export([init/1]).
21 -export([lookup/1]).
22 -export([ptk/1]).
23 -export([recv/1]).
24 -import(pgmp_connection_sync, [bind/1]).
25 -import(pgmp_connection_sync, [execute/1]).
26 -import(pgmp_connection_sync, [parse/1]).
27 -import(pgmp_connection_sync, [query/1]).
28 -include_lib("kernel/include/logger.hrl").
29 -include_lib("stdlib/include/ms_transform.hrl").
30
31
32 init([]) ->
33 6 {ok,
34 #{protocol => #{version => 2},
35 requests => gen_statem:reqids_new()}}.
36
37
38 recv(#{command := #{name := info},
39 message := {array, [_]}}) ->
40
:-(
{continue,
41 {encode,
42 {bulk,
43 ["# Server\r\nredis_version:", pgec:version(), "\r\n"]}}};
44
45 recv(#{command := #{name := ping},
46 message := {array, [_]}}) ->
47 1 {continue, {encode, {string, "pong"}}};
48
49 recv(#{command := #{name := ping},
50 message := {array, [_, {bulk, _} = Greeting]}}) ->
51 1 {continue, {encode, Greeting}};
52
53 recv(#{command := #{name := command},
54 message := {array, _}}) ->
55
:-(
{continue, {encode, {array, []}}};
56
57 recv(#{command := #{name := hello},
58 message := {array, [_, {bulk, <<"3">>}]},
59 data := #{protocol := Protocol} = Data}) ->
60 1 {continue,
61 Data#{protocol := Protocol#{version => 3}},
62 {encode,
63 {map,
64 [{{bulk, <<"server">>}, {bulk, <<"pgec">>}},
65 {{bulk, <<"version">>}, {bulk, pgec:version()}},
66 {{bulk,<<"proto">>}, {integer, 3}},
67 {{bulk, <<"id">>}, {integer, erlang:phash2(self())}},
68 {{bulk, <<"mode">>}, {bulk, <<"standalone">>}},
69 {{bulk, <<"role">>}, {bulk, <<"master">>}},
70 {{bulk, <<"modules">>}, {array, []}}]}}};
71
72 recv(#{command := #{name := hello},
73 message := {array, [_, {bulk, <<"2">>}]},
74 data := #{protocol := Protocol} = Data}) ->
75 1 {continue,
76 Data#{protocol := Protocol#{version => 2}},
77 {encode,
78 {array,
79 [{bulk, <<"server">>}, {bulk, <<"pgec">>},
80 {bulk, <<"version">>}, {bulk, pgec:version()},
81 {bulk, <<"proto">>}, {integer, 2},
82 {bulk, <<"id">>}, {integer, erlang:phash2(self())},
83 {bulk, <<"mode">>}, {bulk, <<"standalone">>},
84 {bulk, <<"role">>}, {bulk, <<"master">>},
85 {bulk, <<"modules">>}, {array, []}]}}};
86
87 recv(#{command := #{name := hello},
88 message := {array, [_]},
89 data := #{protocol := Protocol} = Data}) ->
90 1 {continue,
91 Data#{protocol := Protocol#{version => 2}},
92 {encode,
93 {array,
94 [{bulk, <<"server">>}, {bulk, <<"pgec">>},
95 {bulk, <<"version">>}, {bulk, pgec:version()},
96 {bulk, <<"proto">>}, {integer, 2},
97 {bulk, <<"id">>}, {integer, erlang:phash2(self())},
98 {bulk, <<"mode">>}, {bulk, <<"standalone">>},
99 {bulk, <<"role">>}, {bulk, <<"master">>},
100 {bulk, <<"modules">>}, {array, []}]}}};
101
102 recv(#{command := #{name := exists},
103 message := {array, [_ | Keys]}}) ->
104 1 {continue,
105 {encode,
106 {integer,
107 lists:foldl(
108 fun
109 ({bulk, Key}, A) ->
110 4 try lookup(ptk(Key)) of
111 {ok, _} ->
112 1 A + 1;
113
114 not_found ->
115 3 A
116
117 catch
118 error:badarg ->
119
:-(
A
120 end
121 end,
122 0,
123 Keys)}}};
124
125 recv(#{command := #{name := hexists},
126 message := {array, [_, {bulk, Key}, {bulk, Field}]}}) ->
127 5 {continue,
128 {encode,
129 {integer,
130 5 try lookup(ptk(Key)) of
131 {ok, #{metadata := Metadata, row := Row}} ->
132 3 try row(Metadata, Row) of
133 #{Field := Value} when Value /= null ->
134 2 1;
135
136 #{} ->
137 1 0
138
139 catch
140 Class:Exception:Stacktrace ->
141
:-(
?LOG_DEBUG(#{class => Class,
142 exception => Exception,
143
:-(
stacktrace => Stacktrace}),
144
:-(
0
145 end;
146
147 not_found ->
148 2 0
149 catch
150 error:badarg ->
151
:-(
0
152 end}}};
153
154 recv(#{command := #{name := hset},
155 message := {array, [_, {bulk, Key} | NameValues]}} = Recv) ->
156 10 ?LOG_DEBUG(#{recv => Recv}),
157 10 #{key := Encoded} = PTK = ptk(Key),
158 10 try lookup(PTK) of
159 {ok, #{metadata := _Metadata}} ->
160 4 [{_, Metadata}] = metadata(PTK),
161 4 try pbe(update(
162 maps:merge(
163 Metadata,
164 PTK),
165 names(Metadata, NameValues)),
166 pgec_kv:keys(
167 Metadata,
168 string:split(
169 Encoded,
170 "/",
171 all)) ++ values(Metadata, NameValues)) of
172
173 {ok, Changed} ->
174 2 {continue,
175 {encode,
176 {integer, Changed}}};
177
178 {error, Reason} ->
179
:-(
{continue, {encode, {error, Reason}}}
180
181 catch
182 error:badarg:Stacktrace ->
183 2 ?LOG_DEBUG(#{stacktrace => Stacktrace}),
184 2 {continue, {encode, {integer, 0}}};
185
186 Class:Exception:Stacktrace ->
187
:-(
?LOG_DEBUG(#{class => Class,
188 exception => Exception,
189
:-(
stacktrace => Stacktrace}),
190
:-(
{continue, {encode, encode({error, Exception})}}
191 end;
192
193 not_found ->
194 6 [{_, Metadata}] = metadata(PTK),
195
196 6 try pbe(insert(maps:merge(PTK, Metadata), names(Metadata, NameValues)),
197 pgec_kv:keys(
198 Metadata,
199 string:split(
200 Encoded,
201 "/",
202 all)) ++ values(Metadata, NameValues)) of
203 {ok, Changed} ->
204 5 {continue,
205 {encode,
206 {integer, Changed}}};
207
208 {error, Reason} ->
209
:-(
{continue, {encode, {error, Reason}}}
210
211 catch
212 error:badarg:Stacktrace ->
213 1 ?LOG_DEBUG(#{stacktrace => Stacktrace}),
214 1 {continue, {encode, {integer, 0}}};
215
216 Class:Exception:Stacktrace ->
217
:-(
?LOG_DEBUG(#{class => Class,
218 exception => Exception,
219
:-(
stacktrace => Stacktrace}),
220
:-(
{continue, {encode, encode({error, Exception})}}
221 end
222 catch
223 error:badarg ->
224
:-(
{continue, {encode, {integer, 0}}}
225 end;
226
227 recv(#{command := #{name := del},
228 message := {array, [_ | Keys]}}) ->
229 3 {continue,
230 {encode,
231 {integer,
232 try
233 3 start(),
234
235 3 N = lists:foldl(
236 fun
237 ({bulk, Key}, A) ->
238 3 #{key := Encoded} = PTK = ptk(Key),
239
240 3 [{_, Metadata}] = metadata(PTK),
241
242 2 [{parse_complete, []}] = parse(
243 #{sql => delete(
244 maps:merge(
245 PTK,
246 Metadata))}),
247
248 2 [{bind_complete, []}] = bind(
249 #{args => pgec_kv:keys(
250 Metadata,
251 string:split(
252 Encoded,
253 "/",
254 all))}),
255
256 2 [{command_complete,
257 {delete, Count}}] = execute(#{}),
258 2 A + Count
259 end,
260 0,
261 Keys),
262
263 2 commit(),
264 2 N
265
266 catch
267 Class:Exception:Stacktrace ->
268 1 ?LOG_DEBUG(#{class => Class,
269 exception => Exception,
270 1 stacktrace => Stacktrace}),
271 1 rollback(),
272 1 0
273 end}}};
274
275 recv(#{command := #{name := hget},
276 message := {array, [_, {bulk, Key}, {bulk, Field}]}} = Recv) ->
277 8 ?LOG_DEBUG(#{recv => Recv}),
278 8 {continue,
279 {encode,
280 {bulk,
281 8 try lookup(ptk(Key)) of
282 {ok, #{metadata := Metadata, row := Row}} ->
283 5 try row(Metadata, Row) of
284 #{Field := Value} when Value /= null ->
285 4 ?LOG_DEBUG(#{field => Field, value => Value}),
286 4 Value;
287
288 Otherwise ->
289 1 ?LOG_DEBUG(#{otherwise => Otherwise}),
290 1 null
291 catch
292 Class:Exception:Stacktrace ->
293
:-(
?LOG_DEBUG(#{class => Class,
294 exception => Exception,
295
:-(
stacktrace => Stacktrace}),
296
:-(
null
297 end;
298
299 not_found ->
300 3 null
301 catch
302 error:badarg ->
303
:-(
null
304 end}}};
305
306 recv(#{command := #{name := hgetall},
307 message := {array, [_, {bulk, Key}]}}) ->
308 16 {continue,
309 {encode,
310 {array,
311 16 try lookup(ptk(Key)) of
312 {ok, #{metadata := Metadata, row := Row}} ->
313 12 try maps:fold(
314 fun
315 (_, null, A) ->
316 6 A;
317
318 (K, V, A) ->
319 30 [{bulk, K}, {bulk, V} | A]
320 end,
321 [],
322 row(Metadata, Row))
323 catch
324 Class:Exception:Stacktrace ->
325
:-(
?LOG_DEBUG(#{class => Class,
326 exception => Exception,
327
:-(
stacktrace => Stacktrace}),
328
:-(
[]
329 end;
330
331 not_found ->
332 4 []
333 catch
334 error:badarg ->
335
:-(
[]
336 end}}};
337
338 recv(#{command := #{name := hlen},
339 message := {array, [_, {bulk, Key}]}}) ->
340 4 {continue,
341 {encode,
342 {integer,
343 4 try lookup(ptk(Key)) of
344 {ok, #{metadata := Metadata, row := Row}} ->
345 1 try maps:fold(
346 fun
347 (_, null, A) ->
348 1 A;
349
350 (_, _, A) ->
351 2 A + 1
352 end,
353 0,
354 row(Metadata, Row))
355
356 catch
357 Class:Exception:Stacktrace ->
358
:-(
?LOG_DEBUG(#{class => Class,
359 exception => Exception,
360
:-(
stacktrace => Stacktrace}),
361
:-(
0
362 end;
363
364 not_found ->
365 3 0
366 catch
367 error:badarg ->
368
:-(
0
369 end}}};
370
371 recv(#{command := #{name := hkeys},
372 message := {array, [_, {bulk, Key}]}}) ->
373 4 {continue,
374 {encode,
375 {array,
376 4 try lookup(ptk(Key)) of
377 {ok, #{metadata := Metadata, row := Row}} ->
378 1 try maps:fold(
379 fun
380 (_, null, A) ->
381 1 A;
382
383 (K, _, A) ->
384 2 [{bulk, K} | A]
385 end,
386 [],
387 row(Metadata, Row))
388 catch
389 Class:Exception:Stacktrace ->
390
:-(
?LOG_DEBUG(#{class => Class,
391 exception => Exception,
392
:-(
stacktrace => Stacktrace}),
393
:-(
[]
394 end;
395
396 not_found ->
397 3 []
398 catch
399 error:badarg ->
400
:-(
[]
401 end}}};
402
403 recv(#{command := #{name := psubscribe},
404 message := {array, [_, {bulk, Pattern}]}}) ->
405
:-(
[_, PTK] = string:split(Pattern, ":"),
406
:-(
try ptk(PTK) of
407 #{publication := Publication, table := Table} ->
408
:-(
pgec_pg:join(#{m => pgec_replica,
409 publication => Publication,
410 name => Table}),
411
:-(
{continue,
412 {encode,
413 {array,
414 [{bulk, "subscribe"},
415 {bulk, Pattern},
416 {integer, 1}]}}}
417
418 catch
419 error:badarg ->
420
:-(
[]
421 end;
422
423 recv(#{message := {array, [{bulk, Command} | _]}} = Unknown) ->
424
:-(
?LOG_DEBUG(#{unknown => Unknown}),
425
:-(
{continue, {encode, {error, ["unknown command '", Command, "'"]}}}.
426
427
428 info({notify,
429 #{action := Action,
430 keys := Keys,
431 name := Name,
432 publication := Publication}}) ->
433
:-(
{continue,
434 lists:map(
435 fun
436 (Key) ->
437
:-(
{encode,
438 {array,
439 [{bulk, "message"},
440 {bulk, topic("__keyspace@0__", Publication, Name, Key)},
441 {bulk, action(Action)}]}}
442 end,
443 Keys)};
444
445 info(Message) ->
446
:-(
?LOG_DEBUG(#{info => Message}),
447
:-(
continue.
448
449
450 lookup(PTK) ->
451 51 ?LOG_DEBUG(#{ptk => PTK}),
452
453 51 case metadata(PTK) of
454 [{_, Metadata}] ->
455 39 ?LOG_DEBUG(#{metadata => Metadata}),
456
457 39 case lookup(Metadata, PTK) of
458 [Row] ->
459 27 ?LOG_DEBUG(#{row => Row}),
460 27 {ok, #{metadata => Metadata, row => Row, ptk => PTK}};
461
462 [] ->
463 12 not_found
464 end;
465
466 [] ->
467 12 not_found
468 end.
469
470
471 lookup(Metadata, PTK) ->
472 39 Key = key(Metadata, PTK),
473 39 ?LOG_DEBUG(#{metadata => Metadata, ptk => PTK, key => Key}),
474 39 case pgec_storage_sync:read(PTK#{key := Key}) of
475 {ok, Value} ->
476 27 [pgec_storage_common:row(Key, Value, Metadata)];
477
478 not_found ->
479 12 []
480 end.
481
482 key(Metadata, #{key := Encoded} = PTK) ->
483 39 ?LOG_DEBUG(#{metadata => Metadata, ptk => PTK}),
484 39 pgec_kv:key(Metadata, string:split(Encoded, "/", all)).
485
486
487 metadata(#{publication := Publication, table := Table} = Arg) ->
488 64 ?LOG_DEBUG(#{arg => Arg}),
489 64 case pgec_storage_sync:metadata(Arg) of
490 {ok, Metdata} ->
491 51 [{{Publication, Table}, Metdata}];
492
493 not_found ->
494 13 []
495 end.
496
497
498 ptk(PTK) ->
499 54 ?LOG_DEBUG(#{ptk => PTK}),
500 54 case string:split(PTK, ".", all) of
501 [Publication, Table, Key] ->
502 54 #{publication => Publication, table => Table, key => Key};
503
504 _Otherwise ->
505
:-(
error(badarg, [PTK])
506 end.
507
508
509 row(Metadata, Row) ->
510 22 Result = pgec_kv:row(Metadata, <<"text/plain">>, Row),
511 22 ?LOG_DEBUG(#{metadata => Metadata,
512 row => Row,
513 22 result => Result}),
514 22 Result.
515
516
517 topic(Prefix, Publication, Name, Key) when is_integer(Key) ->
518
:-(
?FUNCTION_NAME(Prefix, Publication, Name, integer_to_list(Key));
519
520 topic(Prefix, Publication, Name, Key) ->
521
:-(
[Prefix, ":", Publication, ".", Name, ".", Key].
522
523
524 action(delete) ->
525
:-(
"del";
526
527 action(insert_new) ->
528
:-(
"set";
529
530 action(update) ->
531
:-(
"set".
532
533
534 update(#{keys := Positions,
535 columns := Columns,
536 schema := Schema,
537 table := Table},
538 Names) ->
539 4 [io_lib:format(
540 "update ~s.~s set ",
541 [Schema, Table]),
542
543 lists:join(
544 ", ",
545 lists:map(
546 fun
547 ({Name, Position}) ->
548 4 io_lib:format(
549 "~s = $~b",
550 [Name, length(Positions) + Position])
551 end,
552 lists:zip(Names, lists:seq(1, length(Names))))),
553
554 " where ",
555
556 lists:join(
557 " and ",
558 lists:map(
559 fun
560 ({Column, Position}) ->
561 4 io_lib:format(
562 "~s = $~b",
563 [Column, Position])
564 end,
565 4 [{lists:nth(
566 Position,
567 4 Columns), N} || {Position, N} <- lists:zip(
568 Positions,
569 lists:seq(
570 1, length(Positions)))]))].
571
572
573 insert(#{keys := Positions,
574 columns := Columns,
575 schema := Schema,
576 table := Table} = Arg,
577 Names) ->
578 6 ?LOG_DEBUG(#{arg => Arg, names => Names}),
579 6 [io_lib:format(
580 "insert into ~s.~s (",
581 [Schema, Table]),
582
583 lists:join(
584 ", ",
585 6 [lists:nth(Position, Columns) || Position <- Positions] ++ Names),
586
587 ") values (",
588
589 lists:join(
590 ", ",
591 lists:map(
592 fun
593 (Parameter) ->
594 16 [$$, integer_to_list(Parameter)]
595 end,
596 lists:seq(1, length(Positions) + length(Names)))),
597
598 ")"].
599
600
601 names(Metadata, [{bulk, Name}, {bulk, _} | T]) ->
602 14 [Name | ?FUNCTION_NAME(Metadata, T)];
603
604 names(_, []) ->
605 10 [].
606
607
608 values(#{columns := Columns, oids := OIDs} = Metadata, NameValues) ->
609 10 Types = pgmp_types:cache(pgec_util:db()),
610 10 ?FUNCTION_NAME(
611 Metadata#{coids => maps:map(
612 fun
613 (_, OID) ->
614 30 maps:get(OID, Types)
615 end,
616 maps:from_list(
617 lists:zip(
618 Columns,
619 OIDs)))},
620 NameValues,
621 []).
622
623
624 values(#{coids := COIDs} = Metadata,
625 [{bulk, Name}, {bulk, Encoded} | T] = L,
626 A) ->
627 14 case maps:find(Name, COIDs) of
628 {ok, Type} ->
629 12 ?FUNCTION_NAME(
630 Metadata,
631 T,
632 [decode(Type, Encoded) | A]);
633
634 error ->
635 2 error(badarg, [Metadata, L, A])
636 end;
637
638 values(_, [], A) ->
639 7 lists:reverse(A).
640
641
642 decode(#{<<"typname">> := <<"bool">>}, Value)
643 when Value == <<"true">>; Value == <<"false">> ->
644 2 binary_to_atom(Value);
645
646 decode(#{<<"typname">> := Name}, Value)
647 when Name == <<"int2">>;
648 Name == <<"int4">>;
649 Name == <<"int8">> ->
650 2 binary_to_integer(Value);
651
652 decode(#{<<"typname">> := <<"float", _/bytes>>}, Value) ->
653
:-(
try
654
:-(
binary_to_float(Value)
655 catch
656 error:badarg ->
657
:-(
binary_to_integer(Value)
658 end;
659
660 decode(#{<<"typname">> := Name}, Value)
661 when Name == <<"timestamp">>;
662 Name == <<"timestampz">> ->
663 1 calendar:rfc3339_to_system_time(
664 binary_to_list(Value),
665 [{unit, microsecond}]);
666
667 decode(#{<<"typname">> := <<"date">>},
668 <<Ye:4/bytes, "-", Mo:2/bytes, "-", Da:2/bytes>>) ->
669 1 triple(Ye, Mo, Da);
670
671 decode(#{<<"typname">> := <<"time">>},
672 <<Ho:2/bytes, ":", Mi:2/bytes, ":", Se:2/bytes>>) ->
673 1 triple(Ho, Mi, Se);
674
675 decode(Type, Value) ->
676 5 ?LOG_DEBUG(#{type => Type, value => Value}),
677 5 Value.
678
679
680 delete(#{keys := Positions,
681 columns := Columns,
682 schema := Schema,
683 table := Table} = Arg) ->
684 2 ?LOG_DEBUG(#{arg => Arg}),
685 2 [io_lib:format(
686 "delete from ~s.~s where ",
687 [Schema, Table]),
688 lists:join(
689 " and ",
690 lists:map(
691 fun
692 ({Column, Position}) ->
693 2 io_lib:format(
694 "~s = $~b",
695 [Column, Position])
696 end,
697 2 [{lists:nth(
698 Position,
699 2 Columns), N} || {Position, N} <- lists:zip(
700 Positions,
701 lists:seq(
702 1, length(Positions)))]))].
703
704
705 pbe(SQL, Bindings) ->
706 7 ?LOG_DEBUG(#{sql => SQL, bindings => Bindings}),
707 7 start(),
708 7 case parse(#{sql => SQL}) of
709 [{parse_complete, []}] ->
710 7 case bind(#{args => Bindings}) of
711 [{bind_complete, []}] ->
712 7 case execute(#{}) of
713 [{command_complete,
714 {Reason, Changed}}] when Reason == update;
715 Reason == insert;
716 Reason == delete ->
717 7 commit(),
718 7 {ok, Changed}
719 end;
720
721 [{error, _} = Error] ->
722
:-(
rollback(),
723
:-(
encode(Error)
724 end;
725
726 [{error, _} = Error] ->
727
:-(
rollback(),
728
:-(
encode(Error);
729
730 [{error_response,
731 #{code := Code,
732 message := Message,
733 severity_localized := SeverityLocalized}}] ->
734
735
:-(
rollback(),
736
:-(
{error, [SeverityLocalized, " ", Code, ": ", Message]}
737 end.
738
739
740 start() ->
741 10 [{command_complete, 'begin'}] = query(#{sql => "begin"}),
742 10 ok.
743
744 commit() ->
745 9 [{command_complete, commit}] = query(#{sql => "commit"}),
746 9 ok.
747
748 rollback() ->
749 1 [{command_complete, rollback}] = query(#{sql => "rollback"}),
750 1 ok.
751
752
753 encode({error, Reason}) when is_atom(Reason) ->
754
:-(
{error, ["ERROR", ": ", atom_to_list(Reason)]}.
755
756 triple(X, Y, Z) ->
757 2 list_to_tuple([binary_to_integer(I) || I <- [X, Y, Z]]).
Line Hits Source