/home/runner/work/pgec/pgec/_site/ct/ct_run.ct_pgec@fv-az651-965.2023-12-04_14.30.54/pgec_replica_backfill.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgec_replica_backfill).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -import(pgec_replica_common, [metadata/4]).
22 -import(pgec_replica_common, [table_name/3]).
23 -import(pgec_statem, [nei/1]).
24 -include_lib("kernel/include/logger.hrl").
25
26
27 callback_mode() ->
28
:-(
handle_event_function.
29
30
31 handle_event({call, _}, _, _, _) ->
32
:-(
{keep_state_and_data, postpone};
33
34 handle_event(internal, {relation, _} = Label, _, _) ->
35
:-(
{keep_state_and_data,
36 nei({parse,
37 #{label => Label,
38 sql => <<"select * from pg_catalog.pg_publication_tables "
39 "where pubname = $1 and schemaname = $2 and tablename = $3">>}})};
40
41 handle_event(internal,
42 {response,
43 #{label := {relation, #{namespace := Namespace, name := Name}} = Label,
44 reply := [{parse_complete, []}]}},
45 parse,
46 #{config := #{publication := Publication}} = Data) ->
47
:-(
{next_state,
48 unready,
49 Data,
50 nei({bind, #{label => Label, args => [Publication, Namespace, Name]}})};
51
52 handle_event(internal,
53 {response,
54 #{label := {relation, _} = Label,
55 reply := [{bind_complete, []}]}},
56 bind,
57 Data) ->
58
:-(
{next_state,
59 unready,
60 Data,
61 nei({execute, #{label => Label}})};
62
63
64 handle_event(internal,
65 {response,
66 #{label := {relation, #{namespace := Namespace, name := Name}},
67 reply := [{command_complete, {select, 0}}]}},
68 execute,
69 #{metadata := Metadata} = Data) ->
70
:-(
{next_state,
71 ready,
72 Data#{metadata := metadata({Namespace, Name}, state, dropped, Metadata)},
73 pop_callback_module};
74
75 handle_event(internal,
76 {response, #{label := {relation, _},
77 reply := [{row_description, Columns} | T]}},
78 execute,
79 Data) ->
80
:-(
{command_complete, {select, _}} = lists:last(T),
81
:-(
{next_state,
82 unready,
83 Data,
84 nei({fetch,
85 lists:map(
86 fun
87 ({data_row, Values}) ->
88
:-(
maps:from_list(lists:zip(Columns, Values))
89 end,
90 lists:droplast(T))})};
91
92 handle_event(internal,
93 {Action, Arg},
94 unready,
95 #{config := Config, requests := Requests} = Data)
96 when Action == query;
97 Action == parse;
98 Action == bind;
99 Action == describe;
100 Action == execute ->
101
:-(
{next_state,
102 Action,
103 Data#{requests := pgmp_connection:Action(
104 Arg#{server_ref => pgmp_connection:server_ref(Config),
105 requests => Requests})}};
106
107 handle_event(internal,
108 {fetch, []},
109 unready,
110 Data) ->
111
:-(
{next_state,
112 ready,
113 Data,
114 pop_callback_module};
115
116 handle_event(internal,
117 {fetch, _} = Label,
118 unready,
119 _) ->
120
:-(
{keep_state_and_data,
121 nei({parse,
122 #{label => Label,
123 sql => <<"select i.indkey from pg_catalog.pg_index i"
124 ", pg_catalog.pg_namespace n"
125 ", pg_catalog.pg_class c"
126 " where "
127 "i.indrelid = c.oid"
128 " and "
129 "c.relnamespace = n.oid"
130 " and "
131 "n.nspname = $1"
132 " and "
133 "c.relname = $2">>}})};
134
135 handle_event(internal,
136 {response,
137 #{label := {fetch,
138 [#{<<"schemaname">> := Schema,
139 <<"tablename">> := Table} | _]} = Label,
140 reply := [{parse_complete, []}]}},
141 parse,
142 Data) ->
143
:-(
{next_state,
144 unready,
145 Data,
146 nei({bind, #{label => Label, args => [Schema, Table]}})};
147
148 handle_event(internal,
149 {response,
150 #{label := {fetch, _} = Label,
151 reply := [{bind_complete, []}]}},
152 bind,
153 Data) ->
154
:-(
{next_state,
155 unready,
156 Data,
157 nei({execute, #{label => Label}})};
158
159 handle_event(
160 internal,
161 {response,
162 #{label := {fetch,
163 [#{<<"schemaname">> := Namespace,
164 <<"tablename">> := Name} = Row | T]},
165 reply := [{row_description, [<<"indkey">>]},
166 {data_row, [Key]},
167 {command_complete, {select, 1}}]}},
168 execute,
169 #{metadata := Metadata} = Data) ->
170
:-(
{next_state,
171 unready,
172 Data#{metadata := metadata(
173 {Namespace, Name},
174 keys,
175 Key,
176 Metadata)},
177 [nei({parse,
178 #{label => {table,
179 #{namespace => Namespace,
180 name => Name}},
181 sql => pub_fetch_sql(Row)}}),
182 nei({fetch, T})]};
183
184 handle_event(internal,
185 {response,
186 #{label := {table, _} = Label,
187 reply := [{parse_complete, []}]}},
188 parse,
189 Data) ->
190
:-(
{next_state, unready, Data, nei({bind, #{label => Label}})};
191
192 handle_event(internal,
193 {response,
194 #{label := {table, _} = Label,
195 reply := [{bind_complete, []}]}},
196 bind,
197 Data) ->
198
:-(
{next_state, unready, Data, nei({describe, #{type => $P, label => Label}})};
199
200 handle_event(internal,
201 {response,
202 #{label := {table,
203 #{namespace := Namespace,
204 name := Name}},
205 reply := [{row_description, Columns}]}},
206 describe,
207 #{config := #{publication := Publication},
208 metadata := Metadata} = Data) ->
209
:-(
{next_state,
210 unready,
211 Data#{metadata := metadata(
212 {Namespace, Name},
213 columns,
214
:-(
[FieldName || #{field_name := FieldName} <- Columns],
215 metadata(
216 {Namespace, Name},
217 oids,
218
:-(
[OID || #{type_oid := OID} <- Columns],
219 Metadata))},
220 nei({notify,
221 #{action => add,
222 relation => table_name(Publication, Namespace, Name)}})};
223
224 handle_event(internal, {Action, _}, _, _)
225 when Action == query;
226 Action == parse;
227 Action == bind;
228 Action == describe;
229 Action == execute ->
230
:-(
{keep_state_and_data, postpone};
231
232 handle_event(internal, {fetch, _}, _, _) ->
233
:-(
{keep_state_and_data, postpone};
234
235 handle_event(Type, Content, State, Data) ->
236
:-(
pgec_replica_common:handle_event(Type, Content, State, Data).
237
238
239 pub_fetch_sql(#{<<"schemaname">> := Schema,
240 <<"tablename">> := Table} = Publication) ->
241
:-(
["select ",
242 pub_fetch_columns(Publication),
243 " from ",
244 Schema,
245 ".",
246 Table,
247 case maps:find(<<"rowfilter">>, Publication) of
248 {ok, RowFilter} when RowFilter /= null ->
249
:-(
[" where ", RowFilter];
250
251 _Otherwise ->
252
:-(
[]
253 end].
254
255
256 pub_fetch_columns(#{<<"attnames">> := Attributes}) ->
257
:-(
lists:join(",", Attributes);
258
259 pub_fetch_columns(#{}) ->
260
:-(
"*".
Line Hits Source