_site/cover/pgmp_codec.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_codec).
17
18
19 -export([demarshal/1]).
20 -export([demarshal/2]).
21 -export([marshal/2]).
22 -export([size_exclusive/1]).
23 -export([size_inclusive/1]).
24 -export_type([command_complete_response/0]).
25 -export_type([row_description/0]).
26 -include_lib("kernel/include/logger.hrl").
27
28
29 -type pg_integer() :: int8
30 | int16
31 | int32
32 | int64.
33
34 -type type() :: {byte, pos_integer()}
35 | pg_integer()
36 | {int, pgmp:int_bit_sizes()}
37 | clock
38 | string
39 | empty_query_response
40 | bind_complete
41 | portal_suspended
42 | no_data
43 | parse_complete
44 | notice_response
45 | error_response
46 | authentication
47 | parameter_description
48 | x_log_data
49 | tuple_data
50 | copy_data
51 | copy_both_response
52 | parameter_status
53 | backend_key_data
54 | command_complete
55 | ready_for_query
56 | row_description
57 | data_row.
58
59 demarshal({Type, Encoded}) ->
60 35839 ?LOG_DEBUG(#{type => Type, encoded => Encoded}),
61 35839 ?FUNCTION_NAME(Type, Encoded).
62
63 -type x_log_begin_transaction() :: #{final_lsn := pgmp:int64(),
64 commit_timestamp := pgmp:int64(),
65 xid := pgmp:int32()}.
66
67 -type x_log_logical_decoding() :: #{xid := pgmp:int32(),
68 flags := pgmp:int8(),
69 lsn := pgmp:int64(),
70 prefix := binary(),
71 content := pgmp:int32()}.
72
73 -type x_log_commit() :: #{flags := pgmp:int8(),
74 commit_lsn := pgmp:int64(),
75 end_lsn := pgmp:int64(),
76 commit_timestamp := pgmp:int64()}.
77
78 -type x_log_origin() :: #{commit_lsn := pgmp:int64(),
79 name := binary()}.
80
81 -type x_log_relation() :: #{id := pgmp:int32(),
82 namespace := binary(),
83 name := binary(),
84 replica_identity := pgmp:int8(),
85 ncols := pgmp:int16()}.
86
87 -type x_log_type() :: #{xid := pgmp:int32(),
88 type := pgmp:int32(),
89 namespace := binary(),
90 name := binary()}.
91
92 -type x_log_insert() :: #{relation := pgmp:int32(),
93 tuple := [tuple_data()]}.
94
95 -type x_log_update() :: #{relation := pgmp:int32(),
96 key := [tuple_data()],
97 new := [tuple_data()]}.
98
99 -type x_log_delete() :: #{relation := pgmp:int32(),
100 key := [tuple_data()]}.
101
102 -type x_log_truncate() :: #{relations := [pgmp:int32()],
103 options := pgmp:int8()}.
104
105 -type x_log_data() :: {begin_transaction, x_log_begin_transaction()}
106 | {logical_decoding, x_log_logical_decoding()}
107 | {commit, x_log_commit()}
108 | {origin, x_log_origin()}
109 | {relation, x_log_relation()}
110 | {type, x_log_type()}
111 | {insert, x_log_insert()}
112 | {update, x_log_update()}
113 | {delete, x_log_delete()}
114 | {truncate, x_log_truncate()}.
115
116 -type startup() :: #{major := integer(),
117 minor := integer(),
118 parameters := #{binary() => binary()}}.
119
120 -type copy_both_response() :: #{format := text | binary,
121 n_of_cols := non_neg_integer()}.
122
123 -type copy_out_response() :: #{format := text | binary,
124 col_formats := [text | binary]}.
125
126 -spec demarshal(nonempty_list(type()), binary()) -> {[any()], binary()};
127 ({byte, pos_integer()}, binary()) -> {binary(), binary()};
128 ({int, 8}, binary()) -> {pgmp:int8(), binary()};
129 (int8, binary()) -> {pgmp:int8(), binary()};
130 ({int, 16}, binary()) -> {pgmp:int16(), binary()};
131 (int16, binary()) -> {pgmp:int16(), binary()};
132 ({int, 32}, binary()) -> {pgmp:int32(), binary()};
133 (int32, binary()) -> {pgmp:int32(), binary()};
134 ({int, 64}, binary()) -> {pgmp:int64(), binary()};
135 (int64, binary()) -> {pgmp:int64(), binary()};
136 (clock, binary()) -> {pgmp:int64(), binary()};
137 (string, binary()) -> {binary(), binary()};
138 (empty_query_response, binary()) -> {[], <<>>};
139 (bind_complete, binary()) -> {[], <<>>};
140 (portal_suspended, binary()) -> {[], <<>>};
141 (no_data, binary()) -> {[], <<>>};
142 (parse_complete, binary()) -> {[], <<>>};
143 (notice_response, binary()) -> {any(), binary()};
144 (error_response, binary()) -> {any(), binary()};
145 (authentication, binary()) -> {any(), binary()};
146 (parameter_description, binary()) -> {[pgmp:int32()], binary()};
147 (password, binary()) -> {binary(), binary()};
148 (startup, binary()) -> {startup(), binary()};
149 (x_log_data, binary()) -> {x_log_data(), binary()};
150 (tuple_data, binary()) -> {[tuple_data()], binary()};
151 (pairs, binary()) -> {#{}, binary()};
152 (copy_out_response, binary()) -> {copy_out_response(), binary()};
153 (copy_both_response, binary()) -> {copy_both_response(), binary()};
154 (copy_data, binary()) -> {{x_log_data, x_log_data()}, binary()};
155 (ssl, binary()) -> {ssl, boolean()}.
156
157 demarshal(Types, Encoded) when is_list(Types) ->
158 13910 ?LOG_DEBUG(#{types => Types, encoded => Encoded}),
159 13910 lists:mapfoldl(fun demarshal/2, Encoded, Types);
160
161 demarshal({byte, N}, Encoded) ->
162 166 <<Decoded:N/bytes, Remainder/bytes>> = Encoded,
163 166 {Decoded, Remainder};
164
165 demarshal(int8, Encoded) ->
166 106 ?FUNCTION_NAME({int, 8}, Encoded);
167
168 demarshal(int16, Encoded) ->
169 47472 ?FUNCTION_NAME({int, 16}, Encoded);
170
171 demarshal(int32, Encoded) ->
172 230824 ?FUNCTION_NAME({int, 32}, Encoded);
173
174 demarshal(int64, Encoded) ->
175 1336 ?FUNCTION_NAME({int, 64}, Encoded);
176
177 demarshal(clock, Encoded) ->
178
:-(
{Clock, Remainder} = ?FUNCTION_NAME({int, 64}, Encoded),
179
:-(
{pgmp_calendar:decode(Clock), Remainder};
180
181 demarshal({int, Bits}, Encoded)
182 when Bits == 8; Bits == 16; Bits == 32; Bits == 64 ->
183 279738 <<Decoded:Bits/signed, Remainder/bytes>> = Encoded,
184 279738 {Decoded, Remainder};
185
186 demarshal(string, Encoded) ->
187 14869 list_to_tuple(binary:split(Encoded, <<0>>));
188
189 demarshal(Type, <<>>)
190 when Type == empty_query_response;
191 Type == bind_complete;
192 Type == portal_suspended;
193 Type == no_data;
194 Type == parse_complete ->
195 7759 {[], <<>>};
196
197 demarshal(Type, Encoded) when Type == notice_response; Type == error_response ->
198 14 ?FUNCTION_NAME(Type, Encoded, []);
199
200 demarshal(authentication, Encoded) ->
201 330 {Code, R} = ?FUNCTION_NAME(int32, Encoded),
202 330 ?FUNCTION_NAME(authentication, Code, R);
203
204 demarshal(parameter_description, Encoded) ->
205 3807 {N, R0} = ?FUNCTION_NAME(int16, Encoded),
206 3807 ?FUNCTION_NAME(lists:duplicate(N, int32), R0);
207
208 demarshal(x_log_data, <<"B", BeginMessage/bytes>>) ->
209 67 ?LOG_DEBUG(#{start_transaction => BeginMessage}),
210 67 {Decoded, Remainder} = ?FUNCTION_NAME(
211 [final_lsn, commit_timestamp, xid],
212 [int64, int64, int32],
213 BeginMessage),
214 67 {{begin_transaction, Decoded}, Remainder};
215
216 demarshal(x_log_data, <<"M", LogicalDecoding/bytes>>) ->
217
:-(
?LOG_DEBUG(#{logical => LogicalDecoding}),
218
:-(
{Decoded, Remainder} = ?FUNCTION_NAME(
219 [xid, flags, lsn, prefix, content],
220 [int32, int8, int64, string, int32],
221 LogicalDecoding),
222
:-(
{{logical_decoding, Decoded}, Remainder};
223
224 demarshal(x_log_data, <<"C", CommitMessage/bytes>>) ->
225 68 ?LOG_DEBUG(#{commit => CommitMessage}),
226 68 {Decoded, Remainder} = ?FUNCTION_NAME(
227 [flags, commit_lsn, end_lsn, commit_timestamp],
228 [int8, int64, int64, int64],
229 CommitMessage),
230 68 {{commit, Decoded}, Remainder};
231
232 demarshal(x_log_data, <<"O", OriginMessage/bytes>>) ->
233
:-(
?LOG_DEBUG(#{origin => OriginMessage}),
234
:-(
{Decoded, Remainder} = ?FUNCTION_NAME(
235 [commit_lsn, name],
236 [int64, string],
237 OriginMessage),
238
:-(
{{origin, Decoded}, Remainder};
239
240 demarshal(x_log_data, <<"R", RelationMessage/bytes>>) ->
241 9 ?LOG_DEBUG(#{relation => RelationMessage}),
242 9 {Relation, R0} = ?FUNCTION_NAME(
243 [id, namespace, name, replica_identity, ncols],
244 [int32, string, string, int8, int16],
245 RelationMessage),
246 9 Columns = lists:reverse(
247 pgmp_binary:foldl(
248 fun
249 (EncodedColumn, A) ->
250 19 {Column, Remainder} = ?FUNCTION_NAME(
251 [flags, name, type, modifier],
252 [int8, string, int32, int32],
253 EncodedColumn),
254 19 {Remainder, [Column | A]}
255 end,
256 [],
257 R0)),
258 9 {{relation, Relation#{columns => Columns}}, <<>>};
259
260 demarshal(x_log_data, <<"Y", TypeMessage/bytes>>) ->
261
:-(
?LOG_DEBUG(#{type => TypeMessage}),
262
:-(
{Decoded, Remainder} = ?FUNCTION_NAME([xid, type, namespace, name],
263 [in32, int32, string, string],
264 TypeMessage),
265
:-(
{{type, Decoded}, Remainder};
266
267 demarshal(x_log_data, <<"I", Id:32, "N", TupleData/bytes>>) ->
268 58 ?LOG_DEBUG(#{id => Id, tuple => TupleData}),
269 58 {Tuple, Remainder} = ?FUNCTION_NAME(tuple_data, TupleData),
270 58 {{insert, #{relation => Id, tuple => Tuple}}, Remainder};
271
272 demarshal(x_log_data, <<"U", Relation:32, "K", KeyData/bytes>>) ->
273 1 ?LOG_DEBUG(#{update => Relation, key => KeyData}),
274 1 {Key, <<"N", NewData/bytes>>} = ?FUNCTION_NAME(tuple_data, KeyData),
275 1 {New, Remainder} = ?FUNCTION_NAME(tuple_data, NewData),
276 1 {{update, #{key => Key, relation => Relation, new => New}}, Remainder};
277
278 demarshal(x_log_data, <<"U", Relation:32, "N", NewData/bytes>>) ->
279 5 ?LOG_DEBUG(#{update => Relation, new => NewData}),
280 5 {New, Remainder} = ?FUNCTION_NAME(tuple_data, NewData),
281 5 {{update, #{relation => Relation, new => New}}, Remainder};
282
283 demarshal(x_log_data, <<"D", Relation:32, "K", KeyData/bytes>>) ->
284 5 ?LOG_DEBUG(#{update => Relation, delete => KeyData}),
285 5 {Key, Remainder} = ?FUNCTION_NAME(tuple_data, KeyData),
286 5 {{delete, #{key => Key, relation => Relation}}, Remainder};
287
288 demarshal(x_log_data, <<"T", N:32, Options:8, Data/bytes>>) ->
289 3 ?LOG_DEBUG(#{truncate => Data}),
290 3 {Relations, Remainder} = ?FUNCTION_NAME(
291 lists:duplicate(N, int32),
292 Data),
293 3 {{truncate, #{options => Options, relations => Relations}}, Remainder};
294
295 demarshal(x_log_data, <<"b", BeginPrepare/bytes>>) ->
296 1 ?LOG_DEBUG(#{begin_prepare => BeginPrepare}),
297 1 {Decoded, Remainder} = ?FUNCTION_NAME(
298 [lsn, end_lsn, timestamp, xid, gid],
299 [int64, int64, int64, int32, string],
300 BeginPrepare),
301 1 {{begin_prepare, Decoded}, Remainder};
302
303 demarshal(x_log_data, <<"P", Prepare/bytes>>) ->
304 1 ?LOG_DEBUG(#{prepare => Prepare}),
305 1 {Decoded, Remainder} = ?FUNCTION_NAME(
306 [flags, lsn, end_lsn, timestamp, xid, gid],
307 [int8, int64, int64, int64, int32, string],
308 Prepare),
309 1 {{prepare, Decoded}, Remainder};
310
311 demarshal(x_log_data, <<"K", CommitPrepared/bytes>>) ->
312 1 ?LOG_DEBUG(#{commit_prepared => CommitPrepared}),
313 1 {Decoded, Remainder} = ?FUNCTION_NAME(
314 [flags, lsn, end_lsn, timestamp, xid, gid],
315 [int8, int64, int64, int64, int32, string],
316 CommitPrepared),
317 1 {{commit_prepared, Decoded}, Remainder};
318
319 demarshal(x_log_data, <<"r", RollbackPrepared/bytes>>) ->
320 1 ?LOG_DEBUG(#{rollback_prepared => RollbackPrepared}),
321 1 {Decoded, Remainder} = ?FUNCTION_NAME(
322 [flags,
323 prepared_end_lsn,
324 rollback_end_lsn,
325 prepare_timestamp,
326 rollback_timestamp,
327 xid,
328 gid],
329 [int8, int64, int64, int64, int64, int32, string],
330 RollbackPrepared),
331 1 {{rollback_prepared, Decoded}, Remainder};
332
333 demarshal(tuple_data, <<Columns:16, TupleData/bytes>>) ->
334 70 ?LOG_DEBUG(#{columns => Columns, tuple => TupleData}),
335 70 ?FUNCTION_NAME(tuple_data, Columns, TupleData, []);
336
337 demarshal(copy_data, <<"w", XLogData/bytes>>) ->
338 217 ?LOG_DEBUG(#{x_log => XLogData}),
339 217 {Decoded, Stream} = ?FUNCTION_NAME(
340 [start_wal, end_wal, clock],
341 [int64, int64, int64],
342 XLogData),
343 217 {WAL, Remainder} = ?FUNCTION_NAME(x_log_data, Stream),
344 217 {{x_log_data, Decoded#{stream => WAL}}, Remainder};
345
346 demarshal(copy_data, <<"k", PrimaryKeepalive/bytes>>) ->
347 165 ?LOG_DEBUG(#{primary_keepalive => PrimaryKeepalive}),
348 165 {Decoded, Remainder} = ?FUNCTION_NAME(
349 [end_wal, clock, reply],
350 [int64, int64, {byte, 1}],
351 PrimaryKeepalive),
352 165 {{keepalive,
353 maps:map(
354 fun
355 (reply, <<0:8>>) ->
356 163 false;
357
358 (reply, <<1:8>>) ->
359 2 true;
360
361 (_, V) ->
362 330 V
363 end,
364 Decoded)},
365 Remainder};
366
367 demarshal(copy_data, <<"r", StandbyStatusUpdate/bytes>>) ->
368 1 ?LOG_DEBUG(#{standby_status_update => StandbyStatusUpdate}),
369 1 {Decoded, Remainder} = ?FUNCTION_NAME(
370 [written,
371 flushed,
372 applied,
373 clock,
374 reply],
375 [int64,
376 int64,
377 int64,
378 int64,
379 {byte, 1}],
380 StandbyStatusUpdate),
381 1 {{standby_status_update,
382 maps:map(
383 fun
384 (reply, <<0:8>>) ->
385 1 false;
386
387 (reply, <<1:8>>) ->
388
:-(
true;
389
390 (_, V) ->
391 4 V
392 end,
393 Decoded)},
394 Remainder};
395
396 demarshal(copy_data = Tag, Encoded) ->
397 1 ?LOG_DEBUG(#{Tag => Encoded}),
398 1 {Encoded, <<>>};
399
400 demarshal(copy_out_response = Tag, Encoded) ->
401 1 ?LOG_DEBUG(#{Tag => Encoded}),
402 1 {[Format, NumOfCols], R0} = ?FUNCTION_NAME([int8, int16], Encoded),
403 1 {ColFormats, R1} = ?FUNCTION_NAME(
404 lists:duplicate(NumOfCols, int16),
405 R0),
406
407 1 {#{format => text_binary(Format),
408 col_formats => lists:map(
409 fun text_binary/1,
410 ColFormats)},
411 R1};
412
413 demarshal(copy_done = Tag, Encoded) ->
414 1 ?LOG_DEBUG(#{Tag => Encoded}),
415 1 {Encoded, <<>>};
416
417 demarshal(copy_both_response = Tag, Encoded) ->
418 6 ?LOG_DEBUG(#{Tag => Encoded}),
419 6 case ?FUNCTION_NAME([int8, int16], Encoded) of
420 {[0, N], <<>>} ->
421 6 {#{format => text, n_of_cols => N}, <<>>};
422
423 {[1, N], <<>>} ->
424
:-(
{#{format => binary, n_of_cols => N}, <<>>}
425 end;
426
427 demarshal(parameter_status = Tag, Encoded) ->
428 925 ?LOG_DEBUG(#{Tag => Encoded}),
429 925 {[K, V], R0} = ?FUNCTION_NAME([string, string], Encoded),
430 925 {{K, V}, R0};
431
432 demarshal(backend_key_data = Tag, Encoded) ->
433 66 ?LOG_DEBUG(#{Tag => Encoded}),
434 66 ?FUNCTION_NAME([int32, int32], Encoded);
435
436 demarshal(command_complete = Tag, Encoded) ->
437 4174 ?LOG_DEBUG(#{Tag => Encoded}),
438 4174 {Decoded, Remainder} = demarshal(string, Encoded),
439 4174 ?FUNCTION_NAME(Tag, Decoded, Remainder);
440
441
442 demarshal(ready_for_query, TxStatus) ->
443 286 {tx_status(TxStatus), <<>>};
444
445 demarshal(row_description = Type, Encoded) ->
446 7777 ?LOG_DEBUG(#{Type => Encoded}),
447 7777 {Columns, Remainder} = demarshal(int16, Encoded),
448 7777 ?FUNCTION_NAME(Type, Columns, Remainder, []);
449
450 demarshal(data_row = Type, Encoded) ->
451 10236 ?LOG_DEBUG(#{Type => Encoded}),
452 10236 {Columns, Remainder} = demarshal(int16, Encoded),
453 10236 ?FUNCTION_NAME(Type, Columns, Remainder, []);
454
455 demarshal(startup, Encoded) ->
456 1 ?LOG_DEBUG(#{startup => Encoded}),
457 1 {[Major, Minor], R0} = ?FUNCTION_NAME([int16, int16], Encoded),
458 1 {Parameters, R1} = ?FUNCTION_NAME(pairs, R0),
459 1 {#{major => Major, minor => Minor, parameters => Parameters}, R1};
460
461 demarshal(pairs, Encoded) ->
462 1 ?LOG_DEBUG(#{pairs => Encoded}),
463 1 {pgmp_binary:foldl(
464 fun
465 (<<0>>, A) ->
466 1 {<<>>, A};
467
468 (EncodedPair, A) ->
469 6 {[Key, Value], Remainder} = ?FUNCTION_NAME(
470 [string, string],
471 EncodedPair),
472 6 {Remainder, A#{Key => Value}}
473 end,
474 #{},
475 Encoded),
476 <<>>};
477
478 demarshal(password, Encoded) ->
479 1 ?FUNCTION_NAME(string, Encoded);
480
481 demarshal(query, Encoded) ->
482 2 ?FUNCTION_NAME(string, Encoded);
483
484 demarshal(ssl, <<"N">>) ->
485
:-(
{false, <<>>};
486 demarshal(ssl, <<"S">>) ->
487 66 {true, <<>>};
488
489 demarshal(terminate, <<>>) ->
490
:-(
{#{}, <<>>}.
491
492
493 -type command_complete_operation() :: insert
494 | delete
495 | select
496 | update
497 | move
498 | fetch
499 | copy.
500
501 -type command_complete_response() :: {{command_complete_operation(), non_neg_integer()}, binary()}
502 | {atom(), binary()}.
503
504 -type authentication_response() :: {authenticated, binary()}
505 | {kerberos, binary()}
506 | {{md5_password, binary()}, binary()}
507 | {scm_credentials, binary()}
508 | {gss, binary()}
509 | {sspi, binary()}
510 | {{sasl, any()}, binary()}
511 | {{sasl_continue, binary()}, <<>>}
512 | {{sasl_final, binary()}, <<>>}.
513
514 -spec demarshal(command_complete, binary(), binary()) -> command_complete_response();
515 (authentication, non_neg_integer(), binary()) -> authentication_response();
516 (notice_response, binary(), [{<<_:8>>, binary()}]) -> {[{<<_:8>>, binary()}], binary()};
517 (error_response, binary(), [{<<_:8>>, binary()}]) -> {[{<<_:8>>, binary()}], binary()};
518 ([atom()], [type()], binary()) -> {#{atom() => any()}, binary()}.
519
520
521 demarshal(command_complete, <<"INSERT 0 ", Rows/bytes>>, Remainder) ->
522 217 {{insert, binary_to_integer(Rows)}, Remainder};
523
524 demarshal(command_complete, <<"DELETE ", Rows/bytes>>, Remainder) ->
525 5 {{delete, binary_to_integer(Rows)}, Remainder};
526
527 demarshal(command_complete, <<"SELECT ", Rows/bytes>>, Remainder) ->
528 3735 {{select, binary_to_integer(Rows)}, Remainder};
529
530 demarshal(command_complete, <<"UPDATE ", Rows/bytes>>, Remainder) ->
531 4 {{update, binary_to_integer(Rows)}, Remainder};
532
533 demarshal(command_complete, <<"MOVE ", Rows/bytes>>, Remainder) ->
534
:-(
{{move, binary_to_integer(Rows)}, Remainder};
535
536 demarshal(command_complete, <<"FETCH ", Rows/bytes>>, Remainder) ->
537
:-(
{{fetch, binary_to_integer(Rows)}, Remainder};
538
539 demarshal(command_complete, <<"COPY ", Rows/bytes>>, Remainder) ->
540 1 {{copy, binary_to_integer(Rows)}, Remainder};
541
542 demarshal(command_complete, Decoded, Remainder) ->
543 212 {pgmp_util:snake_case(
544 lists:map(
545 fun
546 (Name) ->
547 238 binary_to_list(string:lowercase(Name))
548 end,
549 binary:split(
550 Decoded,
551 <<" ">>,
552 [global]))),
553 Remainder};
554
555 demarshal(authentication, 0, R) ->
556 66 {authenticated, R};
557
558 demarshal(authentication, 2, R) ->
559
:-(
{kerberos, R};
560
561 demarshal(authentication, 3, R) ->
562
:-(
{clear_text_password, R};
563
564 demarshal(authentication, 5, R0) ->
565
:-(
{Salt, R1} = ?FUNCTION_NAME({byte, 4}, R0),
566
:-(
{{md5_password, Salt}, R1};
567
568 demarshal(authentication, 6, R) ->
569
:-(
{scm_credentials, R};
570
571 demarshal(authentication, 7, R) ->
572
:-(
{gss, R};
573
574 demarshal(authentication, 9, R) ->
575
:-(
{sspi, R};
576
577 demarshal(authentication, 10, R) ->
578 66 {{sasl,
579 lists:reverse(
580 pgmp_binary:foldl(
581 fun
582 (<<0:8>>, A) ->
583 66 {<<>>, A};
584
585 (Encoded, A) ->
586 132 {Mechanism, Remainder} = ?FUNCTION_NAME(string, Encoded),
587 132 {Remainder, [Mechanism | A]}
588 end,
589 [],
590 R))},
591 <<>>};
592
593 demarshal(authentication, 11, R) ->
594 132 {{sasl_continue, R}, <<>>};
595
596 demarshal(authentication, 12, R) ->
597 66 {{sasl_final, R}, <<>>};
598
599 demarshal(Type, <<0:8, Remainder/bytes>>, A)
600 when Type == notice_response;
601 Type == error_response ->
602 14 {lists:reverse(A), Remainder};
603
604 demarshal(Type, <<Tag:1/bytes, R0/bytes>>, A)
605 when Type == notice_response;
606 Type == error_response ->
607 113 {Value, R1} = ?FUNCTION_NAME(string, R0),
608 113 ?FUNCTION_NAME(Type, R1, [{Tag, Value} | A]);
609
610 demarshal(Keys, Types, Encoded) when is_list(Keys), is_list(Types) ->
611 9094 {Values, Remainder} = ?FUNCTION_NAME(Types, Encoded),
612 9094 {maps:from_list(lists:zip(Keys, Values)), Remainder}.
613
614
615 -type row_description() :: #{field_name := binary(),
616 table_oid := pgmp:int32(),
617 column_number := pgmp:int16(),
618 type_oid := pgmp:int32(),
619 type_size := pgmp:int16(),
620 type_modifier := pgmp:int32(),
621 format := text | binary}.
622
623 -type tuple_data() :: #{format := text, value := any()}
624 | null.
625
626 -spec demarshal(row_description, non_neg_integer(), binary(), [row_description()]) -> {[row_description(), ...], binary()};
627 (data_row, non_neg_integer(), binary(), [pgmp_data_row:decoded()]) -> {[pgmp_data_row:decoded()], binary()};
628 (tuple_data, non_neg_integer(), binary(), [tuple_data()]) -> {[tuple_data()], binary()}.
629
630
631 demarshal(row_description, 0, Remainder, A) ->
632 7777 {lists:reverse(A), Remainder};
633
634 demarshal(row_description = Type, Columns, D0, A) ->
635 8544 Keys = [field_name,
636 table_oid,
637 column_number,
638 type_oid,
639 type_size,
640 type_modifier,
641 format],
642
643 8544 Types = [string,
644 int32,
645 int16,
646 int32,
647 int16,
648 int32,
649 int16],
650
651 8544 {#{format := Format} = KV, D1} = ?FUNCTION_NAME(Keys, Types, D0),
652 8544 ?FUNCTION_NAME(Type,
653 Columns - 1,
654 D1,
655 [KV#{format := text_binary(Format)} | A]);
656
657 demarshal(data_row, 0, Remainder, A) ->
658 10236 {lists:reverse(A), Remainder};
659
660 demarshal(data_row = Type, Column, D0, A) ->
661 200801 case demarshal(int32, D0) of
662 {0, D1} ->
663 33 ?FUNCTION_NAME(Type, Column - 1, D1, [<<>> | A]);
664
665 {-1, D1} ->
666 18375 ?FUNCTION_NAME(Type, Column - 1, D1, [null | A]);
667
668 {Size, D1} ->
669 182393 <<Data:Size/bytes, D2/bytes>> = D1,
670 182393 ?FUNCTION_NAME(Type, Column - 1, D2, [Data | A])
671 end;
672
673 demarshal(tuple_data, 0, Remainder, A) ->
674 70 {lists:reverse(A), Remainder};
675
676
677 demarshal(tuple_data, Columns, <<"n", Remainder/bytes>>, A) ->
678 7 ?FUNCTION_NAME(tuple_data,
679 Columns - 1,
680 Remainder,
681 [null | A]);
682
683 demarshal(tuple_data,
684 Columns,
685 <<"b", Length:32, Value:Length/bytes, Remainder/bytes>>,
686 A) ->
687 129 ?FUNCTION_NAME(tuple_data,
688 Columns - 1,
689 Remainder,
690 [#{format => binary, value => Value} | A]);
691
692 demarshal(tuple_data,
693 Columns,
694 <<"t", Length:32, Value:Length/bytes, Remainder/bytes>>,
695 A) ->
696 6 ?FUNCTION_NAME(tuple_data,
697 Columns - 1,
698 Remainder,
699 [#{format => text, value => Value} | A]).
700
701
702 -spec size_inclusive(iodata()) -> iolist().
703
704 size_inclusive(Data) ->
705 39502 [marshal(int32, iolist_size(Data) + 4), Data].
706
707
708 -spec size_exclusive(iodata()) -> iolist().
709
710 size_exclusive(Data) ->
711 962 [marshal(int32, iolist_size(Data)), Data].
712
713
714 marshal(Names, Types, Bindings) ->
715 12 ?LOG_DEBUG(#{names => Names, types => Types, bindings => Bindings}),
716 12 lists:map(
717 fun
718 ({Name, Type}) ->
719 43 ?FUNCTION_NAME(Type, maps:get(Name, Bindings))
720 end,
721 lists:zip(Names, Types)).
722
723
724 -spec marshal(string, iodata()) -> iolist();
725 (byte, <<_:8>>) -> iodata();
726 (int8, pgmp:int8()) -> nonempty_binary();
727 (int16, pgmp:int16()) -> nonempty_binary();
728 (int32, pgmp:int32()) -> nonempty_binary();
729 (int64, pgmp:int32()) -> nonempty_binary();
730 ({int, 8}, pgmp:int8()) -> nonempty_binary();
731 ({int, 16}, pgmp:int16()) -> nonempty_binary();
732 ({int, 32}, pgmp:int32()) -> nonempty_binary();
733 ({int, 64}, pgmp:int64()) -> nonempty_binary();
734 (x_log_data, x_log_data()) -> iodata();
735 (authentication, ok) -> iodata();
736 (ready_for_query, tx_status()) -> iodata();
737 (row_description, [row_description()]) -> iodata();
738 (data_row, iodata()) -> iodata();
739 (command_complete, {iodata() | atom(), integer()} | iodata() | atom()) -> iodata();
740 (copy_data, iodata()) -> iodata();
741 (copy_both_response, copy_both_response()) -> iodata();
742 (copy_out_response, copy_out_response()) -> iodata();
743 (tuple_data, tuple_data()) -> iodata().
744
745 marshal(string, Value) ->
746 27912 [Value, <<0:8>>];
747
748 marshal(byte, Value) ->
749 198 Value;
750
751 marshal({byte, N}, Value) when byte_size(Value) == N ->
752 2 Value;
753
754 marshal(int8, Value) ->
755 6 ?FUNCTION_NAME({int, 8}, Value);
756
757 marshal(int16, Value) ->
758 23579 ?FUNCTION_NAME({int, 16}, Value);
759
760 marshal(int32, Value) ->
761 48703 ?FUNCTION_NAME({int, 32}, Value);
762
763 marshal(int64, Value) ->
764 326 ?FUNCTION_NAME({int, 64}, Value);
765
766 marshal({int, Size}, Value) when is_integer(Value)->
767 73987 <<Value:Size>>;
768
769 marshal(authentication, ok) ->
770
:-(
["R", size_inclusive([<<0:32>>])];
771
772 marshal(ready_for_query, TxStatus) ->
773
:-(
["Z", size_inclusive(tx_status(TxStatus))];
774
775 marshal(command_complete = Tag, {Command, Rows}) when is_atom(Command),
776 is_integer(Rows) ->
777 1 ?FUNCTION_NAME(
778 Tag,
779 [string:uppercase(atom_to_list(Command)),
780 " ",
781 integer_to_list(Rows)]);
782
783 marshal(command_complete = Tag, Command) when is_atom(Command) ->
784
:-(
?FUNCTION_NAME(Tag, string:uppercase(atom_to_list(Command)));
785
786 marshal(command_complete, Description) ->
787 1 ["C", size_inclusive(marshal(string, Description))];
788
789 marshal(row_description, FieldDescriptions) ->
790 2 ["T",
791 size_inclusive(
792 [marshal(int16, length(FieldDescriptions)),
793 lists:map(
794 fun
795 (#{field_name := FieldName,
796 table_oid := TableOID,
797 column_number := ColumnNumber,
798 type_oid := TypeOID,
799 type_size := TypeSize,
800 type_modifier := TypeModifier,
801 format := Format}) ->
802 5 [marshal(string, FieldName),
803 marshal(int32, TableOID),
804 marshal(int16, ColumnNumber),
805 marshal(int32, TypeOID),
806 marshal(int16, TypeSize),
807 marshal(int32, TypeModifier),
808 marshal(int16, text_binary(Format))]
809 end,
810 FieldDescriptions)])];
811
812 marshal(data_row, Columns) ->
813
:-(
["D",
814 size_inclusive(
815 [marshal(int16, length(Columns)),
816 lists:map(
817 fun
818 (<<-1:32/signed>> = Column) ->
819
:-(
Column;
820
821 (Column) ->
822
:-(
size_exclusive(Column)
823 end,
824 Columns)])];
825
826 marshal(copy_data, {standby_status_update, StandbyStatusUpdate}) ->
827 1 ["d",
828 size_inclusive(
829 ["r",
830 ?FUNCTION_NAME(
831 [written,
832 flushed,
833 applied,
834 clock,
835 reply],
836 [int64,
837 int64,
838 int64,
839 int64,
840 {byte, 1}],
841 maps:map(
842 fun
843 (reply, false) ->
844 1 <<0:8>>;
845
846 (reply, true) ->
847
848
:-(
<<1:8>>;
849
850 (_, V) ->
851 4 V
852 end,
853 StandbyStatusUpdate))])];
854
855 marshal(copy_data, {keepalive, PrimaryKeepalive}) ->
856 1 ["d",
857 size_inclusive(
858 ["k",
859 ?FUNCTION_NAME(
860 [end_wal, clock, reply],
861 [int64, int64, {byte, 1}],
862 maps:map(
863 fun
864 (reply, false) ->
865 1 <<0:8>>;
866
867 (reply, true) ->
868
869
:-(
<<1:8>>;
870
871 (_, V) ->
872 2 V
873 end,
874 PrimaryKeepalive))])];
875
876 marshal(copy_data, {x_log_data, #{stream := Stream} = XLogData}) ->
877 5 ["d",
878 size_inclusive(
879 ["w",
880 ?FUNCTION_NAME(
881 [start_wal, end_wal, clock],
882 [int64, int64, int64],
883 XLogData),
884 ?FUNCTION_NAME(x_log_data, Stream)])];
885
886 marshal(x_log_data, {begin_transaction, BeginTransaction}) ->
887 1 ["B",
888 ?FUNCTION_NAME(
889 [final_lsn, commit_timestamp, xid],
890 [int64, int64, int32],
891 BeginTransaction)];
892
893 marshal(x_log_data, {relation, #{columns := Columns} = Relation}) ->
894 1 ["R",
895 ?FUNCTION_NAME(
896 [id, namespace, name, replica_identity, ncols],
897 [int32, string, string, int8, int16],
898 maps:merge(
899 #{ncols => length(Columns)},
900 Relation)),
901 lists:map(
902 fun
903 (Column) ->
904 2 ?FUNCTION_NAME(
905 [flags, name, type, modifier],
906 [int8, string, int32, int32],
907 Column)
908 end,
909 Columns)];
910
911 marshal(x_log_data,
912 {insert,
913 #{relation := Relation, tuple := Tuple}}) ->
914 2 ["I",
915 ?FUNCTION_NAME(int32, Relation),
916 "N",
917 ?FUNCTION_NAME(tuple_data, Tuple)];
918
919 marshal(x_log_data,
920 {update, #{key := Key, relation := Relation, new := New}}) ->
921
:-(
["U",
922 ?FUNCTION_NAME(int32, Relation),
923 "K",
924 ?FUNCTION_NAME(tuple_data, Key),
925 "N",
926 ?FUNCTION_NAME(tuple_data, New)];
927
928 marshal(x_log_data,
929 {update, #{relation := Relation, new := New}}) ->
930
:-(
["U",
931 ?FUNCTION_NAME(int32, Relation),
932 "N",
933 ?FUNCTION_NAME(tuple_data, New)];
934
935 marshal(x_log_data,
936 {delete, #{key := Key, relation := Relation}}) ->
937
:-(
["D",
938 ?FUNCTION_NAME(int32, Relation),
939 "K",
940 ?FUNCTION_NAME(tuple_data, Key)];
941
942 marshal(x_log_data,
943 {truncate, #{options := Options, relations := Relations}}) ->
944
:-(
["T",
945 ?FUNCTION_NAME(int32, length(Relations)),
946 ?FUNCTION_NAME(int8, Options),
947
:-(
[?FUNCTION_NAME(int32, Relation) || Relation <- Relations]];
948
949 marshal(tuple_data, Columns) ->
950 2 [?FUNCTION_NAME(int16, length(Columns)),
951 lists:map(
952 fun
953 (null) ->
954
:-(
"n";
955
956 (#{format := Format, value := Value}) ->
957 3 [case Format of
958 binary ->
959 2 "b";
960
961 text ->
962 1 "t"
963 end,
964 ?FUNCTION_NAME(int32, byte_size(Value)), Value]
965 end,
966 Columns)];
967
968 marshal(x_log_data, {commit, Commit}) ->
969 1 ["C",
970 ?FUNCTION_NAME(
971 [flags, commit_lsn, end_lsn, commit_timestamp],
972 [int8, int64, int64, int64],
973 Commit)];
974
975 marshal(copy_both_response, #{format := Format, n_of_cols := N}) ->
976 1 ["W",
977 size_inclusive(
978 [?FUNCTION_NAME(int8, text_binary(Format)),
979 ?FUNCTION_NAME(int16, N)])];
980
981 marshal(copy_out_response, #{format := Format, col_formats := ColFormats}) ->
982 1 ["H",
983 size_inclusive(
984 [?FUNCTION_NAME(int8, text_binary(Format)),
985 ?FUNCTION_NAME(int16, length(ColFormats)),
986 lists:map(
987 fun
988 (ColFormat) ->
989 2 ?FUNCTION_NAME(int16, text_binary(ColFormat))
990 end,
991 ColFormats)])];
992
993 marshal(copy_data, Encoded) ->
994 1 ["d", size_inclusive(Encoded)];
995
996 marshal(copy_done, <<>> = Encoded) ->
997 1 ["c", size_inclusive(Encoded)];
998
999 marshal(parameter_status, {Key, Value}) ->
1000 1 ["S",
1001 size_inclusive(
1002 [?FUNCTION_NAME(string, Key),
1003 ?FUNCTION_NAME(string, Value)])];
1004
1005 marshal(Type, Value) ->
1006 1 error(badarg, [Type, Value]).
1007
1008
1009 -type tx_status() :: idle
1010 | in_tx_block
1011 | in_failed_tx_block.
1012
1013 tx_status(idle) ->
1014
:-(
<<"I">>;
1015 tx_status(in_tx_block) ->
1016
:-(
<<"T">>;
1017 tx_status(in_failed_tx_block) ->
1018
:-(
<<"E">>;
1019 tx_status(<<"I">>) ->
1020 192 idle;
1021 tx_status(<<"T">>) ->
1022 89 in_tx_block;
1023 tx_status(<<"E">>) ->
1024 5 in_failed_tx_block.
1025
1026
1027 9 text_binary(text) -> 0;
1028
:-(
text_binary(binary) -> 1;
1029 4293 text_binary(0) -> text;
1030 4254 text_binary(1) -> binary.
Line Hits Source