_site/cover/pgec_storage.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgec_storage).
17
18
19 -export([available/1]).
20 -export([callback_mode/0]).
21 -export([delete/1]).
22 -export([handle_event/4]).
23 -export([init/1]).
24 -export([keys/1]).
25 -export([metadata/1]).
26 -export([position/1]).
27 -export([position_update/1]).
28 -export([read/1]).
29 -export([ready/1]).
30 -export([start_link/0]).
31 -export([table_map/1]).
32 -export([truncate/1]).
33 -export([write/1]).
34 -import(pgec_statem, [nei/1]).
35 -import(pgec_storage_common, [bucket/1]).
36 -import(pgec_storage_common, [key/2]).
37 -import(pgec_storage_common, [pt/1]).
38 -import(pgec_storage_common, [value/2]).
39 -import(pgmp_statem, [send_request/1]).
40 -include("pgec_storage.hrl").
41 -include_lib("kernel/include/logger.hrl").
42 -include_lib("leveled/include/leveled.hrl").
43
44
45 start_link() ->
46 8 gen_statem:start_link(
47 {local, ?MODULE},
48 ?MODULE,
49 [],
50 envy_gen:options(?MODULE)).
51
52
53 keys(Arg) ->
54 34 send_request(Arg, ?FUNCTION_NAME).
55
56
57 position(Arg) ->
58
:-(
send_request(Arg, ?FUNCTION_NAME).
59
60
61 position_update(Arg) ->
62 18 send_request(Arg, ?FUNCTION_NAME).
63
64
65 read(Arg) ->
66 97 send_request(Arg, ?FUNCTION_NAME).
67
68
69 delete(Arg) ->
70 4 send_request(Arg, ?FUNCTION_NAME).
71
72
73 write(Arg) ->
74 263 send_request(Arg, ?FUNCTION_NAME).
75
76
77 table_map(Arg) ->
78 8 send_request(Arg, ?FUNCTION_NAME).
79
80
81 truncate(Arg) ->
82 1 send_request(Arg, ?FUNCTION_NAME).
83
84
85 metadata(Arg) ->
86 72 send_request(Arg, ?FUNCTION_NAME).
87
88
89 ready(Arg) ->
90 8 send_request(Arg, ?FUNCTION_NAME).
91
92
93 available(Arg) ->
94 8 send_request(Arg, ?FUNCTION_NAME).
95
96
97 send_request(Arg, Action) ->
98 513 ?FUNCTION_NAME(Arg, Action, config(Action)).
99
100
101 send_request(Arg, Action, Config) ->
102 513 send_request(
103 maps:without(
104 args(Config),
105 maybe_label(
106 Arg#{request => {request, args(Action, Arg, Config)}}))).
107
108 config(keys) ->
109 34 [publication,
110 table,
111 {folder,
112 fun
113 (_Bucket, Key, A) ->
114 1441 [Key | A]
115 end},
116 {accumulator, []}];
117
118 config(position_update) ->
119 18 [publication, position];
120
121 config(Action) when Action == write;
122 Action == delete ->
123 267 [publication, table, row];
124
125 config(read) ->
126 97 [publication, table, key];
127
128 config(Action) when Action == metadata;
129 Action == truncate ->
130 73 [publication, table];
131
132 config(table_map) ->
133 8 [publication,
134 table,
135 schema,
136 keys,
137 columns,
138 oids];
139
140 config(Action) when Action == position;
141 Action == available;
142 Action == ready ->
143 16 [publication].
144
145
146 args(Config) ->
147 513 lists:map(
148 fun
149 ({Key, _}) ->
150 68 Key;
151
152 (Key) ->
153 1406 Key
154 end,
155 Config).
156
157
158 args(Action, Arg, Config) ->
159 513 lists:foldl(
160 fun
161 ({Parameter, Default}, A) ->
162 68 A#{Parameter => maps:get(Parameter, Arg, Default)};
163
164 (Parameter, A) ->
165 1406 case maps:find(Parameter, Arg) of
166 {ok, Value} ->
167 1406 A#{Parameter => Value};
168
169 error ->
170
:-(
error(arg_missing, [Parameter])
171 end
172 end,
173 #{action => Action},
174 Config).
175
176
177 maybe_label(#{requests := _, label := _} = Arg) ->
178 310 Arg;
179
180 maybe_label(#{requests := _} = Arg) ->
181
:-(
Arg#{label => ?MODULE};
182
183 maybe_label(Arg) ->
184 203 Arg.
185
186 callback_mode() ->
187 9 handle_event_function.
188
189
190 init([]) ->
191 8 process_flag(trap_exit, true),
192 8 {ok,
193 {ready, ordsets:new()},
194 #{requests => gen_server:reqids_new(),
195 cache => ets:new(cache, [{keypos, 2}]),
196 available => ordsets:new(),
197 mappings => ets:new(?MODULE, [protected])},
198 nei(leveled)}.
199
200
201 handle_event(internal, leveled, _, Data) ->
202 8 case pgec_sup:get_child(hd(get('$ancestors')), leveled) of
203 {_, PID, worker, _} when is_pid(PID) ->
204 8 {keep_state, Data#{storage => PID}};
205
206 {_, _, _, _} = Reason ->
207
:-(
{stop, Reason};
208
209 false ->
210
:-(
{stop, no_storage}
211 end;
212
213 handle_event({call, From},
214 {request,
215 #{action := position_update,
216 position := Position}},
217 _,
218 _) ->
219 18 {keep_state_and_data,
220 nei({put,
221 #{from => From,
222 bucket => <<"pgec">>,
223 key => position,
224 value => Position}})};
225
226 handle_event({call, From},
227 {request, #{action := position}},
228 _,
229 _) ->
230
:-(
{keep_state_and_data,
231 nei({get,
232 #{from => From,
233 bucket => <<"pgec">>,
234 key => position}})};
235
236 handle_event({call, From},
237 {request,
238 #{action := ready = Action,
239 publication := Publication}},
240 {ready, Ready},
241 Data) ->
242 8 ?LOG_INFO(#{Action => Publication}),
243 8 {next_state,
244 {ready, ordsets:add_element(Publication, Ready)},
245 Data,
246 {reply, From, ok}};
247
248 handle_event({call, From},
249 {request,
250 #{action := available = Action,
251 publication := Publication}},
252 {ready, _},
253 #{available := Available} = Data) ->
254 8 ?LOG_INFO(#{Action => Publication}),
255 8 {keep_state,
256 Data#{available := ordsets:add_element(Publication, Available)},
257 {reply, From, ok}};
258
259 handle_event({call, From},
260 {request,
261 #{action := read = Action,
262 publication := Publication,
263 key := Key} = Detail},
264 {ready, Ready},
265 #{available := Available}) ->
266
267 97 ?LOG_DEBUG(#{Action => Publication,
268 97 ready => Ready}),
269
270 97 case ordsets:is_element(Publication, Ready) of
271 true ->
272 97 {keep_state_and_data,
273 nei({cache_read,
274 #{bucket => bucket(Detail),
275 from => From,
276 key => Key}})};
277
278 false ->
279
:-(
case ordsets:is_element(Publication, Available) of
280 true ->
281
:-(
{keep_state_and_data, postpone};
282
283 false ->
284
:-(
{keep_state_and_data, {reply, From, not_found}}
285 end
286 end;
287
288 handle_event({call, From},
289 {request,
290 #{action := table_map} = Mapping},
291 _,
292 #{mappings := Mappings}) ->
293 8 NonValue = [action, publication, table],
294 8 Key = pt(Mapping),
295 8 Value = maps:without(NonValue, Mapping),
296 8 ets:insert(Mappings, {Key, Value}),
297 8 {keep_state_and_data,
298 nei({put,
299 #{from => From,
300 bucket => <<"pgec/mapping">>,
301 key => Key,
302 value => Value}})};
303
304 handle_event({call, From},
305 {request,
306 #{action := metadata = Action,
307 publication := Publication} = Metadata},
308 {ready, Ready},
309 #{available := Available}) ->
310
311 72 ?LOG_DEBUG(#{Action => Metadata, ready => Ready}),
312
313 72 case ordsets:is_element(Publication, Ready) of
314 true ->
315 65 {keep_state_and_data,
316 nei({cache_read,
317 #{from => From,
318 bucket => <<"pgec/mapping">>,
319 key => pt(Metadata)}})};
320
321 false ->
322 7 case ordsets:is_element(Publication, Available) of
323 true ->
324 %% we know about this publication, but it is not
325 %% ready yet.
326
:-(
{keep_state_and_data, postpone};
327
328 false ->
329 %% we do not know about this publication.
330 7 {keep_state_and_data, {reply, From, not_found}}
331 end
332 end;
333
334 handle_event({call, From},
335 {request,
336 #{action := truncate = Action} = Detail},
337 {ready, _} = State,
338 Data) ->
339 1 {next_state,
340 {Action, make_ref()},
341 Data#{previous => State},
342 [{push_callback_module, pgec_storage_truncate},
343 nei({truncate, #{bucket => bucket(Detail)}}),
344 {reply, From, ok}]};
345
346 handle_event({call, From},
347 {request,
348 #{action := write,
349 row := Row} = Detail},
350 {ready, _} = State,
351 #{mappings := Mappings} = Data) ->
352 263 case ets:lookup(Mappings, pt(Detail)) of
353 [] ->
354
:-(
{next_state,
355 unready,
356 Data#{previous => State},
357 [{push_callback_module, pgec_storage_backfill},
358 nei({missing, Detail}),
359 postpone]};
360
361 [{_, Mapping}] ->
362 263 BKV = #{bucket => bucket(Detail),
363 key => key(Row, Mapping),
364 value => value(Row, Mapping)},
365
366 263 {keep_state_and_data,
367 [nei({cache_write, BKV}),
368 nei({put, BKV#{from => From}})]}
369 end;
370
371 handle_event({call, From},
372 {request,
373 #{action := delete,
374 row := Row} = Detail},
375 {ready, _} = State,
376 #{mappings := Mappings} = Data) ->
377 4 case ets:lookup(Mappings, pt(Detail)) of
378 [] ->
379
:-(
{next_state,
380 unready,
381 Data#{previous => State},
382 [{push_callback_module, pgec_storage_backfill},
383 nei({missing, Detail}),
384 postpone]};
385
386 [{_, Mapping}] ->
387 4 BK = #{bucket => bucket(Detail),
388 key => key(Row, Mapping)},
389 4 {keep_state_and_data,
390 [nei({cache_delete, BK}),
391 nei({delete, BK#{from => From}})]}
392 end;
393
394 handle_event({call, From},
395 {request, #{action := keys} = Detail},
396 _,
397 _) ->
398 34 {keep_state_and_data,
399 nei({keys,
400 maps:merge(
401 #{from => From,
402 bucket => bucket(Detail)},
403 maps:with([folder, accumulator], Detail))})};
404
405 handle_event(EventType, EventContent, State, Data) ->
406 5246 pgec_storage_common:handle_event(EventType,
407 EventContent,
408 State,
409 Data).
Line Hits Source