_site/cover/pgmp_rep_log_ets_backfill.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_rep_log_ets_backfill).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -import(pgmp_rep_log_ets_common, [metadata/4]).
22 -import(pgmp_rep_log_ets_common, [new_table/4]).
23 -import(pgmp_rep_log_ets_common, [table_name/3]).
24 -import(pgmp_statem, [nei/1]).
25 -include_lib("kernel/include/logger.hrl").
26
27
28 callback_mode() ->
29
:-(
handle_event_function.
30
31
32 handle_event({call, _}, _, _, _) ->
33
:-(
{keep_state_and_data, postpone};
34
35 handle_event(internal, {relation, _} = Label, _, _) ->
36
:-(
{keep_state_and_data,
37 nei({parse,
38 #{label => Label,
39 sql => <<"select * from pg_catalog.pg_publication_tables "
40 "where pubname = $1 and schemaname = $2 and tablename = $3">>}})};
41
42 handle_event(internal,
43 {response,
44 #{label := {relation, #{namespace := Namespace, name := Name}} = Label,
45 reply := [{parse_complete, []}]}},
46 parse,
47 #{config := #{publication := Publication}} = Data) ->
48
:-(
{next_state,
49 unready,
50 Data,
51 nei({bind, #{label => Label, args => [Publication, Namespace, Name]}})};
52
53 handle_event(internal,
54 {response,
55 #{label := {relation, _} = Label,
56 reply := [{bind_complete, []}]}},
57 bind,
58 Data) ->
59
:-(
{next_state,
60 unready,
61 Data,
62 nei({execute, #{label => Label}})};
63
64 handle_event(internal,
65 {response, #{label := {relation, _},
66 reply := [{row_description, Columns} | T]}},
67 execute,
68 Data) ->
69
:-(
{command_complete, {select, _}} = lists:last(T),
70
:-(
{next_state,
71 unready,
72 Data,
73 nei({fetch,
74 lists:map(
75 fun
76 ({data_row, Values}) ->
77
:-(
maps:from_list(lists:zip(Columns, Values))
78 end,
79 lists:droplast(T))})};
80
81 handle_event(internal,
82 {Action, Arg},
83 unready,
84 #{config := Config, requests := Requests} = Data)
85 when Action == query;
86 Action == parse;
87 Action == bind;
88 Action == describe;
89 Action == execute ->
90
:-(
{next_state,
91 Action,
92 Data#{requests := pgmp_connection:Action(
93 Arg#{server_ref => pgmp_connection:server_ref(Config),
94 requests => Requests})}};
95
96
97 handle_event(internal,
98 {fetch, []},
99 unready,
100 Data) ->
101
:-(
{next_state,
102 ready,
103 Data,
104 pop_callback_module};
105
106 handle_event(internal,
107 {fetch, _} = Label,
108 unready,
109 _) ->
110
:-(
{keep_state_and_data,
111 nei({parse,
112 #{label => Label,
113 sql => <<"select i.indkey from pg_catalog.pg_index i"
114 ", pg_catalog.pg_namespace n"
115 ", pg_catalog.pg_class c"
116 " where "
117 "i.indrelid = c.oid"
118 " and "
119 "c.relnamespace = n.oid"
120 " and "
121 "n.nspname = $1"
122 " and "
123 "c.relname = $2">>}})};
124
125 handle_event(internal,
126 {response,
127 #{label := {fetch,
128 [#{<<"schemaname">> := Schema,
129 <<"tablename">> := Table} | _]} = Label,
130 reply := [{parse_complete, []}]}},
131 parse,
132 Data) ->
133
:-(
{next_state,
134 unready,
135 Data,
136 nei({bind, #{label => Label, args => [Schema, Table]}})};
137
138 handle_event(internal,
139 {response,
140 #{label := {fetch, _} = Label,
141 reply := [{bind_complete, []}]}},
142 bind,
143 Data) ->
144
:-(
{next_state,
145 unready,
146 Data,
147 nei({execute, #{label => Label}})};
148
149 handle_event(
150 internal,
151 {response,
152 #{label := {fetch,
153 [#{<<"pubname">> := Publication,
154 <<"schemaname">> := Namespace,
155 <<"tablename">> := Name} = Row | T]},
156 reply := [{row_description, [<<"indkey">>]},
157 {data_row, [Key]},
158 {command_complete, {select, 1}}]}},
159 execute,
160 #{metadata := Metadata} = Data) ->
161
:-(
{next_state,
162 unready,
163 Data#{metadata := metadata(
164 {Namespace, Name},
165 keys,
166 Key,
167 metadata(
168 {Namespace, Name},
169 table,
170 new_table(Publication, Namespace, Name, Key),
171 Metadata))},
172 [nei({parse,
173 #{label => {table,
174 #{namespace => Namespace,
175 name => Name}},
176 sql => pub_fetch_sql(Row)}}),
177 nei({fetch, T})]};
178
179 handle_event(internal,
180 {response,
181 #{label := {table, _} = Label,
182 reply := [{parse_complete, []}]}},
183 parse,
184 Data) ->
185
:-(
{next_state, unready, Data, nei({bind, #{label => Label}})};
186
187 handle_event(internal,
188 {response,
189 #{label := {table, _} = Label,
190 reply := [{bind_complete, []}]}},
191 bind,
192 Data) ->
193
:-(
{next_state, unready, Data, nei({describe, #{type => $P, label => Label}})};
194
195 handle_event(internal,
196 {response,
197 #{label := {table,
198 #{namespace := Namespace,
199 name := Name}},
200 reply := [{row_description, Columns}]}},
201 describe,
202 #{config := #{publication := Publication},
203 metadata := Metadata} = Data) ->
204
:-(
{next_state,
205 unready,
206 Data#{metadata := metadata(
207 {Namespace, Name},
208 columns,
209
:-(
[FieldName || #{field_name := FieldName} <- Columns],
210 metadata(
211 {Namespace, Name},
212 oids,
213
:-(
[OID || #{type_oid := OID} <- Columns],
214 Metadata))},
215 nei({notify,
216 #{action => add,
217 relation => table_name(Publication, Namespace, Name)}})};
218
219 handle_event(internal, {Action, _}, _, _)
220 when Action == query;
221 Action == parse;
222 Action == bind;
223 Action == describe;
224 Action == execute ->
225
:-(
{keep_state_and_data, postpone};
226
227 handle_event(internal, {fetch, _}, _, _) ->
228
:-(
{keep_state_and_data, postpone};
229
230 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
231
:-(
case gen_statem:check_response(Msg, Existing, true) of
232 {{reply, Reply}, Label, Updated} ->
233
:-(
{keep_state,
234 Data#{requests := Updated},
235 nei({response, #{label => Label, reply => Reply}})};
236
237 {{error, {Reason, ServerRef}}, Label, UpdatedRequests} ->
238
:-(
{stop,
239 #{reason => Reason,
240 server_ref => ServerRef,
241 label => Label},
242 Data#{requests := UpdatedRequests}}
243 end;
244
245 handle_event(Type, Content, State, Data) ->
246
:-(
pgmp_rep_log_ets_common:handle_event(Type, Content, State, Data).
247
248
249 pub_fetch_sql(#{<<"schemaname">> := Schema,
250 <<"tablename">> := Table} = Publication) ->
251
:-(
["select ",
252 pub_fetch_columns(Publication),
253 " from ",
254 Schema,
255 ".",
256 Table,
257 case maps:find(<<"rowfilter">>, Publication) of
258 {ok, RowFilter} when RowFilter /= null ->
259
:-(
[" where ", RowFilter];
260
261 _Otherwise ->
262
:-(
[]
263 end].
264
265
266 pub_fetch_columns(#{<<"attnames">> := Attributes}) ->
267
:-(
lists:join(",", Attributes);
268
269 pub_fetch_columns(#{}) ->
270
:-(
"*".
Line Hits Source