_site/cover/pgec_replica_snapshot.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgec_replica_snapshot).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -import(pgec_replica_common, [metadata/4]).
22 -import(pgec_statem, [nei/1]).
23 -include_lib("kernel/include/logger.hrl").
24
25
26 callback_mode() ->
27 8 handle_event_function.
28
29
30 handle_event({call, _} = EventType,
31 EventContent,
32 State,
33 Data) ->
34
:-(
?LOG_DEBUG(#{event_type => EventType,
35 event_content => EventContent,
36 state => State,
37
:-(
data => Data}),
38
:-(
{keep_state_and_data, postpone};
39
40 handle_event(internal = EventType,
41 {response, #{reply := [{command_complete, 'begin'}]}} = EventContent,
42 query = State,
43 Data) ->
44 8 ?LOG_DEBUG(#{event_type => EventType,
45 event_content => EventContent,
46 state => State,
47 8 data => Data}),
48 8 {next_state, unready, Data};
49
50 handle_event(internal = EventType,
51 {response, #{reply := [{command_complete, commit}]}} = EventContent,
52 State,
53 #{config := #{publication := Publication},
54 stream := Stream} = Data) ->
55 8 ?LOG_DEBUG(#{event_type => EventType,
56 event_content => EventContent,
57 state => State,
58 8 data => Data}),
59 8 {next_state,
60 ready,
61 maps:without([stream], Data),
62 [pop_callback_module,
63
64 %% this reply will start streaming replication with pgmp.
65 {reply, Stream, ok},
66
67 nei({notify,
68 #{action => progress,
69 status => completed,
70 activity => ?MODULE}}),
71
72 %% inform storage that the publication is now ready, enabling
73 %% read requests.
74 nei({storage_request,
75 #{action => ready,
76 publication => Publication}})]};
77
78 handle_event(internal = EventType,
79 {response,
80 #{label := sync_publication_tables = Label,
81 reply := [{parse_complete, []}]}} = EventContent,
82 parse = State,
83 #{config := #{publication := Publication}} = Data) ->
84 8 ?LOG_DEBUG(#{event_type => EventType,
85 event_content => EventContent,
86 state => State,
87 8 data => Data}),
88 8 {next_state,
89 unready,
90 Data,
91 nei({bind, #{label => Label, args => [Publication]}})};
92
93 handle_event(internal = EventType,
94 {response,
95 #{label := sync_publication_tables = Label,
96 reply := [{bind_complete, []}]}} = EventContent,
97 bind = State,
98 Data) ->
99 8 ?LOG_DEBUG(#{event_type => EventType,
100 event_content => EventContent,
101 state => State,
102 8 data => Data}),
103 8 {next_state,
104 unready,
105 Data,
106 nei({execute, #{label => Label}})};
107
108 handle_event(internal = EventType,
109 {response, #{label := sync_publication_tables,
110 reply := [{command_complete, {select, 0}}]}} = EventContent,
111 execute = State,
112 Data) ->
113
:-(
?LOG_DEBUG(#{event_type => EventType,
114 event_content => EventContent,
115 state => State,
116
:-(
data => Data}),
117 %% There are no publication tables to sync, initiate streaming
118 %% replication.
119 %%
120
:-(
{next_state, unready, Data, nei(commit)};
121
122 handle_event(internal = EventType,
123 {response, #{label := sync_publication_tables,
124 reply := [{row_description, Columns} | T]}} = EventContent,
125 execute = State,
126 Data) ->
127 8 ?LOG_DEBUG(#{event_type => EventType,
128 event_content => EventContent,
129 state => State,
130 8 data => Data}),
131 8 {command_complete, {select, _}} = lists:last(T),
132 8 {next_state,
133 unready,
134 Data,
135 nei({fetch,
136 lists:map(
137 fun
138 ({data_row, Values}) ->
139 8 maps:from_list(lists:zip(Columns, Values))
140 end,
141 lists:droplast(T))})};
142
143 handle_event(internal = EventType,
144 {fetch, []} = EventContent,
145 unready = State,
146 Data) ->
147 8 ?LOG_DEBUG(#{event_type => EventType,
148 event_content => EventContent,
149 state => State,
150 8 data => Data}),
151 8 {keep_state_and_data, nei(commit)};
152
153 handle_event(internal = EventType,
154 {fetch, _} = Label = EventContent,
155 unready = State,
156 Data) ->
157 8 ?LOG_DEBUG(#{event_type => EventType,
158 event_content => EventContent,
159 state => State,
160 8 data => Data}),
161 8 {keep_state_and_data,
162 nei({parse,
163 #{label => Label,
164 sql => <<"select i.indkey from pg_catalog.pg_index i"
165 ", pg_catalog.pg_namespace n"
166 ", pg_catalog.pg_class c"
167 " where "
168 "i.indisprimary"
169 " and "
170 "i.indrelid = c.oid"
171 " and "
172 "c.relnamespace = n.oid"
173 " and "
174 "n.nspname = $1"
175 " and "
176 "c.relname = $2">>}})};
177
178 handle_event(internal = EventType,
179 {response,
180 #{label := {fetch, [#{<<"schemaname">> := Schema, <<"tablename">> := Table} | _]} = Label,
181 reply := [{parse_complete, []}]}} = EventContent,
182 parse = State,
183 Data) ->
184 8 ?LOG_DEBUG(#{event_type => EventType,
185 event_content => EventContent,
186 state => State,
187 8 data => Data}),
188 8 {next_state,
189 unready,
190 Data,
191 nei({bind,
192 #{label => Label,
193 args => [Schema, Table]}})};
194
195 handle_event(internal = EventType,
196 {response,
197 #{label := {fetch, _} = Label,
198 reply := [{bind_complete, []}]}} = EventContent,
199 bind = State,
200 Data) ->
201 8 ?LOG_DEBUG(#{event_type => EventType,
202 event_content => EventContent,
203 state => State,
204 8 data => Data}),
205 8 {next_state,
206 unready,
207 Data,
208 nei({execute, #{label => Label}})};
209
210 handle_event(
211 internal = EventType,
212 {response,
213 #{label := {fetch, [#{<<"tablename">> := _} = Publication | T]},
214 reply := [{command_complete, {select, 0}}]}} = EventContent,
215 execute = State,
216 Data) ->
217
:-(
?LOG_DEBUG(#{event_type => EventType,
218 event_content => EventContent,
219 state => State,
220
:-(
data => Data}),
221
:-(
?LOG_WARNING(
222 #{publication => Publication,
223
:-(
reason => "no primary key found"}),
224
:-(
{next_state, unready, Data, nei({fetch, T})};
225
226 handle_event(
227 internal = EventType,
228 {response,
229 #{label := {fetch,
230 [#{<<"schemaname">> := Namespace,
231 <<"tablename">> := Name} = Info | T]},
232 reply := [{row_description, [<<"indkey">>]},
233 {data_row, [Key]},
234 {command_complete, {select, 1}}]}} = EventContent,
235 execute = State,
236 #{metadata := Metadata} = Data) ->
237 8 ?LOG_DEBUG(#{event_type => EventType,
238 event_content => EventContent,
239 state => State,
240 8 data => Data}),
241 8 {next_state,
242 unready,
243 Data#{metadata := metadata(
244 {Namespace, Name},
245 keys,
246 Key,
247 Metadata)},
248 [nei({parse,
249 #{label => {table, #{namespace => Namespace, name => Name}},
250 sql => pub_fetch_sql(Info)}}),
251 nei({fetch, T})]};
252
253 handle_event(internal = EventType,
254 {response,
255 #{label := {table, _} = Label,
256 reply := [{parse_complete, []}]}} = EventContent,
257 parse = State,
258 Data) ->
259 8 ?LOG_DEBUG(#{event_type => EventType,
260 event_content => EventContent,
261 state => State,
262 8 data => Data}),
263 8 {next_state,
264 unready,
265 Data,
266 nei({describe, #{type => $S, label => Label}})};
267
268 handle_event(internal = EventType,
269 {response,
270 #{label := {table, _} = Label,
271 reply := [{bind_complete, []}]}} = EventContent,
272 bind = State,
273 Data) ->
274 8 ?LOG_DEBUG(#{event_type => EventType,
275 event_content => EventContent,
276 state => State,
277 8 data => Data}),
278 8 {next_state,
279 unready,
280 Data,
281 nei({execute,
282 #{label => Label,
283 max_rows => pgmp_config:replication(
284 logical,
285 max_rows)}})};
286
287 handle_event(internal = EventType,
288 {response,
289 #{label := {table,
290 #{namespace := Namespace, name := Name}} = Label,
291 reply := [{parameter_description,[]},
292 {row_description, Columns}]}} = EventContent,
293 describe = State,
294 #{metadata := Metadata, config := Config} = Data) ->
295 8 ?LOG_DEBUG(#{event_type => EventType,
296 event_content => EventContent,
297 state => State,
298 8 data => Data}),
299 8 {next_state,
300 unready,
301 Data#{metadata := metadata(
302 {Namespace, Name},
303 columns,
304 23 [FieldName || #{field_name := FieldName} <- Columns],
305 metadata(
306 {Namespace, Name},
307 oids,
308 23 [OID || #{type_oid := OID} <- Columns],
309 Metadata))},
310 nei({bind, #{label => Label, result => column_format(Config, Columns)}})};
311
312 handle_event(internal = EventType,
313 {response, #{label := {table, #{namespace := Namespace, name := Name}},
314 reply := [{command_complete, {select, 0}}]}} = EventContent,
315 execute = State,
316 #{metadata := Metadata,
317 config := #{publication := Publication}} = Data)
318 when is_map_key({Namespace, Name}, Metadata) ->
319 3 ?LOG_DEBUG(#{event_type => EventType,
320 event_content => EventContent,
321 state => State,
322 3 data => Data}),
323 3 #{{Namespace, Name} := Mapping} = Metadata,
324 3 {next_state,
325 unready,
326 Data,
327 nei({storage_request,
328 maps:merge(
329 #{action => table_map,
330 publication => Publication,
331 schema => Namespace,
332 table => Name},
333 Mapping)})};
334
335 handle_event(internal = EventType,
336 {response,
337 #{label := {table,
338 #{namespace := Namespace,
339 name := Name}} = Label,
340 reply := [{row_description, Columns} | T]}} = EventContent,
341 execute = State,
342 #{metadata := Metadata,
343 config := #{publication := Publication}} = Data)
344 when is_map_key({Namespace, Name}, Metadata) ->
345 5 ?LOG_DEBUG(#{event_type => EventType,
346 event_content => EventContent,
347 state => State,
348 5 data => Data}),
349
350 5 #{{Namespace, Name} := #{keys := Keys, oids := OIDs}} = Metadata,
351
352 5 {next_state,
353 unready,
354 Data#{metadata := metadata(
355 {Namespace, Name},
356 columns,
357 Columns,
358 Metadata)},
359 [nei({storage_request,
360 #{action => table_map,
361 publication => Publication,
362 schema => Namespace,
363 table => Name,
364 keys => Keys,
365 oids => OIDs,
366 columns => Columns}}) |
367 lists:foldr(
368 fun
369 ({data_row, Row}, A) ->
370 250 [nei({storage_request,
371 #{action => write,
372 publication => Publication,
373 schema => Namespace,
374 table => Name,
375 row => list_to_tuple(Row)}}) | A]
376 end,
377 case lists:last(T) of
378 {command_complete, {select, _}} ->
379 5 [];
380
381 {portal_suspended, _} ->
382
:-(
[nei({execute,
383 #{label => Label,
384 max_rows => pgmp_config:replication(
385 logical,
386 max_rows)}})]
387 end,
388 lists:droplast(T))]};
389
390 handle_event(internal = EventType,
391 {response, #{reply := [{command_complete, set}]}} = EventContent,
392 query = State,
393 Data) ->
394 8 ?LOG_DEBUG(#{event_type => EventType,
395 event_content => EventContent,
396 state => State,
397 8 data => Data}),
398
399 8 {next_state, unready, Data};
400
401 handle_event(internal = EventType,
402 sync_publication_tables = Label = EventContent,
403 State,
404 Data) ->
405 8 ?LOG_DEBUG(#{event_type => EventType,
406 event_content => EventContent,
407 state => State,
408 8 data => Data}),
409 8 {keep_state_and_data,
410 nei({parse,
411 #{label => Label,
412 sql => <<"select * from pg_catalog.pg_publication_tables "
413 "where pubname = $1">>}})};
414
415 handle_event(internal = EventType,
416 begin_transaction = Label = EventContent,
417 State,
418 Data) ->
419 8 ?LOG_DEBUG(#{event_type => EventType,
420 event_content => EventContent,
421 state => State,
422 8 data => Data}),
423 8 {keep_state_and_data,
424 nei({query,
425 #{label => Label,
426 sql => <<"begin isolation level repeatable read">>}})};
427
428 handle_event(internal = EventType,
429 commit = Label = EventContent,
430 State,
431 Data) ->
432 8 ?LOG_DEBUG(#{event_type => EventType,
433 event_content => EventContent,
434 state => State,
435 8 data => Data}),
436 8 {keep_state_and_data,
437 nei({query, #{label => Label, sql => <<"commit">>}})};
438
439 handle_event(internal = EventType,
440 {set_transaction_snapshot = Label, Id} = EventContent,
441 State,
442 Data) ->
443 8 ?LOG_DEBUG(#{event_type => EventType,
444 event_content => EventContent,
445 state => State,
446 8 data => Data}),
447 8 {keep_state_and_data,
448 nei({query,
449 #{label => Label,
450 sql => io_lib:format(
451 "SET TRANSACTION SNAPSHOT '~s'",
452 [Id])}})};
453
454 handle_event(internal = EventType,
455 {Action, Arg} = EventContent,
456 unready = State,
457 #{requests := Requests, config := Config} = Data)
458 when Action == query;
459 Action == parse;
460 Action == bind;
461 Action == describe;
462 Action == execute ->
463 104 ?LOG_DEBUG(#{event_type => EventType,
464 event_content => EventContent,
465 state => State,
466 104 data => Data}),
467 104 {next_state,
468 Action,
469 Data#{requests := pgmp_connection:Action(
470 Arg#{requests => Requests,
471 server_ref => pgmp_connection:server_ref(
472 Config)})}};
473
474 handle_event(internal = EventType,
475 {Action, _} = EventContent,
476 State,
477 Data)
478 when Action == query;
479 Action == parse;
480 Action == bind;
481 Action == describe;
482 Action == execute ->
483 24 ?LOG_DEBUG(#{event_type => EventType,
484 event_content => EventContent,
485 state => State,
486 24 data => Data}),
487 24 {keep_state_and_data, postpone};
488
489 handle_event(internal = EventType,
490 {fetch, _} = EventContent,
491 State,
492 Data) ->
493 32 ?LOG_DEBUG(#{event_type => EventType,
494 event_content => EventContent,
495 state => State,
496 32 data => Data}),
497 32 {keep_state_and_data, postpone};
498
499 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
500 113 case gen_statem:check_response(Msg, Existing, true) of
501 {{reply, Reply}, Label, Updated} ->
502 113 {keep_state,
503 Data#{requests := Updated},
504 nei({response, #{label => Label, reply => Reply}})};
505
506 {{error, {Reason, ServerRef}}, Label, UpdatedRequests} ->
507
:-(
{stop,
508 #{reason => Reason,
509 server_ref => ServerRef,
510 label => Label},
511 Data#{requests := UpdatedRequests}}
512 end;
513
514 handle_event(Type, Content, State, Data) ->
515 309 pgec_replica_common:handle_event(Type, Content, State, Data).
516
517
518 pub_fetch_sql(#{<<"schemaname">> := Schema,
519 <<"tablename">> := Table} = Publication) ->
520 8 ?LOG_DEBUG(#{publication => Publication}),
521 8 ["select ",
522 pub_fetch_columns(Publication),
523 " from ",
524 Schema,
525 ".",
526 Table,
527 case maps:find(<<"rowfilter">>, Publication) of
528 {ok, RowFilter} when RowFilter /= null ->
529
:-(
[" where ", RowFilter];
530
531 _Otherwise ->
532 8 []
533 end].
534
535
536 pub_fetch_columns(#{<<"attnames">> := Attributes}) ->
537 8 ?LOG_DEBUG(#{attributes => Attributes}),
538 8 lists:join(",", Attributes);
539
540 pub_fetch_columns(#{}) ->
541
:-(
"*".
542
543 column_format(Config, Columns) ->
544 8 ?LOG_DEBUG(#{config => Config, columns => Columns}),
545 8 Types = pgmp_types:cache(Config),
546 8 case lists:any(
547 fun
548 (OID) ->
549 23 #{<<"typreceive">> := R,
550 <<"typsend">> := S} = maps:get(OID, Types),
551 23 R == <<"-">> orelse S == <<"-">>
552 end,
553 23 [OID || #{type_oid := OID} <- Columns]) of
554
555 true ->
556
:-(
text;
557
558 false ->
559 8 binary
560 end.
Line Hits Source