1 |
|
%% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com> |
2 |
|
%% |
3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
%% you may not use this file except in compliance with the License. |
5 |
|
%% You may obtain a copy of the License at |
6 |
|
%% |
7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
%% |
9 |
|
%% Unless required by applicable law or agreed to in writing, software |
10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
%% See the License for the specific language governing permissions and |
13 |
|
%% limitations under the License. |
14 |
|
|
15 |
|
|
16 |
|
%% @doc Replication protocol events |
17 |
|
|
18 |
|
|
19 |
|
-module(msmp_binlog_event). |
20 |
|
|
21 |
|
|
22 |
|
-feature(maybe_expr, enable). |
23 |
|
|
24 |
|
|
25 |
|
-define(FL_COMPLETED_XA, 128). |
26 |
|
-define(FL_GROUP_COMMIT_ID, 2). |
27 |
|
-define(FL_PREPARED_XA, 64). |
28 |
|
-export([decode/0]). |
29 |
|
-export([decode/1]). |
30 |
|
-export([field_metadata/2]). |
31 |
|
-export([rows/1]). |
32 |
|
-import(scran_bits, [into_boolean/0]). |
33 |
|
-import(scran_bytes, [length_encoded/1]). |
34 |
|
-import(scran_bytes, [tag/1]). |
35 |
|
-import(scran_bytes, [take/1]). |
36 |
|
-import(scran_combinator, [condition/2]). |
37 |
|
-import(scran_combinator, [condition/3]). |
38 |
|
-import(scran_combinator, [map_parser/2]). |
39 |
|
-import(scran_combinator, [map_result/2]). |
40 |
|
-import(scran_combinator, [rest/0]). |
41 |
|
-import(scran_combinator, [success/1]). |
42 |
|
-import(scran_multi, [count/2]). |
43 |
|
-import(scran_multi, [many1/1]). |
44 |
|
-import(scran_result, [ignore/1]). |
45 |
|
-import(scran_result, [into_bits/2]). |
46 |
|
-import(scran_result, [into_map/1]). |
47 |
|
-import(scran_result, [into_tuple/1]). |
48 |
|
-import(scran_result, [kv/2]). |
49 |
|
-import(scran_sequence, [combined_with/2]). |
50 |
|
-import(scran_sequence, [followed_with/2]). |
51 |
|
-import(scran_sequence, [pair/2]). |
52 |
|
-import(scran_sequence, [sequence/1]). |
53 |
|
-include_lib("kernel/include/logger.hrl"). |
54 |
|
-on_load(on_load/0). |
55 |
|
|
56 |
|
|
57 |
|
on_load() -> |
58 |
1 |
persistent_term:put( |
59 |
|
?MODULE, |
60 |
|
msmp_enum:priv_consult("event-type.terms")). |
61 |
|
|
62 |
|
|
63 |
|
decode() -> |
64 |
:-( |
?FUNCTION_NAME(#{mapped => #{}}). |
65 |
|
|
66 |
|
|
67 |
|
-type binlog_event() :: #{action := log_event, |
68 |
|
header := header(), |
69 |
|
event := event_body(), |
70 |
|
footer := binary()}. |
71 |
|
|
72 |
|
-spec decode(map()) -> scran:parser(binary(), binlog_event()). |
73 |
|
|
74 |
|
decode(Arg) -> |
75 |
59 |
?LOG_DEBUG(#{arg => Arg}), |
76 |
59 |
fun |
77 |
|
(Input) -> |
78 |
59 |
?LOG_DEBUG(#{input => Input}), |
79 |
|
|
80 |
59 |
(followed_with( |
81 |
|
header(Arg), |
82 |
|
fun |
83 |
|
(#{event_size := EventSize} = Header) -> |
84 |
|
|
85 |
59 |
?LOG_DEBUG(#{encoded => binary:part(Input, 0, EventSize)}), |
86 |
|
|
87 |
59 |
into_map( |
88 |
|
sequence( |
89 |
|
[kv(action, success(log_event)), |
90 |
|
kv(header, success(Header)), |
91 |
|
kv(event, |
92 |
|
map_parser( |
93 |
|
scran_bytes:take( |
94 |
|
EventSize - header_size(Arg) - 4), |
95 |
|
event(Arg, Header))), |
96 |
|
%% kv(footer, rest())])) |
97 |
|
kv(footer, scran_bytes:take(4))])) |
98 |
|
end))(Input) |
99 |
|
end. |
100 |
|
|
101 |
|
|
102 |
|
-type header() :: #{timestamp := msmp:u4(), |
103 |
|
event_type := event_type(), |
104 |
|
server_id := msmp:u4(), |
105 |
|
event_size := msmp:u4(), |
106 |
|
log_pos => msmp:u4(), |
107 |
|
flags => msmp:u4()}. |
108 |
|
|
109 |
|
-spec header(map()) -> scran:parser(binary(), header()). |
110 |
|
|
111 |
|
header(Arg) -> |
112 |
59 |
fun |
113 |
|
(Input) -> |
114 |
59 |
(into_map( |
115 |
|
sequence( |
116 |
|
[kv(timestamp, msmp_integer_fixed:decode(4)), |
117 |
|
kv(event_type, event_type()), |
118 |
|
kv(server_id, msmp_integer_fixed:decode(4)), |
119 |
|
kv(event_size, msmp_integer_fixed:decode(4)), |
120 |
|
condition( |
121 |
|
header_size(Arg) > 1, |
122 |
|
sequence( |
123 |
|
[kv(log_pos, msmp_integer_fixed:decode(4)), |
124 |
|
kv(flags, msmp_integer_fixed:decode(2))]))])))(Input) |
125 |
|
end. |
126 |
|
|
127 |
|
-type event_type() :: start_v3 |
128 |
|
| query |
129 |
|
| stop |
130 |
|
| rotate |
131 |
|
| intvar |
132 |
|
| slave |
133 |
|
| append_block |
134 |
|
| delete_file |
135 |
|
| rand |
136 |
|
| user_var |
137 |
|
| format_description |
138 |
|
| xid |
139 |
|
| begin_load_query |
140 |
|
| execute_load_query |
141 |
|
| table_map |
142 |
|
| write_rows_v1 |
143 |
|
| update_rows_v1 |
144 |
|
| delete_rows_v1 |
145 |
|
| incident_event |
146 |
|
| heartbeat_log |
147 |
|
| ignorable_log |
148 |
|
| rows_query_log |
149 |
|
| write_rows |
150 |
|
| update_rows |
151 |
|
| delete_rows |
152 |
|
| gtid_log |
153 |
|
| anonymous_gtid_log |
154 |
|
| previous_gtids_log |
155 |
|
| transaction_context |
156 |
|
| view_change |
157 |
|
| xa_prepare_log |
158 |
|
| partial_update_rows |
159 |
|
| transaction_payload |
160 |
|
| heartbeat. |
161 |
|
|
162 |
|
-spec event_type() -> scran:parser(binary(), event_type()). |
163 |
|
|
164 |
|
event_type() -> |
165 |
59 |
fun |
166 |
|
(Input) -> |
167 |
59 |
(map_result(msmp_integer_fixed:decode(1), fun event_type/1))(Input) |
168 |
|
end. |
169 |
|
|
170 |
|
|
171 |
|
event_type(Type) -> |
172 |
59 |
maps:get(Type, |
173 |
|
persistent_term:get(?MODULE), |
174 |
|
{unknown, Type}). |
175 |
|
|
176 |
|
|
177 |
|
-type event_body() :: rotate() |
178 |
|
| format_description() |
179 |
|
| table_map(). |
180 |
|
|
181 |
|
-type rotate() :: #{event_type := rotate, |
182 |
|
position => msmp:u8(), |
183 |
|
new_log_ident => binary()}. |
184 |
|
|
185 |
|
-type format_description() :: #{binlog_version := msmp:u2(), |
186 |
|
mysql_server_version := binary(), |
187 |
|
create_timestamp := msmp:u4(), |
188 |
|
header_length := msmp:u1(), |
189 |
|
stuff := binary()}. |
190 |
|
|
191 |
|
-type table_map() :: #{event_type := table_map, |
192 |
|
table_id := msmp:u6(), |
193 |
|
flags := msmp:u2(), |
194 |
|
database := binary(), |
195 |
|
coltypes := [msmp_field:type()], |
196 |
|
metadata := msmp_field_optional_metadata:metadata(), |
197 |
|
table := binary()}. |
198 |
|
|
199 |
|
|
200 |
|
event(Arg, #{event_type := rotate}) -> |
201 |
1 |
fun |
202 |
|
(Input) -> |
203 |
1 |
(into_map( |
204 |
|
sequence( |
205 |
|
[condition( |
206 |
|
version(Arg) > 1, |
207 |
|
kv(position, msmp_integer_fixed:decode(8))), |
208 |
|
kv(new_log_ident, msmp_string_rest_of_packet:decode())])))(Input) |
209 |
|
end; |
210 |
|
|
211 |
|
event(_, #{event_type := format_description}) -> |
212 |
1 |
fun |
213 |
|
(Input) -> |
214 |
1 |
(into_map( |
215 |
|
sequence( |
216 |
|
[kv(binlog_version, msmp_integer_fixed:decode(2)), |
217 |
|
kv(mysql_server_version, |
218 |
|
map_parser( |
219 |
|
msmp_string_fixed:decode(50), |
220 |
|
msmp_string_null_terminated:decode())), |
221 |
|
kv(create_timestamp, msmp_integer_fixed:decode(4)), |
222 |
|
kv(header_length, msmp_integer_fixed:decode(1)), |
223 |
|
kv(stuff, rest())])))(Input) |
224 |
|
end; |
225 |
|
|
226 |
|
event(_, #{event_type := table_map}) -> |
227 |
5 |
fun |
228 |
|
(Input) -> |
229 |
5 |
(combined_with( |
230 |
|
map_result( |
231 |
|
into_map( |
232 |
|
sequence( |
233 |
|
[kv(table_id, msmp_integer_fixed:decode(6)), |
234 |
|
kv(flags, msmp_integer_fixed:decode(2)), |
235 |
|
kv(database, msmp_string_length_encoded:decode()), |
236 |
|
ignore(tag(<<0>>)), |
237 |
|
kv(table, msmp_string_length_encoded:decode()), |
238 |
|
ignore(tag(<<0>>)), |
239 |
|
kv(coltypes, |
240 |
|
count( |
241 |
|
msmp_integer_variable:decode(), |
242 |
|
map_result( |
243 |
|
msmp_integer_fixed:decode(1), |
244 |
|
fun msmp_field:lookup/1))), |
245 |
|
kv(field_metadata, take(msmp_integer_variable:decode()))])), |
246 |
|
fun |
247 |
|
(#{field_metadata := FieldMetadata, |
248 |
|
coltypes := ColTypes} = Result) -> |
249 |
5 |
Result#{field_metadata := field_metadata( |
250 |
|
ColTypes, |
251 |
|
FieldMetadata)} |
252 |
|
end), |
253 |
|
fun |
254 |
|
(#{coltypes := ColTypes}) -> |
255 |
5 |
into_map( |
256 |
|
sequence( |
257 |
|
[kv(null_bitmap, |
258 |
|
take(bitmap_bytes(length(ColTypes)))), |
259 |
|
kv(metadata, |
260 |
|
msmp_field_optional_metadata:decode(ColTypes))])) |
261 |
|
end))(Input) |
262 |
|
end; |
263 |
|
|
264 |
|
event(#{mapped := Mapped}, #{event_type := EventType}) |
265 |
|
when EventType == update_rows; |
266 |
|
EventType == update_rows_v1; |
267 |
|
EventType == update_rows_compressed_v1; |
268 |
|
EventType == write_rows; |
269 |
|
EventType == write_rows_v1; |
270 |
|
EventType == write_rows_compressed_v1; |
271 |
|
EventType == delete_rows; |
272 |
|
EventType == delete_rows_v1; |
273 |
|
EventType == delete_rows_compressed_v1-> |
274 |
30 |
fun |
275 |
|
(Input) -> |
276 |
30 |
(followed_with( |
277 |
|
msmp_integer_fixed:decode(6), |
278 |
|
|
279 |
|
fun |
280 |
|
(TableId) when is_map_key(TableId, Mapped) -> |
281 |
|
|
282 |
30 |
#{TableId := #{coltypes := ColTypes} = Mapping} = Mapped, |
283 |
|
|
284 |
30 |
into_map( |
285 |
|
sequence( |
286 |
|
[kv(table_id, success(TableId)), |
287 |
|
kv(flags, msmp_integer_fixed:decode(2)), |
288 |
|
condition( |
289 |
15 |
EventType == write_rows orelse |
290 |
15 |
EventType == update_rows orelse |
291 |
14 |
EventType == delete_rows, |
292 |
|
kv(extra_row_info, |
293 |
|
length_encoded( |
294 |
|
map_result( |
295 |
|
msmp_integer_fixed:decode(2), |
296 |
|
fun |
297 |
|
(Length) -> |
298 |
17 |
Length - 2 |
299 |
|
end)))), |
300 |
|
kv(columns, msmp_integer_variable:decode()), |
301 |
|
condition( |
302 |
|
EventType == update_rows, |
303 |
|
sequence( |
304 |
|
[kv(bitmaps, |
305 |
|
into_tuple( |
306 |
|
pair( |
307 |
|
bitmap(length(ColTypes)), |
308 |
|
bitmap(length(ColTypes))))), |
309 |
|
|
310 |
|
kv(rows, |
311 |
|
condition( |
312 |
|
EventType == update_rows_compressed_v1, |
313 |
|
map_parser( |
314 |
|
uncompress(), |
315 |
|
many1( |
316 |
|
into_tuple( |
317 |
|
pair( |
318 |
|
row(Mapping), |
319 |
|
row(Mapping))))), |
320 |
|
|
321 |
|
%% uncompressed |
322 |
|
many1( |
323 |
|
into_tuple( |
324 |
|
pair( |
325 |
|
row(Mapping), |
326 |
|
row(Mapping))))))]), |
327 |
|
|
328 |
|
sequence( |
329 |
|
[kv(bitmap, bitmap(length(ColTypes))), |
330 |
|
kv(rows, |
331 |
|
condition( |
332 |
|
EventType == write_rows_compressed_v1, |
333 |
|
map_parser( |
334 |
|
uncompress(), |
335 |
|
many1(row(Mapping))), |
336 |
|
|
337 |
|
%% uncompressed |
338 |
|
many1(row(Mapping))))]))])) |
339 |
|
end))(Input) |
340 |
|
end; |
341 |
|
|
342 |
|
event(Arg, #{event_type := EventType} = Header) |
343 |
|
when EventType == query; |
344 |
|
EventType == query_compressed -> |
345 |
8 |
fun |
346 |
|
(Input) -> |
347 |
8 |
?LOG_DEBUG(#{arg => Arg, header => Header}), |
348 |
|
|
349 |
8 |
(into_map( |
350 |
|
sequence( |
351 |
|
[kv(slave_proxy_id, msmp_integer_fixed:decode(4)), |
352 |
|
kv(execution_time, msmp_integer_fixed:decode(4)), |
353 |
|
kv(schema_length, msmp_integer_fixed:decode(1)), |
354 |
|
kv(error_code, msmp_integer_fixed:decode(2)), |
355 |
|
condition( |
356 |
|
version(Arg) >= 0, |
357 |
|
sequence( |
358 |
|
[kv(status_vars, |
359 |
|
map_parser( |
360 |
|
take(msmp_integer_fixed:decode(2)), |
361 |
|
msmp_status_variable:decode())), |
362 |
|
kv(schema, msmp_string_null_terminated:decode()), |
363 |
|
condition( |
364 |
|
EventType == query, |
365 |
|
kv(sql, rest()), |
366 |
|
kv(sql, uncompress()))]))])))(Input) |
367 |
|
end; |
368 |
|
|
369 |
|
event(_Arg, #{event_type := intvar}) -> |
370 |
2 |
fun |
371 |
|
(Input) -> |
372 |
2 |
(into_map( |
373 |
|
sequence( |
374 |
|
[scran_branch:alt( |
375 |
|
[kv(type, scran_combinator:value(last_insert_id, tag(<<1>>))), |
376 |
|
kv(type, scran_combinator:value(insert_id, tag(<<2>>)))]), |
377 |
|
kv(value, msmp_integer_fixed:decode(8))])))(Input) |
378 |
|
end; |
379 |
|
|
380 |
|
event(Arg, #{event_type := gtid_list} = Header) -> |
381 |
2 |
fun |
382 |
|
(Input) -> |
383 |
2 |
?LOG_DEBUG(#{arg => Arg, |
384 |
|
header => Header, |
385 |
2 |
input => Input}), |
386 |
|
|
387 |
2 |
(scran_sequence:combined_with( |
388 |
|
into_map( |
389 |
|
sequence( |
390 |
|
[kv(count, scran_number:u(little, 28)), |
391 |
|
kv(flags, scran_number:u(little, 4))])), |
392 |
|
fun |
393 |
|
(#{count := Count} = Initial) -> |
394 |
2 |
?LOG_DEBUG(#{initial => Initial}), |
395 |
|
|
396 |
2 |
into_map( |
397 |
|
scran_sequence:sequence( |
398 |
|
[kv(gtids, |
399 |
|
scran_multi:count( |
400 |
|
Count, |
401 |
|
into_map( |
402 |
|
sequence( |
403 |
|
[kv(domain, msmp_integer_fixed:decode(4)), |
404 |
|
kv(server, msmp_integer_fixed:decode(4)), |
405 |
|
kv(sequence, msmp_integer_fixed:decode(8))]))))])) |
406 |
|
end))(Input) |
407 |
|
end; |
408 |
|
|
409 |
|
event(Arg, #{event_type := binlog_checkpoint} = Header) -> |
410 |
1 |
fun |
411 |
|
(Input) -> |
412 |
1 |
?LOG_DEBUG(#{arg => Arg, |
413 |
|
header => Header, |
414 |
1 |
input => Input}), |
415 |
1 |
(into_map( |
416 |
|
sequence( |
417 |
|
[kv(filename, scran_bytes:take(scran_number:u32(little)))])))(Input) |
418 |
|
end; |
419 |
|
|
420 |
|
event(Arg, #{event_type := gtid} = Header) -> |
421 |
1 |
fun |
422 |
|
(Input) -> |
423 |
1 |
?LOG_DEBUG(#{arg => Arg, |
424 |
|
header => Header, |
425 |
1 |
input => Input}), |
426 |
1 |
(into_map( |
427 |
|
sequence( |
428 |
|
[kv(sequence, msmp_integer_fixed:decode(8)), |
429 |
|
kv(domain, msmp_integer_fixed:decode(4)), |
430 |
|
kv(flags2, msmp_integer_fixed:decode(1)), |
431 |
|
kv(data, scran_combinator:rest())])))(Input) |
432 |
|
end; |
433 |
|
|
434 |
|
event(Arg, #{event_type := xid} = Header) -> |
435 |
1 |
fun |
436 |
|
(Input) -> |
437 |
1 |
?LOG_DEBUG(#{arg => Arg, |
438 |
|
header => Header, |
439 |
1 |
input => Input}), |
440 |
1 |
(into_map( |
441 |
|
sequence( |
442 |
|
[kv(xid, scran_bytes:take(8))])))(Input) |
443 |
|
end; |
444 |
|
|
445 |
|
event(Arg, #{event_type := heartbeat_log} = Header) -> |
446 |
1 |
fun |
447 |
|
(Input) -> |
448 |
1 |
?LOG_DEBUG(#{arg => Arg, |
449 |
|
header => Header, |
450 |
1 |
input => Input}), |
451 |
1 |
(into_map( |
452 |
|
sequence( |
453 |
|
[kv(log_ident, rest())])))(Input) |
454 |
|
end; |
455 |
|
|
456 |
|
event(Arg, #{event_type := stop} = Header) -> |
457 |
1 |
fun |
458 |
|
(Input) -> |
459 |
1 |
?LOG_DEBUG(#{arg => Arg, |
460 |
|
header => Header, |
461 |
1 |
input => Input}), |
462 |
1 |
(into_map( |
463 |
|
sequence( |
464 |
|
[ignore(scran_combinator:eof())])))(Input) |
465 |
|
end; |
466 |
|
|
467 |
|
event(Arg, #{event_type := previous_gtids_log} = Header) -> |
468 |
2 |
fun |
469 |
|
(Input) -> |
470 |
2 |
?LOG_DEBUG(#{arg => Arg, |
471 |
|
header => Header, |
472 |
2 |
input => Input}), |
473 |
2 |
(into_map( |
474 |
|
scran_sequence:sequence( |
475 |
|
[kv(gtids, msmp_gtid_set:decode())])))(Input) |
476 |
|
end; |
477 |
|
|
478 |
|
event(Arg, #{event_type := anonymous_gtid_log} = Header) -> |
479 |
:-( |
fun |
480 |
|
(Input) -> |
481 |
:-( |
?LOG_DEBUG(#{arg => Arg, |
482 |
|
header => Header, |
483 |
:-( |
input => Input}), |
484 |
:-( |
(into_map( |
485 |
|
sequence( |
486 |
|
[kv(data, rest())])))(Input) |
487 |
|
end; |
488 |
|
|
489 |
|
event(Arg, #{event_type := gtid_log} = Header) -> |
490 |
1 |
fun |
491 |
|
(Input) -> |
492 |
1 |
?LOG_DEBUG(#{arg => Arg, |
493 |
|
header => Header, |
494 |
1 |
input => Input}), |
495 |
1 |
(into_map( |
496 |
|
sequence( |
497 |
|
[kv(flags, msmp_integer_fixed:decode(1)), |
498 |
|
kv(sid, scran_bytes:take(16)), |
499 |
|
kv(gno, msmp_integer_fixed:decode(8)), |
500 |
|
kv(logical_timestamp_typecode, msmp_integer_fixed:decode(1)), |
501 |
|
kv(last_committed, msmp_integer_fixed:decode(8)), |
502 |
|
kv(sequence_number, msmp_integer_fixed:decode(8)), |
503 |
|
gtid_log_timestamp(), |
504 |
|
kv(transaction_length, msmp_integer_fixed:decode(1)), |
505 |
|
kv(server_version, msmp_integer_fixed:decode(4))])))(Input) |
506 |
|
end; |
507 |
|
|
508 |
|
event(Arg, #{event_type := heartbeat} = Header) -> |
509 |
:-( |
fun |
510 |
|
(Input) -> |
511 |
:-( |
?LOG_DEBUG(#{arg => Arg, |
512 |
|
header => Header, |
513 |
:-( |
input => Input}), |
514 |
:-( |
(into_map( |
515 |
|
sequence( |
516 |
|
[kv(unknown, scran_bytes:take(1)), |
517 |
|
kv(log_ident, msmp_string_length_encoded:decode()), |
518 |
|
kv(data, rest())])))(Input) |
519 |
|
end; |
520 |
|
|
521 |
|
event(Arg, #{event_type := annotate_rows} = Header) -> |
522 |
:-( |
fun |
523 |
|
(Input) -> |
524 |
:-( |
?LOG_DEBUG(#{arg => Arg, |
525 |
|
header => Header, |
526 |
:-( |
input => Input}), |
527 |
:-( |
(into_map( |
528 |
|
sequence( |
529 |
|
[kv(annotation, rest())])))(Input) |
530 |
|
end; |
531 |
|
|
532 |
|
event(Arg, #{event_type := user_var} = Header) -> |
533 |
2 |
fun |
534 |
|
(Input) -> |
535 |
2 |
?LOG_DEBUG(#{arg => Arg, |
536 |
|
header => Header, |
537 |
2 |
input => Input}), |
538 |
2 |
(into_map( |
539 |
|
sequence( |
540 |
|
[kv(name, |
541 |
|
scran_bytes:length_encoded( |
542 |
|
msmp_integer_fixed:decode(4))), |
543 |
|
kv(value, |
544 |
|
scran_branch:alt( |
545 |
|
[scran_combinator:value( |
546 |
|
null, |
547 |
|
scran_bytes:tag(<<1>>))]))])))(Input) |
548 |
|
end; |
549 |
|
|
550 |
|
event(Arg, Header) -> |
551 |
:-( |
fun |
552 |
|
(Input) -> |
553 |
:-( |
?LOG_WARNING(#{arg => Arg, |
554 |
|
input => Input, |
555 |
:-( |
header => Header}), |
556 |
:-( |
(into_map( |
557 |
|
sequence( |
558 |
|
[kv(body, rest())])))(Input) |
559 |
|
end. |
560 |
|
|
561 |
|
|
562 |
|
field_metadata(ColTypes, Data) -> |
563 |
6 |
?LOG_DEBUG(#{col_types => ColTypes, data => Data}), |
564 |
6 |
?FUNCTION_NAME(ColTypes, Data, []). |
565 |
|
|
566 |
|
field_metadata([string = ColType | ColTypes], <<Meta:16/little, Data/bytes>>, A) when Meta >= 256 -> |
567 |
4 |
<<RealType:8, Length:8>> = <<Meta:16/little>>, |
568 |
4 |
?LOG_DEBUG(#{col_type => ColType, |
569 |
|
col_types => ColTypes, |
570 |
|
meta => Meta, |
571 |
4 |
data => Data}), |
572 |
4 |
?FUNCTION_NAME( |
573 |
|
ColTypes, |
574 |
|
Data, |
575 |
|
[#{field_type => msmp_field:lookup(RealType), length => Length} | A]); |
576 |
|
|
577 |
|
field_metadata([string = ColType | ColTypes], <<Meta:16/little, Data/bytes>>, A) -> |
578 |
:-( |
?LOG_DEBUG(#{type => ColType, |
579 |
|
meta => Meta, |
580 |
:-( |
data => Data}), |
581 |
:-( |
?FUNCTION_NAME( |
582 |
|
ColTypes, |
583 |
|
Data, |
584 |
|
[#{length => Meta} | A]); |
585 |
|
|
586 |
|
field_metadata([newdecimal = ColType | ColTypes], <<Precision:8, Scale:8, Data/bytes>>, A) -> |
587 |
:-( |
?LOG_DEBUG(#{col_type => ColType, |
588 |
|
col_types => ColTypes, |
589 |
|
precision => Precision, |
590 |
|
scale => Scale, |
591 |
:-( |
data => Data}), |
592 |
:-( |
?FUNCTION_NAME( |
593 |
|
ColTypes, |
594 |
|
Data, |
595 |
|
[#{precision => Precision, scale => Scale} | A]); |
596 |
|
|
597 |
|
field_metadata([ColType | ColTypes], <<Meta:16/little, Data/bytes>>, A) |
598 |
|
when ColType == var_string; |
599 |
|
ColType == varchar; |
600 |
|
ColType == bit -> |
601 |
5 |
?LOG_DEBUG(#{col_type => ColType, |
602 |
|
col_types => ColTypes, |
603 |
|
meta => Meta, |
604 |
5 |
data => Data}), |
605 |
5 |
?FUNCTION_NAME( |
606 |
|
ColTypes, |
607 |
|
Data, |
608 |
|
[Meta | A]); |
609 |
|
|
610 |
|
field_metadata([ColType | ColTypes], <<Meta:8/little, Data/bytes>>, A) |
611 |
|
when ColType == blob; |
612 |
|
ColType == float; |
613 |
|
ColType == double; |
614 |
|
ColType == geometry; |
615 |
|
ColType == json; |
616 |
|
ColType == time2; |
617 |
|
ColType == datetime2; |
618 |
|
ColType == timestamp2 -> |
619 |
16 |
?LOG_DEBUG(#{col_type => ColType, |
620 |
|
col_types => ColTypes, |
621 |
|
meta => Meta, |
622 |
16 |
data => Data}), |
623 |
16 |
?FUNCTION_NAME( |
624 |
|
ColTypes, |
625 |
|
Data, |
626 |
|
[Meta | A]); |
627 |
|
|
628 |
|
field_metadata([ColType | ColTypes], Data, A) -> |
629 |
5 |
?LOG_DEBUG(#{col_type => ColType, |
630 |
|
col_types => ColTypes, |
631 |
5 |
data => Data}), |
632 |
5 |
?FUNCTION_NAME(ColTypes, Data, [undefined | A]); |
633 |
|
|
634 |
|
field_metadata([], <<>>, A) -> |
635 |
6 |
maps:from_list( |
636 |
|
lists:zip( |
637 |
|
lists:seq(length(A), 1, -1), |
638 |
|
A)); |
639 |
|
|
640 |
|
field_metadata([], Remaining, A) -> |
641 |
:-( |
?LOG_DEBUG(#{remaining => Remaining}), |
642 |
:-( |
maps:from_list( |
643 |
|
lists:zip( |
644 |
|
lists:seq(length(A), 1, -1), |
645 |
|
A)). |
646 |
|
|
647 |
|
|
648 |
|
version(Arg) -> |
649 |
127 |
maps:get(version, Arg, 4). |
650 |
|
|
651 |
|
|
652 |
|
header_size(Arg) -> |
653 |
118 |
maps:get(header_size, |
654 |
|
Arg, |
655 |
|
case version(Arg) of |
656 |
|
Version when Version > 1 -> |
657 |
118 |
19; |
658 |
|
_Otherwise -> |
659 |
:-( |
13 |
660 |
|
end). |
661 |
|
|
662 |
|
|
663 |
|
rows(Mapping) -> |
664 |
1 |
fun |
665 |
|
(Input) -> |
666 |
1 |
?LOG_DEBUG(#{mapping => Mapping, input => Input}), |
667 |
1 |
(many1(row(Mapping)))(Input) |
668 |
|
end. |
669 |
|
|
670 |
|
|
671 |
|
row(#{coltypes := ColTypes, |
672 |
|
field_metadata := FieldMetadata, |
673 |
|
metadata := Metadata} = Mapping) -> |
674 |
181 |
Unsignedness = maps:get(unsignedness, Metadata, #{}), |
675 |
181 |
fun |
676 |
|
(Input) -> |
677 |
252 |
?LOG_DEBUG(#{mapping => Mapping, input => Input}), |
678 |
|
|
679 |
252 |
(followed_with( |
680 |
|
into_map( |
681 |
|
map_result( |
682 |
|
map_parser( |
683 |
|
into_bits( |
684 |
|
bitmap(length(ColTypes)), |
685 |
|
length(ColTypes)), |
686 |
|
many1(into_boolean())), |
687 |
|
fun |
688 |
|
(Nulls) -> |
689 |
223 |
lists:zip( |
690 |
|
lists:seq(length(ColTypes), 1, -1), |
691 |
|
Nulls) |
692 |
|
end)), |
693 |
|
|
694 |
|
fun |
695 |
|
(Nulls) -> |
696 |
223 |
?LOG_DEBUG(#{col_types => ColTypes, nulls => Nulls}), |
697 |
|
|
698 |
223 |
into_tuple( |
699 |
|
sequence( |
700 |
|
lists:map( |
701 |
|
fun |
702 |
|
({ColNo, ColType}) -> |
703 |
1146 |
?LOG_DEBUG(#{col_no => ColNo, col_type => ColType}), |
704 |
|
|
705 |
1146 |
case maps:get(ColNo, Nulls, false) of |
706 |
|
true -> |
707 |
133 |
success(null); |
708 |
|
|
709 |
|
false -> |
710 |
1013 |
msmp_binlog_field:decode( |
711 |
|
ColType, |
712 |
|
maps:get(ColNo, Unsignedness, false), |
713 |
|
maps:get(ColNo, FieldMetadata, undefined)) |
714 |
|
end |
715 |
|
end, |
716 |
|
lists:zip( |
717 |
|
lists:seq(1, length(ColTypes)), |
718 |
|
ColTypes)))) |
719 |
|
end))(Input) |
720 |
|
end. |
721 |
|
|
722 |
|
|
723 |
|
bitmap(N) -> |
724 |
342 |
msmp_integer_fixed:decode(bitmap_bytes(N)). |
725 |
|
|
726 |
|
|
727 |
|
bitmap_bytes(N) -> |
728 |
347 |
msmp_binary:null_bitmap_bytes(N, 0). |
729 |
|
|
730 |
|
|
731 |
|
uncompress() -> |
732 |
68 |
fun |
733 |
|
(Input) -> |
734 |
5 |
maybe |
735 |
5 |
{Compressed, Length} ?= (msmp_integer_fixed:decode( |
736 |
|
scran_combinator:map_result( |
737 |
|
msmp_integer_fixed:decode(1), |
738 |
|
fun |
739 |
|
(Result) -> |
740 |
5 |
Result band 16#07 |
741 |
|
end)))(Input), |
742 |
|
|
743 |
5 |
case zlib:uncompress(Compressed) of |
744 |
|
Uncompressed when byte_size(Uncompressed) == Length -> |
745 |
5 |
{<<>>, Uncompressed}; |
746 |
|
|
747 |
|
_ -> |
748 |
:-( |
nomatch |
749 |
|
end |
750 |
|
end |
751 |
|
end. |
752 |
|
|
753 |
|
|
754 |
|
gtid_log_timestamp() -> |
755 |
1 |
fun |
756 |
|
(<<_:55/bits, 1:1, _/bytes>> = Encoded) -> |
757 |
:-( |
?LOG_DEBUG(#{encoded => Encoded}), |
758 |
:-( |
(scran_sequence:sequence( |
759 |
|
[scran_result:kv( |
760 |
|
immediate_commit_timestamp, |
761 |
|
msmp_integer_fixed:decode(7)), |
762 |
|
|
763 |
|
scran_result:kv( |
764 |
|
original_timestamp, |
765 |
|
msmp_integer_fixed:decode(7))]))(Encoded); |
766 |
|
|
767 |
|
(<<_:55/bits, 0:1, _/bytes>> = Encoded) -> |
768 |
1 |
?LOG_DEBUG(#{encoded => Encoded}), |
769 |
1 |
(scran_sequence:sequence( |
770 |
|
[scran_result:kv( |
771 |
|
immediate_commit_timestamp, |
772 |
|
msmp_integer_fixed:decode(7))]))(Encoded) |
773 |
|
end. |