_site/cover/pgmp_rep_log_ets_common.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_rep_log_ets_common).
17
18
19 -export([delete/6]).
20 -export([handle_event/4]).
21 -export([insert_new/6]).
22 -export([insert_or_update_tuple/2]).
23 -export([metadata/4]).
24 -export([new_table/4]).
25 -export([table_name/3]).
26 -export([truncate/3]).
27 -export([update/6]).
28 -import(pgmp_statem, [nei/1]).
29
30
31 new_table(Publication, Namespace, Name, Keys) ->
32 5 ets:new(
33 table_name(Publication, Namespace, Name),
34 [{keypos, keypos(Keys)}, protected, named_table]).
35
36
37 keypos([Primary]) ->
38 4 Primary;
39
40 keypos(L) when length(L) > 1 ->
41 1 1.
42
43
44 metadata({_Namespace, _Name} = Relation, Key, Value, Metadata) ->
45 86 case Metadata of
46 #{Relation := TMD} ->
47 81 Metadata#{Relation := TMD#{Key => Value}};
48
49 #{} ->
50 5 Metadata#{Relation => #{Key => Value}}
51 end.
52
53
54 insert_new(Scope,
55 Publication,
56 Schema,
57 Table,
58 Tuples,
59 KeyPositions) when is_list(Tuples) ->
60 58 true = ets:insert_new(
61 table_name(Publication, Schema, Table),
62 205 [insert_or_update_tuple(Tuple, KeyPositions) || Tuple <- Tuples]),
63 58 notify(Scope,
64 Publication,
65 Schema,
66 Table,
67 ?FUNCTION_NAME,
68 Tuples,
69 KeyPositions),
70 58 ok;
71
72 insert_new(Scope, Publication, Schema, Table, Tuple, Keys) when is_tuple(Tuple) ->
73 55 ?FUNCTION_NAME(Scope, Publication, Schema, Table, [Tuple], Keys).
74
75
76 update(Scope, Publication, Schema, Table, Tuples, KeyPositions) when is_list(Tuples) ->
77 4 ets:insert(
78 table_name(Publication, Schema, Table),
79 4 [insert_or_update_tuple(Tuple, KeyPositions) || Tuple <- Tuples]),
80 4 notify(Scope, Publication, Schema, Table, ?FUNCTION_NAME, Tuples, KeyPositions),
81 4 ok;
82
83 update(Scope, Publication, Schema, Table, Tuple, Keys) when is_tuple(Tuple) ->
84 4 ?FUNCTION_NAME(Scope, Publication, Schema, Table, [Tuple], Keys).
85
86
87 delete(Scope, Publication, Schema, Table, Tuple, KeyPositions) ->
88 4 ets:delete(
89 table_name(Publication, Schema, Table),
90 key(Tuple, KeyPositions)),
91 4 notify(Scope,
92 Publication,
93 Schema,
94 Table,
95 ?FUNCTION_NAME,
96 [Tuple],
97 KeyPositions),
98 4 ok.
99
100
101 notify(Scope, Publication, Namespace, Name, Action, Tuples, KeyPositions) ->
102 66 notify_members(
103 pg:get_members(
104 Scope,
105 #{m => pgmp_rep_log_ets,
106 publication => Publication,
107 name => Name}),
108 Publication,
109 Namespace,
110 Name,
111 Action,
112 Tuples,
113 KeyPositions).
114
115
116 notify_members([],
117 _Publication,
118 _Namespace,
119 _Name,
120 _Action,
121 _Tuples,
122 _KeyPositions) ->
123 66 ok;
124
125 notify_members(Members,
126 Publication,
127 Namespace,
128 Name,
129 Action,
130 Tuples,
131 KeyPositions) ->
132
:-(
ChangedKeys = key(Tuples, KeyPositions),
133
:-(
Relation = table_name(Publication, Namespace, Name),
134
135
:-(
lists:foreach(
136 fun
137 (Member) ->
138
:-(
Member ! {notify,
139 #{publication => Publication,
140 namespace => Namespace,
141 name => Name,
142 relation => Relation,
143 keys => ChangedKeys,
144 action => Action}}
145 end,
146 Members).
147
148
149 key(Tuples, KeyPositions) when is_list(Tuples) ->
150
:-(
[?FUNCTION_NAME(Tuple, KeyPositions) || Tuple <- Tuples];
151
152 key(Tuple, [Primary]) ->
153 3 element(Primary, Tuple);
154
155 key(Tuple, Composite) when length(Composite) > 1 ->
156 53 list_to_tuple([element(Position, Tuple) || Position <- Composite]).
157
158
159 truncate(Publication, Schema, Table) ->
160 2 ets:delete_all_objects(
161 table_name(Publication, Schema, Table)).
162
163
164 insert_or_update_tuple(Tuple, [_]) ->
165 157 Tuple;
166
167 insert_or_update_tuple(Tuple, Composite) ->
168 52 list_to_tuple(
169 [key(Tuple, Composite) |
170 lists:filtermap(
171 fun
172 ({Position, Value}) ->
173 156 case lists:member(Position, Composite) of
174 true ->
175 104 false;
176
177 false ->
178 52 {true, Value}
179 end
180 end,
181 lists:zip(
182 lists:seq(1, tuple_size(Tuple)),
183 tuple_to_list(Tuple)))]).
184
185
186 table_name(Publication, Schema, Table) ->
187 176 pgmp_util:snake_case(
188 lists:filtermap(
189 fun
190 ([]) ->
191 528 false;
192
193 ([Value]) ->
194
:-(
{true, Value};
195
196 (_) ->
197 176 true
198 end,
199 [pgmp_config:rep_log_ets(prefix_table_name),
200
:-(
[binary_to_list(Publication) || pgmp_config:enabled(
201 rep_log_ets_pub_in_table_name)],
202
:-(
[binary_to_list(Schema) || pgmp_config:enabled(
203 rep_log_ets_schema_in_table_name)],
204 binary_to_list(Table)])).
205
206 handle_event(internal,
207 {notify, Notification},
208 _,
209 #{config := #{scope := Scope,
210 publication := Publication}}) ->
211 5 {keep_state_and_data,
212 nei({notify,
213 pg:get_members(
214 Scope,
215 [pgmp_rep_log_ets,
216 Publication,
217 notifications]),
218 Notification})};
219
220 handle_event(internal, {notify, [], _}, _, _) ->
221 5 keep_state_and_data;
222
223 handle_event(internal,
224 {notify, [Recipient | Recipients], Arg},
225 _,
226 #{requests := Requests} = Data) ->
227
:-(
{keep_state,
228 Data#{requests := gen_statem:send_request(
229 Recipient,
230 {notify, Arg},
231 #{notify => Recipient},
232 Requests)},
233 nei({notify, Recipients, Arg})}.
Line Hits Source