1 |
|
%% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com> |
2 |
|
%% |
3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
%% you may not use this file except in compliance with the License. |
5 |
|
%% You may obtain a copy of the License at |
6 |
|
%% |
7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
%% |
9 |
|
%% Unless required by applicable law or agreed to in writing, software |
10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
%% See the License for the specific language governing permissions and |
13 |
|
%% limitations under the License. |
14 |
|
|
15 |
|
|
16 |
|
-module(pgmp_codec). |
17 |
|
|
18 |
|
|
19 |
|
-export([demarshal/1]). |
20 |
|
-export([demarshal/2]). |
21 |
|
-export([marshal/2]). |
22 |
|
-export([size_exclusive/1]). |
23 |
|
-export([size_inclusive/1]). |
24 |
|
-export_type([command_complete_response/0]). |
25 |
|
-export_type([row_description/0]). |
26 |
|
-include_lib("kernel/include/logger.hrl"). |
27 |
|
|
28 |
|
|
29 |
|
-type pg_integer() :: int8 |
30 |
|
| int16 |
31 |
|
| int32 |
32 |
|
| int64. |
33 |
|
|
34 |
|
-type type() :: {byte, pos_integer()} |
35 |
|
| pg_integer() |
36 |
|
| {int, pgmp:int_bit_sizes()} |
37 |
|
| clock |
38 |
|
| string |
39 |
|
| empty_query_response |
40 |
|
| bind_complete |
41 |
|
| portal_suspended |
42 |
|
| no_data |
43 |
|
| parse_complete |
44 |
|
| notice_response |
45 |
|
| error_response |
46 |
|
| authentication |
47 |
|
| parameter_description |
48 |
|
| x_log_data |
49 |
|
| tuple_data |
50 |
|
| copy_data |
51 |
|
| copy_both_response |
52 |
|
| parameter_status |
53 |
|
| backend_key_data |
54 |
|
| command_complete |
55 |
|
| ready_for_query |
56 |
|
| row_description |
57 |
|
| data_row. |
58 |
|
|
59 |
|
demarshal({Type, Encoded}) -> |
60 |
35839 |
?LOG_DEBUG(#{type => Type, encoded => Encoded}), |
61 |
35839 |
?FUNCTION_NAME(Type, Encoded). |
62 |
|
|
63 |
|
-type x_log_begin_transaction() :: #{final_lsn := pgmp:int64(), |
64 |
|
commit_timestamp := pgmp:int64(), |
65 |
|
xid := pgmp:int32()}. |
66 |
|
|
67 |
|
-type x_log_logical_decoding() :: #{xid := pgmp:int32(), |
68 |
|
flags := pgmp:int8(), |
69 |
|
lsn := pgmp:int64(), |
70 |
|
prefix := binary(), |
71 |
|
content := pgmp:int32()}. |
72 |
|
|
73 |
|
-type x_log_commit() :: #{flags := pgmp:int8(), |
74 |
|
commit_lsn := pgmp:int64(), |
75 |
|
end_lsn := pgmp:int64(), |
76 |
|
commit_timestamp := pgmp:int64()}. |
77 |
|
|
78 |
|
-type x_log_origin() :: #{commit_lsn := pgmp:int64(), |
79 |
|
name := binary()}. |
80 |
|
|
81 |
|
-type x_log_relation() :: #{id := pgmp:int32(), |
82 |
|
namespace := binary(), |
83 |
|
name := binary(), |
84 |
|
replica_identity := pgmp:int8(), |
85 |
|
ncols := pgmp:int16()}. |
86 |
|
|
87 |
|
-type x_log_type() :: #{xid := pgmp:int32(), |
88 |
|
type := pgmp:int32(), |
89 |
|
namespace := binary(), |
90 |
|
name := binary()}. |
91 |
|
|
92 |
|
-type x_log_insert() :: #{relation := pgmp:int32(), |
93 |
|
tuple := [tuple_data()]}. |
94 |
|
|
95 |
|
-type x_log_update() :: #{relation := pgmp:int32(), |
96 |
|
key := [tuple_data()], |
97 |
|
new := [tuple_data()]}. |
98 |
|
|
99 |
|
-type x_log_delete() :: #{relation := pgmp:int32(), |
100 |
|
key := [tuple_data()]}. |
101 |
|
|
102 |
|
-type x_log_truncate() :: #{relations := [pgmp:int32()], |
103 |
|
options := pgmp:int8()}. |
104 |
|
|
105 |
|
-type x_log_data() :: {begin_transaction, x_log_begin_transaction()} |
106 |
|
| {logical_decoding, x_log_logical_decoding()} |
107 |
|
| {commit, x_log_commit()} |
108 |
|
| {origin, x_log_origin()} |
109 |
|
| {relation, x_log_relation()} |
110 |
|
| {type, x_log_type()} |
111 |
|
| {insert, x_log_insert()} |
112 |
|
| {update, x_log_update()} |
113 |
|
| {delete, x_log_delete()} |
114 |
|
| {truncate, x_log_truncate()}. |
115 |
|
|
116 |
|
-type startup() :: #{major := integer(), |
117 |
|
minor := integer(), |
118 |
|
parameters := #{binary() => binary()}}. |
119 |
|
|
120 |
|
-type copy_both_response() :: #{format := text | binary, |
121 |
|
n_of_cols := non_neg_integer()}. |
122 |
|
|
123 |
|
-type copy_out_response() :: #{format := text | binary, |
124 |
|
col_formats := [text | binary]}. |
125 |
|
|
126 |
|
-spec demarshal(nonempty_list(type()), binary()) -> {[any()], binary()}; |
127 |
|
({byte, pos_integer()}, binary()) -> {binary(), binary()}; |
128 |
|
({int, 8}, binary()) -> {pgmp:int8(), binary()}; |
129 |
|
(int8, binary()) -> {pgmp:int8(), binary()}; |
130 |
|
({int, 16}, binary()) -> {pgmp:int16(), binary()}; |
131 |
|
(int16, binary()) -> {pgmp:int16(), binary()}; |
132 |
|
({int, 32}, binary()) -> {pgmp:int32(), binary()}; |
133 |
|
(int32, binary()) -> {pgmp:int32(), binary()}; |
134 |
|
({int, 64}, binary()) -> {pgmp:int64(), binary()}; |
135 |
|
(int64, binary()) -> {pgmp:int64(), binary()}; |
136 |
|
(clock, binary()) -> {pgmp:int64(), binary()}; |
137 |
|
(string, binary()) -> {binary(), binary()}; |
138 |
|
(empty_query_response, binary()) -> {[], <<>>}; |
139 |
|
(bind_complete, binary()) -> {[], <<>>}; |
140 |
|
(portal_suspended, binary()) -> {[], <<>>}; |
141 |
|
(no_data, binary()) -> {[], <<>>}; |
142 |
|
(parse_complete, binary()) -> {[], <<>>}; |
143 |
|
(notice_response, binary()) -> {any(), binary()}; |
144 |
|
(error_response, binary()) -> {any(), binary()}; |
145 |
|
(authentication, binary()) -> {any(), binary()}; |
146 |
|
(parameter_description, binary()) -> {[pgmp:int32()], binary()}; |
147 |
|
(password, binary()) -> {binary(), binary()}; |
148 |
|
(startup, binary()) -> {startup(), binary()}; |
149 |
|
(x_log_data, binary()) -> {x_log_data(), binary()}; |
150 |
|
(tuple_data, binary()) -> {[tuple_data()], binary()}; |
151 |
|
(pairs, binary()) -> {#{}, binary()}; |
152 |
|
(copy_out_response, binary()) -> {copy_out_response(), binary()}; |
153 |
|
(copy_both_response, binary()) -> {copy_both_response(), binary()}; |
154 |
|
(copy_data, binary()) -> {{x_log_data, x_log_data()}, binary()}; |
155 |
|
(ssl, binary()) -> {ssl, boolean()}. |
156 |
|
|
157 |
|
demarshal(Types, Encoded) when is_list(Types) -> |
158 |
13910 |
?LOG_DEBUG(#{types => Types, encoded => Encoded}), |
159 |
13910 |
lists:mapfoldl(fun demarshal/2, Encoded, Types); |
160 |
|
|
161 |
|
demarshal({byte, N}, Encoded) -> |
162 |
166 |
<<Decoded:N/bytes, Remainder/bytes>> = Encoded, |
163 |
166 |
{Decoded, Remainder}; |
164 |
|
|
165 |
|
demarshal(int8, Encoded) -> |
166 |
106 |
?FUNCTION_NAME({int, 8}, Encoded); |
167 |
|
|
168 |
|
demarshal(int16, Encoded) -> |
169 |
47472 |
?FUNCTION_NAME({int, 16}, Encoded); |
170 |
|
|
171 |
|
demarshal(int32, Encoded) -> |
172 |
230824 |
?FUNCTION_NAME({int, 32}, Encoded); |
173 |
|
|
174 |
|
demarshal(int64, Encoded) -> |
175 |
1336 |
?FUNCTION_NAME({int, 64}, Encoded); |
176 |
|
|
177 |
|
demarshal(clock, Encoded) -> |
178 |
:-( |
{Clock, Remainder} = ?FUNCTION_NAME({int, 64}, Encoded), |
179 |
:-( |
{pgmp_calendar:decode(Clock), Remainder}; |
180 |
|
|
181 |
|
demarshal({int, Bits}, Encoded) |
182 |
|
when Bits == 8; Bits == 16; Bits == 32; Bits == 64 -> |
183 |
279738 |
<<Decoded:Bits/signed, Remainder/bytes>> = Encoded, |
184 |
279738 |
{Decoded, Remainder}; |
185 |
|
|
186 |
|
demarshal(string, Encoded) -> |
187 |
14869 |
list_to_tuple(binary:split(Encoded, <<0>>)); |
188 |
|
|
189 |
|
demarshal(Type, <<>>) |
190 |
|
when Type == empty_query_response; |
191 |
|
Type == bind_complete; |
192 |
|
Type == portal_suspended; |
193 |
|
Type == no_data; |
194 |
|
Type == parse_complete -> |
195 |
7759 |
{[], <<>>}; |
196 |
|
|
197 |
|
demarshal(Type, Encoded) when Type == notice_response; Type == error_response -> |
198 |
14 |
?FUNCTION_NAME(Type, Encoded, []); |
199 |
|
|
200 |
|
demarshal(authentication, Encoded) -> |
201 |
330 |
{Code, R} = ?FUNCTION_NAME(int32, Encoded), |
202 |
330 |
?FUNCTION_NAME(authentication, Code, R); |
203 |
|
|
204 |
|
demarshal(parameter_description, Encoded) -> |
205 |
3807 |
{N, R0} = ?FUNCTION_NAME(int16, Encoded), |
206 |
3807 |
?FUNCTION_NAME(lists:duplicate(N, int32), R0); |
207 |
|
|
208 |
|
demarshal(x_log_data, <<"B", BeginMessage/bytes>>) -> |
209 |
67 |
?LOG_DEBUG(#{start_transaction => BeginMessage}), |
210 |
67 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
211 |
|
[final_lsn, commit_timestamp, xid], |
212 |
|
[int64, int64, int32], |
213 |
|
BeginMessage), |
214 |
67 |
{{begin_transaction, Decoded}, Remainder}; |
215 |
|
|
216 |
|
demarshal(x_log_data, <<"M", LogicalDecoding/bytes>>) -> |
217 |
:-( |
?LOG_DEBUG(#{logical => LogicalDecoding}), |
218 |
:-( |
{Decoded, Remainder} = ?FUNCTION_NAME( |
219 |
|
[xid, flags, lsn, prefix, content], |
220 |
|
[int32, int8, int64, string, int32], |
221 |
|
LogicalDecoding), |
222 |
:-( |
{{logical_decoding, Decoded}, Remainder}; |
223 |
|
|
224 |
|
demarshal(x_log_data, <<"C", CommitMessage/bytes>>) -> |
225 |
68 |
?LOG_DEBUG(#{commit => CommitMessage}), |
226 |
68 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
227 |
|
[flags, commit_lsn, end_lsn, commit_timestamp], |
228 |
|
[int8, int64, int64, int64], |
229 |
|
CommitMessage), |
230 |
68 |
{{commit, Decoded}, Remainder}; |
231 |
|
|
232 |
|
demarshal(x_log_data, <<"O", OriginMessage/bytes>>) -> |
233 |
:-( |
?LOG_DEBUG(#{origin => OriginMessage}), |
234 |
:-( |
{Decoded, Remainder} = ?FUNCTION_NAME( |
235 |
|
[commit_lsn, name], |
236 |
|
[int64, string], |
237 |
|
OriginMessage), |
238 |
:-( |
{{origin, Decoded}, Remainder}; |
239 |
|
|
240 |
|
demarshal(x_log_data, <<"R", RelationMessage/bytes>>) -> |
241 |
9 |
?LOG_DEBUG(#{relation => RelationMessage}), |
242 |
9 |
{Relation, R0} = ?FUNCTION_NAME( |
243 |
|
[id, namespace, name, replica_identity, ncols], |
244 |
|
[int32, string, string, int8, int16], |
245 |
|
RelationMessage), |
246 |
9 |
Columns = lists:reverse( |
247 |
|
pgmp_binary:foldl( |
248 |
|
fun |
249 |
|
(EncodedColumn, A) -> |
250 |
19 |
{Column, Remainder} = ?FUNCTION_NAME( |
251 |
|
[flags, name, type, modifier], |
252 |
|
[int8, string, int32, int32], |
253 |
|
EncodedColumn), |
254 |
19 |
{Remainder, [Column | A]} |
255 |
|
end, |
256 |
|
[], |
257 |
|
R0)), |
258 |
9 |
{{relation, Relation#{columns => Columns}}, <<>>}; |
259 |
|
|
260 |
|
demarshal(x_log_data, <<"Y", TypeMessage/bytes>>) -> |
261 |
:-( |
?LOG_DEBUG(#{type => TypeMessage}), |
262 |
:-( |
{Decoded, Remainder} = ?FUNCTION_NAME([xid, type, namespace, name], |
263 |
|
[in32, int32, string, string], |
264 |
|
TypeMessage), |
265 |
:-( |
{{type, Decoded}, Remainder}; |
266 |
|
|
267 |
|
demarshal(x_log_data, <<"I", Id:32, "N", TupleData/bytes>>) -> |
268 |
58 |
?LOG_DEBUG(#{id => Id, tuple => TupleData}), |
269 |
58 |
{Tuple, Remainder} = ?FUNCTION_NAME(tuple_data, TupleData), |
270 |
58 |
{{insert, #{relation => Id, tuple => Tuple}}, Remainder}; |
271 |
|
|
272 |
|
demarshal(x_log_data, <<"U", Relation:32, "K", KeyData/bytes>>) -> |
273 |
1 |
?LOG_DEBUG(#{update => Relation, key => KeyData}), |
274 |
1 |
{Key, <<"N", NewData/bytes>>} = ?FUNCTION_NAME(tuple_data, KeyData), |
275 |
1 |
{New, Remainder} = ?FUNCTION_NAME(tuple_data, NewData), |
276 |
1 |
{{update, #{key => Key, relation => Relation, new => New}}, Remainder}; |
277 |
|
|
278 |
|
demarshal(x_log_data, <<"U", Relation:32, "N", NewData/bytes>>) -> |
279 |
5 |
?LOG_DEBUG(#{update => Relation, new => NewData}), |
280 |
5 |
{New, Remainder} = ?FUNCTION_NAME(tuple_data, NewData), |
281 |
5 |
{{update, #{relation => Relation, new => New}}, Remainder}; |
282 |
|
|
283 |
|
demarshal(x_log_data, <<"D", Relation:32, "K", KeyData/bytes>>) -> |
284 |
5 |
?LOG_DEBUG(#{update => Relation, delete => KeyData}), |
285 |
5 |
{Key, Remainder} = ?FUNCTION_NAME(tuple_data, KeyData), |
286 |
5 |
{{delete, #{key => Key, relation => Relation}}, Remainder}; |
287 |
|
|
288 |
|
demarshal(x_log_data, <<"T", N:32, Options:8, Data/bytes>>) -> |
289 |
3 |
?LOG_DEBUG(#{truncate => Data}), |
290 |
3 |
{Relations, Remainder} = ?FUNCTION_NAME( |
291 |
|
lists:duplicate(N, int32), |
292 |
|
Data), |
293 |
3 |
{{truncate, #{options => Options, relations => Relations}}, Remainder}; |
294 |
|
|
295 |
|
demarshal(x_log_data, <<"b", BeginPrepare/bytes>>) -> |
296 |
1 |
?LOG_DEBUG(#{begin_prepare => BeginPrepare}), |
297 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
298 |
|
[lsn, end_lsn, timestamp, xid, gid], |
299 |
|
[int64, int64, int64, int32, string], |
300 |
|
BeginPrepare), |
301 |
1 |
{{begin_prepare, Decoded}, Remainder}; |
302 |
|
|
303 |
|
demarshal(x_log_data, <<"P", Prepare/bytes>>) -> |
304 |
1 |
?LOG_DEBUG(#{prepare => Prepare}), |
305 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
306 |
|
[flags, lsn, end_lsn, timestamp, xid, gid], |
307 |
|
[int8, int64, int64, int64, int32, string], |
308 |
|
Prepare), |
309 |
1 |
{{prepare, Decoded}, Remainder}; |
310 |
|
|
311 |
|
demarshal(x_log_data, <<"K", CommitPrepared/bytes>>) -> |
312 |
1 |
?LOG_DEBUG(#{commit_prepared => CommitPrepared}), |
313 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
314 |
|
[flags, lsn, end_lsn, timestamp, xid, gid], |
315 |
|
[int8, int64, int64, int64, int32, string], |
316 |
|
CommitPrepared), |
317 |
1 |
{{commit_prepared, Decoded}, Remainder}; |
318 |
|
|
319 |
|
demarshal(x_log_data, <<"r", RollbackPrepared/bytes>>) -> |
320 |
1 |
?LOG_DEBUG(#{rollback_prepared => RollbackPrepared}), |
321 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
322 |
|
[flags, |
323 |
|
prepared_end_lsn, |
324 |
|
rollback_end_lsn, |
325 |
|
prepare_timestamp, |
326 |
|
rollback_timestamp, |
327 |
|
xid, |
328 |
|
gid], |
329 |
|
[int8, int64, int64, int64, int64, int32, string], |
330 |
|
RollbackPrepared), |
331 |
1 |
{{rollback_prepared, Decoded}, Remainder}; |
332 |
|
|
333 |
|
demarshal(tuple_data, <<Columns:16, TupleData/bytes>>) -> |
334 |
70 |
?LOG_DEBUG(#{columns => Columns, tuple => TupleData}), |
335 |
70 |
?FUNCTION_NAME(tuple_data, Columns, TupleData, []); |
336 |
|
|
337 |
|
demarshal(copy_data, <<"w", XLogData/bytes>>) -> |
338 |
217 |
?LOG_DEBUG(#{x_log => XLogData}), |
339 |
217 |
{Decoded, Stream} = ?FUNCTION_NAME( |
340 |
|
[start_wal, end_wal, clock], |
341 |
|
[int64, int64, int64], |
342 |
|
XLogData), |
343 |
217 |
{WAL, Remainder} = ?FUNCTION_NAME(x_log_data, Stream), |
344 |
217 |
{{x_log_data, Decoded#{stream => WAL}}, Remainder}; |
345 |
|
|
346 |
|
demarshal(copy_data, <<"k", PrimaryKeepalive/bytes>>) -> |
347 |
165 |
?LOG_DEBUG(#{primary_keepalive => PrimaryKeepalive}), |
348 |
165 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
349 |
|
[end_wal, clock, reply], |
350 |
|
[int64, int64, {byte, 1}], |
351 |
|
PrimaryKeepalive), |
352 |
165 |
{{keepalive, |
353 |
|
maps:map( |
354 |
|
fun |
355 |
|
(reply, <<0:8>>) -> |
356 |
163 |
false; |
357 |
|
|
358 |
|
(reply, <<1:8>>) -> |
359 |
2 |
true; |
360 |
|
|
361 |
|
(_, V) -> |
362 |
330 |
V |
363 |
|
end, |
364 |
|
Decoded)}, |
365 |
|
Remainder}; |
366 |
|
|
367 |
|
demarshal(copy_data, <<"r", StandbyStatusUpdate/bytes>>) -> |
368 |
1 |
?LOG_DEBUG(#{standby_status_update => StandbyStatusUpdate}), |
369 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
370 |
|
[written, |
371 |
|
flushed, |
372 |
|
applied, |
373 |
|
clock, |
374 |
|
reply], |
375 |
|
[int64, |
376 |
|
int64, |
377 |
|
int64, |
378 |
|
int64, |
379 |
|
{byte, 1}], |
380 |
|
StandbyStatusUpdate), |
381 |
1 |
{{standby_status_update, |
382 |
|
maps:map( |
383 |
|
fun |
384 |
|
(reply, <<0:8>>) -> |
385 |
1 |
false; |
386 |
|
|
387 |
|
(reply, <<1:8>>) -> |
388 |
:-( |
true; |
389 |
|
|
390 |
|
(_, V) -> |
391 |
4 |
V |
392 |
|
end, |
393 |
|
Decoded)}, |
394 |
|
Remainder}; |
395 |
|
|
396 |
|
demarshal(copy_data = Tag, Encoded) -> |
397 |
1 |
?LOG_DEBUG(#{Tag => Encoded}), |
398 |
1 |
{Encoded, <<>>}; |
399 |
|
|
400 |
|
demarshal(copy_out_response = Tag, Encoded) -> |
401 |
1 |
?LOG_DEBUG(#{Tag => Encoded}), |
402 |
1 |
{[Format, NumOfCols], R0} = ?FUNCTION_NAME([int8, int16], Encoded), |
403 |
1 |
{ColFormats, R1} = ?FUNCTION_NAME( |
404 |
|
lists:duplicate(NumOfCols, int16), |
405 |
|
R0), |
406 |
|
|
407 |
1 |
{#{format => text_binary(Format), |
408 |
|
col_formats => lists:map( |
409 |
|
fun text_binary/1, |
410 |
|
ColFormats)}, |
411 |
|
R1}; |
412 |
|
|
413 |
|
demarshal(copy_done = Tag, Encoded) -> |
414 |
1 |
?LOG_DEBUG(#{Tag => Encoded}), |
415 |
1 |
{Encoded, <<>>}; |
416 |
|
|
417 |
|
demarshal(copy_both_response = Tag, Encoded) -> |
418 |
6 |
?LOG_DEBUG(#{Tag => Encoded}), |
419 |
6 |
case ?FUNCTION_NAME([int8, int16], Encoded) of |
420 |
|
{[0, N], <<>>} -> |
421 |
6 |
{#{format => text, n_of_cols => N}, <<>>}; |
422 |
|
|
423 |
|
{[1, N], <<>>} -> |
424 |
:-( |
{#{format => binary, n_of_cols => N}, <<>>} |
425 |
|
end; |
426 |
|
|
427 |
|
demarshal(parameter_status = Tag, Encoded) -> |
428 |
925 |
?LOG_DEBUG(#{Tag => Encoded}), |
429 |
925 |
{[K, V], R0} = ?FUNCTION_NAME([string, string], Encoded), |
430 |
925 |
{{K, V}, R0}; |
431 |
|
|
432 |
|
demarshal(backend_key_data = Tag, Encoded) -> |
433 |
66 |
?LOG_DEBUG(#{Tag => Encoded}), |
434 |
66 |
?FUNCTION_NAME([int32, int32], Encoded); |
435 |
|
|
436 |
|
demarshal(command_complete = Tag, Encoded) -> |
437 |
4174 |
?LOG_DEBUG(#{Tag => Encoded}), |
438 |
4174 |
{Decoded, Remainder} = demarshal(string, Encoded), |
439 |
4174 |
?FUNCTION_NAME(Tag, Decoded, Remainder); |
440 |
|
|
441 |
|
|
442 |
|
demarshal(ready_for_query, TxStatus) -> |
443 |
286 |
{tx_status(TxStatus), <<>>}; |
444 |
|
|
445 |
|
demarshal(row_description = Type, Encoded) -> |
446 |
7777 |
?LOG_DEBUG(#{Type => Encoded}), |
447 |
7777 |
{Columns, Remainder} = demarshal(int16, Encoded), |
448 |
7777 |
?FUNCTION_NAME(Type, Columns, Remainder, []); |
449 |
|
|
450 |
|
demarshal(data_row = Type, Encoded) -> |
451 |
10236 |
?LOG_DEBUG(#{Type => Encoded}), |
452 |
10236 |
{Columns, Remainder} = demarshal(int16, Encoded), |
453 |
10236 |
?FUNCTION_NAME(Type, Columns, Remainder, []); |
454 |
|
|
455 |
|
demarshal(startup, Encoded) -> |
456 |
1 |
?LOG_DEBUG(#{startup => Encoded}), |
457 |
1 |
{[Major, Minor], R0} = ?FUNCTION_NAME([int16, int16], Encoded), |
458 |
1 |
{Parameters, R1} = ?FUNCTION_NAME(pairs, R0), |
459 |
1 |
{#{major => Major, minor => Minor, parameters => Parameters}, R1}; |
460 |
|
|
461 |
|
demarshal(pairs, Encoded) -> |
462 |
1 |
?LOG_DEBUG(#{pairs => Encoded}), |
463 |
1 |
{pgmp_binary:foldl( |
464 |
|
fun |
465 |
|
(<<0>>, A) -> |
466 |
1 |
{<<>>, A}; |
467 |
|
|
468 |
|
(EncodedPair, A) -> |
469 |
6 |
{[Key, Value], Remainder} = ?FUNCTION_NAME( |
470 |
|
[string, string], |
471 |
|
EncodedPair), |
472 |
6 |
{Remainder, A#{Key => Value}} |
473 |
|
end, |
474 |
|
#{}, |
475 |
|
Encoded), |
476 |
|
<<>>}; |
477 |
|
|
478 |
|
demarshal(password, Encoded) -> |
479 |
1 |
?FUNCTION_NAME(string, Encoded); |
480 |
|
|
481 |
|
demarshal(query, Encoded) -> |
482 |
2 |
?FUNCTION_NAME(string, Encoded); |
483 |
|
|
484 |
|
demarshal(ssl, <<"N">>) -> |
485 |
:-( |
{false, <<>>}; |
486 |
|
demarshal(ssl, <<"S">>) -> |
487 |
66 |
{true, <<>>}; |
488 |
|
|
489 |
|
demarshal(terminate, <<>>) -> |
490 |
:-( |
{#{}, <<>>}. |
491 |
|
|
492 |
|
|
493 |
|
-type command_complete_operation() :: insert |
494 |
|
| delete |
495 |
|
| select |
496 |
|
| update |
497 |
|
| move |
498 |
|
| fetch |
499 |
|
| copy. |
500 |
|
|
501 |
|
-type command_complete_response() :: {{command_complete_operation(), non_neg_integer()}, binary()} |
502 |
|
| {atom(), binary()}. |
503 |
|
|
504 |
|
-type authentication_response() :: {authenticated, binary()} |
505 |
|
| {kerberos, binary()} |
506 |
|
| {{md5_password, binary()}, binary()} |
507 |
|
| {scm_credentials, binary()} |
508 |
|
| {gss, binary()} |
509 |
|
| {sspi, binary()} |
510 |
|
| {{sasl, any()}, binary()} |
511 |
|
| {{sasl_continue, binary()}, <<>>} |
512 |
|
| {{sasl_final, binary()}, <<>>}. |
513 |
|
|
514 |
|
-spec demarshal(command_complete, binary(), binary()) -> command_complete_response(); |
515 |
|
(authentication, non_neg_integer(), binary()) -> authentication_response(); |
516 |
|
(notice_response, binary(), [{<<_:8>>, binary()}]) -> {[{<<_:8>>, binary()}], binary()}; |
517 |
|
(error_response, binary(), [{<<_:8>>, binary()}]) -> {[{<<_:8>>, binary()}], binary()}; |
518 |
|
([atom()], [type()], binary()) -> {#{atom() => any()}, binary()}. |
519 |
|
|
520 |
|
|
521 |
|
demarshal(command_complete, <<"INSERT 0 ", Rows/bytes>>, Remainder) -> |
522 |
217 |
{{insert, binary_to_integer(Rows)}, Remainder}; |
523 |
|
|
524 |
|
demarshal(command_complete, <<"DELETE ", Rows/bytes>>, Remainder) -> |
525 |
5 |
{{delete, binary_to_integer(Rows)}, Remainder}; |
526 |
|
|
527 |
|
demarshal(command_complete, <<"SELECT ", Rows/bytes>>, Remainder) -> |
528 |
3735 |
{{select, binary_to_integer(Rows)}, Remainder}; |
529 |
|
|
530 |
|
demarshal(command_complete, <<"UPDATE ", Rows/bytes>>, Remainder) -> |
531 |
4 |
{{update, binary_to_integer(Rows)}, Remainder}; |
532 |
|
|
533 |
|
demarshal(command_complete, <<"MOVE ", Rows/bytes>>, Remainder) -> |
534 |
:-( |
{{move, binary_to_integer(Rows)}, Remainder}; |
535 |
|
|
536 |
|
demarshal(command_complete, <<"FETCH ", Rows/bytes>>, Remainder) -> |
537 |
:-( |
{{fetch, binary_to_integer(Rows)}, Remainder}; |
538 |
|
|
539 |
|
demarshal(command_complete, <<"COPY ", Rows/bytes>>, Remainder) -> |
540 |
1 |
{{copy, binary_to_integer(Rows)}, Remainder}; |
541 |
|
|
542 |
|
demarshal(command_complete, Decoded, Remainder) -> |
543 |
212 |
{pgmp_util:snake_case( |
544 |
|
lists:map( |
545 |
|
fun |
546 |
|
(Name) -> |
547 |
238 |
binary_to_list(string:lowercase(Name)) |
548 |
|
end, |
549 |
|
binary:split( |
550 |
|
Decoded, |
551 |
|
<<" ">>, |
552 |
|
[global]))), |
553 |
|
Remainder}; |
554 |
|
|
555 |
|
demarshal(authentication, 0, R) -> |
556 |
66 |
{authenticated, R}; |
557 |
|
|
558 |
|
demarshal(authentication, 2, R) -> |
559 |
:-( |
{kerberos, R}; |
560 |
|
|
561 |
|
demarshal(authentication, 3, R) -> |
562 |
:-( |
{clear_text_password, R}; |
563 |
|
|
564 |
|
demarshal(authentication, 5, R0) -> |
565 |
:-( |
{Salt, R1} = ?FUNCTION_NAME({byte, 4}, R0), |
566 |
:-( |
{{md5_password, Salt}, R1}; |
567 |
|
|
568 |
|
demarshal(authentication, 6, R) -> |
569 |
:-( |
{scm_credentials, R}; |
570 |
|
|
571 |
|
demarshal(authentication, 7, R) -> |
572 |
:-( |
{gss, R}; |
573 |
|
|
574 |
|
demarshal(authentication, 9, R) -> |
575 |
:-( |
{sspi, R}; |
576 |
|
|
577 |
|
demarshal(authentication, 10, R) -> |
578 |
66 |
{{sasl, |
579 |
|
lists:reverse( |
580 |
|
pgmp_binary:foldl( |
581 |
|
fun |
582 |
|
(<<0:8>>, A) -> |
583 |
66 |
{<<>>, A}; |
584 |
|
|
585 |
|
(Encoded, A) -> |
586 |
132 |
{Mechanism, Remainder} = ?FUNCTION_NAME(string, Encoded), |
587 |
132 |
{Remainder, [Mechanism | A]} |
588 |
|
end, |
589 |
|
[], |
590 |
|
R))}, |
591 |
|
<<>>}; |
592 |
|
|
593 |
|
demarshal(authentication, 11, R) -> |
594 |
132 |
{{sasl_continue, R}, <<>>}; |
595 |
|
|
596 |
|
demarshal(authentication, 12, R) -> |
597 |
66 |
{{sasl_final, R}, <<>>}; |
598 |
|
|
599 |
|
demarshal(Type, <<0:8, Remainder/bytes>>, A) |
600 |
|
when Type == notice_response; |
601 |
|
Type == error_response -> |
602 |
14 |
{lists:reverse(A), Remainder}; |
603 |
|
|
604 |
|
demarshal(Type, <<Tag:1/bytes, R0/bytes>>, A) |
605 |
|
when Type == notice_response; |
606 |
|
Type == error_response -> |
607 |
113 |
{Value, R1} = ?FUNCTION_NAME(string, R0), |
608 |
113 |
?FUNCTION_NAME(Type, R1, [{Tag, Value} | A]); |
609 |
|
|
610 |
|
demarshal(Keys, Types, Encoded) when is_list(Keys), is_list(Types) -> |
611 |
9094 |
{Values, Remainder} = ?FUNCTION_NAME(Types, Encoded), |
612 |
9094 |
{maps:from_list(lists:zip(Keys, Values)), Remainder}. |
613 |
|
|
614 |
|
|
615 |
|
-type row_description() :: #{field_name := binary(), |
616 |
|
table_oid := pgmp:int32(), |
617 |
|
column_number := pgmp:int16(), |
618 |
|
type_oid := pgmp:int32(), |
619 |
|
type_size := pgmp:int16(), |
620 |
|
type_modifier := pgmp:int32(), |
621 |
|
format := text | binary}. |
622 |
|
|
623 |
|
-type tuple_data() :: #{format := text, value := any()} |
624 |
|
| null. |
625 |
|
|
626 |
|
-spec demarshal(row_description, non_neg_integer(), binary(), [row_description()]) -> {[row_description(), ...], binary()}; |
627 |
|
(data_row, non_neg_integer(), binary(), [pgmp_data_row:decoded()]) -> {[pgmp_data_row:decoded()], binary()}; |
628 |
|
(tuple_data, non_neg_integer(), binary(), [tuple_data()]) -> {[tuple_data()], binary()}. |
629 |
|
|
630 |
|
|
631 |
|
demarshal(row_description, 0, Remainder, A) -> |
632 |
7777 |
{lists:reverse(A), Remainder}; |
633 |
|
|
634 |
|
demarshal(row_description = Type, Columns, D0, A) -> |
635 |
8544 |
Keys = [field_name, |
636 |
|
table_oid, |
637 |
|
column_number, |
638 |
|
type_oid, |
639 |
|
type_size, |
640 |
|
type_modifier, |
641 |
|
format], |
642 |
|
|
643 |
8544 |
Types = [string, |
644 |
|
int32, |
645 |
|
int16, |
646 |
|
int32, |
647 |
|
int16, |
648 |
|
int32, |
649 |
|
int16], |
650 |
|
|
651 |
8544 |
{#{format := Format} = KV, D1} = ?FUNCTION_NAME(Keys, Types, D0), |
652 |
8544 |
?FUNCTION_NAME(Type, |
653 |
|
Columns - 1, |
654 |
|
D1, |
655 |
|
[KV#{format := text_binary(Format)} | A]); |
656 |
|
|
657 |
|
demarshal(data_row, 0, Remainder, A) -> |
658 |
10236 |
{lists:reverse(A), Remainder}; |
659 |
|
|
660 |
|
demarshal(data_row = Type, Column, D0, A) -> |
661 |
200801 |
case demarshal(int32, D0) of |
662 |
|
{0, D1} -> |
663 |
33 |
?FUNCTION_NAME(Type, Column - 1, D1, [<<>> | A]); |
664 |
|
|
665 |
|
{-1, D1} -> |
666 |
18375 |
?FUNCTION_NAME(Type, Column - 1, D1, [null | A]); |
667 |
|
|
668 |
|
{Size, D1} -> |
669 |
182393 |
<<Data:Size/bytes, D2/bytes>> = D1, |
670 |
182393 |
?FUNCTION_NAME(Type, Column - 1, D2, [Data | A]) |
671 |
|
end; |
672 |
|
|
673 |
|
demarshal(tuple_data, 0, Remainder, A) -> |
674 |
70 |
{lists:reverse(A), Remainder}; |
675 |
|
|
676 |
|
|
677 |
|
demarshal(tuple_data, Columns, <<"n", Remainder/bytes>>, A) -> |
678 |
7 |
?FUNCTION_NAME(tuple_data, |
679 |
|
Columns - 1, |
680 |
|
Remainder, |
681 |
|
[null | A]); |
682 |
|
|
683 |
|
demarshal(tuple_data, |
684 |
|
Columns, |
685 |
|
<<"b", Length:32, Value:Length/bytes, Remainder/bytes>>, |
686 |
|
A) -> |
687 |
129 |
?FUNCTION_NAME(tuple_data, |
688 |
|
Columns - 1, |
689 |
|
Remainder, |
690 |
|
[#{format => binary, value => Value} | A]); |
691 |
|
|
692 |
|
demarshal(tuple_data, |
693 |
|
Columns, |
694 |
|
<<"t", Length:32, Value:Length/bytes, Remainder/bytes>>, |
695 |
|
A) -> |
696 |
6 |
?FUNCTION_NAME(tuple_data, |
697 |
|
Columns - 1, |
698 |
|
Remainder, |
699 |
|
[#{format => text, value => Value} | A]). |
700 |
|
|
701 |
|
|
702 |
|
-spec size_inclusive(iodata()) -> iolist(). |
703 |
|
|
704 |
|
size_inclusive(Data) -> |
705 |
39502 |
[marshal(int32, iolist_size(Data) + 4), Data]. |
706 |
|
|
707 |
|
|
708 |
|
-spec size_exclusive(iodata()) -> iolist(). |
709 |
|
|
710 |
|
size_exclusive(Data) -> |
711 |
962 |
[marshal(int32, iolist_size(Data)), Data]. |
712 |
|
|
713 |
|
|
714 |
|
marshal(Names, Types, Bindings) -> |
715 |
12 |
?LOG_DEBUG(#{names => Names, types => Types, bindings => Bindings}), |
716 |
12 |
lists:map( |
717 |
|
fun |
718 |
|
({Name, Type}) -> |
719 |
43 |
?FUNCTION_NAME(Type, maps:get(Name, Bindings)) |
720 |
|
end, |
721 |
|
lists:zip(Names, Types)). |
722 |
|
|
723 |
|
|
724 |
|
-spec marshal(string, iodata()) -> iolist(); |
725 |
|
(byte, <<_:8>>) -> iodata(); |
726 |
|
(int8, pgmp:int8()) -> nonempty_binary(); |
727 |
|
(int16, pgmp:int16()) -> nonempty_binary(); |
728 |
|
(int32, pgmp:int32()) -> nonempty_binary(); |
729 |
|
(int64, pgmp:int32()) -> nonempty_binary(); |
730 |
|
({int, 8}, pgmp:int8()) -> nonempty_binary(); |
731 |
|
({int, 16}, pgmp:int16()) -> nonempty_binary(); |
732 |
|
({int, 32}, pgmp:int32()) -> nonempty_binary(); |
733 |
|
({int, 64}, pgmp:int64()) -> nonempty_binary(); |
734 |
|
(x_log_data, x_log_data()) -> iodata(); |
735 |
|
(authentication, ok) -> iodata(); |
736 |
|
(ready_for_query, tx_status()) -> iodata(); |
737 |
|
(row_description, [row_description()]) -> iodata(); |
738 |
|
(data_row, iodata()) -> iodata(); |
739 |
|
(command_complete, {iodata() | atom(), integer()} | iodata() | atom()) -> iodata(); |
740 |
|
(copy_data, iodata()) -> iodata(); |
741 |
|
(copy_both_response, copy_both_response()) -> iodata(); |
742 |
|
(copy_out_response, copy_out_response()) -> iodata(); |
743 |
|
(tuple_data, tuple_data()) -> iodata(). |
744 |
|
|
745 |
|
marshal(string, Value) -> |
746 |
27912 |
[Value, <<0:8>>]; |
747 |
|
|
748 |
|
marshal(byte, Value) -> |
749 |
198 |
Value; |
750 |
|
|
751 |
|
marshal({byte, N}, Value) when byte_size(Value) == N -> |
752 |
2 |
Value; |
753 |
|
|
754 |
|
marshal(int8, Value) -> |
755 |
6 |
?FUNCTION_NAME({int, 8}, Value); |
756 |
|
|
757 |
|
marshal(int16, Value) -> |
758 |
23579 |
?FUNCTION_NAME({int, 16}, Value); |
759 |
|
|
760 |
|
marshal(int32, Value) -> |
761 |
48703 |
?FUNCTION_NAME({int, 32}, Value); |
762 |
|
|
763 |
|
marshal(int64, Value) -> |
764 |
326 |
?FUNCTION_NAME({int, 64}, Value); |
765 |
|
|
766 |
|
marshal({int, Size}, Value) when is_integer(Value)-> |
767 |
73987 |
<<Value:Size>>; |
768 |
|
|
769 |
|
marshal(authentication, ok) -> |
770 |
:-( |
["R", size_inclusive([<<0:32>>])]; |
771 |
|
|
772 |
|
marshal(ready_for_query, TxStatus) -> |
773 |
:-( |
["Z", size_inclusive(tx_status(TxStatus))]; |
774 |
|
|
775 |
|
marshal(command_complete = Tag, {Command, Rows}) when is_atom(Command), |
776 |
|
is_integer(Rows) -> |
777 |
1 |
?FUNCTION_NAME( |
778 |
|
Tag, |
779 |
|
[string:uppercase(atom_to_list(Command)), |
780 |
|
" ", |
781 |
|
integer_to_list(Rows)]); |
782 |
|
|
783 |
|
marshal(command_complete = Tag, Command) when is_atom(Command) -> |
784 |
:-( |
?FUNCTION_NAME(Tag, string:uppercase(atom_to_list(Command))); |
785 |
|
|
786 |
|
marshal(command_complete, Description) -> |
787 |
1 |
["C", size_inclusive(marshal(string, Description))]; |
788 |
|
|
789 |
|
marshal(row_description, FieldDescriptions) -> |
790 |
2 |
["T", |
791 |
|
size_inclusive( |
792 |
|
[marshal(int16, length(FieldDescriptions)), |
793 |
|
lists:map( |
794 |
|
fun |
795 |
|
(#{field_name := FieldName, |
796 |
|
table_oid := TableOID, |
797 |
|
column_number := ColumnNumber, |
798 |
|
type_oid := TypeOID, |
799 |
|
type_size := TypeSize, |
800 |
|
type_modifier := TypeModifier, |
801 |
|
format := Format}) -> |
802 |
5 |
[marshal(string, FieldName), |
803 |
|
marshal(int32, TableOID), |
804 |
|
marshal(int16, ColumnNumber), |
805 |
|
marshal(int32, TypeOID), |
806 |
|
marshal(int16, TypeSize), |
807 |
|
marshal(int32, TypeModifier), |
808 |
|
marshal(int16, text_binary(Format))] |
809 |
|
end, |
810 |
|
FieldDescriptions)])]; |
811 |
|
|
812 |
|
marshal(data_row, Columns) -> |
813 |
:-( |
["D", |
814 |
|
size_inclusive( |
815 |
|
[marshal(int16, length(Columns)), |
816 |
|
lists:map( |
817 |
|
fun |
818 |
|
(<<-1:32/signed>> = Column) -> |
819 |
:-( |
Column; |
820 |
|
|
821 |
|
(Column) -> |
822 |
:-( |
size_exclusive(Column) |
823 |
|
end, |
824 |
|
Columns)])]; |
825 |
|
|
826 |
|
marshal(copy_data, {standby_status_update, StandbyStatusUpdate}) -> |
827 |
1 |
["d", |
828 |
|
size_inclusive( |
829 |
|
["r", |
830 |
|
?FUNCTION_NAME( |
831 |
|
[written, |
832 |
|
flushed, |
833 |
|
applied, |
834 |
|
clock, |
835 |
|
reply], |
836 |
|
[int64, |
837 |
|
int64, |
838 |
|
int64, |
839 |
|
int64, |
840 |
|
{byte, 1}], |
841 |
|
maps:map( |
842 |
|
fun |
843 |
|
(reply, false) -> |
844 |
1 |
<<0:8>>; |
845 |
|
|
846 |
|
(reply, true) -> |
847 |
|
|
848 |
:-( |
<<1:8>>; |
849 |
|
|
850 |
|
(_, V) -> |
851 |
4 |
V |
852 |
|
end, |
853 |
|
StandbyStatusUpdate))])]; |
854 |
|
|
855 |
|
marshal(copy_data, {keepalive, PrimaryKeepalive}) -> |
856 |
1 |
["d", |
857 |
|
size_inclusive( |
858 |
|
["k", |
859 |
|
?FUNCTION_NAME( |
860 |
|
[end_wal, clock, reply], |
861 |
|
[int64, int64, {byte, 1}], |
862 |
|
maps:map( |
863 |
|
fun |
864 |
|
(reply, false) -> |
865 |
1 |
<<0:8>>; |
866 |
|
|
867 |
|
(reply, true) -> |
868 |
|
|
869 |
:-( |
<<1:8>>; |
870 |
|
|
871 |
|
(_, V) -> |
872 |
2 |
V |
873 |
|
end, |
874 |
|
PrimaryKeepalive))])]; |
875 |
|
|
876 |
|
marshal(copy_data, {x_log_data, #{stream := Stream} = XLogData}) -> |
877 |
5 |
["d", |
878 |
|
size_inclusive( |
879 |
|
["w", |
880 |
|
?FUNCTION_NAME( |
881 |
|
[start_wal, end_wal, clock], |
882 |
|
[int64, int64, int64], |
883 |
|
XLogData), |
884 |
|
?FUNCTION_NAME(x_log_data, Stream)])]; |
885 |
|
|
886 |
|
marshal(x_log_data, {begin_transaction, BeginTransaction}) -> |
887 |
1 |
["B", |
888 |
|
?FUNCTION_NAME( |
889 |
|
[final_lsn, commit_timestamp, xid], |
890 |
|
[int64, int64, int32], |
891 |
|
BeginTransaction)]; |
892 |
|
|
893 |
|
marshal(x_log_data, {relation, #{columns := Columns} = Relation}) -> |
894 |
1 |
["R", |
895 |
|
?FUNCTION_NAME( |
896 |
|
[id, namespace, name, replica_identity, ncols], |
897 |
|
[int32, string, string, int8, int16], |
898 |
|
maps:merge( |
899 |
|
#{ncols => length(Columns)}, |
900 |
|
Relation)), |
901 |
|
lists:map( |
902 |
|
fun |
903 |
|
(Column) -> |
904 |
2 |
?FUNCTION_NAME( |
905 |
|
[flags, name, type, modifier], |
906 |
|
[int8, string, int32, int32], |
907 |
|
Column) |
908 |
|
end, |
909 |
|
Columns)]; |
910 |
|
|
911 |
|
marshal(x_log_data, |
912 |
|
{insert, |
913 |
|
#{relation := Relation, tuple := Tuple}}) -> |
914 |
2 |
["I", |
915 |
|
?FUNCTION_NAME(int32, Relation), |
916 |
|
"N", |
917 |
|
?FUNCTION_NAME(tuple_data, Tuple)]; |
918 |
|
|
919 |
|
marshal(x_log_data, |
920 |
|
{update, #{key := Key, relation := Relation, new := New}}) -> |
921 |
:-( |
["U", |
922 |
|
?FUNCTION_NAME(int32, Relation), |
923 |
|
"K", |
924 |
|
?FUNCTION_NAME(tuple_data, Key), |
925 |
|
"N", |
926 |
|
?FUNCTION_NAME(tuple_data, New)]; |
927 |
|
|
928 |
|
marshal(x_log_data, |
929 |
|
{update, #{relation := Relation, new := New}}) -> |
930 |
:-( |
["U", |
931 |
|
?FUNCTION_NAME(int32, Relation), |
932 |
|
"N", |
933 |
|
?FUNCTION_NAME(tuple_data, New)]; |
934 |
|
|
935 |
|
marshal(x_log_data, |
936 |
|
{delete, #{key := Key, relation := Relation}}) -> |
937 |
:-( |
["D", |
938 |
|
?FUNCTION_NAME(int32, Relation), |
939 |
|
"K", |
940 |
|
?FUNCTION_NAME(tuple_data, Key)]; |
941 |
|
|
942 |
|
marshal(x_log_data, |
943 |
|
{truncate, #{options := Options, relations := Relations}}) -> |
944 |
:-( |
["T", |
945 |
|
?FUNCTION_NAME(int32, length(Relations)), |
946 |
|
?FUNCTION_NAME(int8, Options), |
947 |
:-( |
[?FUNCTION_NAME(int32, Relation) || Relation <- Relations]]; |
948 |
|
|
949 |
|
marshal(tuple_data, Columns) -> |
950 |
2 |
[?FUNCTION_NAME(int16, length(Columns)), |
951 |
|
lists:map( |
952 |
|
fun |
953 |
|
(null) -> |
954 |
:-( |
"n"; |
955 |
|
|
956 |
|
(#{format := Format, value := Value}) -> |
957 |
3 |
[case Format of |
958 |
|
binary -> |
959 |
2 |
"b"; |
960 |
|
|
961 |
|
text -> |
962 |
1 |
"t" |
963 |
|
end, |
964 |
|
?FUNCTION_NAME(int32, byte_size(Value)), Value] |
965 |
|
end, |
966 |
|
Columns)]; |
967 |
|
|
968 |
|
marshal(x_log_data, {commit, Commit}) -> |
969 |
1 |
["C", |
970 |
|
?FUNCTION_NAME( |
971 |
|
[flags, commit_lsn, end_lsn, commit_timestamp], |
972 |
|
[int8, int64, int64, int64], |
973 |
|
Commit)]; |
974 |
|
|
975 |
|
marshal(copy_both_response, #{format := Format, n_of_cols := N}) -> |
976 |
1 |
["W", |
977 |
|
size_inclusive( |
978 |
|
[?FUNCTION_NAME(int8, text_binary(Format)), |
979 |
|
?FUNCTION_NAME(int16, N)])]; |
980 |
|
|
981 |
|
marshal(copy_out_response, #{format := Format, col_formats := ColFormats}) -> |
982 |
1 |
["H", |
983 |
|
size_inclusive( |
984 |
|
[?FUNCTION_NAME(int8, text_binary(Format)), |
985 |
|
?FUNCTION_NAME(int16, length(ColFormats)), |
986 |
|
lists:map( |
987 |
|
fun |
988 |
|
(ColFormat) -> |
989 |
2 |
?FUNCTION_NAME(int16, text_binary(ColFormat)) |
990 |
|
end, |
991 |
|
ColFormats)])]; |
992 |
|
|
993 |
|
marshal(copy_data, Encoded) -> |
994 |
1 |
["d", size_inclusive(Encoded)]; |
995 |
|
|
996 |
|
marshal(copy_done, <<>> = Encoded) -> |
997 |
1 |
["c", size_inclusive(Encoded)]; |
998 |
|
|
999 |
|
marshal(parameter_status, {Key, Value}) -> |
1000 |
1 |
["S", |
1001 |
|
size_inclusive( |
1002 |
|
[?FUNCTION_NAME(string, Key), |
1003 |
|
?FUNCTION_NAME(string, Value)])]; |
1004 |
|
|
1005 |
|
marshal(Type, Value) -> |
1006 |
1 |
error(badarg, [Type, Value]). |
1007 |
|
|
1008 |
|
|
1009 |
|
-type tx_status() :: idle |
1010 |
|
| in_tx_block |
1011 |
|
| in_failed_tx_block. |
1012 |
|
|
1013 |
|
tx_status(idle) -> |
1014 |
:-( |
<<"I">>; |
1015 |
|
tx_status(in_tx_block) -> |
1016 |
:-( |
<<"T">>; |
1017 |
|
tx_status(in_failed_tx_block) -> |
1018 |
:-( |
<<"E">>; |
1019 |
|
tx_status(<<"I">>) -> |
1020 |
192 |
idle; |
1021 |
|
tx_status(<<"T">>) -> |
1022 |
89 |
in_tx_block; |
1023 |
|
tx_status(<<"E">>) -> |
1024 |
5 |
in_failed_tx_block. |
1025 |
|
|
1026 |
|
|
1027 |
9 |
text_binary(text) -> 0; |
1028 |
:-( |
text_binary(binary) -> 1; |
1029 |
4293 |
text_binary(0) -> text; |
1030 |
4254 |
text_binary(1) -> binary. |