1 |
|
%% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com> |
2 |
|
%% |
3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
%% you may not use this file except in compliance with the License. |
5 |
|
%% You may obtain a copy of the License at |
6 |
|
%% |
7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
%% |
9 |
|
%% Unless required by applicable law or agreed to in writing, software |
10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
%% See the License for the specific language governing permissions and |
13 |
|
%% limitations under the License. |
14 |
|
|
15 |
|
|
16 |
|
-module(pgmp_mm_rep_log). |
17 |
|
|
18 |
|
|
19 |
|
-export([callback_mode/0]). |
20 |
|
-export([handle_event/4]). |
21 |
|
-export([terminate/3]). |
22 |
|
-import(pgmp_codec, [marshal/2]). |
23 |
|
-import(pgmp_codec, [size_inclusive/1]). |
24 |
|
-import(pgmp_data_row, [decode/3]). |
25 |
|
-import(pgmp_statem, [cancel_generic_timeout/1]). |
26 |
|
-import(pgmp_statem, [generic_timeout/1]). |
27 |
|
-import(pgmp_statem, [nei/1]). |
28 |
|
-include_lib("kernel/include/logger.hrl"). |
29 |
|
|
30 |
|
|
31 |
|
callback_mode() -> |
32 |
5 |
[handle_event_function, state_enter]. |
33 |
|
|
34 |
|
|
35 |
|
terminate(Reason, State, Data) -> |
36 |
5 |
pgmp_mm_common:terminate(Reason, State, Data). |
37 |
|
|
38 |
|
|
39 |
|
handle_event(internal, |
40 |
|
{recv, {copy_both_response, CopyBothRespone}}, |
41 |
|
_, |
42 |
|
_) -> |
43 |
5 |
?LOG_DEBUG(#{copy_both_response => CopyBothRespone}), |
44 |
5 |
keep_state_and_data; |
45 |
|
|
46 |
|
handle_event(internal, {recv, {copy_data, {Tag, _} = TM}}, _, _) |
47 |
|
when Tag == keepalive; Tag == x_log_data -> |
48 |
365 |
?LOG_DEBUG(#{copy_data => TM}), |
49 |
365 |
{keep_state_and_data, |
50 |
|
[nei(TM), |
51 |
|
generic_timeout(replication_ping), |
52 |
|
cancel_generic_timeout(replication_ping_no_reply)]}; |
53 |
|
|
54 |
|
handle_event(internal, |
55 |
|
{x_log_data, |
56 |
|
#{clock := Clock, |
57 |
|
end_wal := EndWAL, |
58 |
|
start_wal := StartWAL, |
59 |
|
stream := {Command, Arg}}}, |
60 |
|
_, |
61 |
|
#{wal := WAL} = Data) -> |
62 |
202 |
{keep_state, |
63 |
|
Data#{wal := WAL#{received := EndWAL, |
64 |
|
flushed := EndWAL, |
65 |
|
clock := Clock, |
66 |
|
applied := EndWAL}}, |
67 |
|
nei({Command, |
68 |
|
Arg#{x_log => #{clock => pgmp_calendar:decode(Clock), |
69 |
|
end_wal => EndWAL, |
70 |
|
start_wal => StartWAL}}})}; |
71 |
|
|
72 |
|
handle_event(internal, |
73 |
|
{optional_callback, F, A}, |
74 |
|
_, |
75 |
|
#{config := #{module := M}}) -> |
76 |
130 |
case pgmp_util:is_exported(M, F, 1) of |
77 |
|
true -> |
78 |
130 |
{keep_state_and_data, nei({callback, F, A})}; |
79 |
|
|
80 |
|
false -> |
81 |
:-( |
keep_state_and_data |
82 |
|
end; |
83 |
|
|
84 |
|
handle_event(internal, |
85 |
|
{callback, F, A}, |
86 |
|
_, |
87 |
|
#{config := #{manager := Manager, |
88 |
|
module := M}, |
89 |
|
requests := Requests} = Data) -> |
90 |
200 |
{keep_state, |
91 |
|
Data#{requests => M:F(A#{server_ref => Manager, |
92 |
|
label => F, |
93 |
|
requests => Requests})}}; |
94 |
|
|
95 |
|
handle_event(internal, {begin_prepare = Change, Arg}, _, _) -> |
96 |
:-( |
?LOG_DEBUG(#{Change => Arg}), |
97 |
:-( |
{keep_state_and_data, |
98 |
|
nei({rep_telemetry, Change, #{count => 1}})}; |
99 |
|
|
100 |
|
handle_event(internal, {prepare = Change, Arg}, _, _) -> |
101 |
:-( |
?LOG_DEBUG(#{Change => Arg}), |
102 |
:-( |
{keep_state_and_data, |
103 |
|
nei({rep_telemetry, Change, #{count => 1}})}; |
104 |
|
|
105 |
|
handle_event(internal, {commit_prepared = Change, Arg}, _, _) -> |
106 |
:-( |
?LOG_DEBUG(#{Change => Arg}), |
107 |
:-( |
{keep_state_and_data, |
108 |
|
nei({rep_telemetry, Change, #{count => 1}})}; |
109 |
|
|
110 |
|
handle_event(internal, {rollback_prepared = Change, Arg}, _, _) -> |
111 |
:-( |
?LOG_DEBUG(#{Change => Arg}), |
112 |
:-( |
{keep_state_and_data, |
113 |
|
nei({rep_telemetry, Change, #{count => 1}})}; |
114 |
|
|
115 |
|
handle_event(internal, |
116 |
|
{begin_transaction = Change, Arg}, |
117 |
|
_, |
118 |
|
_) -> |
119 |
65 |
?LOG_DEBUG(#{Change => Arg}), |
120 |
65 |
{keep_state_and_data, |
121 |
|
[nei({optional_callback, |
122 |
|
Change, |
123 |
|
maps:with([commit_timestamp, |
124 |
|
final_lsn, |
125 |
|
xid, |
126 |
|
x_log], Arg)}), |
127 |
|
|
128 |
|
nei({rep_telemetry, Change, #{count => 1}})]}; |
129 |
|
|
130 |
|
handle_event(internal, |
131 |
|
{insert = Change, |
132 |
|
#{relation := Relation, |
133 |
|
x_log := XLog, |
134 |
|
tuple := Values} = Arg}, |
135 |
|
_, |
136 |
|
#{relations := Relations, |
137 |
|
config := Config, |
138 |
|
parameters := Parameters}) -> |
139 |
55 |
?LOG_DEBUG(#{Change => Arg}), |
140 |
55 |
#{Relation := #{columns := Columns} = Detail} = Relations, |
141 |
55 |
{keep_state_and_data, |
142 |
|
[nei({callback, |
143 |
|
Change, |
144 |
|
#{relation => relation(Detail), |
145 |
|
x_log => XLog, |
146 |
|
tuple => row_tuple(Config, Parameters, Columns, Values)}}), |
147 |
|
|
148 |
|
nei({rep_telemetry, |
149 |
|
Change, |
150 |
|
#{count => 1}, |
151 |
|
#{relation => relation(Detail)}})]}; |
152 |
|
|
153 |
|
handle_event(internal, |
154 |
|
{update = Change, |
155 |
|
#{relation := Relation, |
156 |
|
x_log := XLog, |
157 |
|
new := Values} = Arg}, |
158 |
|
_, |
159 |
|
#{relations := Relations, |
160 |
|
config := Config, |
161 |
|
parameters := Parameters}) -> |
162 |
4 |
?LOG_DEBUG(#{Change => Arg}), |
163 |
4 |
#{Relation := #{columns := Columns} = Detail} = Relations, |
164 |
4 |
{keep_state_and_data, |
165 |
|
[nei({callback, |
166 |
|
Change, |
167 |
|
#{relation => relation(Detail), |
168 |
|
x_log => XLog, |
169 |
|
tuple => row_tuple(Config, Parameters, Columns, Values)}}), |
170 |
|
|
171 |
|
nei({rep_telemetry, |
172 |
|
Change, |
173 |
|
#{count => 1}, |
174 |
|
#{relation => relation(Detail)}})]}; |
175 |
|
|
176 |
|
handle_event(internal, |
177 |
|
{delete = Change, |
178 |
|
#{relation := Relation, x_log := XLog, key := Values} = Arg}, |
179 |
|
_, |
180 |
|
#{relations := Relations, |
181 |
|
config := Config, |
182 |
|
parameters := Parameters}) -> |
183 |
4 |
?LOG_DEBUG(#{Change => Arg}), |
184 |
4 |
#{Relation := #{columns := Columns} = Detail} = Relations, |
185 |
4 |
{keep_state_and_data, |
186 |
|
[nei({callback, |
187 |
|
Change, |
188 |
|
#{relation => relation(Detail), |
189 |
|
x_log => XLog, |
190 |
|
tuple => row_tuple(Config, Parameters, Columns, Values)}}), |
191 |
|
|
192 |
|
nei({rep_telemetry, |
193 |
|
Change, |
194 |
|
#{count => 1}, |
195 |
|
#{relation => relation(Detail)}})]}; |
196 |
|
|
197 |
|
handle_event(internal, |
198 |
|
{truncate = Change, |
199 |
|
#{x_log := XLog, |
200 |
|
relations := Truncates} = Arg}, |
201 |
|
_, |
202 |
|
#{relations := Relations}) -> |
203 |
2 |
?LOG_DEBUG(#{Change => Arg}), |
204 |
2 |
Names = lists:map( |
205 |
|
fun |
206 |
|
(Relation) -> |
207 |
2 |
#{Relation := Detail} = Relations, |
208 |
2 |
relation(Detail) |
209 |
|
end, |
210 |
|
Truncates), |
211 |
2 |
{keep_state_and_data, |
212 |
|
[nei({callback, Change, #{relations => Names, x_log => XLog}}), |
213 |
|
|
214 |
|
nei({rep_telemetry, Change, #{count => 1}, #{relations => Names}})]}; |
215 |
|
|
216 |
|
handle_event(internal, {commit = Change, Arg}, _, _) -> |
217 |
65 |
?LOG_DEBUG(#{Change => Arg}), |
218 |
65 |
{keep_state_and_data, |
219 |
|
[nei({optional_callback, |
220 |
|
Change, |
221 |
|
maps:with([commit_lsn, |
222 |
|
commit_timestamp, |
223 |
|
end_lsn], |
224 |
|
Arg)}), |
225 |
|
|
226 |
|
nei({rep_telemetry, Change, #{count => 1}})]}; |
227 |
|
|
228 |
|
handle_event(internal, |
229 |
|
{relation, #{id := Id} = Relation}, |
230 |
|
_, |
231 |
|
#{relations := Relations} = Data) -> |
232 |
7 |
?LOG_DEBUG(#{relation => Relation}), |
233 |
7 |
{keep_state, |
234 |
|
Data#{relations := Relations#{Id => maps:without([id], Relation)}}}; |
235 |
|
|
236 |
|
handle_event(internal, |
237 |
|
{keepalive = Change, |
238 |
|
#{clock := Clock, |
239 |
|
end_wal := EndWAL, |
240 |
|
reply := true}}, |
241 |
|
_, |
242 |
|
#{wal := WAL} = Data) -> |
243 |
2 |
{keep_state, |
244 |
|
Data#{wal := WAL#{clock := Clock, received := EndWAL}}, |
245 |
|
[nei(ping), |
246 |
|
|
247 |
|
nei({rep_telemetry, Change, #{count => 1}})]}; |
248 |
|
|
249 |
|
handle_event(internal, {keepalive, _}, _, _) -> |
250 |
161 |
keep_state_and_data; |
251 |
|
|
252 |
|
handle_event({timeout, replication_ping}, _, replication, _) -> |
253 |
:-( |
{keep_state_and_data, |
254 |
|
[nei(ping), generic_timeout(replication_ping_no_reply)]}; |
255 |
|
|
256 |
|
handle_event(internal, |
257 |
|
ping, |
258 |
|
_, |
259 |
|
#{wal := #{received := ReceivedWAL, |
260 |
|
clock := Clock, |
261 |
|
flushed := FlushedWAL, |
262 |
|
applied := AppliedWAL}}) -> |
263 |
2 |
{keep_state_and_data, |
264 |
|
nei({standby_status_update, |
265 |
|
#{received_wal => ReceivedWAL, |
266 |
|
flushed_wal => FlushedWAL, |
267 |
|
applied_wal => AppliedWAL, |
268 |
|
clock => Clock, |
269 |
|
reply => true}})}; |
270 |
|
|
271 |
|
handle_event(internal, |
272 |
|
{response, #{label := Change, reply := ok}}, |
273 |
|
_, |
274 |
|
_) |
275 |
|
when Change == insert; |
276 |
|
Change == update; |
277 |
|
Change == delete; |
278 |
|
Change == truncate; |
279 |
|
Change == begin_transaction; |
280 |
|
Change == commit -> |
281 |
195 |
keep_state_and_data; |
282 |
|
|
283 |
|
handle_event(internal, |
284 |
|
{response, |
285 |
|
#{label := snapshot, |
286 |
|
reply := {error, Reason}}}, |
287 |
|
waiting_for_snapshot_completion, |
288 |
|
_) -> |
289 |
:-( |
{stop, Reason}; |
290 |
|
|
291 |
|
handle_event(internal, |
292 |
|
{lsn, LSN}, |
293 |
|
_, |
294 |
|
Data) when is_integer(LSN) -> |
295 |
5 |
{keep_state, |
296 |
|
Data#{lsn => pgmp_lsn:encode(LSN), |
297 |
|
wal => #{received => LSN, |
298 |
|
clock => 0, |
299 |
|
flushed => LSN, |
300 |
|
applied => LSN}}}; |
301 |
|
|
302 |
|
handle_event(internal, |
303 |
|
{response, |
304 |
|
#{label := snapshot, |
305 |
|
reply := ok}}, |
306 |
|
waiting_for_snapshot_completion, |
307 |
|
Data) -> |
308 |
5 |
{next_state, |
309 |
|
replication, |
310 |
|
Data, |
311 |
|
[nei({lsn, 0}), |
312 |
|
nei(start_replication), |
313 |
|
generic_timeout(replication_ping)]}; |
314 |
|
|
315 |
|
handle_event( |
316 |
|
internal, |
317 |
|
{response, #{label := lsn, reply := {ok, LSN}}}, |
318 |
|
waiting_for_lsn, |
319 |
|
Data) when is_integer(LSN) -> |
320 |
:-( |
{next_state, |
321 |
|
replication, |
322 |
|
Data, |
323 |
|
[nei({lsn, LSN}), |
324 |
|
nei(start_replication), |
325 |
|
generic_timeout(replication_ping)]}; |
326 |
|
|
327 |
|
handle_event( |
328 |
|
internal, |
329 |
|
{response, #{label := lsn, reply := not_found}}, |
330 |
|
waiting_for_lsn, |
331 |
|
Data) -> |
332 |
:-( |
{next_state, |
333 |
|
replication, |
334 |
|
Data, |
335 |
|
[nei({lsn, 0}), |
336 |
|
nei(start_replication), |
337 |
|
generic_timeout(replication_ping)]}; |
338 |
|
|
339 |
|
handle_event(internal, |
340 |
|
{response, #{label := pgmp_types, reply := ready}}, |
341 |
|
waiting_for_types, |
342 |
|
#{types_ready := false} = Data) -> |
343 |
1 |
{next_state, |
344 |
|
identify_system, |
345 |
|
Data#{types_ready := true}, |
346 |
|
[nei(manager), nei(identify_system)]}; |
347 |
|
|
348 |
|
handle_event(internal, |
349 |
|
server_version, |
350 |
|
_, |
351 |
|
#{parameters := #{<<"server_version">> := ServerVersion}} = Data) -> |
352 |
5 |
{keep_state, |
353 |
|
Data#{server_version => pgmp_util:semantic_version(ServerVersion)}}; |
354 |
|
|
355 |
|
handle_event(internal, |
356 |
|
bootstrap_complete, |
357 |
|
_, |
358 |
|
#{types_ready := false} = Data) -> |
359 |
1 |
{next_state, waiting_for_types, Data, nei(server_version)}; |
360 |
|
|
361 |
|
handle_event(internal, |
362 |
|
bootstrap_complete, |
363 |
|
_, |
364 |
|
#{types_ready := true} = Data) -> |
365 |
4 |
{next_state, |
366 |
|
identify_system, |
367 |
|
Data, |
368 |
|
[nei(server_version), |
369 |
|
nei(manager), |
370 |
|
nei(identify_system)]}; |
371 |
|
|
372 |
|
handle_event(internal, |
373 |
|
manager, |
374 |
|
_, |
375 |
|
#{config := Config} = Data) -> |
376 |
5 |
[_, LogicalSup | _] = get('$ancestors'), |
377 |
5 |
case pgmp_sup:get_child(LogicalSup, manager) of |
378 |
|
{_, Manager, worker, _} when is_pid(Manager) -> |
379 |
5 |
{keep_state, |
380 |
|
Data#{config := Config#{manager => Manager, |
381 |
|
module => pgmp_config:replication( |
382 |
|
logical, module)}}} |
383 |
|
end; |
384 |
|
|
385 |
|
handle_event(internal, identify_system, _, _) -> |
386 |
5 |
{keep_state_and_data, nei({query, <<"IDENTIFY_SYSTEM">>})}; |
387 |
|
|
388 |
|
handle_event(internal, |
389 |
|
{recv, {command_complete, {copy, 0}}}, |
390 |
|
_, |
391 |
|
_) -> |
392 |
|
%% Sent during DB shutdown to indicate that the replication |
393 |
|
%% process is finishing. |
394 |
:-( |
stop; |
395 |
|
|
396 |
|
handle_event(internal, |
397 |
|
{recv, {command_complete, Command}}, |
398 |
|
_, |
399 |
|
Data) when Command == identify_system; |
400 |
|
Command == create_replication_slot -> |
401 |
10 |
{keep_state, maps:without([columns], Data)}; |
402 |
|
|
403 |
|
handle_event(internal, |
404 |
|
{recv, {command_complete, select}}, |
405 |
|
State, |
406 |
|
#{server_version := #{major := 12}} = Data) |
407 |
|
when State == identify_system; |
408 |
|
State == replication_slot -> |
409 |
:-( |
{keep_state, maps:without([columns], Data)}; |
410 |
|
|
411 |
|
|
412 |
|
handle_event(internal, |
413 |
|
{recv, {ready_for_query, idle}}, |
414 |
|
identify_system, |
415 |
|
Data) -> |
416 |
|
%% logger:set_module_level([pgmp_mm_rep_log, pgmp_codec], debug), |
417 |
5 |
{next_state, replication_slot, Data, nei(create_replication_slot)}; |
418 |
|
|
419 |
|
handle_event( |
420 |
|
internal, |
421 |
|
{recv, {ready_for_query, idle}}, |
422 |
|
replication_slot, |
423 |
|
#{replication_slot := #{<<"snapshot_name">> := Snapshot}} = Data) -> |
424 |
5 |
{next_state, |
425 |
|
waiting_for_snapshot_completion, |
426 |
|
Data, |
427 |
|
nei({callback, snapshot, #{id => Snapshot}})}; |
428 |
|
|
429 |
|
handle_event( |
430 |
|
internal, |
431 |
|
{recv, {ready_for_query, idle}}, |
432 |
|
replication_slot, |
433 |
|
Data) -> |
434 |
:-( |
{next_state, |
435 |
|
waiting_for_lsn, |
436 |
|
Data, |
437 |
|
nei({callback, lsn, #{}})}; |
438 |
|
|
439 |
|
handle_event( |
440 |
|
internal, |
441 |
|
{recv, {error_response, _} = TM}, |
442 |
|
replication_slot, |
443 |
|
Data) -> |
444 |
:-( |
case pgmp_error_notice_fields:map(TM) of |
445 |
|
{error_response, #{code := <<"42710">>}} -> |
446 |
:-( |
keep_state_and_data; |
447 |
|
|
448 |
|
{Tag, Message} -> |
449 |
:-( |
{next_state, |
450 |
|
limbo, |
451 |
|
Data, |
452 |
|
[nei({telemetry, |
453 |
|
error, |
454 |
|
#{count => 1}, |
455 |
|
maps:merge( |
456 |
|
#{event => Tag}, |
457 |
|
maps:with( |
458 |
|
[code, message, severity], |
459 |
|
Message))}), |
460 |
|
{state_timeout, |
461 |
|
timer:seconds( |
462 |
|
backoff:rand_increment( |
463 |
|
pgmp_config:backoff(rand_increment))), |
464 |
|
{backoff, #{action => Tag, reason => Message}}}]} |
465 |
|
end; |
466 |
|
|
467 |
|
handle_event(internal, |
468 |
|
create_replication_slot = Command, |
469 |
|
_, |
470 |
|
#{config := #{publication := Publication}}) -> |
471 |
5 |
{keep_state_and_data, |
472 |
|
nei({Command, pgmp_rep_log:slot_name(Publication)})}; |
473 |
|
|
474 |
|
handle_event(internal, |
475 |
|
{create_replication_slot, SlotName}, |
476 |
|
_, |
477 |
|
#{server_version := ServerVersion}) -> |
478 |
5 |
{keep_state_and_data, |
479 |
|
nei({query, |
480 |
|
lists:join( |
481 |
|
" ", |
482 |
|
["CREATE_REPLICATION_SLOT", |
483 |
|
["\"", SlotName, "\""], |
484 |
5 |
["TEMPORARY" || pgmp_config:replication(logical, temporary)], |
485 |
|
"LOGICAL", |
486 |
|
"pgoutput", |
487 |
|
create_slot_options( |
488 |
|
protocol_version(ServerVersion))])})}; |
489 |
|
|
490 |
|
handle_event(internal, |
491 |
|
start_replication, |
492 |
|
_, |
493 |
|
#{config := #{publication := PublicationNames}, |
494 |
|
server_version := ServerVersion}) -> |
495 |
5 |
{keep_state_and_data, |
496 |
|
nei({start_replication, |
497 |
|
pgoutput_options( |
498 |
|
protocol_version(ServerVersion), |
499 |
|
PublicationNames)})}; |
500 |
|
|
501 |
|
handle_event(internal, |
502 |
|
{start_replication, Options}, |
503 |
|
_, |
504 |
|
#{lsn := LSN, |
505 |
|
config := #{publication := Publication}} = Data) -> |
506 |
5 |
{keep_state, |
507 |
|
Data#{relations => #{}}, |
508 |
|
nei({query, |
509 |
|
lists:join( |
510 |
|
" ", |
511 |
|
["START_REPLICATION SLOT", |
512 |
|
pgmp_rep_log:slot_name(Publication), |
513 |
|
"LOGICAL", |
514 |
|
LSN, |
515 |
|
["(", |
516 |
|
maps:fold( |
517 |
|
fun |
518 |
|
(K, V, A) -> |
519 |
35 |
[A, |
520 |
30 |
[", " || A /= []], |
521 |
|
atom_to_list(K), |
522 |
|
" ", |
523 |
|
"'", |
524 |
|
any:to_list(V), |
525 |
|
"'"] |
526 |
|
end, |
527 |
|
[], |
528 |
|
Options), |
529 |
|
")"]])})}; |
530 |
|
|
531 |
|
handle_event(internal, {recv, {row_description, Columns}}, _, Data) -> |
532 |
10 |
?LOG_DEBUG(#{row_description => Columns}), |
533 |
10 |
{keep_state, Data#{columns => Columns}}; |
534 |
|
|
535 |
|
handle_event(internal, |
536 |
|
{recv, {data_row, Values}}, |
537 |
|
State, |
538 |
|
#{parameters := Parameters, |
539 |
|
config := Config, |
540 |
|
columns := Columns} = Data) -> |
541 |
10 |
?LOG_DEBUG(#{data_row => Values}), |
542 |
10 |
{keep_state, |
543 |
|
maps:put(State, |
544 |
|
lists:foldl( |
545 |
|
fun |
546 |
|
({#{field_name := FieldName}, Value}, A) -> |
547 |
40 |
A#{FieldName => Value} |
548 |
|
end, |
549 |
|
maps:get(State, Data, #{}), |
550 |
|
lists:zip(Columns, |
551 |
|
decode( |
552 |
|
Parameters, |
553 |
|
lists:zip(Columns, Values), |
554 |
|
pgmp_types:cache(Config)))), |
555 |
|
Data)}; |
556 |
|
|
557 |
|
handle_event(internal, {query, SQL}, _, _) -> |
558 |
15 |
?LOG_DEBUG(#{query => SQL}), |
559 |
15 |
{keep_state_and_data, |
560 |
|
nei({send, ["Q", size_inclusive([marshal(string, SQL)])]})}; |
561 |
|
|
562 |
|
handle_event(internal, |
563 |
|
{standby_status_update, |
564 |
|
#{received_wal := ReceivedWAL, |
565 |
|
flushed_wal := FlushedWAL, |
566 |
|
applied_wal := AppliedWAL, |
567 |
|
clock := Clock, |
568 |
|
reply := Reply}}, |
569 |
|
_, |
570 |
|
_) -> |
571 |
2 |
{keep_state_and_data, |
572 |
|
nei({send, |
573 |
|
["d", |
574 |
|
size_inclusive( |
575 |
|
["r", |
576 |
|
<<ReceivedWAL:64, |
577 |
|
FlushedWAL:64, |
578 |
|
AppliedWAL:64, |
579 |
|
Clock:64, |
580 |
|
(b(Reply)):8>>])]})}; |
581 |
|
|
582 |
|
handle_event(internal, {rep_telemetry, EventName, Measurements}, _, _) -> |
583 |
132 |
{keep_state_and_data, nei({rep_telemetry, EventName, Measurements, #{}})}; |
584 |
|
|
585 |
|
handle_event(internal, |
586 |
|
{rep_telemetry, EventName, Measurements, Metadata}, |
587 |
|
_, |
588 |
|
#{config := #{publication := Publication}} = Data) -> |
589 |
197 |
{keep_state_and_data, |
590 |
|
nei({telemetry, |
591 |
|
[rep, EventName], |
592 |
|
maps:merge( |
593 |
|
Measurements, |
594 |
|
maps:with([wal], Data)), |
595 |
|
maps:merge( |
596 |
|
Metadata#{publication => Publication}, |
597 |
|
maps:with([identify_system, replication_slot], Data))})}; |
598 |
|
|
599 |
|
handle_event(info, |
600 |
|
{'DOWN' ,_ , process, Manager, noproc}, |
601 |
|
_, |
602 |
|
#{manager := Manager}) -> |
603 |
:-( |
stop; |
604 |
|
|
605 |
|
handle_event(EventType, EventContent, State, Data) -> |
606 |
941 |
pgmp_mm_common:handle_event(EventType, |
607 |
|
EventContent, |
608 |
|
State, |
609 |
|
Data). |
610 |
|
|
611 |
|
|
612 |
|
row_tuple(Config, Parameters, Columns, Values) -> |
613 |
63 |
list_to_tuple( |
614 |
|
pgmp_data_row:decode( |
615 |
|
Parameters, |
616 |
|
lists:map( |
617 |
|
fun |
618 |
|
({#{type := Type}, null = Value}) -> |
619 |
4 |
{#{format => text, type_oid => Type}, Value}; |
620 |
|
|
621 |
|
({#{type := Type}, #{format := Format, value := Value}}) -> |
622 |
125 |
{#{format => Format, type_oid => Type}, Value} |
623 |
|
end, |
624 |
|
lists:zip(Columns, Values)), |
625 |
|
pgmp_types:cache(Config))). |
626 |
|
|
627 |
|
|
628 |
:-( |
b(false) -> 0; |
629 |
2 |
b(true) -> 1; |
630 |
:-( |
b(0) -> false; |
631 |
:-( |
b(1) -> true. |
632 |
|
|
633 |
|
|
634 |
|
relation(Detail) -> |
635 |
128 |
maps:with([name, namespace], Detail). |
636 |
|
|
637 |
|
create_slot_options(ProtoVersion) when ProtoVersion >= 3 -> |
638 |
5 |
["(", |
639 |
|
lists:foldl( |
640 |
|
fun |
641 |
|
(Option, A) -> |
642 |
15 |
case pgmp_config:replication(logical, Option) of |
643 |
|
false -> |
644 |
10 |
A; |
645 |
|
|
646 |
|
true -> |
647 |
:-( |
[A, |
648 |
:-( |
[", " || A /= []], |
649 |
|
slot_option_name(Option)]; |
650 |
|
|
651 |
|
Value -> |
652 |
5 |
[A, |
653 |
:-( |
[", " || A /= []], |
654 |
|
slot_option(Option, Value)] |
655 |
|
end |
656 |
|
end, |
657 |
|
[], |
658 |
|
[two_phase, reserve_wal, snapshot]), |
659 |
|
")"]; |
660 |
|
|
661 |
|
create_slot_options(2) -> |
662 |
:-( |
case pgmp_config:replication(logical, snapshot) of |
663 |
|
export -> |
664 |
:-( |
"EXPORT_SNAPSHOT"; |
665 |
|
no -> |
666 |
:-( |
"NOEXPORT_SNAPSHOT"; |
667 |
|
use -> |
668 |
:-( |
"USE_SNAPSHOT" |
669 |
|
end. |
670 |
|
|
671 |
|
|
672 |
|
slot_option(Name, Value) when is_boolean(Value) -> |
673 |
:-( |
lists:join( |
674 |
|
" ", |
675 |
|
[slot_option_name(Name), slot_option_value(Value)]); |
676 |
|
|
677 |
|
slot_option(Name, Value) -> |
678 |
5 |
lists:join( |
679 |
|
" ", |
680 |
|
[slot_option_name(Name), slot_option_value(Value)]). |
681 |
|
|
682 |
|
slot_option_name(Name) -> |
683 |
5 |
string:uppercase(atom_to_list(Name)). |
684 |
|
|
685 |
|
slot_option_value(Value) when is_boolean(Value) -> |
686 |
:-( |
["'", atom_to_list(Value), "'"]; |
687 |
|
|
688 |
|
slot_option_value(Value) -> |
689 |
5 |
["'", atom_to_list(Value), "'"]. |
690 |
|
|
691 |
|
|
692 |
|
pgoutput_options(ProtoVersion, PublicationNames) -> |
693 |
5 |
lists:foldl( |
694 |
|
fun |
695 |
|
(K, A) -> |
696 |
25 |
A#{K => pgmp_config:pgoutput(K, ProtoVersion)} |
697 |
|
end, |
698 |
|
#{proto_version => ProtoVersion, |
699 |
|
publication_names => PublicationNames}, |
700 |
|
pgoutput_options(ProtoVersion)). |
701 |
|
|
702 |
|
|
703 |
|
protocol_version(#{major := Major}) when Major >= 16 -> |
704 |
10 |
4; |
705 |
|
|
706 |
|
protocol_version(#{major := Major}) when Major >= 15 -> |
707 |
:-( |
3; |
708 |
|
|
709 |
|
protocol_version(#{major := Major}) when Major >= 14 -> |
710 |
:-( |
2; |
711 |
|
|
712 |
|
protocol_version(#{major := _}) -> |
713 |
:-( |
1. |
714 |
|
|
715 |
|
|
716 |
|
pgoutput_options(ProtocolVersion) when ProtocolVersion >= 4 -> |
717 |
5 |
[binary, messages, streaming, two_phase, origin]; |
718 |
|
|
719 |
|
pgoutput_options(ProtocolVersion) when ProtocolVersion >= 3 -> |
720 |
:-( |
[binary, messages, streaming, two_phase]; |
721 |
|
|
722 |
|
pgoutput_options(ProtocolVersion) when ProtocolVersion >= 2 -> |
723 |
:-( |
[binary, messages, streaming]; |
724 |
|
|
725 |
|
pgoutput_options(_) -> |
726 |
:-( |
[]. |