_site/cover/msmp_binlog_field.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 %% @doc Replication protocol support for decoding of fields
17
18 -module(msmp_binlog_field).
19
20
21 -export([decode/3]).
22 -import(scran_bytes, [take/1]).
23 -import(scran_combinator, [map_result/2]).
24 -import(scran_combinator, [rest/0]).
25 -include_lib("kernel/include/logger.hrl").
26
27
28 decode(long, Unsigned, _Metadata) ->
29 9 fun
30 (Input) ->
31 9 (fixed(4, Unsigned))(Input)
32 end;
33
34 decode(longlong = Type, Unsigned, Metadata) ->
35 84 fun
36 (Input) ->
37 84 ?LOG_DEBUG(#{type => Type,
38 unsigned => Unsigned,
39 metadata => Metadata,
40 84 input => Input}),
41 84 (fixed(8, Unsigned))(Input)
42 end;
43
44 decode(tiny, Unsigned, _Metadata) ->
45 17 fun
46 (Input) ->
47 15 (fixed(1, Unsigned))(Input)
48 end;
49
50 decode(short, Unsigned, _Metadata) ->
51 7 fun
52 (Input) ->
53 7 (fixed(2, Unsigned))(Input)
54 end;
55
56 decode(int24, Unsigned, _) ->
57 7 fun
58 (Input) ->
59 7 (fixed(3, Unsigned))(Input)
60 end;
61
62 decode(string, _, #{length := Length, field_type := enum}) ->
63 5 fun
64 (Input) ->
65 5 (msmp_integer_fixed:decode(Length))(Input)
66 end;
67
68 decode(string, _, #{length := Length, field_type := string}) when Length < 256 ->
69 1 fun
70 (Input) ->
71 1 (scran_bytes:length_encoded(
72 msmp_integer_fixed:decode(1)))(Input)
73 end;
74
75 decode(string, _, #{field_type := string}) ->
76
:-(
fun
77 (Input) ->
78
:-(
(scran_bytes:length_encoded(
79 msmp_integer_fixed:decode(2)))(Input)
80 end;
81
82 decode(blob = Type, Unsigned, Length) ->
83 711 fun
84 (Input) ->
85 711 ?LOG_DEBUG(#{type => Type,
86 unsigned => Unsigned,
87 length => Length,
88 711 input => Input}),
89
90 711 (scran_bytes:length_encoded(
91 msmp_integer_fixed:decode(Length)))(Input)
92 end;
93
94 decode(varchar, _, Length) when is_integer(Length), Length < 256 ->
95 143 fun
96 (Input) ->
97 145 (scran_bytes:length_encoded(
98 msmp_integer_fixed:decode(1)))(Input)
99 end;
100
101 decode(varchar, _, Length) when is_integer(Length) ->
102 1 fun
103 (Input) ->
104 3 (scran_bytes:length_encoded(
105 msmp_integer_fixed:decode(2)))(Input)
106 end;
107
108 decode(date, Unsigned, _Metadata) ->
109 3 fun
110 (Input) ->
111 3 (map_result(
112 fixed(3, Unsigned),
113 fun
114 (Encoded) ->
115 3 <<Year:15, Month:4, Date:5>> = <<Encoded:24>>,
116 3 {Year, Month, Date}
117 end))(Input)
118 end;
119
120 decode(time, Unsigned, _Metadata) ->
121
:-(
fun
122 (Input) ->
123
:-(
(map_result(
124 fixed(3, Unsigned),
125 fun
126 (Encoded) ->
127
:-(
{Encoded div 10_000,
128 (Encoded div 100) rem 100,
129 Encoded rem 100}
130 end))(Input)
131 end;
132
133 decode(time2, _, 0) ->
134 2 fun
135 (Input) ->
136 3 (map_result(
137 take(3),
138 fun
139 (<<Encoded:24>>) ->
140 3 case Encoded - 16#80_00_00 of
141 Negative when Negative < 0 ->
142 1 <<_:2, H:10, M:6, S:6>> = <<(abs(Negative)):24>>,
143 1 {-H, M, S};
144
145 Positive ->
146 2 <<_:2, H:10, M:6, S:6>> = <<Positive:24>>,
147 2 {H, M, S}
148 end
149 end))(Input)
150 end;
151
152 decode(time2, _, Metadata) when Metadata == 5;
153 Metadata == 6 ->
154 2 fun
155 (Input) ->
156 4 (map_result(
157 take(6),
158 fun
159 (<<Encoded:48>>) ->
160 4 case Encoded - 16#80_00_00_00_00_00 of
161 Negative when Negative < 0 ->
162 2 <<_:2, H:10, M:6, S:6, Frac:24>> = <<(abs(Negative)):48>>,
163 2 {-H, M, S + (Frac / 1_000_000)};
164
165 Positive ->
166 2 <<_:2, H:10, M:6, S:6, Frac:24>> = <<Positive:48>>,
167 2 {H, M, S + (Frac / 1_000_000)}
168 end
169 end))(Input)
170 end;
171
172 decode(time2, _, Metadata) ->
173 4 FracBytes = (Metadata + 1) div 2,
174 4 {Delta, Divisor} = case Metadata of
175 Metadata when Metadata =< 2 ->
176 2 {16#1_00, 100};
177
178 Metadata when Metadata =< 4 ->
179 2 {16#1_00_00, 10_000}
180 end,
181 4 fun
182 (Input) ->
183 8 (map_result(
184 take(3 + FracBytes),
185 fun
186 (<<IntPart:24, FracPart:FracBytes/unit:8>>) ->
187 8 case IntPart - 16#80_00_00 of
188 Negative when Negative < 0 ->
189 4 <<_:2, H:10, M:6, S:6>> = <<(abs(Negative)):24>>,
190 4 {-H, M, S - 1 - ((FracPart - Delta) / Divisor)};
191
192 Positive ->
193 4 <<_:2, H:10, M:6, S:6>> = <<Positive:24>>,
194 4 {H, M, S + (FracPart / Divisor)}
195 end
196 end))(Input)
197 end;
198
199 decode(timestamp2, _, 0) ->
200 5 fun
201 (Input) ->
202 6 (map_result(
203 take(4),
204 fun
205 (<<Second:32>>) ->
206 6 erlang:convert_time_unit(
207 Second,
208 second,
209 microsecond)
210 end))(Input)
211 end;
212
213 decode(timestamp2, _, Decimals) when Decimals =< 2 ->
214 2 fun
215 (Input) ->
216 4 (map_result(
217 take(5),
218 fun
219 (<<Second:32, Micro:8>>) ->
220 erlang:convert_time_unit(
221 Second,
222 second,
223 microsecond)
224 4 + (Micro * 10_000)
225 end))(Input)
226 end;
227
228 decode(timestamp2, _, Decimals) when Decimals =< 4 ->
229 2 fun
230 (Input) ->
231 4 (map_result(
232 take(6),
233 fun
234 (<<Second:32, Micro:16>>) ->
235 erlang:convert_time_unit(
236 Second,
237 second,
238 microsecond)
239 4 + (Micro * 100)
240 end))(Input)
241 end;
242
243 decode(timestamp2, _, Decimals) when Decimals =< 6 ->
244 2 fun
245 (Input) ->
246 4 (map_result(
247 take(7),
248 fun
249 (<<Second:32, Micro:24>>) ->
250 erlang:convert_time_unit(
251 Second,
252 second,
253 microsecond)
254 4 + Micro
255 end))(Input)
256 end;
257
258 decode(year, _, _) ->
259 1 fun
260 (Input) ->
261 3 (map_result(
262 take(1),
263 fun
264 (<<0:8>>) ->
265 1 0;
266
267 (<<Year:8>>) ->
268 2 Year + 1_900
269 end))(Input)
270 end;
271
272 decode(datetime = Type, Unsigned, Metadata = 0) ->
273
:-(
fun
274 (Input) ->
275
:-(
?LOG_DEBUG(#{type => Type,
276 unsigned => Unsigned,
277 metadata => Metadata,
278
:-(
input => Input}),
279
:-(
(map_result(
280 take(5),
281 fun
282 (<<_:1, YM:17, Day:5, Hour:5, Min:6, Sec:6>>) ->
283
:-(
{{YM div 13, YM rem 13, Day}, {Hour, Min, Sec}}
284 end))(Input)
285 end;
286
287 decode(datetime2 = Type, Unsigned, Metadata) when Metadata >= 5 ->
288 4 fun
289 (Input) ->
290 8 ?LOG_DEBUG(#{type => Type,
291 unsigned => Unsigned,
292 metadata => Metadata,
293 8 input => Input}),
294 8 (map_result(
295 take(8),
296 fun
297 (<<_:1, YM:17, Day:5, Hour:5, Min:6, Sec:6, Micro:24>>) ->
298 8 datetime_to_system_microsecond(
299 {{YM div 13, YM rem 13, Day}, {Hour, Min, Sec}},
300 Micro)
301 %% {{YM div 13, YM rem 13, Day}, {Hour, Min, Sec + (Micro / 1_000_000)}}
302 end))(Input)
303 end;
304
305 decode(datetime2 = Type, Unsigned, Metadata) when Metadata >= 3 ->
306 2 fun
307 (Input) ->
308 4 ?LOG_DEBUG(#{type => Type,
309 unsigned => Unsigned,
310 metadata => Metadata,
311 4 input => Input}),
312 4 (map_result(
313 take(7),
314 fun
315 (<<_:1, YM:17, Day:5, Hour:5, Min:6, Sec:6, Micro:16>>) ->
316 4 datetime_to_system_microsecond(
317 {{YM div 13, YM rem 13, Day}, {Hour, Min, Sec}},
318 Micro * 100)
319 %% {{YM div 13, YM rem 13, Day}, {Hour, Min, Sec + (Micro * 100 / 1_000_000)}}
320 end))(Input)
321 end;
322
323 decode(datetime2 = Type, Unsigned, Metadata) when Metadata >= 1 ->
324 2 fun
325 (Input) ->
326 4 ?LOG_DEBUG(#{type => Type, unsigned => Unsigned, metadata => Metadata, input => Input}),
327 4 (map_result(
328 take(6),
329 fun
330 (<<_:1, YM:17, Day:5, Hour:5, Min:6, Sec:6, Micro:8>>) ->
331 4 datetime_to_system_microsecond(
332 {{YM div 13, YM rem 13, Day}, {Hour, Min, Sec}},
333 erlang:convert_time_unit(
334 Micro * 10,
335 millisecond,
336 microsecond))
337
338 %% datetime_to_system_microsecond({{YM div 13, YM rem 13, Day}, {Hour, Min, Sec}})
339 %% + erlang:convert_time_unit(
340 %% Micro * 10,
341 %% millisecond,
342 %% microsecond)
343 %% + (Micro * 10_000 / 1_000_000)
344 end))(Input)
345 end;
346
347 decode(datetime2 = Type, Unsigned, 0 = Metadata) ->
348 10 fun
349 (Input) ->
350 9 ?LOG_DEBUG(#{type => Type, unsigned => Unsigned, metadata => Metadata, input => Input}),
351 9 (map_result(
352 take(5),
353 fun
354 (<<_:1, YM:17, Day:5, Hour:5, Min:6, Sec:6>>) ->
355 9 datetime_to_system_microsecond({{YM div 13, YM rem 13, Day}, {Hour, Min, Sec}})
356 end))(Input)
357 end;
358
359 decode(float, _, _) ->
360 1 fun
361 (Input) ->
362 3 (map_result(
363 scran_number_le:f32(),
364 scran_number:precision(6)))(Input)
365 end;
366
367 decode(double, _, _) ->
368 1 fun
369 (Input) ->
370 3 (map_result(
371 scran_number_le:f64(),
372 scran_number:precision(15)))(Input)
373 end;
374
375 decode(newdecimal, _, #{precision := Precision, scale := Scale}) ->
376 9 msmp_decimal:decode(Precision, Scale);
377
378 decode(bit, _, 0) ->
379
:-(
fun
380 (Input) ->
381
:-(
(fixed(1, false))(Input)
382 end;
383
384 decode(bit, _, Bits) ->
385 5 fun
386 (Input) ->
387 8 (map_result(
388 take((Bits + 7) div 8),
389 fun
390 (<<_:(8 - (Bits rem 8))/bits, Value:Bits/bits>>) ->
391 8 Value
392 end))(Input)
393 end;
394
395 decode(json, _, Metadata) ->
396
:-(
msmp_jsonb:decode(Metadata);
397
398 decode(Type, Unsigned, Metadata) ->
399
:-(
fun
400 (Input) ->
401
:-(
?LOG_WARNING(#{type => Type,
402 unsigned => Unsigned,
403 metadata => Metadata,
404
:-(
input => Input}),
405
:-(
(rest())(Input)
406 end.
407
408
409 datetime_to_system_microsecond(DateTime) ->
410 9 ?FUNCTION_NAME(DateTime, 0).
411
412 datetime_to_system_microsecond(DateTime, Microsecond) ->
413 25 case erlang:convert_time_unit(
414 calendar:datetime_to_gregorian_seconds(DateTime) -
415 epoch(posix),
416 second,
417 microsecond) of
418 SystemTime when SystemTime >= 0 ->
419 13 SystemTime + Microsecond;
420
421 SystemTime ->
422 12 SystemTime - Microsecond
423 end.
424
425
426 epoch(posix) ->
427 25 calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}).
428
429
430 fixed(Bytes, true) ->
431 90 scran_number_le:u(Bytes * 8);
432 fixed(Bytes, false) ->
433 35 scran_number_le:i(Bytes * 8).
Line Hits Source