| 1 |
|
%% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com> |
| 2 |
|
%% |
| 3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 |
|
%% you may not use this file except in compliance with the License. |
| 5 |
|
%% You may obtain a copy of the License at |
| 6 |
|
%% |
| 7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
| 8 |
|
%% |
| 9 |
|
%% Unless required by applicable law or agreed to in writing, software |
| 10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
| 11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 |
|
%% See the License for the specific language governing permissions and |
| 13 |
|
%% limitations under the License. |
| 14 |
|
|
| 15 |
|
|
| 16 |
|
-module(pgmp_codec). |
| 17 |
|
|
| 18 |
|
|
| 19 |
|
-export([demarshal/1]). |
| 20 |
|
-export([demarshal/2]). |
| 21 |
|
-export([marshal/2]). |
| 22 |
|
-export([size_exclusive/1]). |
| 23 |
|
-export([size_inclusive/1]). |
| 24 |
|
-export_type([command_complete_response/0]). |
| 25 |
|
-export_type([row_description/0]). |
| 26 |
|
-include_lib("kernel/include/logger.hrl"). |
| 27 |
|
|
| 28 |
|
|
| 29 |
|
-type pg_integer() :: int8 |
| 30 |
|
| int16 |
| 31 |
|
| int32 |
| 32 |
|
| int64. |
| 33 |
|
|
| 34 |
|
-type type() :: {byte, pos_integer()} |
| 35 |
|
| pg_integer() |
| 36 |
|
| {int, pgmp:int_bit_sizes()} |
| 37 |
|
| clock |
| 38 |
|
| string |
| 39 |
|
| empty_query_response |
| 40 |
|
| bind_complete |
| 41 |
|
| portal_suspended |
| 42 |
|
| no_data |
| 43 |
|
| parse_complete |
| 44 |
|
| notice_response |
| 45 |
|
| error_response |
| 46 |
|
| authentication |
| 47 |
|
| parameter_description |
| 48 |
|
| x_log_data |
| 49 |
|
| tuple_data |
| 50 |
|
| copy_data |
| 51 |
|
| copy_both_response |
| 52 |
|
| parameter_status |
| 53 |
|
| backend_key_data |
| 54 |
|
| command_complete |
| 55 |
|
| ready_for_query |
| 56 |
|
| row_description |
| 57 |
|
| data_row. |
| 58 |
|
|
| 59 |
|
demarshal({Type, Encoded}) -> |
| 60 |
35839 |
?LOG_DEBUG(#{type => Type, encoded => Encoded}), |
| 61 |
35839 |
?FUNCTION_NAME(Type, Encoded). |
| 62 |
|
|
| 63 |
|
-type x_log_begin_transaction() :: #{final_lsn := pgmp:int64(), |
| 64 |
|
commit_timestamp := pgmp:int64(), |
| 65 |
|
xid := pgmp:int32()}. |
| 66 |
|
|
| 67 |
|
-type x_log_logical_decoding() :: #{xid := pgmp:int32(), |
| 68 |
|
flags := pgmp:int8(), |
| 69 |
|
lsn := pgmp:int64(), |
| 70 |
|
prefix := binary(), |
| 71 |
|
content := pgmp:int32()}. |
| 72 |
|
|
| 73 |
|
-type x_log_commit() :: #{flags := pgmp:int8(), |
| 74 |
|
commit_lsn := pgmp:int64(), |
| 75 |
|
end_lsn := pgmp:int64(), |
| 76 |
|
commit_timestamp := pgmp:int64()}. |
| 77 |
|
|
| 78 |
|
-type x_log_origin() :: #{commit_lsn := pgmp:int64(), |
| 79 |
|
name := binary()}. |
| 80 |
|
|
| 81 |
|
-type x_log_relation() :: #{id := pgmp:int32(), |
| 82 |
|
namespace := binary(), |
| 83 |
|
name := binary(), |
| 84 |
|
replica_identity := pgmp:int8(), |
| 85 |
|
ncols := pgmp:int16()}. |
| 86 |
|
|
| 87 |
|
-type x_log_type() :: #{xid := pgmp:int32(), |
| 88 |
|
type := pgmp:int32(), |
| 89 |
|
namespace := binary(), |
| 90 |
|
name := binary()}. |
| 91 |
|
|
| 92 |
|
-type x_log_insert() :: #{relation := pgmp:int32(), |
| 93 |
|
tuple := [tuple_data()]}. |
| 94 |
|
|
| 95 |
|
-type x_log_update() :: #{relation := pgmp:int32(), |
| 96 |
|
key := [tuple_data()], |
| 97 |
|
new := [tuple_data()]}. |
| 98 |
|
|
| 99 |
|
-type x_log_delete() :: #{relation := pgmp:int32(), |
| 100 |
|
key := [tuple_data()]}. |
| 101 |
|
|
| 102 |
|
-type x_log_truncate() :: #{relations := [pgmp:int32()], |
| 103 |
|
options := pgmp:int8()}. |
| 104 |
|
|
| 105 |
|
-type x_log_data() :: {begin_transaction, x_log_begin_transaction()} |
| 106 |
|
| {logical_decoding, x_log_logical_decoding()} |
| 107 |
|
| {commit, x_log_commit()} |
| 108 |
|
| {origin, x_log_origin()} |
| 109 |
|
| {relation, x_log_relation()} |
| 110 |
|
| {type, x_log_type()} |
| 111 |
|
| {insert, x_log_insert()} |
| 112 |
|
| {update, x_log_update()} |
| 113 |
|
| {delete, x_log_delete()} |
| 114 |
|
| {truncate, x_log_truncate()}. |
| 115 |
|
|
| 116 |
|
-type startup() :: #{major := integer(), |
| 117 |
|
minor := integer(), |
| 118 |
|
parameters := #{binary() => binary()}}. |
| 119 |
|
|
| 120 |
|
-type copy_both_response() :: #{format := text | binary, |
| 121 |
|
n_of_cols := non_neg_integer()}. |
| 122 |
|
|
| 123 |
|
-type copy_out_response() :: #{format := text | binary, |
| 124 |
|
col_formats := [text | binary]}. |
| 125 |
|
|
| 126 |
|
-spec demarshal(nonempty_list(type()), binary()) -> {[any()], binary()}; |
| 127 |
|
({byte, pos_integer()}, binary()) -> {binary(), binary()}; |
| 128 |
|
({int, 8}, binary()) -> {pgmp:int8(), binary()}; |
| 129 |
|
(int8, binary()) -> {pgmp:int8(), binary()}; |
| 130 |
|
({int, 16}, binary()) -> {pgmp:int16(), binary()}; |
| 131 |
|
(int16, binary()) -> {pgmp:int16(), binary()}; |
| 132 |
|
({int, 32}, binary()) -> {pgmp:int32(), binary()}; |
| 133 |
|
(int32, binary()) -> {pgmp:int32(), binary()}; |
| 134 |
|
({int, 64}, binary()) -> {pgmp:int64(), binary()}; |
| 135 |
|
(int64, binary()) -> {pgmp:int64(), binary()}; |
| 136 |
|
(clock, binary()) -> {pgmp:int64(), binary()}; |
| 137 |
|
(string, binary()) -> {binary(), binary()}; |
| 138 |
|
(empty_query_response, binary()) -> {[], <<>>}; |
| 139 |
|
(bind_complete, binary()) -> {[], <<>>}; |
| 140 |
|
(portal_suspended, binary()) -> {[], <<>>}; |
| 141 |
|
(no_data, binary()) -> {[], <<>>}; |
| 142 |
|
(parse_complete, binary()) -> {[], <<>>}; |
| 143 |
|
(notice_response, binary()) -> {any(), binary()}; |
| 144 |
|
(error_response, binary()) -> {any(), binary()}; |
| 145 |
|
(authentication, binary()) -> {any(), binary()}; |
| 146 |
|
(parameter_description, binary()) -> {[pgmp:int32()], binary()}; |
| 147 |
|
(password, binary()) -> {binary(), binary()}; |
| 148 |
|
(startup, binary()) -> {startup(), binary()}; |
| 149 |
|
(x_log_data, binary()) -> {x_log_data(), binary()}; |
| 150 |
|
(tuple_data, binary()) -> {[tuple_data()], binary()}; |
| 151 |
|
(pairs, binary()) -> {#{}, binary()}; |
| 152 |
|
(copy_out_response, binary()) -> {copy_out_response(), binary()}; |
| 153 |
|
(copy_both_response, binary()) -> {copy_both_response(), binary()}; |
| 154 |
|
(copy_data, binary()) -> {{x_log_data, x_log_data()}, binary()}; |
| 155 |
|
(ssl, binary()) -> {ssl, boolean()}. |
| 156 |
|
|
| 157 |
|
demarshal(Types, Encoded) when is_list(Types) -> |
| 158 |
13910 |
?LOG_DEBUG(#{types => Types, encoded => Encoded}), |
| 159 |
13910 |
lists:mapfoldl(fun demarshal/2, Encoded, Types); |
| 160 |
|
|
| 161 |
|
demarshal({byte, N}, Encoded) -> |
| 162 |
166 |
<<Decoded:N/bytes, Remainder/bytes>> = Encoded, |
| 163 |
166 |
{Decoded, Remainder}; |
| 164 |
|
|
| 165 |
|
demarshal(int8, Encoded) -> |
| 166 |
106 |
?FUNCTION_NAME({int, 8}, Encoded); |
| 167 |
|
|
| 168 |
|
demarshal(int16, Encoded) -> |
| 169 |
47472 |
?FUNCTION_NAME({int, 16}, Encoded); |
| 170 |
|
|
| 171 |
|
demarshal(int32, Encoded) -> |
| 172 |
230824 |
?FUNCTION_NAME({int, 32}, Encoded); |
| 173 |
|
|
| 174 |
|
demarshal(int64, Encoded) -> |
| 175 |
1336 |
?FUNCTION_NAME({int, 64}, Encoded); |
| 176 |
|
|
| 177 |
|
demarshal(clock, Encoded) -> |
| 178 |
:-( |
{Clock, Remainder} = ?FUNCTION_NAME({int, 64}, Encoded), |
| 179 |
:-( |
{pgmp_calendar:decode(Clock), Remainder}; |
| 180 |
|
|
| 181 |
|
demarshal({int, Bits}, Encoded) |
| 182 |
|
when Bits == 8; Bits == 16; Bits == 32; Bits == 64 -> |
| 183 |
279738 |
<<Decoded:Bits/signed, Remainder/bytes>> = Encoded, |
| 184 |
279738 |
{Decoded, Remainder}; |
| 185 |
|
|
| 186 |
|
demarshal(string, Encoded) -> |
| 187 |
14869 |
list_to_tuple(binary:split(Encoded, <<0>>)); |
| 188 |
|
|
| 189 |
|
demarshal(Type, <<>>) |
| 190 |
|
when Type == empty_query_response; |
| 191 |
|
Type == bind_complete; |
| 192 |
|
Type == portal_suspended; |
| 193 |
|
Type == no_data; |
| 194 |
|
Type == parse_complete -> |
| 195 |
7759 |
{[], <<>>}; |
| 196 |
|
|
| 197 |
|
demarshal(Type, Encoded) when Type == notice_response; Type == error_response -> |
| 198 |
14 |
?FUNCTION_NAME(Type, Encoded, []); |
| 199 |
|
|
| 200 |
|
demarshal(authentication, Encoded) -> |
| 201 |
330 |
{Code, R} = ?FUNCTION_NAME(int32, Encoded), |
| 202 |
330 |
?FUNCTION_NAME(authentication, Code, R); |
| 203 |
|
|
| 204 |
|
demarshal(parameter_description, Encoded) -> |
| 205 |
3807 |
{N, R0} = ?FUNCTION_NAME(int16, Encoded), |
| 206 |
3807 |
?FUNCTION_NAME(lists:duplicate(N, int32), R0); |
| 207 |
|
|
| 208 |
|
demarshal(x_log_data, <<"B", BeginMessage/bytes>>) -> |
| 209 |
67 |
?LOG_DEBUG(#{start_transaction => BeginMessage}), |
| 210 |
67 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 211 |
|
[final_lsn, commit_timestamp, xid], |
| 212 |
|
[int64, int64, int32], |
| 213 |
|
BeginMessage), |
| 214 |
67 |
{{begin_transaction, Decoded}, Remainder}; |
| 215 |
|
|
| 216 |
|
demarshal(x_log_data, <<"M", LogicalDecoding/bytes>>) -> |
| 217 |
:-( |
?LOG_DEBUG(#{logical => LogicalDecoding}), |
| 218 |
:-( |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 219 |
|
[xid, flags, lsn, prefix, content], |
| 220 |
|
[int32, int8, int64, string, int32], |
| 221 |
|
LogicalDecoding), |
| 222 |
:-( |
{{logical_decoding, Decoded}, Remainder}; |
| 223 |
|
|
| 224 |
|
demarshal(x_log_data, <<"C", CommitMessage/bytes>>) -> |
| 225 |
68 |
?LOG_DEBUG(#{commit => CommitMessage}), |
| 226 |
68 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 227 |
|
[flags, commit_lsn, end_lsn, commit_timestamp], |
| 228 |
|
[int8, int64, int64, int64], |
| 229 |
|
CommitMessage), |
| 230 |
68 |
{{commit, Decoded}, Remainder}; |
| 231 |
|
|
| 232 |
|
demarshal(x_log_data, <<"O", OriginMessage/bytes>>) -> |
| 233 |
:-( |
?LOG_DEBUG(#{origin => OriginMessage}), |
| 234 |
:-( |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 235 |
|
[commit_lsn, name], |
| 236 |
|
[int64, string], |
| 237 |
|
OriginMessage), |
| 238 |
:-( |
{{origin, Decoded}, Remainder}; |
| 239 |
|
|
| 240 |
|
demarshal(x_log_data, <<"R", RelationMessage/bytes>>) -> |
| 241 |
9 |
?LOG_DEBUG(#{relation => RelationMessage}), |
| 242 |
9 |
{Relation, R0} = ?FUNCTION_NAME( |
| 243 |
|
[id, namespace, name, replica_identity, ncols], |
| 244 |
|
[int32, string, string, int8, int16], |
| 245 |
|
RelationMessage), |
| 246 |
9 |
Columns = lists:reverse( |
| 247 |
|
pgmp_binary:foldl( |
| 248 |
|
fun |
| 249 |
|
(EncodedColumn, A) -> |
| 250 |
19 |
{Column, Remainder} = ?FUNCTION_NAME( |
| 251 |
|
[flags, name, type, modifier], |
| 252 |
|
[int8, string, int32, int32], |
| 253 |
|
EncodedColumn), |
| 254 |
19 |
{Remainder, [Column | A]} |
| 255 |
|
end, |
| 256 |
|
[], |
| 257 |
|
R0)), |
| 258 |
9 |
{{relation, Relation#{columns => Columns}}, <<>>}; |
| 259 |
|
|
| 260 |
|
demarshal(x_log_data, <<"Y", TypeMessage/bytes>>) -> |
| 261 |
:-( |
?LOG_DEBUG(#{type => TypeMessage}), |
| 262 |
:-( |
{Decoded, Remainder} = ?FUNCTION_NAME([xid, type, namespace, name], |
| 263 |
|
[in32, int32, string, string], |
| 264 |
|
TypeMessage), |
| 265 |
:-( |
{{type, Decoded}, Remainder}; |
| 266 |
|
|
| 267 |
|
demarshal(x_log_data, <<"I", Id:32, "N", TupleData/bytes>>) -> |
| 268 |
58 |
?LOG_DEBUG(#{id => Id, tuple => TupleData}), |
| 269 |
58 |
{Tuple, Remainder} = ?FUNCTION_NAME(tuple_data, TupleData), |
| 270 |
58 |
{{insert, #{relation => Id, tuple => Tuple}}, Remainder}; |
| 271 |
|
|
| 272 |
|
demarshal(x_log_data, <<"U", Relation:32, "K", KeyData/bytes>>) -> |
| 273 |
1 |
?LOG_DEBUG(#{update => Relation, key => KeyData}), |
| 274 |
1 |
{Key, <<"N", NewData/bytes>>} = ?FUNCTION_NAME(tuple_data, KeyData), |
| 275 |
1 |
{New, Remainder} = ?FUNCTION_NAME(tuple_data, NewData), |
| 276 |
1 |
{{update, #{key => Key, relation => Relation, new => New}}, Remainder}; |
| 277 |
|
|
| 278 |
|
demarshal(x_log_data, <<"U", Relation:32, "N", NewData/bytes>>) -> |
| 279 |
5 |
?LOG_DEBUG(#{update => Relation, new => NewData}), |
| 280 |
5 |
{New, Remainder} = ?FUNCTION_NAME(tuple_data, NewData), |
| 281 |
5 |
{{update, #{relation => Relation, new => New}}, Remainder}; |
| 282 |
|
|
| 283 |
|
demarshal(x_log_data, <<"D", Relation:32, "K", KeyData/bytes>>) -> |
| 284 |
5 |
?LOG_DEBUG(#{update => Relation, delete => KeyData}), |
| 285 |
5 |
{Key, Remainder} = ?FUNCTION_NAME(tuple_data, KeyData), |
| 286 |
5 |
{{delete, #{key => Key, relation => Relation}}, Remainder}; |
| 287 |
|
|
| 288 |
|
demarshal(x_log_data, <<"T", N:32, Options:8, Data/bytes>>) -> |
| 289 |
3 |
?LOG_DEBUG(#{truncate => Data}), |
| 290 |
3 |
{Relations, Remainder} = ?FUNCTION_NAME( |
| 291 |
|
lists:duplicate(N, int32), |
| 292 |
|
Data), |
| 293 |
3 |
{{truncate, #{options => Options, relations => Relations}}, Remainder}; |
| 294 |
|
|
| 295 |
|
demarshal(x_log_data, <<"b", BeginPrepare/bytes>>) -> |
| 296 |
1 |
?LOG_DEBUG(#{begin_prepare => BeginPrepare}), |
| 297 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 298 |
|
[lsn, end_lsn, timestamp, xid, gid], |
| 299 |
|
[int64, int64, int64, int32, string], |
| 300 |
|
BeginPrepare), |
| 301 |
1 |
{{begin_prepare, Decoded}, Remainder}; |
| 302 |
|
|
| 303 |
|
demarshal(x_log_data, <<"P", Prepare/bytes>>) -> |
| 304 |
1 |
?LOG_DEBUG(#{prepare => Prepare}), |
| 305 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 306 |
|
[flags, lsn, end_lsn, timestamp, xid, gid], |
| 307 |
|
[int8, int64, int64, int64, int32, string], |
| 308 |
|
Prepare), |
| 309 |
1 |
{{prepare, Decoded}, Remainder}; |
| 310 |
|
|
| 311 |
|
demarshal(x_log_data, <<"K", CommitPrepared/bytes>>) -> |
| 312 |
1 |
?LOG_DEBUG(#{commit_prepared => CommitPrepared}), |
| 313 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 314 |
|
[flags, lsn, end_lsn, timestamp, xid, gid], |
| 315 |
|
[int8, int64, int64, int64, int32, string], |
| 316 |
|
CommitPrepared), |
| 317 |
1 |
{{commit_prepared, Decoded}, Remainder}; |
| 318 |
|
|
| 319 |
|
demarshal(x_log_data, <<"r", RollbackPrepared/bytes>>) -> |
| 320 |
1 |
?LOG_DEBUG(#{rollback_prepared => RollbackPrepared}), |
| 321 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 322 |
|
[flags, |
| 323 |
|
prepared_end_lsn, |
| 324 |
|
rollback_end_lsn, |
| 325 |
|
prepare_timestamp, |
| 326 |
|
rollback_timestamp, |
| 327 |
|
xid, |
| 328 |
|
gid], |
| 329 |
|
[int8, int64, int64, int64, int64, int32, string], |
| 330 |
|
RollbackPrepared), |
| 331 |
1 |
{{rollback_prepared, Decoded}, Remainder}; |
| 332 |
|
|
| 333 |
|
demarshal(tuple_data, <<Columns:16, TupleData/bytes>>) -> |
| 334 |
70 |
?LOG_DEBUG(#{columns => Columns, tuple => TupleData}), |
| 335 |
70 |
?FUNCTION_NAME(tuple_data, Columns, TupleData, []); |
| 336 |
|
|
| 337 |
|
demarshal(copy_data, <<"w", XLogData/bytes>>) -> |
| 338 |
217 |
?LOG_DEBUG(#{x_log => XLogData}), |
| 339 |
217 |
{Decoded, Stream} = ?FUNCTION_NAME( |
| 340 |
|
[start_wal, end_wal, clock], |
| 341 |
|
[int64, int64, int64], |
| 342 |
|
XLogData), |
| 343 |
217 |
{WAL, Remainder} = ?FUNCTION_NAME(x_log_data, Stream), |
| 344 |
217 |
{{x_log_data, Decoded#{stream => WAL}}, Remainder}; |
| 345 |
|
|
| 346 |
|
demarshal(copy_data, <<"k", PrimaryKeepalive/bytes>>) -> |
| 347 |
165 |
?LOG_DEBUG(#{primary_keepalive => PrimaryKeepalive}), |
| 348 |
165 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 349 |
|
[end_wal, clock, reply], |
| 350 |
|
[int64, int64, {byte, 1}], |
| 351 |
|
PrimaryKeepalive), |
| 352 |
165 |
{{keepalive, |
| 353 |
|
maps:map( |
| 354 |
|
fun |
| 355 |
|
(reply, <<0:8>>) -> |
| 356 |
163 |
false; |
| 357 |
|
|
| 358 |
|
(reply, <<1:8>>) -> |
| 359 |
2 |
true; |
| 360 |
|
|
| 361 |
|
(_, V) -> |
| 362 |
330 |
V |
| 363 |
|
end, |
| 364 |
|
Decoded)}, |
| 365 |
|
Remainder}; |
| 366 |
|
|
| 367 |
|
demarshal(copy_data, <<"r", StandbyStatusUpdate/bytes>>) -> |
| 368 |
1 |
?LOG_DEBUG(#{standby_status_update => StandbyStatusUpdate}), |
| 369 |
1 |
{Decoded, Remainder} = ?FUNCTION_NAME( |
| 370 |
|
[written, |
| 371 |
|
flushed, |
| 372 |
|
applied, |
| 373 |
|
clock, |
| 374 |
|
reply], |
| 375 |
|
[int64, |
| 376 |
|
int64, |
| 377 |
|
int64, |
| 378 |
|
int64, |
| 379 |
|
{byte, 1}], |
| 380 |
|
StandbyStatusUpdate), |
| 381 |
1 |
{{standby_status_update, |
| 382 |
|
maps:map( |
| 383 |
|
fun |
| 384 |
|
(reply, <<0:8>>) -> |
| 385 |
1 |
false; |
| 386 |
|
|
| 387 |
|
(reply, <<1:8>>) -> |
| 388 |
:-( |
true; |
| 389 |
|
|
| 390 |
|
(_, V) -> |
| 391 |
4 |
V |
| 392 |
|
end, |
| 393 |
|
Decoded)}, |
| 394 |
|
Remainder}; |
| 395 |
|
|
| 396 |
|
demarshal(copy_data = Tag, Encoded) -> |
| 397 |
1 |
?LOG_DEBUG(#{Tag => Encoded}), |
| 398 |
1 |
{Encoded, <<>>}; |
| 399 |
|
|
| 400 |
|
demarshal(copy_out_response = Tag, Encoded) -> |
| 401 |
1 |
?LOG_DEBUG(#{Tag => Encoded}), |
| 402 |
1 |
{[Format, NumOfCols], R0} = ?FUNCTION_NAME([int8, int16], Encoded), |
| 403 |
1 |
{ColFormats, R1} = ?FUNCTION_NAME( |
| 404 |
|
lists:duplicate(NumOfCols, int16), |
| 405 |
|
R0), |
| 406 |
|
|
| 407 |
1 |
{#{format => text_binary(Format), |
| 408 |
|
col_formats => lists:map( |
| 409 |
|
fun text_binary/1, |
| 410 |
|
ColFormats)}, |
| 411 |
|
R1}; |
| 412 |
|
|
| 413 |
|
demarshal(copy_done = Tag, Encoded) -> |
| 414 |
1 |
?LOG_DEBUG(#{Tag => Encoded}), |
| 415 |
1 |
{Encoded, <<>>}; |
| 416 |
|
|
| 417 |
|
demarshal(copy_both_response = Tag, Encoded) -> |
| 418 |
6 |
?LOG_DEBUG(#{Tag => Encoded}), |
| 419 |
6 |
case ?FUNCTION_NAME([int8, int16], Encoded) of |
| 420 |
|
{[0, N], <<>>} -> |
| 421 |
6 |
{#{format => text, n_of_cols => N}, <<>>}; |
| 422 |
|
|
| 423 |
|
{[1, N], <<>>} -> |
| 424 |
:-( |
{#{format => binary, n_of_cols => N}, <<>>} |
| 425 |
|
end; |
| 426 |
|
|
| 427 |
|
demarshal(parameter_status = Tag, Encoded) -> |
| 428 |
925 |
?LOG_DEBUG(#{Tag => Encoded}), |
| 429 |
925 |
{[K, V], R0} = ?FUNCTION_NAME([string, string], Encoded), |
| 430 |
925 |
{{K, V}, R0}; |
| 431 |
|
|
| 432 |
|
demarshal(backend_key_data = Tag, Encoded) -> |
| 433 |
66 |
?LOG_DEBUG(#{Tag => Encoded}), |
| 434 |
66 |
?FUNCTION_NAME([int32, int32], Encoded); |
| 435 |
|
|
| 436 |
|
demarshal(command_complete = Tag, Encoded) -> |
| 437 |
4174 |
?LOG_DEBUG(#{Tag => Encoded}), |
| 438 |
4174 |
{Decoded, Remainder} = demarshal(string, Encoded), |
| 439 |
4174 |
?FUNCTION_NAME(Tag, Decoded, Remainder); |
| 440 |
|
|
| 441 |
|
|
| 442 |
|
demarshal(ready_for_query, TxStatus) -> |
| 443 |
286 |
{tx_status(TxStatus), <<>>}; |
| 444 |
|
|
| 445 |
|
demarshal(row_description = Type, Encoded) -> |
| 446 |
7777 |
?LOG_DEBUG(#{Type => Encoded}), |
| 447 |
7777 |
{Columns, Remainder} = demarshal(int16, Encoded), |
| 448 |
7777 |
?FUNCTION_NAME(Type, Columns, Remainder, []); |
| 449 |
|
|
| 450 |
|
demarshal(data_row = Type, Encoded) -> |
| 451 |
10236 |
?LOG_DEBUG(#{Type => Encoded}), |
| 452 |
10236 |
{Columns, Remainder} = demarshal(int16, Encoded), |
| 453 |
10236 |
?FUNCTION_NAME(Type, Columns, Remainder, []); |
| 454 |
|
|
| 455 |
|
demarshal(startup, Encoded) -> |
| 456 |
1 |
?LOG_DEBUG(#{startup => Encoded}), |
| 457 |
1 |
{[Major, Minor], R0} = ?FUNCTION_NAME([int16, int16], Encoded), |
| 458 |
1 |
{Parameters, R1} = ?FUNCTION_NAME(pairs, R0), |
| 459 |
1 |
{#{major => Major, minor => Minor, parameters => Parameters}, R1}; |
| 460 |
|
|
| 461 |
|
demarshal(pairs, Encoded) -> |
| 462 |
1 |
?LOG_DEBUG(#{pairs => Encoded}), |
| 463 |
1 |
{pgmp_binary:foldl( |
| 464 |
|
fun |
| 465 |
|
(<<0>>, A) -> |
| 466 |
1 |
{<<>>, A}; |
| 467 |
|
|
| 468 |
|
(EncodedPair, A) -> |
| 469 |
6 |
{[Key, Value], Remainder} = ?FUNCTION_NAME( |
| 470 |
|
[string, string], |
| 471 |
|
EncodedPair), |
| 472 |
6 |
{Remainder, A#{Key => Value}} |
| 473 |
|
end, |
| 474 |
|
#{}, |
| 475 |
|
Encoded), |
| 476 |
|
<<>>}; |
| 477 |
|
|
| 478 |
|
demarshal(password, Encoded) -> |
| 479 |
1 |
?FUNCTION_NAME(string, Encoded); |
| 480 |
|
|
| 481 |
|
demarshal(query, Encoded) -> |
| 482 |
2 |
?FUNCTION_NAME(string, Encoded); |
| 483 |
|
|
| 484 |
|
demarshal(ssl, <<"N">>) -> |
| 485 |
:-( |
{false, <<>>}; |
| 486 |
|
demarshal(ssl, <<"S">>) -> |
| 487 |
66 |
{true, <<>>}; |
| 488 |
|
|
| 489 |
|
demarshal(terminate, <<>>) -> |
| 490 |
:-( |
{#{}, <<>>}. |
| 491 |
|
|
| 492 |
|
|
| 493 |
|
-type command_complete_operation() :: insert |
| 494 |
|
| delete |
| 495 |
|
| select |
| 496 |
|
| update |
| 497 |
|
| move |
| 498 |
|
| fetch |
| 499 |
|
| copy. |
| 500 |
|
|
| 501 |
|
-type command_complete_response() :: {{command_complete_operation(), non_neg_integer()}, binary()} |
| 502 |
|
| {atom(), binary()}. |
| 503 |
|
|
| 504 |
|
-type authentication_response() :: {authenticated, binary()} |
| 505 |
|
| {kerberos, binary()} |
| 506 |
|
| {{md5_password, binary()}, binary()} |
| 507 |
|
| {scm_credentials, binary()} |
| 508 |
|
| {gss, binary()} |
| 509 |
|
| {sspi, binary()} |
| 510 |
|
| {{sasl, any()}, binary()} |
| 511 |
|
| {{sasl_continue, binary()}, <<>>} |
| 512 |
|
| {{sasl_final, binary()}, <<>>}. |
| 513 |
|
|
| 514 |
|
-spec demarshal(command_complete, binary(), binary()) -> command_complete_response(); |
| 515 |
|
(authentication, non_neg_integer(), binary()) -> authentication_response(); |
| 516 |
|
(notice_response, binary(), [{<<_:8>>, binary()}]) -> {[{<<_:8>>, binary()}], binary()}; |
| 517 |
|
(error_response, binary(), [{<<_:8>>, binary()}]) -> {[{<<_:8>>, binary()}], binary()}; |
| 518 |
|
([atom()], [type()], binary()) -> {#{atom() => any()}, binary()}. |
| 519 |
|
|
| 520 |
|
|
| 521 |
|
demarshal(command_complete, <<"INSERT 0 ", Rows/bytes>>, Remainder) -> |
| 522 |
217 |
{{insert, binary_to_integer(Rows)}, Remainder}; |
| 523 |
|
|
| 524 |
|
demarshal(command_complete, <<"DELETE ", Rows/bytes>>, Remainder) -> |
| 525 |
5 |
{{delete, binary_to_integer(Rows)}, Remainder}; |
| 526 |
|
|
| 527 |
|
demarshal(command_complete, <<"SELECT ", Rows/bytes>>, Remainder) -> |
| 528 |
3735 |
{{select, binary_to_integer(Rows)}, Remainder}; |
| 529 |
|
|
| 530 |
|
demarshal(command_complete, <<"UPDATE ", Rows/bytes>>, Remainder) -> |
| 531 |
4 |
{{update, binary_to_integer(Rows)}, Remainder}; |
| 532 |
|
|
| 533 |
|
demarshal(command_complete, <<"MOVE ", Rows/bytes>>, Remainder) -> |
| 534 |
:-( |
{{move, binary_to_integer(Rows)}, Remainder}; |
| 535 |
|
|
| 536 |
|
demarshal(command_complete, <<"FETCH ", Rows/bytes>>, Remainder) -> |
| 537 |
:-( |
{{fetch, binary_to_integer(Rows)}, Remainder}; |
| 538 |
|
|
| 539 |
|
demarshal(command_complete, <<"COPY ", Rows/bytes>>, Remainder) -> |
| 540 |
1 |
{{copy, binary_to_integer(Rows)}, Remainder}; |
| 541 |
|
|
| 542 |
|
demarshal(command_complete, Decoded, Remainder) -> |
| 543 |
212 |
{pgmp_util:snake_case( |
| 544 |
|
lists:map( |
| 545 |
|
fun |
| 546 |
|
(Name) -> |
| 547 |
238 |
binary_to_list(string:lowercase(Name)) |
| 548 |
|
end, |
| 549 |
|
binary:split( |
| 550 |
|
Decoded, |
| 551 |
|
<<" ">>, |
| 552 |
|
[global]))), |
| 553 |
|
Remainder}; |
| 554 |
|
|
| 555 |
|
demarshal(authentication, 0, R) -> |
| 556 |
66 |
{authenticated, R}; |
| 557 |
|
|
| 558 |
|
demarshal(authentication, 2, R) -> |
| 559 |
:-( |
{kerberos, R}; |
| 560 |
|
|
| 561 |
|
demarshal(authentication, 3, R) -> |
| 562 |
:-( |
{clear_text_password, R}; |
| 563 |
|
|
| 564 |
|
demarshal(authentication, 5, R0) -> |
| 565 |
:-( |
{Salt, R1} = ?FUNCTION_NAME({byte, 4}, R0), |
| 566 |
:-( |
{{md5_password, Salt}, R1}; |
| 567 |
|
|
| 568 |
|
demarshal(authentication, 6, R) -> |
| 569 |
:-( |
{scm_credentials, R}; |
| 570 |
|
|
| 571 |
|
demarshal(authentication, 7, R) -> |
| 572 |
:-( |
{gss, R}; |
| 573 |
|
|
| 574 |
|
demarshal(authentication, 9, R) -> |
| 575 |
:-( |
{sspi, R}; |
| 576 |
|
|
| 577 |
|
demarshal(authentication, 10, R) -> |
| 578 |
66 |
{{sasl, |
| 579 |
|
lists:reverse( |
| 580 |
|
pgmp_binary:foldl( |
| 581 |
|
fun |
| 582 |
|
(<<0:8>>, A) -> |
| 583 |
66 |
{<<>>, A}; |
| 584 |
|
|
| 585 |
|
(Encoded, A) -> |
| 586 |
132 |
{Mechanism, Remainder} = ?FUNCTION_NAME(string, Encoded), |
| 587 |
132 |
{Remainder, [Mechanism | A]} |
| 588 |
|
end, |
| 589 |
|
[], |
| 590 |
|
R))}, |
| 591 |
|
<<>>}; |
| 592 |
|
|
| 593 |
|
demarshal(authentication, 11, R) -> |
| 594 |
132 |
{{sasl_continue, R}, <<>>}; |
| 595 |
|
|
| 596 |
|
demarshal(authentication, 12, R) -> |
| 597 |
66 |
{{sasl_final, R}, <<>>}; |
| 598 |
|
|
| 599 |
|
demarshal(Type, <<0:8, Remainder/bytes>>, A) |
| 600 |
|
when Type == notice_response; |
| 601 |
|
Type == error_response -> |
| 602 |
14 |
{lists:reverse(A), Remainder}; |
| 603 |
|
|
| 604 |
|
demarshal(Type, <<Tag:1/bytes, R0/bytes>>, A) |
| 605 |
|
when Type == notice_response; |
| 606 |
|
Type == error_response -> |
| 607 |
113 |
{Value, R1} = ?FUNCTION_NAME(string, R0), |
| 608 |
113 |
?FUNCTION_NAME(Type, R1, [{Tag, Value} | A]); |
| 609 |
|
|
| 610 |
|
demarshal(Keys, Types, Encoded) when is_list(Keys), is_list(Types) -> |
| 611 |
9094 |
{Values, Remainder} = ?FUNCTION_NAME(Types, Encoded), |
| 612 |
9094 |
{maps:from_list(lists:zip(Keys, Values)), Remainder}. |
| 613 |
|
|
| 614 |
|
|
| 615 |
|
-type row_description() :: #{field_name := binary(), |
| 616 |
|
table_oid := pgmp:int32(), |
| 617 |
|
column_number := pgmp:int16(), |
| 618 |
|
type_oid := pgmp:int32(), |
| 619 |
|
type_size := pgmp:int16(), |
| 620 |
|
type_modifier := pgmp:int32(), |
| 621 |
|
format := text | binary}. |
| 622 |
|
|
| 623 |
|
-type tuple_data() :: #{format := text, value := any()} |
| 624 |
|
| null. |
| 625 |
|
|
| 626 |
|
-spec demarshal(row_description, non_neg_integer(), binary(), [row_description()]) -> {[row_description(), ...], binary()}; |
| 627 |
|
(data_row, non_neg_integer(), binary(), [pgmp_data_row:decoded()]) -> {[pgmp_data_row:decoded()], binary()}; |
| 628 |
|
(tuple_data, non_neg_integer(), binary(), [tuple_data()]) -> {[tuple_data()], binary()}. |
| 629 |
|
|
| 630 |
|
|
| 631 |
|
demarshal(row_description, 0, Remainder, A) -> |
| 632 |
7777 |
{lists:reverse(A), Remainder}; |
| 633 |
|
|
| 634 |
|
demarshal(row_description = Type, Columns, D0, A) -> |
| 635 |
8544 |
Keys = [field_name, |
| 636 |
|
table_oid, |
| 637 |
|
column_number, |
| 638 |
|
type_oid, |
| 639 |
|
type_size, |
| 640 |
|
type_modifier, |
| 641 |
|
format], |
| 642 |
|
|
| 643 |
8544 |
Types = [string, |
| 644 |
|
int32, |
| 645 |
|
int16, |
| 646 |
|
int32, |
| 647 |
|
int16, |
| 648 |
|
int32, |
| 649 |
|
int16], |
| 650 |
|
|
| 651 |
8544 |
{#{format := Format} = KV, D1} = ?FUNCTION_NAME(Keys, Types, D0), |
| 652 |
8544 |
?FUNCTION_NAME(Type, |
| 653 |
|
Columns - 1, |
| 654 |
|
D1, |
| 655 |
|
[KV#{format := text_binary(Format)} | A]); |
| 656 |
|
|
| 657 |
|
demarshal(data_row, 0, Remainder, A) -> |
| 658 |
10236 |
{lists:reverse(A), Remainder}; |
| 659 |
|
|
| 660 |
|
demarshal(data_row = Type, Column, D0, A) -> |
| 661 |
200801 |
case demarshal(int32, D0) of |
| 662 |
|
{0, D1} -> |
| 663 |
33 |
?FUNCTION_NAME(Type, Column - 1, D1, [<<>> | A]); |
| 664 |
|
|
| 665 |
|
{-1, D1} -> |
| 666 |
18375 |
?FUNCTION_NAME(Type, Column - 1, D1, [null | A]); |
| 667 |
|
|
| 668 |
|
{Size, D1} -> |
| 669 |
182393 |
<<Data:Size/bytes, D2/bytes>> = D1, |
| 670 |
182393 |
?FUNCTION_NAME(Type, Column - 1, D2, [Data | A]) |
| 671 |
|
end; |
| 672 |
|
|
| 673 |
|
demarshal(tuple_data, 0, Remainder, A) -> |
| 674 |
70 |
{lists:reverse(A), Remainder}; |
| 675 |
|
|
| 676 |
|
|
| 677 |
|
demarshal(tuple_data, Columns, <<"n", Remainder/bytes>>, A) -> |
| 678 |
7 |
?FUNCTION_NAME(tuple_data, |
| 679 |
|
Columns - 1, |
| 680 |
|
Remainder, |
| 681 |
|
[null | A]); |
| 682 |
|
|
| 683 |
|
demarshal(tuple_data, |
| 684 |
|
Columns, |
| 685 |
|
<<"b", Length:32, Value:Length/bytes, Remainder/bytes>>, |
| 686 |
|
A) -> |
| 687 |
129 |
?FUNCTION_NAME(tuple_data, |
| 688 |
|
Columns - 1, |
| 689 |
|
Remainder, |
| 690 |
|
[#{format => binary, value => Value} | A]); |
| 691 |
|
|
| 692 |
|
demarshal(tuple_data, |
| 693 |
|
Columns, |
| 694 |
|
<<"t", Length:32, Value:Length/bytes, Remainder/bytes>>, |
| 695 |
|
A) -> |
| 696 |
6 |
?FUNCTION_NAME(tuple_data, |
| 697 |
|
Columns - 1, |
| 698 |
|
Remainder, |
| 699 |
|
[#{format => text, value => Value} | A]). |
| 700 |
|
|
| 701 |
|
|
| 702 |
|
-spec size_inclusive(iodata()) -> iolist(). |
| 703 |
|
|
| 704 |
|
size_inclusive(Data) -> |
| 705 |
39502 |
[marshal(int32, iolist_size(Data) + 4), Data]. |
| 706 |
|
|
| 707 |
|
|
| 708 |
|
-spec size_exclusive(iodata()) -> iolist(). |
| 709 |
|
|
| 710 |
|
size_exclusive(Data) -> |
| 711 |
962 |
[marshal(int32, iolist_size(Data)), Data]. |
| 712 |
|
|
| 713 |
|
|
| 714 |
|
marshal(Names, Types, Bindings) -> |
| 715 |
12 |
?LOG_DEBUG(#{names => Names, types => Types, bindings => Bindings}), |
| 716 |
12 |
lists:map( |
| 717 |
|
fun |
| 718 |
|
({Name, Type}) -> |
| 719 |
43 |
?FUNCTION_NAME(Type, maps:get(Name, Bindings)) |
| 720 |
|
end, |
| 721 |
|
lists:zip(Names, Types)). |
| 722 |
|
|
| 723 |
|
|
| 724 |
|
-spec marshal(string, iodata()) -> iolist(); |
| 725 |
|
(byte, <<_:8>>) -> iodata(); |
| 726 |
|
(int8, pgmp:int8()) -> nonempty_binary(); |
| 727 |
|
(int16, pgmp:int16()) -> nonempty_binary(); |
| 728 |
|
(int32, pgmp:int32()) -> nonempty_binary(); |
| 729 |
|
(int64, pgmp:int32()) -> nonempty_binary(); |
| 730 |
|
({int, 8}, pgmp:int8()) -> nonempty_binary(); |
| 731 |
|
({int, 16}, pgmp:int16()) -> nonempty_binary(); |
| 732 |
|
({int, 32}, pgmp:int32()) -> nonempty_binary(); |
| 733 |
|
({int, 64}, pgmp:int64()) -> nonempty_binary(); |
| 734 |
|
(x_log_data, x_log_data()) -> iodata(); |
| 735 |
|
(authentication, ok) -> iodata(); |
| 736 |
|
(ready_for_query, tx_status()) -> iodata(); |
| 737 |
|
(row_description, [row_description()]) -> iodata(); |
| 738 |
|
(data_row, iodata()) -> iodata(); |
| 739 |
|
(command_complete, {iodata() | atom(), integer()} | iodata() | atom()) -> iodata(); |
| 740 |
|
(copy_data, iodata()) -> iodata(); |
| 741 |
|
(copy_both_response, copy_both_response()) -> iodata(); |
| 742 |
|
(copy_out_response, copy_out_response()) -> iodata(); |
| 743 |
|
(tuple_data, tuple_data()) -> iodata(). |
| 744 |
|
|
| 745 |
|
marshal(string, Value) -> |
| 746 |
27912 |
[Value, <<0:8>>]; |
| 747 |
|
|
| 748 |
|
marshal(byte, Value) -> |
| 749 |
198 |
Value; |
| 750 |
|
|
| 751 |
|
marshal({byte, N}, Value) when byte_size(Value) == N -> |
| 752 |
2 |
Value; |
| 753 |
|
|
| 754 |
|
marshal(int8, Value) -> |
| 755 |
6 |
?FUNCTION_NAME({int, 8}, Value); |
| 756 |
|
|
| 757 |
|
marshal(int16, Value) -> |
| 758 |
23579 |
?FUNCTION_NAME({int, 16}, Value); |
| 759 |
|
|
| 760 |
|
marshal(int32, Value) -> |
| 761 |
48703 |
?FUNCTION_NAME({int, 32}, Value); |
| 762 |
|
|
| 763 |
|
marshal(int64, Value) -> |
| 764 |
326 |
?FUNCTION_NAME({int, 64}, Value); |
| 765 |
|
|
| 766 |
|
marshal({int, Size}, Value) when is_integer(Value)-> |
| 767 |
73987 |
<<Value:Size>>; |
| 768 |
|
|
| 769 |
|
marshal(authentication, ok) -> |
| 770 |
:-( |
["R", size_inclusive([<<0:32>>])]; |
| 771 |
|
|
| 772 |
|
marshal(ready_for_query, TxStatus) -> |
| 773 |
:-( |
["Z", size_inclusive(tx_status(TxStatus))]; |
| 774 |
|
|
| 775 |
|
marshal(command_complete = Tag, {Command, Rows}) when is_atom(Command), |
| 776 |
|
is_integer(Rows) -> |
| 777 |
1 |
?FUNCTION_NAME( |
| 778 |
|
Tag, |
| 779 |
|
[string:uppercase(atom_to_list(Command)), |
| 780 |
|
" ", |
| 781 |
|
integer_to_list(Rows)]); |
| 782 |
|
|
| 783 |
|
marshal(command_complete = Tag, Command) when is_atom(Command) -> |
| 784 |
:-( |
?FUNCTION_NAME(Tag, string:uppercase(atom_to_list(Command))); |
| 785 |
|
|
| 786 |
|
marshal(command_complete, Description) -> |
| 787 |
1 |
["C", size_inclusive(marshal(string, Description))]; |
| 788 |
|
|
| 789 |
|
marshal(row_description, FieldDescriptions) -> |
| 790 |
2 |
["T", |
| 791 |
|
size_inclusive( |
| 792 |
|
[marshal(int16, length(FieldDescriptions)), |
| 793 |
|
lists:map( |
| 794 |
|
fun |
| 795 |
|
(#{field_name := FieldName, |
| 796 |
|
table_oid := TableOID, |
| 797 |
|
column_number := ColumnNumber, |
| 798 |
|
type_oid := TypeOID, |
| 799 |
|
type_size := TypeSize, |
| 800 |
|
type_modifier := TypeModifier, |
| 801 |
|
format := Format}) -> |
| 802 |
5 |
[marshal(string, FieldName), |
| 803 |
|
marshal(int32, TableOID), |
| 804 |
|
marshal(int16, ColumnNumber), |
| 805 |
|
marshal(int32, TypeOID), |
| 806 |
|
marshal(int16, TypeSize), |
| 807 |
|
marshal(int32, TypeModifier), |
| 808 |
|
marshal(int16, text_binary(Format))] |
| 809 |
|
end, |
| 810 |
|
FieldDescriptions)])]; |
| 811 |
|
|
| 812 |
|
marshal(data_row, Columns) -> |
| 813 |
:-( |
["D", |
| 814 |
|
size_inclusive( |
| 815 |
|
[marshal(int16, length(Columns)), |
| 816 |
|
lists:map( |
| 817 |
|
fun |
| 818 |
|
(<<-1:32/signed>> = Column) -> |
| 819 |
:-( |
Column; |
| 820 |
|
|
| 821 |
|
(Column) -> |
| 822 |
:-( |
size_exclusive(Column) |
| 823 |
|
end, |
| 824 |
|
Columns)])]; |
| 825 |
|
|
| 826 |
|
marshal(copy_data, {standby_status_update, StandbyStatusUpdate}) -> |
| 827 |
1 |
["d", |
| 828 |
|
size_inclusive( |
| 829 |
|
["r", |
| 830 |
|
?FUNCTION_NAME( |
| 831 |
|
[written, |
| 832 |
|
flushed, |
| 833 |
|
applied, |
| 834 |
|
clock, |
| 835 |
|
reply], |
| 836 |
|
[int64, |
| 837 |
|
int64, |
| 838 |
|
int64, |
| 839 |
|
int64, |
| 840 |
|
{byte, 1}], |
| 841 |
|
maps:map( |
| 842 |
|
fun |
| 843 |
|
(reply, false) -> |
| 844 |
1 |
<<0:8>>; |
| 845 |
|
|
| 846 |
|
(reply, true) -> |
| 847 |
|
|
| 848 |
:-( |
<<1:8>>; |
| 849 |
|
|
| 850 |
|
(_, V) -> |
| 851 |
4 |
V |
| 852 |
|
end, |
| 853 |
|
StandbyStatusUpdate))])]; |
| 854 |
|
|
| 855 |
|
marshal(copy_data, {keepalive, PrimaryKeepalive}) -> |
| 856 |
1 |
["d", |
| 857 |
|
size_inclusive( |
| 858 |
|
["k", |
| 859 |
|
?FUNCTION_NAME( |
| 860 |
|
[end_wal, clock, reply], |
| 861 |
|
[int64, int64, {byte, 1}], |
| 862 |
|
maps:map( |
| 863 |
|
fun |
| 864 |
|
(reply, false) -> |
| 865 |
1 |
<<0:8>>; |
| 866 |
|
|
| 867 |
|
(reply, true) -> |
| 868 |
|
|
| 869 |
:-( |
<<1:8>>; |
| 870 |
|
|
| 871 |
|
(_, V) -> |
| 872 |
2 |
V |
| 873 |
|
end, |
| 874 |
|
PrimaryKeepalive))])]; |
| 875 |
|
|
| 876 |
|
marshal(copy_data, {x_log_data, #{stream := Stream} = XLogData}) -> |
| 877 |
5 |
["d", |
| 878 |
|
size_inclusive( |
| 879 |
|
["w", |
| 880 |
|
?FUNCTION_NAME( |
| 881 |
|
[start_wal, end_wal, clock], |
| 882 |
|
[int64, int64, int64], |
| 883 |
|
XLogData), |
| 884 |
|
?FUNCTION_NAME(x_log_data, Stream)])]; |
| 885 |
|
|
| 886 |
|
marshal(x_log_data, {begin_transaction, BeginTransaction}) -> |
| 887 |
1 |
["B", |
| 888 |
|
?FUNCTION_NAME( |
| 889 |
|
[final_lsn, commit_timestamp, xid], |
| 890 |
|
[int64, int64, int32], |
| 891 |
|
BeginTransaction)]; |
| 892 |
|
|
| 893 |
|
marshal(x_log_data, {relation, #{columns := Columns} = Relation}) -> |
| 894 |
1 |
["R", |
| 895 |
|
?FUNCTION_NAME( |
| 896 |
|
[id, namespace, name, replica_identity, ncols], |
| 897 |
|
[int32, string, string, int8, int16], |
| 898 |
|
maps:merge( |
| 899 |
|
#{ncols => length(Columns)}, |
| 900 |
|
Relation)), |
| 901 |
|
lists:map( |
| 902 |
|
fun |
| 903 |
|
(Column) -> |
| 904 |
2 |
?FUNCTION_NAME( |
| 905 |
|
[flags, name, type, modifier], |
| 906 |
|
[int8, string, int32, int32], |
| 907 |
|
Column) |
| 908 |
|
end, |
| 909 |
|
Columns)]; |
| 910 |
|
|
| 911 |
|
marshal(x_log_data, |
| 912 |
|
{insert, |
| 913 |
|
#{relation := Relation, tuple := Tuple}}) -> |
| 914 |
2 |
["I", |
| 915 |
|
?FUNCTION_NAME(int32, Relation), |
| 916 |
|
"N", |
| 917 |
|
?FUNCTION_NAME(tuple_data, Tuple)]; |
| 918 |
|
|
| 919 |
|
marshal(x_log_data, |
| 920 |
|
{update, #{key := Key, relation := Relation, new := New}}) -> |
| 921 |
:-( |
["U", |
| 922 |
|
?FUNCTION_NAME(int32, Relation), |
| 923 |
|
"K", |
| 924 |
|
?FUNCTION_NAME(tuple_data, Key), |
| 925 |
|
"N", |
| 926 |
|
?FUNCTION_NAME(tuple_data, New)]; |
| 927 |
|
|
| 928 |
|
marshal(x_log_data, |
| 929 |
|
{update, #{relation := Relation, new := New}}) -> |
| 930 |
:-( |
["U", |
| 931 |
|
?FUNCTION_NAME(int32, Relation), |
| 932 |
|
"N", |
| 933 |
|
?FUNCTION_NAME(tuple_data, New)]; |
| 934 |
|
|
| 935 |
|
marshal(x_log_data, |
| 936 |
|
{delete, #{key := Key, relation := Relation}}) -> |
| 937 |
:-( |
["D", |
| 938 |
|
?FUNCTION_NAME(int32, Relation), |
| 939 |
|
"K", |
| 940 |
|
?FUNCTION_NAME(tuple_data, Key)]; |
| 941 |
|
|
| 942 |
|
marshal(x_log_data, |
| 943 |
|
{truncate, #{options := Options, relations := Relations}}) -> |
| 944 |
:-( |
["T", |
| 945 |
|
?FUNCTION_NAME(int32, length(Relations)), |
| 946 |
|
?FUNCTION_NAME(int8, Options), |
| 947 |
:-( |
[?FUNCTION_NAME(int32, Relation) || Relation <- Relations]]; |
| 948 |
|
|
| 949 |
|
marshal(tuple_data, Columns) -> |
| 950 |
2 |
[?FUNCTION_NAME(int16, length(Columns)), |
| 951 |
|
lists:map( |
| 952 |
|
fun |
| 953 |
|
(null) -> |
| 954 |
:-( |
"n"; |
| 955 |
|
|
| 956 |
|
(#{format := Format, value := Value}) -> |
| 957 |
3 |
[case Format of |
| 958 |
|
binary -> |
| 959 |
2 |
"b"; |
| 960 |
|
|
| 961 |
|
text -> |
| 962 |
1 |
"t" |
| 963 |
|
end, |
| 964 |
|
?FUNCTION_NAME(int32, byte_size(Value)), Value] |
| 965 |
|
end, |
| 966 |
|
Columns)]; |
| 967 |
|
|
| 968 |
|
marshal(x_log_data, {commit, Commit}) -> |
| 969 |
1 |
["C", |
| 970 |
|
?FUNCTION_NAME( |
| 971 |
|
[flags, commit_lsn, end_lsn, commit_timestamp], |
| 972 |
|
[int8, int64, int64, int64], |
| 973 |
|
Commit)]; |
| 974 |
|
|
| 975 |
|
marshal(copy_both_response, #{format := Format, n_of_cols := N}) -> |
| 976 |
1 |
["W", |
| 977 |
|
size_inclusive( |
| 978 |
|
[?FUNCTION_NAME(int8, text_binary(Format)), |
| 979 |
|
?FUNCTION_NAME(int16, N)])]; |
| 980 |
|
|
| 981 |
|
marshal(copy_out_response, #{format := Format, col_formats := ColFormats}) -> |
| 982 |
1 |
["H", |
| 983 |
|
size_inclusive( |
| 984 |
|
[?FUNCTION_NAME(int8, text_binary(Format)), |
| 985 |
|
?FUNCTION_NAME(int16, length(ColFormats)), |
| 986 |
|
lists:map( |
| 987 |
|
fun |
| 988 |
|
(ColFormat) -> |
| 989 |
2 |
?FUNCTION_NAME(int16, text_binary(ColFormat)) |
| 990 |
|
end, |
| 991 |
|
ColFormats)])]; |
| 992 |
|
|
| 993 |
|
marshal(copy_data, Encoded) -> |
| 994 |
1 |
["d", size_inclusive(Encoded)]; |
| 995 |
|
|
| 996 |
|
marshal(copy_done, <<>> = Encoded) -> |
| 997 |
1 |
["c", size_inclusive(Encoded)]; |
| 998 |
|
|
| 999 |
|
marshal(parameter_status, {Key, Value}) -> |
| 1000 |
1 |
["S", |
| 1001 |
|
size_inclusive( |
| 1002 |
|
[?FUNCTION_NAME(string, Key), |
| 1003 |
|
?FUNCTION_NAME(string, Value)])]; |
| 1004 |
|
|
| 1005 |
|
marshal(Type, Value) -> |
| 1006 |
1 |
error(badarg, [Type, Value]). |
| 1007 |
|
|
| 1008 |
|
|
| 1009 |
|
-type tx_status() :: idle |
| 1010 |
|
| in_tx_block |
| 1011 |
|
| in_failed_tx_block. |
| 1012 |
|
|
| 1013 |
|
tx_status(idle) -> |
| 1014 |
:-( |
<<"I">>; |
| 1015 |
|
tx_status(in_tx_block) -> |
| 1016 |
:-( |
<<"T">>; |
| 1017 |
|
tx_status(in_failed_tx_block) -> |
| 1018 |
:-( |
<<"E">>; |
| 1019 |
|
tx_status(<<"I">>) -> |
| 1020 |
192 |
idle; |
| 1021 |
|
tx_status(<<"T">>) -> |
| 1022 |
89 |
in_tx_block; |
| 1023 |
|
tx_status(<<"E">>) -> |
| 1024 |
5 |
in_failed_tx_block. |
| 1025 |
|
|
| 1026 |
|
|
| 1027 |
9 |
text_binary(text) -> 0; |
| 1028 |
:-( |
text_binary(binary) -> 1; |
| 1029 |
4293 |
text_binary(0) -> text; |
| 1030 |
4254 |
text_binary(1) -> binary. |