_site/cover/pgec_storage_common.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgec_storage_common).
17
18
19 -export([bucket/1]).
20 -export([callback_mode/0]).
21 -export([handle_event/4]).
22 -export([key/2]).
23 -export([pt/1]).
24 -export([row/3]).
25 -export([value/2]).
26 -import(pgec_statem, [nei/1]).
27 -include("pgec_storage.hrl").
28 -include_lib("kernel/include/logger.hrl").
29 -include_lib("leveled/include/leveled.hrl").
30
31
32 callback_mode() ->
33
:-(
handle_event_function.
34
35
36 handle_event(internal,
37 {put = Action,
38 #{bucket := Bucket,
39 key := Key,
40 value := Value} = Detail},
41 _,
42 _) ->
43 289 {keep_state_and_data,
44 [nei({storage,
45 #{request => {put,
46 Bucket,
47 term_to_binary(Key),
48 Value,
49 [],
50 ?STD_TAG,
51 infinity,
52 false},
53 label => storage_label(Action, Detail)}}),
54 nei({telemetry,
55 Action,
56 #{count => 1},
57 #{bucket => Bucket}})]};
58
59 handle_event(internal,
60 {delete = Action,
61 #{bucket := Bucket,
62 key := Key} = Detail},
63 _,
64 _) ->
65 54 {keep_state_and_data,
66 [nei({cache_delete, Detail}),
67 nei({storage,
68 #{request => {put,
69 Bucket,
70 term_to_binary(Key),
71 delete,
72 [],
73 ?STD_TAG,
74 infinity,
75 false},
76 label => storage_label(Action, Detail)}}),
77 nei({telemetry,
78 Action,
79 #{count => 1},
80 #{bucket => Bucket}})]};
81
82 handle_event(internal,
83 {keys = Action,
84 #{bucket := Bucket,
85 folder := Folder,
86 accumulator := Accumulator} = Detail},
87 _,
88 _) ->
89 35 {keep_state_and_data,
90 [nei({storage,
91 #{request => {return_runner,
92 {keylist,
93 ?STD_TAG,
94 Bucket,
95 {fun
96 (FoldBucket, Key, A) ->
97 1491 Folder(
98 FoldBucket,
99 binary_to_term(Key),
100 A)
101 end,
102 Accumulator}}},
103 label => storage_label(Action, Detail)}}),
104 nei({telemetry,
105 Action,
106 #{count => 1},
107 #{bucket => Bucket}})]};
108
109 handle_event(internal,
110 {get = Action,
111 #{bucket := Bucket,
112 key := Key} = Detail},
113 _,
114 _) ->
115 41 {keep_state_and_data,
116 [nei({storage,
117 #{request => {get, Bucket, term_to_binary(Key), ?STD_TAG},
118 label => storage_label(Action, Detail)}}),
119 nei({telemetry,
120 Action,
121 #{count => 1},
122 #{bucket => Bucket}})]};
123
124 handle_event(internal,
125 {storage, #{request := Request, label := Label}},
126 _,
127 #{requests := Requests, storage := Storage} = Data) ->
128 419 {keep_state,
129 Data#{requests := gen_server:send_request(
130 Storage,
131 Request,
132 Label,
133 Requests)}};
134
135 handle_event(info, {'EXIT', _, _}, _, _) ->
136
:-(
stop;
137
138 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
139 419 case gen_server:check_response(Msg, Existing, true) of
140 {{reply, Reply}, Label, Updated} ->
141 419 {keep_state,
142 Data#{requests := Updated},
143 [nei({telemetry,
144 reqids_size,
145 #{value => gen_server:reqids_size(Updated)}}),
146 nei({response, #{label => Label, reply => Reply}})]};
147
148 {{error, {Reason, _}}, _, UpdatedRequests} ->
149
:-(
{stop, Reason, Data#{requests := UpdatedRequests}}
150 end;
151
152 handle_event(internal,
153 {cache_read,
154 #{bucket := Bucket,
155 from := From,
156 key := Key}},
157 _,
158 #{cache := Cache}) ->
159 162 case ets:lookup(Cache, {Bucket, Key}) of
160 [] ->
161 41 {keep_state_and_data,
162 [nei({telemetry,
163 cache,
164 #{count => 1},
165 #{action => miss, bucket => Bucket}}),
166 nei({get, #{from => From, bucket => Bucket, key => Key}})]};
167
168 [#entry{value = Value}] ->
169 121 {keep_state_and_data,
170 [{reply, From, {ok, Value}},
171 nei({telemetry,
172 cache,
173 #{count => 1},
174 #{action => hit, bucket => Bucket}}),
175 {{timeout, {Bucket, Key}},
176 pgec_config:timeout(expiry),
177 expired}]}
178 end;
179
180 handle_event(internal,
181 {cache_write,
182 #{bucket := Bucket,
183 key := Key,
184 value := Value}},
185 _,
186 #{cache := Cache}) ->
187 271 case ets:update_element(
188 Cache,
189 {Bucket, Key},
190 {#entry.value, Value}) of
191
192 true ->
193 5 {keep_state_and_data,
194 [nei({telemetry,
195 cache,
196 #{count => 1},
197 #{action => update,
198 bucket => Bucket}}),
199 {{timeout, {Bucket, Key}},
200 pgec_config:timeout(expiry),
201 expired}]};
202
203 false ->
204 266 true = ets:insert_new(
205 Cache,
206 #entry{key = {Bucket, Key}, value = Value}),
207
208 266 {keep_state_and_data,
209 [nei({telemetry,
210 cache,
211 #{count => 1},
212 #{action => insert,
213 bucket => Bucket}}),
214 {{timeout, {Bucket, Key}},
215 pgec_config:timeout(expiry),
216 expired}]}
217 end;
218
219 handle_event(internal,
220 {cache_delete,
221 #{bucket := Bucket,
222 key := Key}},
223 _,
224 #{cache := Cache}) ->
225 58 ets:delete(Cache, {Bucket, Key}),
226 58 {keep_state_and_data,
227 [nei({telemetry,
228 cache,
229 #{count => 1},
230 #{action => delete,
231 bucket => Bucket}}),
232 {{timeout, {Bucket, Key}}, infinity, cancelled}]};
233
234 handle_event({timeout, {Bucket, _} = Key}, expired, _, #{cache := Cache}) ->
235
:-(
ets:delete(Cache, Key),
236
:-(
{keep_state_and_data,
237 nei({telemetry,
238 cache,
239 #{count => 1},
240 #{action => expired,
241 bucket => Bucket}})};
242
243 handle_event(
244 internal,
245 {response,
246 #{reply := {ok, Value} = Reply,
247 label := #{action := get = Action,
248 bucket := Bucket,
249 key := Key,
250 from := From}}},
251 _,
252 _) ->
253 8 {keep_state_and_data,
254 [{reply, From, Reply},
255 nei({telemetry,
256 Action,
257 #{count => 1},
258 #{bucket => Bucket}}),
259 nei({cache_write,
260 #{bucket => Bucket,
261 key => Key,
262 value => Value}})]};
263
264 handle_event(internal,
265 {response,
266 #{reply := {async, F},
267 label := #{from := From}} = Response},
268 State,
269 Data) ->
270 34 ?LOG_DEBUG(#{response => Response,
271 state => State,
272 34 data => Data}),
273 34 {keep_state_and_data, {reply, From, F()}};
274
275 handle_event(internal,
276 {response,
277 #{reply := Reply,
278 label := #{from := From}} = Response},
279 State,
280 Data) ->
281 326 ?LOG_DEBUG(#{response => Response,
282 state => State,
283 326 data => Data}),
284 326 {keep_state_and_data, {reply, From, Reply}};
285
286 handle_event(internal,
287 {response, Response},
288 State,
289 Data) ->
290 50 ?LOG_DEBUG(#{response => Response,
291 state => State,
292 50 data => Data}),
293 50 keep_state_and_data;
294
295 handle_event(internal,
296 {telemetry, EventName, Measurements},
297 _,
298 _) ->
299 419 {keep_state_and_data,
300 nei({telemetry, EventName, Measurements, #{}})};
301
302 handle_event(internal,
303 {telemetry, EventName, Measurements, Metadata},
304 _,
305 _) when is_atom(EventName) ->
306 1337 {keep_state_and_data,
307 nei({telemetry, [EventName], Measurements, Metadata})};
308
309 handle_event(internal,
310 {telemetry, EventName, Measurements, Metadata},
311 _,
312 Data) ->
313 1337 ok = telemetry:execute([pgec, storage] ++ EventName,
314 Measurements,
315 maps:merge(
316 maps:with([operator, client_flags], Data),
317 Metadata)),
318 1337 keep_state_and_data.
319
320
321 storage_label(Action, Detail) ->
322 419 Detail#{action => Action}.
323
324
325 pt(#{publication := Publication, table := Table}) ->
326 341 {Publication, Table}.
327
328
329 bucket(#{publication := Publication, table := Table}) ->
330 400 <<"pgec/", Publication/bytes, "/", Table/bytes>>.
331
332
333 value(Tuple, #{keys := Keys}) ->
334 274 pick(Tuple, lists:seq(1, tuple_size(Tuple)) -- Keys).
335
336 key(Tuple, #{keys := Keys}) ->
337 276 pick(Tuple, Keys).
338
339
340 pick(Tuple, [Position]) ->
341 379 element(Position, Tuple);
342
343 pick(Tuple, Positions) ->
344 171 list_to_tuple(positions(Tuple, Positions)).
345
346
347 row(K, V, #{keys := [1]} = Metadata)
348 when is_tuple(V) ->
349 30 ?LOG_DEBUG(#{k => K, v => V, metadata => Metadata}),
350 30 list_to_tuple([K | tuple_to_list(V)]);
351
352 row(K, V, #{keys := [1]} = Metadata) ->
353 5 ?LOG_DEBUG(#{k => K, v => V, metadata => Metadata}),
354 5 list_to_tuple([K, V]);
355
356 row(K, V, #{keys := [L]} = Metadata)
357 when is_tuple(V),
358 tuple_size(V) == L - 1 ->
359 2 ?LOG_DEBUG(#{k => K, v => V, metadata => Metadata}),
360 2 list_to_tuple(tuple_to_list(V) ++ [K]);
361
362 row(K, V, #{keys := [N]} = Metadata)
363 when is_tuple(V),
364 N =< tuple_size(V) ->
365 3 ?LOG_DEBUG(#{k => K, v => V, metadata => Metadata}),
366 3 list_to_tuple(
367 positions(V, lists:seq(1, N - 1)) ++
368 [K | positions(V, lists:seq(N, tuple_size(V)))]);
369
370 row(K, V, #{keys := [2]} = Metadata) ->
371 1 ?LOG_DEBUG(#{k => K, v => V, metadata => Metadata}),
372 1 list_to_tuple([V, K]);
373
374 row(K, V, #{keys := Keys} = Metadata)
375 when is_tuple(K),
376 is_tuple(V) ->
377 1 ?LOG_DEBUG(#{k => K, v => V, metadata => Metadata}),
378 1 list_to_tuple(
379 integrate(
380 tuple_to_list(K),
381 tuple_to_list(V),
382 Keys,
383 1,
384 []));
385
386 row(K, V, #{keys := Keys} = Metadata)
387 when is_tuple(K) ->
388 1 ?LOG_DEBUG(#{k => K, v => V, metadata => Metadata}),
389 1 list_to_tuple(
390 integrate(
391 tuple_to_list(K),
392 [V],
393 Keys,
394 1,
395 [])).
396
397
398 integrate([], [], [], _, A) ->
399
:-(
lists:reverse(A);
400
401 integrate([], Columns, _, _, A) ->
402 2 lists:reverse(A) ++ Columns;
403
404 integrate(Keys,
405 [Column | Columns],
406 [Position | _] = Positions,
407 N,
408 A)
409 when N < Position ->
410
:-(
?FUNCTION_NAME(Keys,
411 Columns,
412 Positions,
413 N + 1,
414 [Column | A]);
415
416 integrate([Key | Keys],
417 Columns,
418 [_ | Positions],
419 N,
420 A) ->
421 4 ?FUNCTION_NAME(
422 Keys,
423 Columns,
424 Positions,
425 N + 1,
426 [Key | A]).
427
428
429 positions(Tuple, Positions) ->
430 177 [element(Position, Tuple) || Position <- Positions].
Line Hits Source