_site/cover/pgmp_rep_log_ets_snapshot.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_rep_log_ets_snapshot).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -import(pgmp_rep_log_ets_common, [insert_new/6]).
22 -import(pgmp_rep_log_ets_common, [metadata/4]).
23 -import(pgmp_rep_log_ets_common, [new_table/4]).
24 -import(pgmp_statem, [nei/1]).
25 -include_lib("kernel/include/logger.hrl").
26
27
28 callback_mode() ->
29 5 handle_event_function.
30
31
32 handle_event({call, _}, _, _, _) ->
33 44 {keep_state_and_data, postpone};
34
35 handle_event(internal,
36 {response, #{reply := [{command_complete, 'begin'}]}},
37 query,
38 Data) ->
39 5 {next_state, unready, Data};
40
41 handle_event(internal,
42 {response, #{reply := [{command_complete, commit}]}},
43 _,
44 #{stream := Stream} = Data) ->
45 5 {next_state,
46 ready,
47 maps:without([stream], Data),
48 [pop_callback_module,
49 {reply, Stream, ok},
50 nei({notify,
51 #{action => progress,
52 status => completed,
53 activity => ?MODULE}})]};
54
55 handle_event(internal,
56 {response,
57 #{label := sync_publication_tables = Label,
58 reply := [{parse_complete, []}]}},
59 parse,
60 #{config := #{publication := Publication}} = Data) ->
61 5 {next_state,
62 unready,
63 Data,
64 nei({bind, #{label => Label, args => [Publication]}})};
65
66 handle_event(internal,
67 {response,
68 #{label := sync_publication_tables = Label,
69 reply := [{bind_complete, []}]}},
70 bind,
71 Data) ->
72 5 {next_state,
73 unready,
74 Data,
75 nei({execute, #{label => Label}})};
76
77 handle_event(internal,
78 {response, #{label := sync_publication_tables,
79 reply := [{command_complete, {select, 0}}]}},
80 execute,
81 Data) ->
82 %% There are no publication tables to sync, initiate streaming
83 %% replication.
84 %%
85
:-(
{next_state, unready, Data, nei(commit)};
86
87 handle_event(internal,
88 {response, #{label := sync_publication_tables,
89 reply := [{row_description, Columns} | T]}},
90 execute,
91 Data) ->
92 5 {command_complete, {select, _}} = lists:last(T),
93 5 {next_state,
94 unready,
95 Data,
96 nei({fetch,
97 lists:map(
98 fun
99 ({data_row, Values}) ->
100 5 maps:from_list(lists:zip(Columns, Values))
101 end,
102 lists:droplast(T))})};
103
104 handle_event(internal,
105 {fetch, []},
106 unready,
107 _) ->
108 5 {keep_state_and_data, nei(commit)};
109
110 handle_event(internal,
111 {fetch, _} = Label,
112 unready,
113 _) ->
114 5 {keep_state_and_data,
115 nei({parse,
116 #{label => Label,
117 sql => <<"select i.indkey from pg_catalog.pg_index i"
118 ", pg_catalog.pg_namespace n"
119 ", pg_catalog.pg_class c"
120 " where "
121 "i.indisprimary"
122 " and "
123 "i.indrelid = c.oid"
124 " and "
125 "c.relnamespace = n.oid"
126 " and "
127 "n.nspname = $1"
128 " and "
129 "c.relname = $2">>}})};
130
131 handle_event(internal,
132 {response,
133 #{label := {fetch, [#{<<"schemaname">> := Schema, <<"tablename">> := Table} | _]} = Label,
134 reply := [{parse_complete, []}]}},
135 parse,
136 Data) ->
137 5 {next_state,
138 unready,
139 Data,
140 nei({bind,
141 #{label => Label,
142 args => [Schema, Table]}})};
143
144 handle_event(internal,
145 {response,
146 #{label := {fetch, _} = Label,
147 reply := [{bind_complete, []}]}},
148 bind,
149 Data) ->
150 5 {next_state,
151 unready,
152 Data,
153 nei({execute, #{label => Label}})};
154
155 handle_event(
156 internal,
157 {response,
158 #{label := {fetch, [#{<<"tablename">> := _} = Publication | T]},
159 reply := [{command_complete, {select, 0}}]}},
160 execute,
161 Data) ->
162
:-(
?LOG_WARNING(
163 #{publication => Publication,
164
:-(
reason => "no primary key found"}),
165
:-(
{next_state, unready, Data, nei({fetch, T})};
166
167 handle_event(
168 internal,
169 {response,
170 #{label := {fetch,
171 [#{<<"schemaname">> := Namespace,
172 <<"tablename">> := Name} = Info | T]},
173 reply := [{row_description, [<<"indkey">>]},
174 {data_row, [Key]},
175 {command_complete, {select, 1}}]}},
176 execute,
177 #{metadata := Metadata,
178 config := #{publication := Publication}} = Data) ->
179 5 {next_state,
180 unready,
181 Data#{metadata := metadata(
182 {Namespace, Name},
183 keys,
184 Key,
185 metadata(
186 {Namespace, Name},
187 table,
188 new_table(Publication, Namespace, Name, Key),
189 Metadata))},
190 [nei({parse,
191 #{label => {table, #{namespace => Namespace, name => Name}},
192 sql => pub_fetch_sql(Info)}}),
193 nei({fetch, T})]};
194
195 handle_event(internal,
196 {response,
197 #{label := {table, _} = Label,
198 reply := [{parse_complete, []}]}},
199 parse,
200 Data) ->
201 5 {next_state,
202 unready,
203 Data,
204 nei({describe, #{type => $S, label => Label}})};
205
206 handle_event(internal,
207 {response,
208 #{label := {table, _} = Label,
209 reply := [{bind_complete, []}]}},
210 bind,
211 Data) ->
212 5 {next_state,
213 unready,
214 Data,
215 nei({execute,
216 #{label => Label,
217 max_rows => pgmp_config:replication(
218 logical,
219 max_rows)}})};
220
221 handle_event(internal,
222 {response,
223 #{label := {table,
224 #{namespace := Namespace, name := Name}} = Label,
225 reply := [{parameter_description,[]},
226 {row_description, Columns}]}},
227 describe,
228 #{metadata := Metadata, config := Config} = Data) ->
229 5 {next_state,
230 unready,
231 Data#{metadata := metadata(
232 {Namespace, Name},
233 columns,
234 11 [FieldName || #{field_name := FieldName} <- Columns],
235 metadata(
236 {Namespace, Name},
237 oids,
238 11 [OID || #{type_oid := OID} <- Columns],
239 Metadata))},
240 nei({bind, #{label => Label, result => column_format(Config, Columns)}})};
241
242 handle_event(internal,
243 {response, #{label := {table, _},
244 reply := [{command_complete, {select, 0}}]}},
245 execute,
246 Data) ->
247 2 {next_state, unready, Data};
248
249 handle_event(internal,
250 {response,
251 #{label := {table,
252 #{namespace := Namespace,
253 name := Name}} = Label,
254 reply := [{row_description, Columns} | T]}},
255 execute,
256 #{metadata := Metadata,
257 config := #{scope := Scope,
258 publication := Publication}} = Data) ->
259
260 3 #{{Namespace, Name} := #{keys := Keys}} = Metadata,
261
262 3 ok = insert_new(
263 Scope,
264 Publication,
265 Namespace,
266 Name,
267 150 [list_to_tuple(Values) || {data_row, Values} <- lists:droplast(T)],
268 Keys),
269
270 3 case lists:last(T) of
271 {command_complete, {select, _}} ->
272 3 {next_state,
273 unready,
274 Data#{metadata := metadata({Namespace, Name}, columns, Columns, Metadata)}};
275
276 {portal_suspended, _} ->
277
:-(
{next_state,
278 unready,
279 Data,
280 nei({execute,
281 #{label => Label,
282 max_rows => pgmp_config:replication(
283 logical,
284 max_rows)}})}
285 end;
286
287 handle_event(internal,
288 {response, #{reply := [{command_complete, set}]}},
289 query,
290 Data) ->
291 5 {next_state, unready, Data};
292
293 handle_event(internal,
294 sync_publication_tables = Label,
295 _,
296 _) ->
297 5 {keep_state_and_data,
298 nei({parse,
299 #{label => Label,
300 sql => <<"select * from pg_catalog.pg_publication_tables "
301 "where pubname = $1">>}})};
302
303 handle_event(internal, begin_transaction = Label, _, _) ->
304 5 {keep_state_and_data,
305 nei({query,
306 #{label => Label,
307 sql => <<"begin isolation level repeatable read">>}})};
308
309 handle_event(internal, commit = Label, _, _) ->
310 5 {keep_state_and_data,
311 nei({query, #{label => Label, sql => <<"commit">>}})};
312
313 handle_event(internal, {set_transaction_snapshot = Label, Id}, _, _) ->
314 5 {keep_state_and_data,
315 nei({query,
316 #{label => Label,
317 sql => io_lib:format(
318 "SET TRANSACTION SNAPSHOT '~s'",
319 [Id])}})};
320
321 handle_event(internal,
322 {Action, Arg},
323 unready,
324 #{requests := Requests, config := Config} = Data)
325 when Action == query;
326 Action == parse;
327 Action == bind;
328 Action == describe;
329 Action == execute ->
330 65 {next_state,
331 Action,
332 Data#{requests := pgmp_connection:Action(
333 Arg#{requests => Requests,
334 server_ref => pgmp_connection:server_ref(
335 Config)})}};
336
337 handle_event(internal, {Action, _}, _, _)
338 when Action == query;
339 Action == parse;
340 Action == bind;
341 Action == describe;
342 Action == execute ->
343 15 {keep_state_and_data, postpone};
344
345 handle_event(internal, {fetch, _}, _, _) ->
346 20 {keep_state_and_data, postpone};
347
348 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
349 65 case gen_statem:check_response(Msg, Existing, true) of
350 {{reply, Reply}, Label, Updated} ->
351 65 {keep_state,
352 Data#{requests := Updated},
353 nei({response, #{label => Label, reply => Reply}})};
354
355 {{error, {Reason, ServerRef}}, Label, UpdatedRequests} ->
356
:-(
{stop,
357 #{reason => Reason,
358 server_ref => ServerRef,
359 label => Label},
360 Data#{requests := UpdatedRequests}}
361 end;
362
363 handle_event(Type, Content, State, Data) ->
364
:-(
pgmp_rep_log_ets_common:handle_event(Type, Content, State, Data).
365
366
367 pub_fetch_sql(#{<<"schemaname">> := Schema,
368 <<"tablename">> := Table} = Publication) ->
369 5 ["select ",
370 pub_fetch_columns(Publication),
371 " from ",
372 Schema,
373 ".",
374 Table,
375 case maps:find(<<"rowfilter">>, Publication) of
376 {ok, RowFilter} when RowFilter /= null ->
377
:-(
[" where ", RowFilter];
378
379 _Otherwise ->
380 5 []
381 end].
382
383
384 pub_fetch_columns(#{<<"attnames">> := Attributes}) ->
385 5 lists:join(",", Attributes);
386
387 pub_fetch_columns(#{}) ->
388
:-(
"*".
389
390 column_format(Config, Columns) ->
391 5 Types = pgmp_types:cache(Config),
392 5 case lists:any(
393 fun
394 (OID) ->
395 11 #{<<"typreceive">> := R,
396 <<"typsend">> := S} = maps:get(OID, Types),
397 11 R == <<"-">> orelse S == <<"-">>
398 end,
399 11 [OID || #{type_oid := OID} <- Columns]) of
400
401 true ->
402
:-(
text;
403
404 false ->
405 5 binary
406 end.
Line Hits Source