_site/cover/pgmp_mm_rep_log.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_mm_rep_log).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -export([terminate/3]).
22 -import(pgmp_codec, [marshal/2]).
23 -import(pgmp_codec, [size_inclusive/1]).
24 -import(pgmp_data_row, [decode/3]).
25 -import(pgmp_statem, [cancel_generic_timeout/1]).
26 -import(pgmp_statem, [generic_timeout/1]).
27 -import(pgmp_statem, [nei/1]).
28 -include_lib("kernel/include/logger.hrl").
29
30
31 callback_mode() ->
32 5 [handle_event_function, state_enter].
33
34
35 terminate(Reason, State, Data) ->
36 5 pgmp_mm_common:terminate(Reason, State, Data).
37
38
39 handle_event(internal,
40 {recv, {copy_both_response, CopyBothRespone}},
41 _,
42 _) ->
43 5 ?LOG_DEBUG(#{copy_both_response => CopyBothRespone}),
44 5 keep_state_and_data;
45
46 handle_event(internal, {recv, {copy_data, {Tag, _} = TM}}, _, _)
47 when Tag == keepalive; Tag == x_log_data ->
48 365 ?LOG_DEBUG(#{copy_data => TM}),
49 365 {keep_state_and_data,
50 [nei(TM),
51 generic_timeout(replication_ping),
52 cancel_generic_timeout(replication_ping_no_reply)]};
53
54 handle_event(internal,
55 {x_log_data,
56 #{clock := Clock,
57 end_wal := EndWAL,
58 start_wal := StartWAL,
59 stream := {Command, Arg}}},
60 _,
61 #{wal := WAL} = Data) ->
62 202 {keep_state,
63 Data#{wal := WAL#{received := EndWAL,
64 flushed := EndWAL,
65 clock := Clock,
66 applied := EndWAL}},
67 nei({Command,
68 Arg#{x_log => #{clock => pgmp_calendar:decode(Clock),
69 end_wal => EndWAL,
70 start_wal => StartWAL}}})};
71
72 handle_event(internal,
73 {optional_callback, F, A},
74 _,
75 #{config := #{module := M}}) ->
76 130 case pgmp_util:is_exported(M, F, 1) of
77 true ->
78 130 {keep_state_and_data, nei({callback, F, A})};
79
80 false ->
81
:-(
keep_state_and_data
82 end;
83
84 handle_event(internal,
85 {callback, F, A},
86 _,
87 #{config := #{manager := Manager,
88 module := M},
89 requests := Requests} = Data) ->
90 200 {keep_state,
91 Data#{requests => M:F(A#{server_ref => Manager,
92 label => F,
93 requests => Requests})}};
94
95 handle_event(internal, {begin_prepare = Change, Arg}, _, _) ->
96
:-(
?LOG_DEBUG(#{Change => Arg}),
97
:-(
{keep_state_and_data,
98 nei({rep_telemetry, Change, #{count => 1}})};
99
100 handle_event(internal, {prepare = Change, Arg}, _, _) ->
101
:-(
?LOG_DEBUG(#{Change => Arg}),
102
:-(
{keep_state_and_data,
103 nei({rep_telemetry, Change, #{count => 1}})};
104
105 handle_event(internal, {commit_prepared = Change, Arg}, _, _) ->
106
:-(
?LOG_DEBUG(#{Change => Arg}),
107
:-(
{keep_state_and_data,
108 nei({rep_telemetry, Change, #{count => 1}})};
109
110 handle_event(internal, {rollback_prepared = Change, Arg}, _, _) ->
111
:-(
?LOG_DEBUG(#{Change => Arg}),
112
:-(
{keep_state_and_data,
113 nei({rep_telemetry, Change, #{count => 1}})};
114
115 handle_event(internal,
116 {begin_transaction = Change, Arg},
117 _,
118 _) ->
119 65 ?LOG_DEBUG(#{Change => Arg}),
120 65 {keep_state_and_data,
121 [nei({optional_callback,
122 Change,
123 maps:with([commit_timestamp,
124 final_lsn,
125 xid,
126 x_log], Arg)}),
127
128 nei({rep_telemetry, Change, #{count => 1}})]};
129
130 handle_event(internal,
131 {insert = Change,
132 #{relation := Relation,
133 x_log := XLog,
134 tuple := Values} = Arg},
135 _,
136 #{relations := Relations,
137 config := Config,
138 parameters := Parameters}) ->
139 55 ?LOG_DEBUG(#{Change => Arg}),
140 55 #{Relation := #{columns := Columns} = Detail} = Relations,
141 55 {keep_state_and_data,
142 [nei({callback,
143 Change,
144 #{relation => relation(Detail),
145 x_log => XLog,
146 tuple => row_tuple(Config, Parameters, Columns, Values)}}),
147
148 nei({rep_telemetry,
149 Change,
150 #{count => 1},
151 #{relation => relation(Detail)}})]};
152
153 handle_event(internal,
154 {update = Change,
155 #{relation := Relation,
156 x_log := XLog,
157 new := Values} = Arg},
158 _,
159 #{relations := Relations,
160 config := Config,
161 parameters := Parameters}) ->
162 4 ?LOG_DEBUG(#{Change => Arg}),
163 4 #{Relation := #{columns := Columns} = Detail} = Relations,
164 4 {keep_state_and_data,
165 [nei({callback,
166 Change,
167 #{relation => relation(Detail),
168 x_log => XLog,
169 tuple => row_tuple(Config, Parameters, Columns, Values)}}),
170
171 nei({rep_telemetry,
172 Change,
173 #{count => 1},
174 #{relation => relation(Detail)}})]};
175
176 handle_event(internal,
177 {delete = Change,
178 #{relation := Relation, x_log := XLog, key := Values} = Arg},
179 _,
180 #{relations := Relations,
181 config := Config,
182 parameters := Parameters}) ->
183 4 ?LOG_DEBUG(#{Change => Arg}),
184 4 #{Relation := #{columns := Columns} = Detail} = Relations,
185 4 {keep_state_and_data,
186 [nei({callback,
187 Change,
188 #{relation => relation(Detail),
189 x_log => XLog,
190 tuple => row_tuple(Config, Parameters, Columns, Values)}}),
191
192 nei({rep_telemetry,
193 Change,
194 #{count => 1},
195 #{relation => relation(Detail)}})]};
196
197 handle_event(internal,
198 {truncate = Change,
199 #{x_log := XLog,
200 relations := Truncates} = Arg},
201 _,
202 #{relations := Relations}) ->
203 2 ?LOG_DEBUG(#{Change => Arg}),
204 2 Names = lists:map(
205 fun
206 (Relation) ->
207 2 #{Relation := Detail} = Relations,
208 2 relation(Detail)
209 end,
210 Truncates),
211 2 {keep_state_and_data,
212 [nei({callback, Change, #{relations => Names, x_log => XLog}}),
213
214 nei({rep_telemetry, Change, #{count => 1}, #{relations => Names}})]};
215
216 handle_event(internal, {commit = Change, Arg}, _, _) ->
217 65 ?LOG_DEBUG(#{Change => Arg}),
218 65 {keep_state_and_data,
219 [nei({optional_callback,
220 Change,
221 maps:with([commit_lsn,
222 commit_timestamp,
223 end_lsn],
224 Arg)}),
225
226 nei({rep_telemetry, Change, #{count => 1}})]};
227
228 handle_event(internal,
229 {relation, #{id := Id} = Relation},
230 _,
231 #{relations := Relations} = Data) ->
232 7 ?LOG_DEBUG(#{relation => Relation}),
233 7 {keep_state,
234 Data#{relations := Relations#{Id => maps:without([id], Relation)}}};
235
236 handle_event(internal,
237 {keepalive = Change,
238 #{clock := Clock,
239 end_wal := EndWAL,
240 reply := true}},
241 _,
242 #{wal := WAL} = Data) ->
243 2 {keep_state,
244 Data#{wal := WAL#{clock := Clock, received := EndWAL}},
245 [nei(ping),
246
247 nei({rep_telemetry, Change, #{count => 1}})]};
248
249 handle_event(internal, {keepalive, _}, _, _) ->
250 161 keep_state_and_data;
251
252 handle_event({timeout, replication_ping}, _, replication, _) ->
253
:-(
{keep_state_and_data,
254 [nei(ping), generic_timeout(replication_ping_no_reply)]};
255
256 handle_event(internal,
257 ping,
258 _,
259 #{wal := #{received := ReceivedWAL,
260 clock := Clock,
261 flushed := FlushedWAL,
262 applied := AppliedWAL}}) ->
263 2 {keep_state_and_data,
264 nei({standby_status_update,
265 #{received_wal => ReceivedWAL,
266 flushed_wal => FlushedWAL,
267 applied_wal => AppliedWAL,
268 clock => Clock,
269 reply => true}})};
270
271 handle_event(internal,
272 {response, #{label := Change, reply := ok}},
273 _,
274 _)
275 when Change == insert;
276 Change == update;
277 Change == delete;
278 Change == truncate;
279 Change == begin_transaction;
280 Change == commit ->
281 195 keep_state_and_data;
282
283 handle_event(internal,
284 {response,
285 #{label := snapshot,
286 reply := {error, Reason}}},
287 waiting_for_snapshot_completion,
288 _) ->
289
:-(
{stop, Reason};
290
291 handle_event(internal,
292 {lsn, LSN},
293 _,
294 Data) when is_integer(LSN) ->
295 5 {keep_state,
296 Data#{lsn => pgmp_lsn:encode(LSN),
297 wal => #{received => LSN,
298 clock => 0,
299 flushed => LSN,
300 applied => LSN}}};
301
302 handle_event(internal,
303 {response,
304 #{label := snapshot,
305 reply := ok}},
306 waiting_for_snapshot_completion,
307 Data) ->
308 5 {next_state,
309 replication,
310 Data,
311 [nei({lsn, 0}),
312 nei(start_replication),
313 generic_timeout(replication_ping)]};
314
315 handle_event(
316 internal,
317 {response, #{label := lsn, reply := {ok, LSN}}},
318 waiting_for_lsn,
319 Data) when is_integer(LSN) ->
320
:-(
{next_state,
321 replication,
322 Data,
323 [nei({lsn, LSN}),
324 nei(start_replication),
325 generic_timeout(replication_ping)]};
326
327 handle_event(
328 internal,
329 {response, #{label := lsn, reply := not_found}},
330 waiting_for_lsn,
331 Data) ->
332
:-(
{next_state,
333 replication,
334 Data,
335 [nei({lsn, 0}),
336 nei(start_replication),
337 generic_timeout(replication_ping)]};
338
339 handle_event(internal,
340 {response, #{label := pgmp_types, reply := ready}},
341 waiting_for_types,
342 #{types_ready := false} = Data) ->
343 1 {next_state,
344 identify_system,
345 Data#{types_ready := true},
346 [nei(manager), nei(identify_system)]};
347
348 handle_event(internal,
349 server_version,
350 _,
351 #{parameters := #{<<"server_version">> := ServerVersion}} = Data) ->
352 5 {keep_state,
353 Data#{server_version => pgmp_util:semantic_version(ServerVersion)}};
354
355 handle_event(internal,
356 bootstrap_complete,
357 _,
358 #{types_ready := false} = Data) ->
359 1 {next_state, waiting_for_types, Data, nei(server_version)};
360
361 handle_event(internal,
362 bootstrap_complete,
363 _,
364 #{types_ready := true} = Data) ->
365 4 {next_state,
366 identify_system,
367 Data,
368 [nei(server_version),
369 nei(manager),
370 nei(identify_system)]};
371
372 handle_event(internal,
373 manager,
374 _,
375 #{config := Config} = Data) ->
376 5 [_, LogicalSup | _] = get('$ancestors'),
377 5 case pgmp_sup:get_child(LogicalSup, manager) of
378 {_, Manager, worker, _} when is_pid(Manager) ->
379 5 {keep_state,
380 Data#{config := Config#{manager => Manager,
381 module => pgmp_config:replication(
382 logical, module)}}}
383 end;
384
385 handle_event(internal, identify_system, _, _) ->
386 5 {keep_state_and_data, nei({query, <<"IDENTIFY_SYSTEM">>})};
387
388 handle_event(internal,
389 {recv, {command_complete, {copy, 0}}},
390 _,
391 _) ->
392 %% Sent during DB shutdown to indicate that the replication
393 %% process is finishing.
394
:-(
stop;
395
396 handle_event(internal,
397 {recv, {command_complete, Command}},
398 _,
399 Data) when Command == identify_system;
400 Command == create_replication_slot ->
401 10 {keep_state, maps:without([columns], Data)};
402
403 handle_event(internal,
404 {recv, {command_complete, select}},
405 State,
406 #{server_version := #{major := 12}} = Data)
407 when State == identify_system;
408 State == replication_slot ->
409
:-(
{keep_state, maps:without([columns], Data)};
410
411
412 handle_event(internal,
413 {recv, {ready_for_query, idle}},
414 identify_system,
415 Data) ->
416 %% logger:set_module_level([pgmp_mm_rep_log, pgmp_codec], debug),
417 5 {next_state, replication_slot, Data, nei(create_replication_slot)};
418
419 handle_event(
420 internal,
421 {recv, {ready_for_query, idle}},
422 replication_slot,
423 #{replication_slot := #{<<"snapshot_name">> := Snapshot}} = Data) ->
424 5 {next_state,
425 waiting_for_snapshot_completion,
426 Data,
427 nei({callback, snapshot, #{id => Snapshot}})};
428
429 handle_event(
430 internal,
431 {recv, {ready_for_query, idle}},
432 replication_slot,
433 Data) ->
434
:-(
{next_state,
435 waiting_for_lsn,
436 Data,
437 nei({callback, lsn, #{}})};
438
439 handle_event(
440 internal,
441 {recv, {error_response, _} = TM},
442 replication_slot,
443 Data) ->
444
:-(
case pgmp_error_notice_fields:map(TM) of
445 {error_response, #{code := <<"42710">>}} ->
446
:-(
keep_state_and_data;
447
448 {Tag, Message} ->
449
:-(
{next_state,
450 limbo,
451 Data,
452 [nei({telemetry,
453 error,
454 #{count => 1},
455 maps:merge(
456 #{event => Tag},
457 maps:with(
458 [code, message, severity],
459 Message))}),
460 {state_timeout,
461 timer:seconds(
462 backoff:rand_increment(
463 pgmp_config:backoff(rand_increment))),
464 {backoff, #{action => Tag, reason => Message}}}]}
465 end;
466
467 handle_event(internal,
468 create_replication_slot = Command,
469 _,
470 #{config := #{publication := Publication}}) ->
471 5 {keep_state_and_data,
472 nei({Command, pgmp_rep_log:slot_name(Publication)})};
473
474 handle_event(internal,
475 {create_replication_slot, SlotName},
476 _,
477 #{server_version := ServerVersion}) ->
478 5 {keep_state_and_data,
479 nei({query,
480 lists:join(
481 " ",
482 ["CREATE_REPLICATION_SLOT",
483 ["\"", SlotName, "\""],
484 5 ["TEMPORARY" || pgmp_config:replication(logical, temporary)],
485 "LOGICAL",
486 "pgoutput",
487 create_slot_options(
488 protocol_version(ServerVersion))])})};
489
490 handle_event(internal,
491 start_replication,
492 _,
493 #{config := #{publication := PublicationNames},
494 server_version := ServerVersion}) ->
495 5 {keep_state_and_data,
496 nei({start_replication,
497 pgoutput_options(
498 protocol_version(ServerVersion),
499 PublicationNames)})};
500
501 handle_event(internal,
502 {start_replication, Options},
503 _,
504 #{lsn := LSN,
505 config := #{publication := Publication}} = Data) ->
506 5 {keep_state,
507 Data#{relations => #{}},
508 nei({query,
509 lists:join(
510 " ",
511 ["START_REPLICATION SLOT",
512 pgmp_rep_log:slot_name(Publication),
513 "LOGICAL",
514 LSN,
515 ["(",
516 maps:fold(
517 fun
518 (K, V, A) ->
519 35 [A,
520 30 [", " || A /= []],
521 atom_to_list(K),
522 " ",
523 "'",
524 any:to_list(V),
525 "'"]
526 end,
527 [],
528 Options),
529 ")"]])})};
530
531 handle_event(internal, {recv, {row_description, Columns}}, _, Data) ->
532 10 ?LOG_DEBUG(#{row_description => Columns}),
533 10 {keep_state, Data#{columns => Columns}};
534
535 handle_event(internal,
536 {recv, {data_row, Values}},
537 State,
538 #{parameters := Parameters,
539 config := Config,
540 columns := Columns} = Data) ->
541 10 ?LOG_DEBUG(#{data_row => Values}),
542 10 {keep_state,
543 maps:put(State,
544 lists:foldl(
545 fun
546 ({#{field_name := FieldName}, Value}, A) ->
547 40 A#{FieldName => Value}
548 end,
549 maps:get(State, Data, #{}),
550 lists:zip(Columns,
551 decode(
552 Parameters,
553 lists:zip(Columns, Values),
554 pgmp_types:cache(Config)))),
555 Data)};
556
557 handle_event(internal, {query, SQL}, _, _) ->
558 15 ?LOG_DEBUG(#{query => SQL}),
559 15 {keep_state_and_data,
560 nei({send, ["Q", size_inclusive([marshal(string, SQL)])]})};
561
562 handle_event(internal,
563 {standby_status_update,
564 #{received_wal := ReceivedWAL,
565 flushed_wal := FlushedWAL,
566 applied_wal := AppliedWAL,
567 clock := Clock,
568 reply := Reply}},
569 _,
570 _) ->
571 2 {keep_state_and_data,
572 nei({send,
573 ["d",
574 size_inclusive(
575 ["r",
576 <<ReceivedWAL:64,
577 FlushedWAL:64,
578 AppliedWAL:64,
579 Clock:64,
580 (b(Reply)):8>>])]})};
581
582 handle_event(internal, {rep_telemetry, EventName, Measurements}, _, _) ->
583 132 {keep_state_and_data, nei({rep_telemetry, EventName, Measurements, #{}})};
584
585 handle_event(internal,
586 {rep_telemetry, EventName, Measurements, Metadata},
587 _,
588 #{config := #{publication := Publication}} = Data) ->
589 197 {keep_state_and_data,
590 nei({telemetry,
591 [rep, EventName],
592 maps:merge(
593 Measurements,
594 maps:with([wal], Data)),
595 maps:merge(
596 Metadata#{publication => Publication},
597 maps:with([identify_system, replication_slot], Data))})};
598
599 handle_event(info,
600 {'DOWN' ,_ , process, Manager, noproc},
601 _,
602 #{manager := Manager}) ->
603
:-(
stop;
604
605 handle_event(EventType, EventContent, State, Data) ->
606 941 pgmp_mm_common:handle_event(EventType,
607 EventContent,
608 State,
609 Data).
610
611
612 row_tuple(Config, Parameters, Columns, Values) ->
613 63 list_to_tuple(
614 pgmp_data_row:decode(
615 Parameters,
616 lists:map(
617 fun
618 ({#{type := Type}, null = Value}) ->
619 4 {#{format => text, type_oid => Type}, Value};
620
621 ({#{type := Type}, #{format := Format, value := Value}}) ->
622 125 {#{format => Format, type_oid => Type}, Value}
623 end,
624 lists:zip(Columns, Values)),
625 pgmp_types:cache(Config))).
626
627
628
:-(
b(false) -> 0;
629 2 b(true) -> 1;
630
:-(
b(0) -> false;
631
:-(
b(1) -> true.
632
633
634 relation(Detail) ->
635 128 maps:with([name, namespace], Detail).
636
637 create_slot_options(ProtoVersion) when ProtoVersion >= 3 ->
638 5 ["(",
639 lists:foldl(
640 fun
641 (Option, A) ->
642 15 case pgmp_config:replication(logical, Option) of
643 false ->
644 10 A;
645
646 true ->
647
:-(
[A,
648
:-(
[", " || A /= []],
649 slot_option_name(Option)];
650
651 Value ->
652 5 [A,
653
:-(
[", " || A /= []],
654 slot_option(Option, Value)]
655 end
656 end,
657 [],
658 [two_phase, reserve_wal, snapshot]),
659 ")"];
660
661 create_slot_options(2) ->
662
:-(
case pgmp_config:replication(logical, snapshot) of
663 export ->
664
:-(
"EXPORT_SNAPSHOT";
665 no ->
666
:-(
"NOEXPORT_SNAPSHOT";
667 use ->
668
:-(
"USE_SNAPSHOT"
669 end.
670
671
672 slot_option(Name, Value) when is_boolean(Value) ->
673
:-(
lists:join(
674 " ",
675 [slot_option_name(Name), slot_option_value(Value)]);
676
677 slot_option(Name, Value) ->
678 5 lists:join(
679 " ",
680 [slot_option_name(Name), slot_option_value(Value)]).
681
682 slot_option_name(Name) ->
683 5 string:uppercase(atom_to_list(Name)).
684
685 slot_option_value(Value) when is_boolean(Value) ->
686
:-(
["'", atom_to_list(Value), "'"];
687
688 slot_option_value(Value) ->
689 5 ["'", atom_to_list(Value), "'"].
690
691
692 pgoutput_options(ProtoVersion, PublicationNames) ->
693 5 lists:foldl(
694 fun
695 (K, A) ->
696 25 A#{K => pgmp_config:pgoutput(K, ProtoVersion)}
697 end,
698 #{proto_version => ProtoVersion,
699 publication_names => PublicationNames},
700 pgoutput_options(ProtoVersion)).
701
702
703 protocol_version(#{major := Major}) when Major >= 16 ->
704 10 4;
705
706 protocol_version(#{major := Major}) when Major >= 15 ->
707
:-(
3;
708
709 protocol_version(#{major := Major}) when Major >= 14 ->
710
:-(
2;
711
712 protocol_version(#{major := _}) ->
713
:-(
1.
714
715
716 pgoutput_options(ProtocolVersion) when ProtocolVersion >= 4 ->
717 5 [binary, messages, streaming, two_phase, origin];
718
719 pgoutput_options(ProtocolVersion) when ProtocolVersion >= 3 ->
720
:-(
[binary, messages, streaming, two_phase];
721
722 pgoutput_options(ProtocolVersion) when ProtocolVersion >= 2 ->
723
:-(
[binary, messages, streaming];
724
725 pgoutput_options(_) ->
726
:-(
[].
Line Hits Source