1 |
|
%% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com> |
2 |
|
%% |
3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
%% you may not use this file except in compliance with the License. |
5 |
|
%% You may obtain a copy of the License at |
6 |
|
%% |
7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
%% |
9 |
|
%% Unless required by applicable law or agreed to in writing, software |
10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
%% See the License for the specific language governing permissions and |
13 |
|
%% limitations under the License. |
14 |
|
|
15 |
|
|
16 |
|
-module(pgec_resp_emulator). |
17 |
|
|
18 |
|
|
19 |
|
-export([info/1]). |
20 |
|
-export([init/1]). |
21 |
|
-export([lookup/1]). |
22 |
|
-export([ptk/1]). |
23 |
|
-export([recv/1]). |
24 |
|
-import(pgmp_connection_sync, [bind/1]). |
25 |
|
-import(pgmp_connection_sync, [execute/1]). |
26 |
|
-import(pgmp_connection_sync, [parse/1]). |
27 |
|
-import(pgmp_connection_sync, [query/1]). |
28 |
|
-include_lib("kernel/include/logger.hrl"). |
29 |
|
-include_lib("stdlib/include/ms_transform.hrl"). |
30 |
|
|
31 |
|
|
32 |
|
init([]) -> |
33 |
6 |
{ok, |
34 |
|
#{protocol => #{version => 2}, |
35 |
|
requests => gen_statem:reqids_new()}}. |
36 |
|
|
37 |
|
|
38 |
|
recv(#{command := #{name := info}, |
39 |
|
message := {array, [_]}}) -> |
40 |
:-( |
{continue, |
41 |
|
{encode, |
42 |
|
{bulk, |
43 |
|
["# Server\r\nredis_version:", pgec:version(), "\r\n"]}}}; |
44 |
|
|
45 |
|
recv(#{command := #{name := ping}, |
46 |
|
message := {array, [_]}}) -> |
47 |
1 |
{continue, {encode, {string, "pong"}}}; |
48 |
|
|
49 |
|
recv(#{command := #{name := ping}, |
50 |
|
message := {array, [_, {bulk, _} = Greeting]}}) -> |
51 |
1 |
{continue, {encode, Greeting}}; |
52 |
|
|
53 |
|
recv(#{command := #{name := command}, |
54 |
|
message := {array, _}}) -> |
55 |
:-( |
{continue, {encode, {array, []}}}; |
56 |
|
|
57 |
|
recv(#{command := #{name := hello}, |
58 |
|
message := {array, [_, {bulk, <<"3">>}]}, |
59 |
|
data := #{protocol := Protocol} = Data}) -> |
60 |
1 |
{continue, |
61 |
|
Data#{protocol := Protocol#{version => 3}}, |
62 |
|
{encode, |
63 |
|
{map, |
64 |
|
[{{bulk, <<"server">>}, {bulk, <<"pgec">>}}, |
65 |
|
{{bulk, <<"version">>}, {bulk, pgec:version()}}, |
66 |
|
{{bulk,<<"proto">>}, {integer, 3}}, |
67 |
|
{{bulk, <<"id">>}, {integer, erlang:phash2(self())}}, |
68 |
|
{{bulk, <<"mode">>}, {bulk, <<"standalone">>}}, |
69 |
|
{{bulk, <<"role">>}, {bulk, <<"master">>}}, |
70 |
|
{{bulk, <<"modules">>}, {array, []}}]}}}; |
71 |
|
|
72 |
|
recv(#{command := #{name := hello}, |
73 |
|
message := {array, [_, {bulk, <<"2">>}]}, |
74 |
|
data := #{protocol := Protocol} = Data}) -> |
75 |
1 |
{continue, |
76 |
|
Data#{protocol := Protocol#{version => 2}}, |
77 |
|
{encode, |
78 |
|
{array, |
79 |
|
[{bulk, <<"server">>}, {bulk, <<"pgec">>}, |
80 |
|
{bulk, <<"version">>}, {bulk, pgec:version()}, |
81 |
|
{bulk, <<"proto">>}, {integer, 2}, |
82 |
|
{bulk, <<"id">>}, {integer, erlang:phash2(self())}, |
83 |
|
{bulk, <<"mode">>}, {bulk, <<"standalone">>}, |
84 |
|
{bulk, <<"role">>}, {bulk, <<"master">>}, |
85 |
|
{bulk, <<"modules">>}, {array, []}]}}}; |
86 |
|
|
87 |
|
recv(#{command := #{name := hello}, |
88 |
|
message := {array, [_]}, |
89 |
|
data := #{protocol := Protocol} = Data}) -> |
90 |
1 |
{continue, |
91 |
|
Data#{protocol := Protocol#{version => 2}}, |
92 |
|
{encode, |
93 |
|
{array, |
94 |
|
[{bulk, <<"server">>}, {bulk, <<"pgec">>}, |
95 |
|
{bulk, <<"version">>}, {bulk, pgec:version()}, |
96 |
|
{bulk, <<"proto">>}, {integer, 2}, |
97 |
|
{bulk, <<"id">>}, {integer, erlang:phash2(self())}, |
98 |
|
{bulk, <<"mode">>}, {bulk, <<"standalone">>}, |
99 |
|
{bulk, <<"role">>}, {bulk, <<"master">>}, |
100 |
|
{bulk, <<"modules">>}, {array, []}]}}}; |
101 |
|
|
102 |
|
recv(#{command := #{name := exists}, |
103 |
|
message := {array, [_ | Keys]}}) -> |
104 |
1 |
{continue, |
105 |
|
{encode, |
106 |
|
{integer, |
107 |
|
lists:foldl( |
108 |
|
fun |
109 |
|
({bulk, Key}, A) -> |
110 |
4 |
try lookup(ptk(Key)) of |
111 |
|
{ok, _} -> |
112 |
1 |
A + 1; |
113 |
|
|
114 |
|
not_found -> |
115 |
3 |
A |
116 |
|
|
117 |
|
catch |
118 |
|
error:badarg -> |
119 |
:-( |
A |
120 |
|
end |
121 |
|
end, |
122 |
|
0, |
123 |
|
Keys)}}}; |
124 |
|
|
125 |
|
recv(#{command := #{name := hexists}, |
126 |
|
message := {array, [_, {bulk, Key}, {bulk, Field}]}}) -> |
127 |
5 |
{continue, |
128 |
|
{encode, |
129 |
|
{integer, |
130 |
5 |
try lookup(ptk(Key)) of |
131 |
|
{ok, #{metadata := Metadata, row := Row}} -> |
132 |
3 |
try row(Metadata, Row) of |
133 |
|
#{Field := Value} when Value /= null -> |
134 |
2 |
1; |
135 |
|
|
136 |
|
#{} -> |
137 |
1 |
0 |
138 |
|
|
139 |
|
catch |
140 |
|
Class:Exception:Stacktrace -> |
141 |
:-( |
?LOG_DEBUG(#{class => Class, |
142 |
|
exception => Exception, |
143 |
:-( |
stacktrace => Stacktrace}), |
144 |
:-( |
0 |
145 |
|
end; |
146 |
|
|
147 |
|
not_found -> |
148 |
2 |
0 |
149 |
|
catch |
150 |
|
error:badarg -> |
151 |
:-( |
0 |
152 |
|
end}}}; |
153 |
|
|
154 |
|
recv(#{command := #{name := hset}, |
155 |
|
message := {array, [_, {bulk, Key} | NameValues]}} = Recv) -> |
156 |
10 |
?LOG_DEBUG(#{recv => Recv}), |
157 |
10 |
#{key := Encoded} = PTK = ptk(Key), |
158 |
10 |
try lookup(PTK) of |
159 |
|
{ok, #{metadata := _Metadata}} -> |
160 |
4 |
[{_, Metadata}] = metadata(PTK), |
161 |
4 |
try pbe(update( |
162 |
|
maps:merge( |
163 |
|
Metadata, |
164 |
|
PTK), |
165 |
|
names(Metadata, NameValues)), |
166 |
|
pgec_kv:keys( |
167 |
|
Metadata, |
168 |
|
string:split( |
169 |
|
Encoded, |
170 |
|
"/", |
171 |
|
all)) ++ values(Metadata, NameValues)) of |
172 |
|
|
173 |
|
{ok, Changed} -> |
174 |
2 |
{continue, |
175 |
|
{encode, |
176 |
|
{integer, Changed}}}; |
177 |
|
|
178 |
|
{error, Reason} -> |
179 |
:-( |
{continue, {encode, {error, Reason}}} |
180 |
|
|
181 |
|
catch |
182 |
|
error:badarg:Stacktrace -> |
183 |
2 |
?LOG_DEBUG(#{stacktrace => Stacktrace}), |
184 |
2 |
{continue, {encode, {integer, 0}}}; |
185 |
|
|
186 |
|
Class:Exception:Stacktrace -> |
187 |
:-( |
?LOG_DEBUG(#{class => Class, |
188 |
|
exception => Exception, |
189 |
:-( |
stacktrace => Stacktrace}), |
190 |
:-( |
{continue, {encode, encode({error, Exception})}} |
191 |
|
end; |
192 |
|
|
193 |
|
not_found -> |
194 |
6 |
[{_, Metadata}] = metadata(PTK), |
195 |
|
|
196 |
6 |
try pbe(insert(maps:merge(PTK, Metadata), names(Metadata, NameValues)), |
197 |
|
pgec_kv:keys( |
198 |
|
Metadata, |
199 |
|
string:split( |
200 |
|
Encoded, |
201 |
|
"/", |
202 |
|
all)) ++ values(Metadata, NameValues)) of |
203 |
|
{ok, Changed} -> |
204 |
5 |
{continue, |
205 |
|
{encode, |
206 |
|
{integer, Changed}}}; |
207 |
|
|
208 |
|
{error, Reason} -> |
209 |
:-( |
{continue, {encode, {error, Reason}}} |
210 |
|
|
211 |
|
catch |
212 |
|
error:badarg:Stacktrace -> |
213 |
1 |
?LOG_DEBUG(#{stacktrace => Stacktrace}), |
214 |
1 |
{continue, {encode, {integer, 0}}}; |
215 |
|
|
216 |
|
Class:Exception:Stacktrace -> |
217 |
:-( |
?LOG_DEBUG(#{class => Class, |
218 |
|
exception => Exception, |
219 |
:-( |
stacktrace => Stacktrace}), |
220 |
:-( |
{continue, {encode, encode({error, Exception})}} |
221 |
|
end |
222 |
|
catch |
223 |
|
error:badarg -> |
224 |
:-( |
{continue, {encode, {integer, 0}}} |
225 |
|
end; |
226 |
|
|
227 |
|
recv(#{command := #{name := del}, |
228 |
|
message := {array, [_ | Keys]}}) -> |
229 |
3 |
{continue, |
230 |
|
{encode, |
231 |
|
{integer, |
232 |
|
try |
233 |
3 |
start(), |
234 |
|
|
235 |
3 |
N = lists:foldl( |
236 |
|
fun |
237 |
|
({bulk, Key}, A) -> |
238 |
3 |
#{key := Encoded} = PTK = ptk(Key), |
239 |
|
|
240 |
3 |
[{_, Metadata}] = metadata(PTK), |
241 |
|
|
242 |
2 |
[{parse_complete, []}] = parse( |
243 |
|
#{sql => delete( |
244 |
|
maps:merge( |
245 |
|
PTK, |
246 |
|
Metadata))}), |
247 |
|
|
248 |
2 |
[{bind_complete, []}] = bind( |
249 |
|
#{args => pgec_kv:keys( |
250 |
|
Metadata, |
251 |
|
string:split( |
252 |
|
Encoded, |
253 |
|
"/", |
254 |
|
all))}), |
255 |
|
|
256 |
2 |
[{command_complete, |
257 |
|
{delete, Count}}] = execute(#{}), |
258 |
2 |
A + Count |
259 |
|
end, |
260 |
|
0, |
261 |
|
Keys), |
262 |
|
|
263 |
2 |
commit(), |
264 |
2 |
N |
265 |
|
|
266 |
|
catch |
267 |
|
Class:Exception:Stacktrace -> |
268 |
1 |
?LOG_DEBUG(#{class => Class, |
269 |
|
exception => Exception, |
270 |
1 |
stacktrace => Stacktrace}), |
271 |
1 |
rollback(), |
272 |
1 |
0 |
273 |
|
end}}}; |
274 |
|
|
275 |
|
recv(#{command := #{name := hget}, |
276 |
|
message := {array, [_, {bulk, Key}, {bulk, Field}]}} = Recv) -> |
277 |
8 |
?LOG_DEBUG(#{recv => Recv}), |
278 |
8 |
{continue, |
279 |
|
{encode, |
280 |
|
{bulk, |
281 |
8 |
try lookup(ptk(Key)) of |
282 |
|
{ok, #{metadata := Metadata, row := Row}} -> |
283 |
5 |
try row(Metadata, Row) of |
284 |
|
#{Field := Value} when Value /= null -> |
285 |
4 |
?LOG_DEBUG(#{field => Field, value => Value}), |
286 |
4 |
Value; |
287 |
|
|
288 |
|
Otherwise -> |
289 |
1 |
?LOG_DEBUG(#{otherwise => Otherwise}), |
290 |
1 |
null |
291 |
|
catch |
292 |
|
Class:Exception:Stacktrace -> |
293 |
:-( |
?LOG_DEBUG(#{class => Class, |
294 |
|
exception => Exception, |
295 |
:-( |
stacktrace => Stacktrace}), |
296 |
:-( |
null |
297 |
|
end; |
298 |
|
|
299 |
|
not_found -> |
300 |
3 |
null |
301 |
|
catch |
302 |
|
error:badarg -> |
303 |
:-( |
null |
304 |
|
end}}}; |
305 |
|
|
306 |
|
recv(#{command := #{name := hgetall}, |
307 |
|
message := {array, [_, {bulk, Key}]}}) -> |
308 |
16 |
{continue, |
309 |
|
{encode, |
310 |
|
{array, |
311 |
16 |
try lookup(ptk(Key)) of |
312 |
|
{ok, #{metadata := Metadata, row := Row}} -> |
313 |
12 |
try maps:fold( |
314 |
|
fun |
315 |
|
(_, null, A) -> |
316 |
6 |
A; |
317 |
|
|
318 |
|
(K, V, A) -> |
319 |
30 |
[{bulk, K}, {bulk, V} | A] |
320 |
|
end, |
321 |
|
[], |
322 |
|
row(Metadata, Row)) |
323 |
|
catch |
324 |
|
Class:Exception:Stacktrace -> |
325 |
:-( |
?LOG_DEBUG(#{class => Class, |
326 |
|
exception => Exception, |
327 |
:-( |
stacktrace => Stacktrace}), |
328 |
:-( |
[] |
329 |
|
end; |
330 |
|
|
331 |
|
not_found -> |
332 |
4 |
[] |
333 |
|
catch |
334 |
|
error:badarg -> |
335 |
:-( |
[] |
336 |
|
end}}}; |
337 |
|
|
338 |
|
recv(#{command := #{name := hlen}, |
339 |
|
message := {array, [_, {bulk, Key}]}}) -> |
340 |
4 |
{continue, |
341 |
|
{encode, |
342 |
|
{integer, |
343 |
4 |
try lookup(ptk(Key)) of |
344 |
|
{ok, #{metadata := Metadata, row := Row}} -> |
345 |
1 |
try maps:fold( |
346 |
|
fun |
347 |
|
(_, null, A) -> |
348 |
1 |
A; |
349 |
|
|
350 |
|
(_, _, A) -> |
351 |
2 |
A + 1 |
352 |
|
end, |
353 |
|
0, |
354 |
|
row(Metadata, Row)) |
355 |
|
|
356 |
|
catch |
357 |
|
Class:Exception:Stacktrace -> |
358 |
:-( |
?LOG_DEBUG(#{class => Class, |
359 |
|
exception => Exception, |
360 |
:-( |
stacktrace => Stacktrace}), |
361 |
:-( |
0 |
362 |
|
end; |
363 |
|
|
364 |
|
not_found -> |
365 |
3 |
0 |
366 |
|
catch |
367 |
|
error:badarg -> |
368 |
:-( |
0 |
369 |
|
end}}}; |
370 |
|
|
371 |
|
recv(#{command := #{name := hkeys}, |
372 |
|
message := {array, [_, {bulk, Key}]}}) -> |
373 |
4 |
{continue, |
374 |
|
{encode, |
375 |
|
{array, |
376 |
4 |
try lookup(ptk(Key)) of |
377 |
|
{ok, #{metadata := Metadata, row := Row}} -> |
378 |
1 |
try maps:fold( |
379 |
|
fun |
380 |
|
(_, null, A) -> |
381 |
1 |
A; |
382 |
|
|
383 |
|
(K, _, A) -> |
384 |
2 |
[{bulk, K} | A] |
385 |
|
end, |
386 |
|
[], |
387 |
|
row(Metadata, Row)) |
388 |
|
catch |
389 |
|
Class:Exception:Stacktrace -> |
390 |
:-( |
?LOG_DEBUG(#{class => Class, |
391 |
|
exception => Exception, |
392 |
:-( |
stacktrace => Stacktrace}), |
393 |
:-( |
[] |
394 |
|
end; |
395 |
|
|
396 |
|
not_found -> |
397 |
3 |
[] |
398 |
|
catch |
399 |
|
error:badarg -> |
400 |
:-( |
[] |
401 |
|
end}}}; |
402 |
|
|
403 |
|
recv(#{command := #{name := psubscribe}, |
404 |
|
message := {array, [_, {bulk, Pattern}]}}) -> |
405 |
:-( |
[_, PTK] = string:split(Pattern, ":"), |
406 |
:-( |
try ptk(PTK) of |
407 |
|
#{publication := Publication, table := Table} -> |
408 |
:-( |
pgec_pg:join(#{m => pgec_replica, |
409 |
|
publication => Publication, |
410 |
|
name => Table}), |
411 |
:-( |
{continue, |
412 |
|
{encode, |
413 |
|
{array, |
414 |
|
[{bulk, "subscribe"}, |
415 |
|
{bulk, Pattern}, |
416 |
|
{integer, 1}]}}} |
417 |
|
|
418 |
|
catch |
419 |
|
error:badarg -> |
420 |
:-( |
[] |
421 |
|
end; |
422 |
|
|
423 |
|
recv(#{message := {array, [{bulk, Command} | _]}} = Unknown) -> |
424 |
:-( |
?LOG_DEBUG(#{unknown => Unknown}), |
425 |
:-( |
{continue, {encode, {error, ["unknown command '", Command, "'"]}}}. |
426 |
|
|
427 |
|
|
428 |
|
info({notify, |
429 |
|
#{action := Action, |
430 |
|
keys := Keys, |
431 |
|
name := Name, |
432 |
|
publication := Publication}}) -> |
433 |
:-( |
{continue, |
434 |
|
lists:map( |
435 |
|
fun |
436 |
|
(Key) -> |
437 |
:-( |
{encode, |
438 |
|
{array, |
439 |
|
[{bulk, "message"}, |
440 |
|
{bulk, topic("__keyspace@0__", Publication, Name, Key)}, |
441 |
|
{bulk, action(Action)}]}} |
442 |
|
end, |
443 |
|
Keys)}; |
444 |
|
|
445 |
|
info(Message) -> |
446 |
:-( |
?LOG_DEBUG(#{info => Message}), |
447 |
:-( |
continue. |
448 |
|
|
449 |
|
|
450 |
|
lookup(PTK) -> |
451 |
51 |
?LOG_DEBUG(#{ptk => PTK}), |
452 |
|
|
453 |
51 |
case metadata(PTK) of |
454 |
|
[{_, Metadata}] -> |
455 |
39 |
?LOG_DEBUG(#{metadata => Metadata}), |
456 |
|
|
457 |
39 |
case lookup(Metadata, PTK) of |
458 |
|
[Row] -> |
459 |
27 |
?LOG_DEBUG(#{row => Row}), |
460 |
27 |
{ok, #{metadata => Metadata, row => Row, ptk => PTK}}; |
461 |
|
|
462 |
|
[] -> |
463 |
12 |
not_found |
464 |
|
end; |
465 |
|
|
466 |
|
[] -> |
467 |
12 |
not_found |
468 |
|
end. |
469 |
|
|
470 |
|
|
471 |
|
lookup(Metadata, PTK) -> |
472 |
39 |
Key = key(Metadata, PTK), |
473 |
39 |
?LOG_DEBUG(#{metadata => Metadata, ptk => PTK, key => Key}), |
474 |
39 |
case pgec_storage_sync:read(PTK#{key := Key}) of |
475 |
|
{ok, Value} -> |
476 |
27 |
[pgec_storage_common:row(Key, Value, Metadata)]; |
477 |
|
|
478 |
|
not_found -> |
479 |
12 |
[] |
480 |
|
end. |
481 |
|
|
482 |
|
key(Metadata, #{key := Encoded} = PTK) -> |
483 |
39 |
?LOG_DEBUG(#{metadata => Metadata, ptk => PTK}), |
484 |
39 |
pgec_kv:key(Metadata, string:split(Encoded, "/", all)). |
485 |
|
|
486 |
|
|
487 |
|
metadata(#{publication := Publication, table := Table} = Arg) -> |
488 |
64 |
?LOG_DEBUG(#{arg => Arg}), |
489 |
64 |
case pgec_storage_sync:metadata(Arg) of |
490 |
|
{ok, Metdata} -> |
491 |
51 |
[{{Publication, Table}, Metdata}]; |
492 |
|
|
493 |
|
not_found -> |
494 |
13 |
[] |
495 |
|
end. |
496 |
|
|
497 |
|
|
498 |
|
ptk(PTK) -> |
499 |
54 |
?LOG_DEBUG(#{ptk => PTK}), |
500 |
54 |
case string:split(PTK, ".", all) of |
501 |
|
[Publication, Table, Key] -> |
502 |
54 |
#{publication => Publication, table => Table, key => Key}; |
503 |
|
|
504 |
|
_Otherwise -> |
505 |
:-( |
error(badarg, [PTK]) |
506 |
|
end. |
507 |
|
|
508 |
|
|
509 |
|
row(Metadata, Row) -> |
510 |
22 |
Result = pgec_kv:row(Metadata, <<"text/plain">>, Row), |
511 |
22 |
?LOG_DEBUG(#{metadata => Metadata, |
512 |
|
row => Row, |
513 |
22 |
result => Result}), |
514 |
22 |
Result. |
515 |
|
|
516 |
|
|
517 |
|
topic(Prefix, Publication, Name, Key) when is_integer(Key) -> |
518 |
:-( |
?FUNCTION_NAME(Prefix, Publication, Name, integer_to_list(Key)); |
519 |
|
|
520 |
|
topic(Prefix, Publication, Name, Key) -> |
521 |
:-( |
[Prefix, ":", Publication, ".", Name, ".", Key]. |
522 |
|
|
523 |
|
|
524 |
|
action(delete) -> |
525 |
:-( |
"del"; |
526 |
|
|
527 |
|
action(insert_new) -> |
528 |
:-( |
"set"; |
529 |
|
|
530 |
|
action(update) -> |
531 |
:-( |
"set". |
532 |
|
|
533 |
|
|
534 |
|
update(#{keys := Positions, |
535 |
|
columns := Columns, |
536 |
|
schema := Schema, |
537 |
|
table := Table}, |
538 |
|
Names) -> |
539 |
4 |
[io_lib:format( |
540 |
|
"update ~s.~s set ", |
541 |
|
[Schema, Table]), |
542 |
|
|
543 |
|
lists:join( |
544 |
|
", ", |
545 |
|
lists:map( |
546 |
|
fun |
547 |
|
({Name, Position}) -> |
548 |
4 |
io_lib:format( |
549 |
|
"~s = $~b", |
550 |
|
[Name, length(Positions) + Position]) |
551 |
|
end, |
552 |
|
lists:zip(Names, lists:seq(1, length(Names))))), |
553 |
|
|
554 |
|
" where ", |
555 |
|
|
556 |
|
lists:join( |
557 |
|
" and ", |
558 |
|
lists:map( |
559 |
|
fun |
560 |
|
({Column, Position}) -> |
561 |
4 |
io_lib:format( |
562 |
|
"~s = $~b", |
563 |
|
[Column, Position]) |
564 |
|
end, |
565 |
4 |
[{lists:nth( |
566 |
|
Position, |
567 |
4 |
Columns), N} || {Position, N} <- lists:zip( |
568 |
|
Positions, |
569 |
|
lists:seq( |
570 |
|
1, length(Positions)))]))]. |
571 |
|
|
572 |
|
|
573 |
|
insert(#{keys := Positions, |
574 |
|
columns := Columns, |
575 |
|
schema := Schema, |
576 |
|
table := Table} = Arg, |
577 |
|
Names) -> |
578 |
6 |
?LOG_DEBUG(#{arg => Arg, names => Names}), |
579 |
6 |
[io_lib:format( |
580 |
|
"insert into ~s.~s (", |
581 |
|
[Schema, Table]), |
582 |
|
|
583 |
|
lists:join( |
584 |
|
", ", |
585 |
6 |
[lists:nth(Position, Columns) || Position <- Positions] ++ Names), |
586 |
|
|
587 |
|
") values (", |
588 |
|
|
589 |
|
lists:join( |
590 |
|
", ", |
591 |
|
lists:map( |
592 |
|
fun |
593 |
|
(Parameter) -> |
594 |
16 |
[$$, integer_to_list(Parameter)] |
595 |
|
end, |
596 |
|
lists:seq(1, length(Positions) + length(Names)))), |
597 |
|
|
598 |
|
")"]. |
599 |
|
|
600 |
|
|
601 |
|
names(Metadata, [{bulk, Name}, {bulk, _} | T]) -> |
602 |
14 |
[Name | ?FUNCTION_NAME(Metadata, T)]; |
603 |
|
|
604 |
|
names(_, []) -> |
605 |
10 |
[]. |
606 |
|
|
607 |
|
|
608 |
|
values(#{columns := Columns, oids := OIDs} = Metadata, NameValues) -> |
609 |
10 |
Types = pgmp_types:cache(pgec_util:db()), |
610 |
10 |
?FUNCTION_NAME( |
611 |
|
Metadata#{coids => maps:map( |
612 |
|
fun |
613 |
|
(_, OID) -> |
614 |
30 |
maps:get(OID, Types) |
615 |
|
end, |
616 |
|
maps:from_list( |
617 |
|
lists:zip( |
618 |
|
Columns, |
619 |
|
OIDs)))}, |
620 |
|
NameValues, |
621 |
|
[]). |
622 |
|
|
623 |
|
|
624 |
|
values(#{coids := COIDs} = Metadata, |
625 |
|
[{bulk, Name}, {bulk, Encoded} | T] = L, |
626 |
|
A) -> |
627 |
14 |
case maps:find(Name, COIDs) of |
628 |
|
{ok, Type} -> |
629 |
12 |
?FUNCTION_NAME( |
630 |
|
Metadata, |
631 |
|
T, |
632 |
|
[decode(Type, Encoded) | A]); |
633 |
|
|
634 |
|
error -> |
635 |
2 |
error(badarg, [Metadata, L, A]) |
636 |
|
end; |
637 |
|
|
638 |
|
values(_, [], A) -> |
639 |
7 |
lists:reverse(A). |
640 |
|
|
641 |
|
|
642 |
|
decode(#{<<"typname">> := <<"bool">>}, Value) |
643 |
|
when Value == <<"true">>; Value == <<"false">> -> |
644 |
2 |
binary_to_atom(Value); |
645 |
|
|
646 |
|
decode(#{<<"typname">> := Name}, Value) |
647 |
|
when Name == <<"int2">>; |
648 |
|
Name == <<"int4">>; |
649 |
|
Name == <<"int8">> -> |
650 |
2 |
binary_to_integer(Value); |
651 |
|
|
652 |
|
decode(#{<<"typname">> := <<"float", _/bytes>>}, Value) -> |
653 |
:-( |
try |
654 |
:-( |
binary_to_float(Value) |
655 |
|
catch |
656 |
|
error:badarg -> |
657 |
:-( |
binary_to_integer(Value) |
658 |
|
end; |
659 |
|
|
660 |
|
decode(#{<<"typname">> := Name}, Value) |
661 |
|
when Name == <<"timestamp">>; |
662 |
|
Name == <<"timestampz">> -> |
663 |
1 |
calendar:rfc3339_to_system_time( |
664 |
|
binary_to_list(Value), |
665 |
|
[{unit, microsecond}]); |
666 |
|
|
667 |
|
decode(#{<<"typname">> := <<"date">>}, |
668 |
|
<<Ye:4/bytes, "-", Mo:2/bytes, "-", Da:2/bytes>>) -> |
669 |
1 |
triple(Ye, Mo, Da); |
670 |
|
|
671 |
|
decode(#{<<"typname">> := <<"time">>}, |
672 |
|
<<Ho:2/bytes, ":", Mi:2/bytes, ":", Se:2/bytes>>) -> |
673 |
1 |
triple(Ho, Mi, Se); |
674 |
|
|
675 |
|
decode(Type, Value) -> |
676 |
5 |
?LOG_DEBUG(#{type => Type, value => Value}), |
677 |
5 |
Value. |
678 |
|
|
679 |
|
|
680 |
|
delete(#{keys := Positions, |
681 |
|
columns := Columns, |
682 |
|
schema := Schema, |
683 |
|
table := Table} = Arg) -> |
684 |
2 |
?LOG_DEBUG(#{arg => Arg}), |
685 |
2 |
[io_lib:format( |
686 |
|
"delete from ~s.~s where ", |
687 |
|
[Schema, Table]), |
688 |
|
lists:join( |
689 |
|
" and ", |
690 |
|
lists:map( |
691 |
|
fun |
692 |
|
({Column, Position}) -> |
693 |
2 |
io_lib:format( |
694 |
|
"~s = $~b", |
695 |
|
[Column, Position]) |
696 |
|
end, |
697 |
2 |
[{lists:nth( |
698 |
|
Position, |
699 |
2 |
Columns), N} || {Position, N} <- lists:zip( |
700 |
|
Positions, |
701 |
|
lists:seq( |
702 |
|
1, length(Positions)))]))]. |
703 |
|
|
704 |
|
|
705 |
|
pbe(SQL, Bindings) -> |
706 |
7 |
?LOG_DEBUG(#{sql => SQL, bindings => Bindings}), |
707 |
7 |
start(), |
708 |
7 |
case parse(#{sql => SQL}) of |
709 |
|
[{parse_complete, []}] -> |
710 |
7 |
case bind(#{args => Bindings}) of |
711 |
|
[{bind_complete, []}] -> |
712 |
7 |
case execute(#{}) of |
713 |
|
[{command_complete, |
714 |
|
{Reason, Changed}}] when Reason == update; |
715 |
|
Reason == insert; |
716 |
|
Reason == delete -> |
717 |
7 |
commit(), |
718 |
7 |
{ok, Changed} |
719 |
|
end; |
720 |
|
|
721 |
|
[{error, _} = Error] -> |
722 |
:-( |
rollback(), |
723 |
:-( |
encode(Error) |
724 |
|
end; |
725 |
|
|
726 |
|
[{error, _} = Error] -> |
727 |
:-( |
rollback(), |
728 |
:-( |
encode(Error); |
729 |
|
|
730 |
|
[{error_response, |
731 |
|
#{code := Code, |
732 |
|
message := Message, |
733 |
|
severity_localized := SeverityLocalized}}] -> |
734 |
|
|
735 |
:-( |
rollback(), |
736 |
:-( |
{error, [SeverityLocalized, " ", Code, ": ", Message]} |
737 |
|
end. |
738 |
|
|
739 |
|
|
740 |
|
start() -> |
741 |
10 |
[{command_complete, 'begin'}] = query(#{sql => "begin"}), |
742 |
10 |
ok. |
743 |
|
|
744 |
|
commit() -> |
745 |
9 |
[{command_complete, commit}] = query(#{sql => "commit"}), |
746 |
9 |
ok. |
747 |
|
|
748 |
|
rollback() -> |
749 |
1 |
[{command_complete, rollback}] = query(#{sql => "rollback"}), |
750 |
1 |
ok. |
751 |
|
|
752 |
|
|
753 |
|
encode({error, Reason}) when is_atom(Reason) -> |
754 |
:-( |
{error, ["ERROR", ": ", atom_to_list(Reason)]}. |
755 |
|
|
756 |
|
triple(X, Y, Z) -> |
757 |
2 |
list_to_tuple([binary_to_integer(I) || I <- [X, Y, Z]]). |