_site/cover/pgec_replica.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgec_replica).
17
18
19 -export([begin_transaction/1]).
20 -export([callback_mode/0]).
21 -export([commit/1]).
22 -export([delete/1]).
23 -export([handle_event/4]).
24 -export([init/1]).
25 -export([insert/1]).
26 -export([lsn/1]).
27 -export([metadata/1]).
28 -export([snapshot/1]).
29 -export([start_link/1]).
30 -export([terminate/3]).
31 -export([truncate/1]).
32 -export([update/1]).
33 -export([when_ready/1]).
34 -import(pgec_statem, [nei/1]).
35 -include_lib("kernel/include/logger.hrl").
36
37
38 start_link(Arg) ->
39 8 gen_statem:start_link(?MODULE, [Arg], envy_gen:options(?MODULE)).
40
41
42 snapshot(Arg) ->
43 8 send_request(?FUNCTION_NAME, [id], Arg).
44
45
46 lsn(Arg) ->
47
:-(
send_request(?FUNCTION_NAME, [], Arg).
48
49
50 begin_transaction(Arg) ->
51 18 send_request(?FUNCTION_NAME,
52 [commit_timestamp, final_lsn, xid, x_log],
53 Arg).
54
55
56 commit(Arg) ->
57 18 send_request(?FUNCTION_NAME,
58 [commit_lsn, commit_timestamp, end_lsn, x_log],
59 Arg).
60
61
62 insert(Arg) ->
63 8 send_request(?FUNCTION_NAME, [relation, tuple, x_log], Arg).
64
65
66 update(Arg) ->
67 5 send_request(?FUNCTION_NAME, [relation, tuple, x_log], Arg).
68
69
70 delete(Arg) ->
71 4 send_request(?FUNCTION_NAME, [relation, tuple, x_log], Arg).
72
73
74 truncate(Arg) ->
75 1 send_request(?FUNCTION_NAME, [relations, x_log], Arg).
76
77
78 metadata(Arg) ->
79
:-(
send_request(?FUNCTION_NAME, [], Arg).
80
81
82 send_request(Action, Keys, Arg) ->
83 62 send_request(
84 maps:without(
85 Keys,
86 Arg#{request => {Action, maps:with(Keys, Arg)}})).
87
88
89 when_ready(Arg) ->
90 32 send_request(
91 maps:merge(
92 Arg,
93 #{request => ?FUNCTION_NAME})).
94
95
96 send_request(#{label := _} = Arg) ->
97 62 pgec_statem:send_request(Arg);
98
99 send_request(Arg) ->
100 32 pgec_statem:send_request(Arg#{label => ?MODULE}).
101
102
103 init([Arg]) ->
104 8 process_flag(trap_exit, true),
105 8 {ok,
106 unready,
107 #{requests => gen_statem:reqids_new(),
108 config => Arg,
109 monitors => #{},
110 metadata => #{}},
111 nei(join)}.
112
113
114 callback_mode() ->
115 16 handle_event_function.
116
117
118 handle_event(internal = EventType,
119 join = EventContent,
120 State,
121 #{config := #{scope := Scope, publication := Publication}} = Data) ->
122 8 ?LOG_DEBUG(#{event_type => EventType,
123 event_content => EventContent,
124 state => State,
125 8 data => Data}),
126 8 pg:join(Scope, [?MODULE, Publication], self()),
127 8 keep_state_and_data;
128
129 handle_event({call, From} = EventType,
130 {metadata, #{}} = EventContent,
131 ready = State,
132 #{metadata := Metadata} = Data) ->
133
:-(
?LOG_DEBUG(#{event_type => EventType,
134 event_content => EventContent,
135 state => State,
136
:-(
data => Data}),
137
:-(
{keep_state_and_data, {reply, From, Metadata}};
138
139 handle_event({call, From} = EventType,
140 when_ready = EventContent,
141 ready = State,
142 Data) ->
143 32 ?LOG_DEBUG(#{event_type => EventType,
144 event_content => EventContent,
145 state => State,
146 32 data => Data}),
147 32 {keep_state_and_data, {reply, From, ok}};
148
149 handle_event({call, _} = EventType,
150 when_ready = EventContent,
151 State,
152 Data) ->
153
:-(
?LOG_DEBUG(#{event_type => EventType,
154 event_content => EventContent,
155 state => State,
156
:-(
data => Data}),
157
:-(
{keep_state_and_data, postpone};
158
159 handle_event({call, From} = EventType,
160 {begin_transaction, _} = EventContent,
161 State,
162 Data) ->
163 18 ?LOG_DEBUG(#{event_type => EventType,
164 event_content => EventContent,
165 state => State,
166 18 data => Data}),
167 18 {keep_state_and_data, {reply, From, ok}};
168
169 handle_event({call, From} = EventType,
170 {commit, #{end_lsn := LSN}} = EventContent,
171 State,
172 #{config := #{publication := Publication}} = Data) ->
173 18 ?LOG_DEBUG(#{event_type => EventType,
174 event_content => EventContent,
175 state => State,
176 18 data => Data}),
177 18 {keep_state_and_data,
178 [{reply, From, ok},
179 nei({storage_request,
180 #{action => position_update,
181 publication => Publication,
182 position => LSN}})]};
183
184 handle_event({call, From} = EventType,
185 {Action,
186 #{relation := #{namespace := Namespace, name := Name} = Relation,
187 tuple := Tuple}} = EventContent,
188 State,
189 #{metadata := Metadata,
190 config := #{publication := Publication}} = Data)
191 when Action == insert;
192 Action == update;
193 Action == delete ->
194
195 17 ?LOG_DEBUG(#{event_type => EventType,
196 event_content => EventContent,
197 state => State,
198 17 data => Data}),
199
200 17 case Metadata of
201 #{{Namespace, Name} := #{state := dropped}} ->
202
:-(
{keep_state_and_data, {reply, From, ok}};
203
204 #{{Namespace, Name} := _} ->
205 17 {keep_state_and_data,
206 [{reply, From, ok},
207
208 nei({storage_request,
209 #{action => storage_action(Action),
210 publication => Publication,
211 schema => Namespace,
212 table => Name,
213 row => Tuple}}),
214
215 nei({change,
216 #{namespace => Namespace,
217 action => Action,
218 row => Tuple,
219 name => Name}})]};
220 #{} ->
221
:-(
{next_state,
222 unready,
223 Data,
224 [{push_callback_module, pgec_replica_backfill},
225 nei({relation, Relation}),
226 postpone]}
227 end;
228
229 handle_event({call, From} = EventType,
230 {truncate, #{relations := Relations}} = EventContent,
231 State,
232 #{metadata := Metadata,
233 config := #{publication := Publication}} = Data) ->
234 1 ?LOG_DEBUG(#{event_type => EventType,
235 event_content => EventContent,
236 state => State,
237 1 data => Data}),
238 1 case lists:filter(
239 fun
240 (#{namespace := Namespace, name := Name}) ->
241 1 case maps:find({Namespace, Name}, Metadata) of
242 {ok, _} ->
243 1 false;
244
245 error ->
246
:-(
true
247 end
248 end,
249 Relations) of
250
251 [] ->
252 1 {keep_state_and_data,
253 lists:foldl(
254 fun
255 (#{namespace := Namespace, name := Name}, A) ->
256 1 [nei({storage_request,
257 #{action => truncate,
258 publication => Publication,
259 namespace => Namespace,
260 table => Name}}) | A]
261 end,
262 [{reply, From, ok}],
263 Relations)};
264
265 [Relation | _] ->
266
:-(
{keep_state_and_data,
267 [{push_callback_module, pgec_replica_backfill},
268 nei({relation, Relation}),
269 postpone]}
270 end;
271
272 handle_event({call, Stream} = EventType,
273 {snapshot, #{id := Id}} = EventContent,
274 State,
275 Data) ->
276 8 ?LOG_DEBUG(#{event_type => EventType,
277 event_content => EventContent,
278 state => State,
279 8 data => Data}),
280 8 ?LOG_INFO(#{snapshot => Id}),
281 8 {keep_state,
282 Data#{stream => Stream},
283 [nei(storage),
284 {push_callback_module, pgec_replica_snapshot},
285 nei(begin_transaction),
286 nei({set_transaction_snapshot, Id}),
287 nei(sync_publication_tables)]};
288
289 handle_event({call, Stream},
290 {lsn, LSN},
291 _,
292 #{config := #{publication := Publication}}) ->
293
:-(
?LOG_INFO(#{lsn => LSN}),
294
:-(
{keep_state_and_data,
295 [nei(storage),
296
297 nei({storage_request,
298 #{action => position,
299 publication => Publication,
300 from => Stream}}),
301
302 %% inform storage that the publication is now ready, enabling
303 %% read requests.
304 nei({storage_request,
305 #{action => ready,
306 publication => Publication}})]};
307
308 handle_event(info = EventType,
309 {'DOWN', _, process, _, shutdown = Reason} = EventContent,
310 State,
311 Data) ->
312
:-(
?LOG_DEBUG(#{event_type => EventType,
313 event_content => EventContent,
314 state => State,
315
:-(
data => Data}),
316
:-(
{stop, Reason};
317
318 handle_event(Type, Content, State, Data) ->
319 2184 pgec_replica_common:handle_event(Type, Content, State, Data).
320
321
322 terminate(_Reason,
323 _State,
324 #{config := #{scope := Scope, publication := Publication}}) ->
325 8 pg:leave(Scope, [?MODULE, Publication], self()).
326
327
328 storage_action(Action) when Action == insert;
329 Action == update ->
330 13 write;
331 storage_action(delete = Action) ->
332 4 Action.
Line Hits Source