1 |
|
%% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com> |
2 |
|
%% |
3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
%% you may not use this file except in compliance with the License. |
5 |
|
%% You may obtain a copy of the License at |
6 |
|
%% |
7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
%% |
9 |
|
%% Unless required by applicable law or agreed to in writing, software |
10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
%% See the License for the specific language governing permissions and |
13 |
|
%% limitations under the License. |
14 |
|
|
15 |
|
|
16 |
|
-module(pgec_replica_snapshot). |
17 |
|
|
18 |
|
|
19 |
|
-export([callback_mode/0]). |
20 |
|
-export([handle_event/4]). |
21 |
|
-import(pgec_replica_common, [metadata/4]). |
22 |
|
-import(pgec_statem, [nei/1]). |
23 |
|
-include_lib("kernel/include/logger.hrl"). |
24 |
|
|
25 |
|
|
26 |
|
callback_mode() -> |
27 |
8 |
handle_event_function. |
28 |
|
|
29 |
|
|
30 |
|
handle_event({call, _} = EventType, |
31 |
|
EventContent, |
32 |
|
State, |
33 |
|
Data) -> |
34 |
:-( |
?LOG_DEBUG(#{event_type => EventType, |
35 |
|
event_content => EventContent, |
36 |
|
state => State, |
37 |
:-( |
data => Data}), |
38 |
:-( |
{keep_state_and_data, postpone}; |
39 |
|
|
40 |
|
handle_event(internal = EventType, |
41 |
|
{response, #{reply := [{command_complete, 'begin'}]}} = EventContent, |
42 |
|
query = State, |
43 |
|
Data) -> |
44 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
45 |
|
event_content => EventContent, |
46 |
|
state => State, |
47 |
8 |
data => Data}), |
48 |
8 |
{next_state, unready, Data}; |
49 |
|
|
50 |
|
handle_event(internal = EventType, |
51 |
|
{response, #{reply := [{command_complete, commit}]}} = EventContent, |
52 |
|
State, |
53 |
|
#{config := #{publication := Publication}, |
54 |
|
stream := Stream} = Data) -> |
55 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
56 |
|
event_content => EventContent, |
57 |
|
state => State, |
58 |
8 |
data => Data}), |
59 |
8 |
{next_state, |
60 |
|
ready, |
61 |
|
maps:without([stream], Data), |
62 |
|
[pop_callback_module, |
63 |
|
|
64 |
|
%% this reply will start streaming replication with pgmp. |
65 |
|
{reply, Stream, ok}, |
66 |
|
|
67 |
|
nei({notify, |
68 |
|
#{action => progress, |
69 |
|
status => completed, |
70 |
|
activity => ?MODULE}}), |
71 |
|
|
72 |
|
%% inform storage that the publication is now ready, enabling |
73 |
|
%% read requests. |
74 |
|
nei({storage_request, |
75 |
|
#{action => ready, |
76 |
|
publication => Publication}})]}; |
77 |
|
|
78 |
|
handle_event(internal = EventType, |
79 |
|
{response, |
80 |
|
#{label := sync_publication_tables = Label, |
81 |
|
reply := [{parse_complete, []}]}} = EventContent, |
82 |
|
parse = State, |
83 |
|
#{config := #{publication := Publication}} = Data) -> |
84 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
85 |
|
event_content => EventContent, |
86 |
|
state => State, |
87 |
8 |
data => Data}), |
88 |
8 |
{next_state, |
89 |
|
unready, |
90 |
|
Data, |
91 |
|
nei({bind, #{label => Label, args => [Publication]}})}; |
92 |
|
|
93 |
|
handle_event(internal = EventType, |
94 |
|
{response, |
95 |
|
#{label := sync_publication_tables = Label, |
96 |
|
reply := [{bind_complete, []}]}} = EventContent, |
97 |
|
bind = State, |
98 |
|
Data) -> |
99 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
100 |
|
event_content => EventContent, |
101 |
|
state => State, |
102 |
8 |
data => Data}), |
103 |
8 |
{next_state, |
104 |
|
unready, |
105 |
|
Data, |
106 |
|
nei({execute, #{label => Label}})}; |
107 |
|
|
108 |
|
handle_event(internal = EventType, |
109 |
|
{response, #{label := sync_publication_tables, |
110 |
|
reply := [{command_complete, {select, 0}}]}} = EventContent, |
111 |
|
execute = State, |
112 |
|
Data) -> |
113 |
:-( |
?LOG_DEBUG(#{event_type => EventType, |
114 |
|
event_content => EventContent, |
115 |
|
state => State, |
116 |
:-( |
data => Data}), |
117 |
|
%% There are no publication tables to sync, initiate streaming |
118 |
|
%% replication. |
119 |
|
%% |
120 |
:-( |
{next_state, unready, Data, nei(commit)}; |
121 |
|
|
122 |
|
handle_event(internal = EventType, |
123 |
|
{response, #{label := sync_publication_tables, |
124 |
|
reply := [{row_description, Columns} | T]}} = EventContent, |
125 |
|
execute = State, |
126 |
|
Data) -> |
127 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
128 |
|
event_content => EventContent, |
129 |
|
state => State, |
130 |
8 |
data => Data}), |
131 |
8 |
{command_complete, {select, _}} = lists:last(T), |
132 |
8 |
{next_state, |
133 |
|
unready, |
134 |
|
Data, |
135 |
|
nei({fetch, |
136 |
|
lists:map( |
137 |
|
fun |
138 |
|
({data_row, Values}) -> |
139 |
8 |
maps:from_list(lists:zip(Columns, Values)) |
140 |
|
end, |
141 |
|
lists:droplast(T))})}; |
142 |
|
|
143 |
|
handle_event(internal = EventType, |
144 |
|
{fetch, []} = EventContent, |
145 |
|
unready = State, |
146 |
|
Data) -> |
147 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
148 |
|
event_content => EventContent, |
149 |
|
state => State, |
150 |
8 |
data => Data}), |
151 |
8 |
{keep_state_and_data, nei(commit)}; |
152 |
|
|
153 |
|
handle_event(internal = EventType, |
154 |
|
{fetch, _} = Label = EventContent, |
155 |
|
unready = State, |
156 |
|
Data) -> |
157 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
158 |
|
event_content => EventContent, |
159 |
|
state => State, |
160 |
8 |
data => Data}), |
161 |
8 |
{keep_state_and_data, |
162 |
|
nei({parse, |
163 |
|
#{label => Label, |
164 |
|
sql => <<"select i.indkey from pg_catalog.pg_index i" |
165 |
|
", pg_catalog.pg_namespace n" |
166 |
|
", pg_catalog.pg_class c" |
167 |
|
" where " |
168 |
|
"i.indisprimary" |
169 |
|
" and " |
170 |
|
"i.indrelid = c.oid" |
171 |
|
" and " |
172 |
|
"c.relnamespace = n.oid" |
173 |
|
" and " |
174 |
|
"n.nspname = $1" |
175 |
|
" and " |
176 |
|
"c.relname = $2">>}})}; |
177 |
|
|
178 |
|
handle_event(internal = EventType, |
179 |
|
{response, |
180 |
|
#{label := {fetch, [#{<<"schemaname">> := Schema, <<"tablename">> := Table} | _]} = Label, |
181 |
|
reply := [{parse_complete, []}]}} = EventContent, |
182 |
|
parse = State, |
183 |
|
Data) -> |
184 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
185 |
|
event_content => EventContent, |
186 |
|
state => State, |
187 |
8 |
data => Data}), |
188 |
8 |
{next_state, |
189 |
|
unready, |
190 |
|
Data, |
191 |
|
nei({bind, |
192 |
|
#{label => Label, |
193 |
|
args => [Schema, Table]}})}; |
194 |
|
|
195 |
|
handle_event(internal = EventType, |
196 |
|
{response, |
197 |
|
#{label := {fetch, _} = Label, |
198 |
|
reply := [{bind_complete, []}]}} = EventContent, |
199 |
|
bind = State, |
200 |
|
Data) -> |
201 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
202 |
|
event_content => EventContent, |
203 |
|
state => State, |
204 |
8 |
data => Data}), |
205 |
8 |
{next_state, |
206 |
|
unready, |
207 |
|
Data, |
208 |
|
nei({execute, #{label => Label}})}; |
209 |
|
|
210 |
|
handle_event( |
211 |
|
internal = EventType, |
212 |
|
{response, |
213 |
|
#{label := {fetch, [#{<<"tablename">> := _} = Publication | T]}, |
214 |
|
reply := [{command_complete, {select, 0}}]}} = EventContent, |
215 |
|
execute = State, |
216 |
|
Data) -> |
217 |
:-( |
?LOG_DEBUG(#{event_type => EventType, |
218 |
|
event_content => EventContent, |
219 |
|
state => State, |
220 |
:-( |
data => Data}), |
221 |
:-( |
?LOG_WARNING( |
222 |
|
#{publication => Publication, |
223 |
:-( |
reason => "no primary key found"}), |
224 |
:-( |
{next_state, unready, Data, nei({fetch, T})}; |
225 |
|
|
226 |
|
handle_event( |
227 |
|
internal = EventType, |
228 |
|
{response, |
229 |
|
#{label := {fetch, |
230 |
|
[#{<<"schemaname">> := Namespace, |
231 |
|
<<"tablename">> := Name} = Info | T]}, |
232 |
|
reply := [{row_description, [<<"indkey">>]}, |
233 |
|
{data_row, [Key]}, |
234 |
|
{command_complete, {select, 1}}]}} = EventContent, |
235 |
|
execute = State, |
236 |
|
#{metadata := Metadata} = Data) -> |
237 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
238 |
|
event_content => EventContent, |
239 |
|
state => State, |
240 |
8 |
data => Data}), |
241 |
8 |
{next_state, |
242 |
|
unready, |
243 |
|
Data#{metadata := metadata( |
244 |
|
{Namespace, Name}, |
245 |
|
keys, |
246 |
|
Key, |
247 |
|
Metadata)}, |
248 |
|
[nei({parse, |
249 |
|
#{label => {table, #{namespace => Namespace, name => Name}}, |
250 |
|
sql => pub_fetch_sql(Info)}}), |
251 |
|
nei({fetch, T})]}; |
252 |
|
|
253 |
|
handle_event(internal = EventType, |
254 |
|
{response, |
255 |
|
#{label := {table, _} = Label, |
256 |
|
reply := [{parse_complete, []}]}} = EventContent, |
257 |
|
parse = State, |
258 |
|
Data) -> |
259 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
260 |
|
event_content => EventContent, |
261 |
|
state => State, |
262 |
8 |
data => Data}), |
263 |
8 |
{next_state, |
264 |
|
unready, |
265 |
|
Data, |
266 |
|
nei({describe, #{type => $S, label => Label}})}; |
267 |
|
|
268 |
|
handle_event(internal = EventType, |
269 |
|
{response, |
270 |
|
#{label := {table, _} = Label, |
271 |
|
reply := [{bind_complete, []}]}} = EventContent, |
272 |
|
bind = State, |
273 |
|
Data) -> |
274 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
275 |
|
event_content => EventContent, |
276 |
|
state => State, |
277 |
8 |
data => Data}), |
278 |
8 |
{next_state, |
279 |
|
unready, |
280 |
|
Data, |
281 |
|
nei({execute, |
282 |
|
#{label => Label, |
283 |
|
max_rows => pgmp_config:replication( |
284 |
|
logical, |
285 |
|
max_rows)}})}; |
286 |
|
|
287 |
|
handle_event(internal = EventType, |
288 |
|
{response, |
289 |
|
#{label := {table, |
290 |
|
#{namespace := Namespace, name := Name}} = Label, |
291 |
|
reply := [{parameter_description,[]}, |
292 |
|
{row_description, Columns}]}} = EventContent, |
293 |
|
describe = State, |
294 |
|
#{metadata := Metadata, config := Config} = Data) -> |
295 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
296 |
|
event_content => EventContent, |
297 |
|
state => State, |
298 |
8 |
data => Data}), |
299 |
8 |
{next_state, |
300 |
|
unready, |
301 |
|
Data#{metadata := metadata( |
302 |
|
{Namespace, Name}, |
303 |
|
columns, |
304 |
23 |
[FieldName || #{field_name := FieldName} <- Columns], |
305 |
|
metadata( |
306 |
|
{Namespace, Name}, |
307 |
|
oids, |
308 |
23 |
[OID || #{type_oid := OID} <- Columns], |
309 |
|
Metadata))}, |
310 |
|
nei({bind, #{label => Label, result => column_format(Config, Columns)}})}; |
311 |
|
|
312 |
|
handle_event(internal = EventType, |
313 |
|
{response, #{label := {table, #{namespace := Namespace, name := Name}}, |
314 |
|
reply := [{command_complete, {select, 0}}]}} = EventContent, |
315 |
|
execute = State, |
316 |
|
#{metadata := Metadata, |
317 |
|
config := #{publication := Publication}} = Data) |
318 |
|
when is_map_key({Namespace, Name}, Metadata) -> |
319 |
3 |
?LOG_DEBUG(#{event_type => EventType, |
320 |
|
event_content => EventContent, |
321 |
|
state => State, |
322 |
3 |
data => Data}), |
323 |
3 |
#{{Namespace, Name} := Mapping} = Metadata, |
324 |
3 |
{next_state, |
325 |
|
unready, |
326 |
|
Data, |
327 |
|
nei({storage_request, |
328 |
|
maps:merge( |
329 |
|
#{action => table_map, |
330 |
|
publication => Publication, |
331 |
|
schema => Namespace, |
332 |
|
table => Name}, |
333 |
|
Mapping)})}; |
334 |
|
|
335 |
|
handle_event(internal = EventType, |
336 |
|
{response, |
337 |
|
#{label := {table, |
338 |
|
#{namespace := Namespace, |
339 |
|
name := Name}} = Label, |
340 |
|
reply := [{row_description, Columns} | T]}} = EventContent, |
341 |
|
execute = State, |
342 |
|
#{metadata := Metadata, |
343 |
|
config := #{publication := Publication}} = Data) |
344 |
|
when is_map_key({Namespace, Name}, Metadata) -> |
345 |
5 |
?LOG_DEBUG(#{event_type => EventType, |
346 |
|
event_content => EventContent, |
347 |
|
state => State, |
348 |
5 |
data => Data}), |
349 |
|
|
350 |
5 |
#{{Namespace, Name} := #{keys := Keys, oids := OIDs}} = Metadata, |
351 |
|
|
352 |
5 |
{next_state, |
353 |
|
unready, |
354 |
|
Data#{metadata := metadata( |
355 |
|
{Namespace, Name}, |
356 |
|
columns, |
357 |
|
Columns, |
358 |
|
Metadata)}, |
359 |
|
[nei({storage_request, |
360 |
|
#{action => table_map, |
361 |
|
publication => Publication, |
362 |
|
schema => Namespace, |
363 |
|
table => Name, |
364 |
|
keys => Keys, |
365 |
|
oids => OIDs, |
366 |
|
columns => Columns}}) | |
367 |
|
lists:foldr( |
368 |
|
fun |
369 |
|
({data_row, Row}, A) -> |
370 |
250 |
[nei({storage_request, |
371 |
|
#{action => write, |
372 |
|
publication => Publication, |
373 |
|
schema => Namespace, |
374 |
|
table => Name, |
375 |
|
row => list_to_tuple(Row)}}) | A] |
376 |
|
end, |
377 |
|
case lists:last(T) of |
378 |
|
{command_complete, {select, _}} -> |
379 |
5 |
[]; |
380 |
|
|
381 |
|
{portal_suspended, _} -> |
382 |
:-( |
[nei({execute, |
383 |
|
#{label => Label, |
384 |
|
max_rows => pgmp_config:replication( |
385 |
|
logical, |
386 |
|
max_rows)}})] |
387 |
|
end, |
388 |
|
lists:droplast(T))]}; |
389 |
|
|
390 |
|
handle_event(internal = EventType, |
391 |
|
{response, #{reply := [{command_complete, set}]}} = EventContent, |
392 |
|
query = State, |
393 |
|
Data) -> |
394 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
395 |
|
event_content => EventContent, |
396 |
|
state => State, |
397 |
8 |
data => Data}), |
398 |
|
|
399 |
8 |
{next_state, unready, Data}; |
400 |
|
|
401 |
|
handle_event(internal = EventType, |
402 |
|
sync_publication_tables = Label = EventContent, |
403 |
|
State, |
404 |
|
Data) -> |
405 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
406 |
|
event_content => EventContent, |
407 |
|
state => State, |
408 |
8 |
data => Data}), |
409 |
8 |
{keep_state_and_data, |
410 |
|
nei({parse, |
411 |
|
#{label => Label, |
412 |
|
sql => <<"select * from pg_catalog.pg_publication_tables " |
413 |
|
"where pubname = $1">>}})}; |
414 |
|
|
415 |
|
handle_event(internal = EventType, |
416 |
|
begin_transaction = Label = EventContent, |
417 |
|
State, |
418 |
|
Data) -> |
419 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
420 |
|
event_content => EventContent, |
421 |
|
state => State, |
422 |
8 |
data => Data}), |
423 |
8 |
{keep_state_and_data, |
424 |
|
nei({query, |
425 |
|
#{label => Label, |
426 |
|
sql => <<"begin isolation level repeatable read">>}})}; |
427 |
|
|
428 |
|
handle_event(internal = EventType, |
429 |
|
commit = Label = EventContent, |
430 |
|
State, |
431 |
|
Data) -> |
432 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
433 |
|
event_content => EventContent, |
434 |
|
state => State, |
435 |
8 |
data => Data}), |
436 |
8 |
{keep_state_and_data, |
437 |
|
nei({query, #{label => Label, sql => <<"commit">>}})}; |
438 |
|
|
439 |
|
handle_event(internal = EventType, |
440 |
|
{set_transaction_snapshot = Label, Id} = EventContent, |
441 |
|
State, |
442 |
|
Data) -> |
443 |
8 |
?LOG_DEBUG(#{event_type => EventType, |
444 |
|
event_content => EventContent, |
445 |
|
state => State, |
446 |
8 |
data => Data}), |
447 |
8 |
{keep_state_and_data, |
448 |
|
nei({query, |
449 |
|
#{label => Label, |
450 |
|
sql => io_lib:format( |
451 |
|
"SET TRANSACTION SNAPSHOT '~s'", |
452 |
|
[Id])}})}; |
453 |
|
|
454 |
|
handle_event(internal = EventType, |
455 |
|
{Action, Arg} = EventContent, |
456 |
|
unready = State, |
457 |
|
#{requests := Requests, config := Config} = Data) |
458 |
|
when Action == query; |
459 |
|
Action == parse; |
460 |
|
Action == bind; |
461 |
|
Action == describe; |
462 |
|
Action == execute -> |
463 |
104 |
?LOG_DEBUG(#{event_type => EventType, |
464 |
|
event_content => EventContent, |
465 |
|
state => State, |
466 |
104 |
data => Data}), |
467 |
104 |
{next_state, |
468 |
|
Action, |
469 |
|
Data#{requests := pgmp_connection:Action( |
470 |
|
Arg#{requests => Requests, |
471 |
|
server_ref => pgmp_connection:server_ref( |
472 |
|
Config)})}}; |
473 |
|
|
474 |
|
handle_event(internal = EventType, |
475 |
|
{Action, _} = EventContent, |
476 |
|
State, |
477 |
|
Data) |
478 |
|
when Action == query; |
479 |
|
Action == parse; |
480 |
|
Action == bind; |
481 |
|
Action == describe; |
482 |
|
Action == execute -> |
483 |
24 |
?LOG_DEBUG(#{event_type => EventType, |
484 |
|
event_content => EventContent, |
485 |
|
state => State, |
486 |
24 |
data => Data}), |
487 |
24 |
{keep_state_and_data, postpone}; |
488 |
|
|
489 |
|
handle_event(internal = EventType, |
490 |
|
{fetch, _} = EventContent, |
491 |
|
State, |
492 |
|
Data) -> |
493 |
32 |
?LOG_DEBUG(#{event_type => EventType, |
494 |
|
event_content => EventContent, |
495 |
|
state => State, |
496 |
32 |
data => Data}), |
497 |
32 |
{keep_state_and_data, postpone}; |
498 |
|
|
499 |
|
handle_event(info, Msg, _, #{requests := Existing} = Data) -> |
500 |
113 |
case gen_statem:check_response(Msg, Existing, true) of |
501 |
|
{{reply, Reply}, Label, Updated} -> |
502 |
113 |
{keep_state, |
503 |
|
Data#{requests := Updated}, |
504 |
|
nei({response, #{label => Label, reply => Reply}})}; |
505 |
|
|
506 |
|
{{error, {Reason, ServerRef}}, Label, UpdatedRequests} -> |
507 |
:-( |
{stop, |
508 |
|
#{reason => Reason, |
509 |
|
server_ref => ServerRef, |
510 |
|
label => Label}, |
511 |
|
Data#{requests := UpdatedRequests}} |
512 |
|
end; |
513 |
|
|
514 |
|
handle_event(Type, Content, State, Data) -> |
515 |
309 |
pgec_replica_common:handle_event(Type, Content, State, Data). |
516 |
|
|
517 |
|
|
518 |
|
pub_fetch_sql(#{<<"schemaname">> := Schema, |
519 |
|
<<"tablename">> := Table} = Publication) -> |
520 |
8 |
?LOG_DEBUG(#{publication => Publication}), |
521 |
8 |
["select ", |
522 |
|
pub_fetch_columns(Publication), |
523 |
|
" from ", |
524 |
|
Schema, |
525 |
|
".", |
526 |
|
Table, |
527 |
|
case maps:find(<<"rowfilter">>, Publication) of |
528 |
|
{ok, RowFilter} when RowFilter /= null -> |
529 |
:-( |
[" where ", RowFilter]; |
530 |
|
|
531 |
|
_Otherwise -> |
532 |
8 |
[] |
533 |
|
end]. |
534 |
|
|
535 |
|
|
536 |
|
pub_fetch_columns(#{<<"attnames">> := Attributes}) -> |
537 |
8 |
?LOG_DEBUG(#{attributes => Attributes}), |
538 |
8 |
lists:join(",", Attributes); |
539 |
|
|
540 |
|
pub_fetch_columns(#{}) -> |
541 |
:-( |
"*". |
542 |
|
|
543 |
|
column_format(Config, Columns) -> |
544 |
8 |
?LOG_DEBUG(#{config => Config, columns => Columns}), |
545 |
8 |
Types = pgmp_types:cache(Config), |
546 |
8 |
case lists:any( |
547 |
|
fun |
548 |
|
(OID) -> |
549 |
23 |
#{<<"typreceive">> := R, |
550 |
|
<<"typsend">> := S} = maps:get(OID, Types), |
551 |
23 |
R == <<"-">> orelse S == <<"-">> |
552 |
|
end, |
553 |
23 |
[OID || #{type_oid := OID} <- Columns]) of |
554 |
|
|
555 |
|
true -> |
556 |
:-( |
text; |
557 |
|
|
558 |
|
false -> |
559 |
8 |
binary |
560 |
|
end. |