_site/cover/pgec_replica_common.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgec_replica_common).
17
18
19 -export([handle_event/4]).
20 -export([metadata/4]).
21 -export([table_name/3]).
22 -import(pgec_statem, [nei/1]).
23 -include_lib("kernel/include/logger.hrl").
24
25
26 handle_event(internal,
27 {notify, Notification},
28 _,
29 #{config := #{scope := Scope,
30 publication := Publication}}) ->
31 8 {keep_state_and_data,
32 nei({notify,
33 pg:get_members(
34 Scope,
35 [pgec_replica,
36 Publication,
37 notifications]),
38 Notification})};
39
40 handle_event(internal, {notify, [], _}, _, _) ->
41 8 keep_state_and_data;
42
43 handle_event(internal,
44 {notify, [Recipient | Recipients], Arg},
45 _,
46 #{requests := Requests} = Data) ->
47
:-(
{keep_state,
48 Data#{requests := gen_statem:send_request(
49 Recipient,
50 {notify, Arg},
51 #{notify => Recipient},
52 Requests)},
53 nei({notify, Recipients, Arg})};
54
55 handle_event(internal,
56 {change,
57 #{namespace := Namespace,
58 action := Action,
59 row := Row,
60 name := Name}},
61 _,
62 #{metadata := Metadata,
63 config := #{scope := Scope,
64 publication := Publication}})
65 when is_map_key({Namespace, Name}, Metadata) ->
66 17 case pg:get_members(
67 Scope,
68 #{m => pgec_replica,
69 publication => Publication,
70 name => Name}) of
71
72 [] ->
73 17 keep_state_and_data;
74
75 Members ->
76
:-(
#{{Namespace, Name} := #{keys := Keys}} = Metadata,
77
78
:-(
ChangedKeys = key(Row, Keys),
79
:-(
Relation = table_name(Publication, Namespace, Name),
80
81
:-(
lists:foreach(
82 fun
83 (Member) ->
84
:-(
Member ! {notify,
85 #{publication => Publication,
86 namespace => Namespace,
87 name => Name,
88 relation => Relation,
89 keys => ChangedKeys,
90 action => Action}}
91 end,
92 Members),
93
:-(
keep_state_and_data
94 end;
95
96 handle_event(
97 internal,
98 {storage_request, #{action := Action} = Arg},
99 _,
100 #{storage := Storage, requests := Requests} = Data) ->
101 310 ?LOG_DEBUG(#{arg => Arg}),
102 310 {keep_state,
103 Data#{requests := pgec_storage:Action(
104 maps:merge(
105 #{server_ref => Storage,
106 label => Arg,
107 requests => Requests},
108 maps:without(
109 [action], Arg)))}};
110
111 handle_event(internal, storage, _, #{monitors := Monitors} = Data) ->
112 8 case pgec_sup:get_child(pgec_storage_sup, storage) of
113 {_, Storage, worker, _} when is_pid(Storage) ->
114 8 {keep_state,
115 Data#{storage => Storage,
116 monitors := Monitors#{Storage => erlang:monitor(
117 process,
118 Storage)}},
119 nei(available)};
120
121 {_, _, _, _} = Reason ->
122
:-(
{stop, Reason};
123
124 false ->
125
:-(
{stop, no_storage}
126 end;
127
128 handle_event(internal,
129 available,
130 _,
131 #{config := #{publication := Publication}}) ->
132 %% inform storage that this publication is available, but not
133 %% ready yet.
134 8 {keep_state_and_data,
135 nei({storage_request,
136 #{action => available,
137 publication => Publication}})};
138
139 handle_event(internal,
140 {response,
141 #{label := #{from := From},
142 reply := Reply}},
143 _,
144 _) ->
145
:-(
{keep_state_and_data, {reply, From, Reply}};
146
147 handle_event(internal,
148 {response,
149 #{label := #{action := Action},
150 reply := Reply}},
151 _,
152 _) when Action == table_map;
153 Action == delete;
154 Action == truncate;
155 Action == ready;
156 Action == available;
157 Action == position_update;
158 Action == write ->
159 310 {keep_state_and_data,
160 nei({telemetry,
161 storage,
162 #{count => 1},
163 #{action => Action, reply => Reply}})};
164
165 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
166 301 case gen_server:check_response(Msg, Existing, true) of
167 {{reply, Reply}, Label, Updated} ->
168 301 {keep_state,
169 Data#{requests := Updated},
170 [nei({telemetry,
171 reqids_size,
172 #{value => gen_server:reqids_size(Updated)}}),
173 nei({response, #{label => Label, reply => Reply}})]};
174
175 {{error, {Reason, _}}, _, UpdatedRequests} ->
176
:-(
{stop, Reason, Data#{requests := UpdatedRequests}}
177 end;
178
179 handle_event(internal,
180 {telemetry, EventName, Measurements},
181 _,
182 _) ->
183 301 {keep_state_and_data,
184 nei({telemetry, EventName, Measurements, #{}})};
185
186 handle_event(internal,
187 {telemetry, EventName, Measurements, Metadata},
188 _,
189 _) when is_atom(EventName) ->
190 611 {keep_state_and_data,
191 nei({telemetry, [EventName], Measurements, Metadata})};
192
193 handle_event(internal,
194 {telemetry, EventName, Measurements, Metadata},
195 _,
196 _) ->
197 611 ok = telemetry:execute([pgec, replica] ++ EventName,
198 Measurements,
199 Metadata),
200 611 keep_state_and_data.
201
202
203 metadata({_Namespace, _Name} = Relation, Key, Value, Metadata) ->
204 29 case Metadata of
205 #{Relation := TMD} ->
206 21 Metadata#{Relation := TMD#{Key => Value}};
207
208 #{} ->
209 8 Metadata#{Relation => #{Key => Value}}
210 end.
211
212 key(Tuple, [Primary]) ->
213
:-(
[element(Primary, Tuple)];
214
215 key(Tuple, Composite) when length(Composite) > 1 ->
216
:-(
list_to_tuple([element(Position, Tuple) || Position <- Composite]).
217
218
219 table_name(Publication, Schema, Table) ->
220
:-(
pgmp_util:snake_case(
221 lists:filtermap(
222 fun
223 ([]) ->
224
:-(
false;
225
226 ([Value]) ->
227
:-(
{true, Value};
228
229 (_) ->
230
:-(
true
231 end,
232 [pgmp_config:rep_log_ets(prefix_table_name),
233
:-(
[binary_to_list(Publication) || pgmp_config:enabled(
234 rep_log_ets_pub_in_table_name)],
235
:-(
[binary_to_list(Schema) || pgmp_config:enabled(
236 rep_log_ets_schema_in_table_name)],
237 binary_to_list(Table)])).
Line Hits Source