_site/cover/pgmp_rep_log_ets.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_rep_log_ets).
17
18
19 -behaviour(pgmp_rep_log).
20 -export([begin_transaction/1]).
21 -export([callback_mode/0]).
22 -export([commit/1]).
23 -export([delete/1]).
24 -export([handle_event/4]).
25 -export([init/1]).
26 -export([insert/1]).
27 -export([lsn/1]).
28 -export([metadata/1]).
29 -export([snapshot/1]).
30 -export([start_link/1]).
31 -export([terminate/3]).
32 -export([truncate/1]).
33 -export([update/1]).
34 -export([when_ready/1]).
35 -import(pgmp_rep_log_ets_common, [delete/6]).
36 -import(pgmp_rep_log_ets_common, [insert_new/6]).
37 -import(pgmp_rep_log_ets_common, [metadata/4]).
38 -import(pgmp_rep_log_ets_common, [truncate/3]).
39 -import(pgmp_rep_log_ets_common, [update/6]).
40 -import(pgmp_statem, [nei/1]).
41 -include_lib("kernel/include/logger.hrl").
42
43
44 start_link(Arg) ->
45 5 gen_statem:start_link(?MODULE, [Arg], envy_gen:options(?MODULE)).
46
47
48 snapshot(Arg) ->
49 5 send_request(?FUNCTION_NAME, [id], Arg).
50
51
52 lsn(Arg) ->
53
:-(
send_request(?FUNCTION_NAME, [], Arg).
54
55
56 begin_transaction(Arg) ->
57 65 send_request(?FUNCTION_NAME,
58 [commit_timestamp, final_lsn, xid, x_log],
59 Arg).
60
61
62 commit(Arg) ->
63 65 send_request(?FUNCTION_NAME,
64 [commit_lsn, commit_timestamp, end_lsn, x_log],
65 Arg).
66
67
68 insert(Arg) ->
69 55 send_request(?FUNCTION_NAME, [relation, tuple, x_log], Arg).
70
71
72 update(Arg) ->
73 4 send_request(?FUNCTION_NAME, [relation, tuple, x_log], Arg).
74
75
76 delete(Arg) ->
77 4 send_request(?FUNCTION_NAME, [relation, tuple, x_log], Arg).
78
79
80 truncate(Arg) ->
81 2 send_request(?FUNCTION_NAME, [relations, x_log], Arg).
82
83
84 metadata(Arg) ->
85
:-(
send_request(?FUNCTION_NAME, [], Arg).
86
87
88 send_request(Action, Keys, Arg) ->
89 200 send_request(
90 maps:without(
91 Keys,
92 Arg#{request => {Action, maps:with(Keys, Arg)}})).
93
94
95 when_ready(Arg) ->
96 15 send_request(
97 maps:merge(
98 Arg,
99 #{request => ?FUNCTION_NAME})).
100
101
102 send_request(#{label := _} = Arg) ->
103 200 pgmp_statem:send_request(Arg);
104
105 send_request(Arg) ->
106 15 pgmp_statem:send_request(Arg#{label => ?MODULE}).
107
108
109 init([Arg]) ->
110 5 process_flag(trap_exit, true),
111 5 {ok,
112 unready,
113 #{requests => gen_statem:reqids_new(),
114 config => Arg,
115 metadata => #{}},
116 nei(join)}.
117
118
119 callback_mode() ->
120 10 handle_event_function.
121
122
123 handle_event(internal,
124 join,
125 _,
126 #{config := #{scope := Scope, publication := Publication}}) ->
127 5 pg:join(Scope, [?MODULE, Publication], self()),
128 5 keep_state_and_data;
129
130 handle_event({call, From}, {metadata, #{}}, ready, #{metadata := Metadata}) ->
131
:-(
{keep_state_and_data, {reply, From, Metadata}};
132
133 handle_event({call, From}, when_ready, ready, _) ->
134 15 {keep_state_and_data, {reply, From, ok}};
135
136 handle_event({call, _}, when_ready, _, _) ->
137
:-(
{keep_state_and_data, postpone};
138
139 handle_event({call, From},
140 {begin_transaction, _},
141 _,
142 _) ->
143 65 {keep_state_and_data, {reply, From, ok}};
144
145 handle_event({call, From},
146 {commit, _},
147 _,
148 _) ->
149 65 {keep_state_and_data, {reply, From, ok}};
150
151 handle_event({call, From},
152 {insert,
153 #{relation := #{namespace := Namespace, name := Name} = Relation,
154 x_log := XLog,
155 tuple := Tuple}},
156 _,
157 #{metadata := Metadata,
158 config := #{scope := Scope,
159 publication := Publication}} = Data) ->
160 55 case Metadata of
161 #{{Namespace, Name} := #{keys := Positions}} ->
162 55 insert_new(Scope, Publication, Namespace, Name, Tuple, Positions),
163 55 {keep_state,
164 Data#{metadata := metadata({Namespace, Name},
165 x_log,
166 XLog,
167 Metadata)},
168 {reply, From, ok}};
169
170 _Otherwise ->
171
:-(
{next_state,
172 unready,
173 Data,
174 [{push_callback_module, pgmp_rep_log_ets_backfill},
175 nei({relation, Relation}),
176 postpone]}
177 end;
178
179 handle_event({call, From},
180 {update,
181 #{relation := #{namespace := Namespace, name := Name} = Relation,
182 x_log := XLog,
183 tuple := Tuple}},
184 _,
185 #{metadata := Metadata,
186 config := #{scope := Scope,
187 publication := Publication}} = Data) ->
188 4 case Metadata of
189 #{{Namespace, Name} := #{keys := Positions}} ->
190 4 update(Scope, Publication, Namespace, Name, Tuple, Positions),
191 4 {keep_state,
192 Data#{metadata := metadata({Namespace, Name}, x_log, XLog, Metadata)},
193 {reply, From, ok}};
194
195 _Otherwise ->
196
:-(
{keep_state_and_data,
197 [{push_callback_module, pgmp_rep_log_ets_backfill},
198 nei({relation, Relation}),
199 postpone]}
200 end;
201
202 handle_event({call, From},
203 {delete,
204 #{relation := #{namespace := Namespace, name := Name} = Relation,
205 x_log := XLog,
206 tuple := Tuple}},
207 _,
208 #{metadata := Metadata,
209 config := #{scope := Scope,
210 publication := Publication}} = Data) ->
211 4 case Metadata of
212 #{{Namespace, Name} := #{keys := Keys}} ->
213 4 delete(Scope, Publication, Namespace, Name, Tuple, Keys),
214 4 {keep_state,
215 Data#{metadata := metadata({Namespace, Name},
216 x_log,
217 XLog,
218 Metadata)},
219 {reply, From, ok}};
220
221 _Otherwise ->
222
:-(
{keep_state_and_data,
223 [{push_callback_module, pgmp_rep_log_ets_backfill},
224 nei({relation, Relation}),
225 postpone]}
226 end;
227
228 handle_event({call, From},
229 {truncate, #{relations := Relations}},
230 _,
231 #{metadata := Metadata,
232 config := #{publication := Publication}}) ->
233 2 case lists:filter(
234 fun
235 (#{namespace := Namespace, name := Name}) ->
236 2 case maps:find({Namespace, Name}, Metadata) of
237 {ok, _} ->
238 2 false;
239
240 error ->
241
:-(
true
242 end
243 end,
244 Relations) of
245
246 [] ->
247 2 lists:foreach(
248 fun
249 (#{namespace := Namespace, name := Name}) ->
250 2 truncate(Publication, Namespace, Name)
251 end,
252 Relations),
253 2 {keep_state_and_data, {reply, From, ok}};
254
255
256 [Relation | _] ->
257
:-(
{keep_state_and_data,
258 [{push_callback_module, pgmp_rep_log_ets_backfill},
259 nei({relation, Relation}),
260 postpone]}
261 end;
262
263 handle_event({call, Stream}, {snapshot, #{id := Id}}, _, Data) ->
264 5 {keep_state,
265 Data#{stream => Stream},
266 [{push_callback_module, pgmp_rep_log_ets_snapshot},
267 nei(begin_transaction),
268 nei({set_transaction_snapshot, Id}),
269 nei(sync_publication_tables)]};
270
271 handle_event({call, Stream}, {lsn, #{}}, _, _) ->
272
:-(
{keep_state_and_data, {reply, Stream, <<"0/0">>}};
273
274 handle_event(info, {'DOWN', _, process, _, shutdown = Reason}, _, _) ->
275
:-(
{stop, Reason};
276
277 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
278
:-(
case gen_statem:check_response(Msg, Existing, true) of
279 {{reply, Reply}, Label, Updated} ->
280
:-(
{keep_state,
281 Data#{requests := Updated},
282 nei({response, #{label => Label, reply => Reply}})};
283
284 {{error, {Reason, ServerRef}}, Label, UpdatedRequests} ->
285
:-(
{stop,
286 #{reason => Reason,
287 server_ref => ServerRef,
288 label => Label},
289 Data#{requests := UpdatedRequests}}
290 end;
291
292 handle_event(Type, Content, State, Data) ->
293 10 pgmp_rep_log_ets_common:handle_event(Type, Content, State, Data).
294
295
296 terminate(_Reason,
297 _State,
298 #{config := #{scope := Scope, publication := Publication}}) ->
299 5 pg:leave(Scope, [?MODULE, Publication], self()).
Line Hits Source