/home/runner/work/mcd/mcd/_site/ct/ct_run.ct_mcd@fv-az773-648.2023-11-24_16.44.07/mcd_emulator_meta.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(mcd_emulator_meta).
17
18
19 -export([recv/1]).
20 -include_lib("kernel/include/logger.hrl").
21 -include_lib("stdlib/include/ms_transform.hrl").
22 -include("mcd_emulator.hrl").
23
24 recv(#{message := #{meta := no_op}}) ->
25 2 {continue, {encode, #{meta => no_op_reply}}};
26
27 recv(#{message := #{meta := debug,
28 key := Key},
29 data := #{table := Table}} = Arg) ->
30
:-(
?LOG_DEBUG(#{arg => Arg}),
31
:-(
case ets:lookup(Table, Key) of
32 [#entry{cas = Expected}] ->
33
:-(
{continue,
34 {encode,
35 #{meta => debug_reply,
36 key => Key,
37 internal => [{"cas", integer_to_list(Expected)}]}}};
38
39 [] ->
40
:-(
{continue, {encode, #{meta => miss}}}
41 end;
42
43 recv(#{message := #{meta := delete, key := Key},
44 flags := #{invalidate := _},
45 data := #{table := Table}} = Arg) ->
46
:-(
case ets:select_replace(
47 Table,
48 ets:fun2ms(
49 fun
50 (#entry{key = Candidate, cas = CAS} = Entry) when Key == Candidate ->
51 Entry#entry{cas = CAS + 1, stale = true}
52 end)) of
53
54 1 ->
55
:-(
case ets:lookup(Table, Key) of
56 [#entry{} = Entry] ->
57
:-(
{continue, encode(head, Entry, Arg)};
58 [] ->
59
:-(
{continue, {encode, #{meta => not_found}}}
60 end;
61
62 0 ->
63
:-(
{continue, {encode, #{meta => not_found}}}
64 end;
65
66 recv(#{message := #{meta := delete, key := Key},
67 flags := #{cas_expected := Expected},
68 data := #{table := Table}} = Arg) ->
69 2 ?LOG_DEBUG(#{arg => Arg}),
70 2 case ets:lookup(Table, Key) of
71 [#entry{cas = Expected}] ->
72 1 case ets:select_delete(
73 Table,
74 ets:fun2ms(
75 fun
76 (#entry{key = Candidate, cas = Actual}) ->
77 Candidate == Key andalso Expected == Actual
78 end)) of
79
80 1 ->
81 1 {continue, {encode, #{meta => head}}};
82
83 0 ->
84
:-(
{continue, {encode, #{meta => exists}}}
85 end;
86
87 [#entry{}] ->
88 1 {continue, {encode, #{meta => exists}}};
89
90 [] ->
91
:-(
{continue, {encode, #{meta => not_found}}}
92 end;
93
94 recv(#{message := #{meta := arithmetic, key := Key} = Message,
95 flags := #{cas_expected := ExistingCAS},
96 data := #{table := Table}} = Arg) ->
97 2 case ets:lookup(Table, Key) of
98 [#entry{data = ExistingValue, cas = ExistingCAS} = Entry] ->
99 1 try
100 1 NewValue = update_op(ExistingValue, Arg),
101
102 1 case ets:select_replace(
103 Table,
104 ets:fun2ms(
105 fun
106 (#entry{key = FoundKey,
107 cas = FoundCAS,
108 data = FoundData} = Found)
109 when FoundKey == Key,
110 FoundCAS == ExistingCAS,
111 FoundData == ExistingValue ->
112 Found#entry{data = NewValue, cas = FoundCAS + 1}
113 end)) of
114
115 1 ->
116 1 ?FUNCTION_NAME(Arg#{message := Message#{meta := get}});
117
118 0 ->
119
:-(
{continue, encode(exists, Entry#entry{cas = 0}, Arg)}
120 end
121 catch
122 error:badarg ->
123
:-(
{continue,
124 {encode,
125 #{command => client_error,
126 reason => "cannot increment or decrement "
127 "non-numeric value"}}}
128 end;
129
130 [#entry{} = Entry] ->
131 1 {continue, encode(exists, Entry#entry{cas = 0}, Arg)};
132
133 [] ->
134
:-(
{continue, encode(not_found, #entry{}, Arg)}
135 end;
136
137 recv(#{message := #{meta := arithmetic, key := Key} = Message,
138 flags := Flags,
139 data := #{table := Table}} = Arg) ->
140 16 try ets:update_counter(Table, Key, [delta(Arg)]) of
141 [_] when is_map_key(noreply, Flags) ->
142 1 continue;
143
144 [_] ->
145 7 ?FUNCTION_NAME(Arg#{message := Message#{meta := get}})
146
147 catch
148 error:badarg ->
149 8 case ets:lookup(Table, Key) of
150 [#entry{data = ExistingText}] when is_binary(ExistingText) ->
151 3 try
152 3 ExistingTextAsInteger = binary_to_integer(ExistingText),
153
154 2 ets:select_replace(
155 Table,
156 ets:fun2ms(
157 fun
158 (#entry{key = FoundKey,
159 data = FoundData} = Entry)
160 when FoundKey =:= Key,
161 FoundData == ExistingText ->
162 Entry#entry{data = ExistingTextAsInteger}
163 end)),
164
165 2 ?FUNCTION_NAME(Arg)
166
167 catch
168 error:badarg ->
169 1 {continue,
170 {encode,
171 #{command => client_error,
172 reason => "cannot increment or decrement "
173 "non-numeric value"}}}
174 end;
175
176 [] when is_map_key(vivify, Flags) ->
177 3 case ets:insert_new(Table, entry_from(Arg)) of
178 true when is_map_key(noreply, Flags) ->
179
:-(
continue;
180
181 true ->
182 3 ?FUNCTION_NAME(
183 Arg#{message := Message#{meta := get}});
184
185 false ->
186
:-(
?FUNCTION_NAME(Arg)
187 end;
188
189 [] when is_map_key(noreply, Flags) ->
190
:-(
continue;
191
192 [] ->
193 2 {continue, {encode, #{meta => not_found}}}
194 end
195 end;
196
197
198 recv(#{message := #{meta := set,
199 data := _Data,
200 key := Key},
201 flags := #{mode := add},
202 data := #{table := Table}} = Arg) ->
203 1 ?LOG_DEBUG(#{arg => Arg}),
204
205 1 Entry = entry_from(Arg),
206
207 1 case ets:insert_new(Table, entry_from(Arg)) of
208 true ->
209
:-(
{continue, encode(head, Entry, Arg)};
210
211 false ->
212 1 lru_bump_entry(Arg),
213 1 case ets:lookup(Table, Key) of
214 [#entry{} = Existing] ->
215 1 {continue, encode(not_stored, Existing, Arg)};
216
217 [] ->
218
:-(
?FUNCTION_NAME(Arg)
219 end
220 end;
221
222 recv(#{message := #{meta := set,
223 data := Data,
224 key := Key},
225 flags := #{mode := Mode},
226 data := #{table := Table}} = Arg) when Mode == append;
227 Mode == prepend;
228 Mode == replace ->
229
230 4 case ets:select_replace(
231 Table,
232 ets:fun2ms(
233 fun
234 (#entry{key = Candidate,
235 cas = CAS,
236 data = D0} = Existing)
237 when Candidate == Key, Mode == append ->
238 Existing#entry{cas = CAS + 1, data = [D0, Data]};
239
240 (#entry{key = Candidate,
241 cas = CAS,
242 data = D0} = Existing)
243 when Candidate == Key, Mode == prepend ->
244 Existing#entry{cas = CAS + 1, data = [Data, D0]};
245
246 (#entry{key = Candidate,
247 cas = CAS,
248 data = D0} = Existing)
249 when Candidate == Key, Mode == replace ->
250 Existing#entry{cas = CAS + 1, data = Data}
251 end)) of
252 1 ->
253 3 case ets:lookup(Table, Key) of
254 [#entry{} = Entry] ->
255 3 {continue, encode(Entry, Arg)};
256
257 [] when Mode == replace ->
258
:-(
{continue, {encode, #{meta => not_stored}}};
259
260 [] ->
261
:-(
{continue, {encode, #{meta => not_found}}}
262 end;
263
264 0 when Mode == replace ->
265 1 {continue, {encode, #{meta => not_stored}}};
266
267 0 ->
268
:-(
{continue, {encode, #{meta => not_found}}}
269 end;
270
271 recv(#{message := #{meta := set,
272 data := Data,
273 key := Key},
274 flags := #{cas_expected := Expected,
275 mode := set},
276 data := #{table := Table}} = Arg) ->
277 2 ?LOG_DEBUG(#{arg => Arg}),
278
279 2 case ets:lookup(Table, Key) of
280 [#entry{cas = Expected} = Entry] ->
281 1 case ets:select_replace(
282 Table,
283 ets:fun2ms(
284 fun
285 (#entry{key = Candidate, cas = Actual} = Existing)
286 when Candidate == Key, Expected == Actual ->
287 Existing#entry{cas = Actual + 1, data = Data}
288 end)) of
289
290 1 ->
291 1 {continue, encode(head, Entry#entry{cas = Expected + 1}, Arg)};
292
293 0 ->
294
:-(
{continue, encode(exists, Entry#entry{cas = 0}, Arg)}
295 end;
296
297 [#entry{} = Entry] ->
298 1 {continue, encode(exists, Entry#entry{cas = 0}, Arg)};
299
300 [] ->
301
:-(
{continue, encode(not_found, #entry{}, Arg)}
302 end;
303
304 recv(#{message := #{meta := set},
305 flags := #{mode := set},
306 data := #{table := Table}} = Arg) ->
307 11 ?LOG_DEBUG(#{arg => Arg}),
308 11 ets:insert(Table, entry_from(Arg)),
309 11 {continue, {encode, #{meta => head}}};
310
311
312 recv(#{message := #{meta := get, key := Key},
313 flags := Flags,
314 data := #{table := Table}} = Arg)
315 when is_map_key(value, Flags);
316 is_map_key(ttl, Flags);
317 is_map_key(cas, Flags) ->
318 20 ?LOG_DEBUG(#{arg => Arg}),
319
320 20 case update_entry(Arg) of
321 true ->
322 20 case ets:lookup(Table, Key) of
323 [#entry{} = Entry] ->
324 20 _ = update_entry(Arg),
325 20 {continue, encode(Entry, Arg)};
326
327 [] ->
328
:-(
{continue, {encode, #{meta => miss}}}
329 end;
330
331 false when is_map_key(vivify, Flags) ->
332
:-(
Entry = entry_from(Arg),
333
:-(
case ets:insert_new(Table, Entry) of
334 true ->
335
:-(
{continue, encode(Entry, Arg)};
336
337 false ->
338
:-(
?FUNCTION_NAME(Arg)
339 end;
340
341 false ->
342
:-(
{continue, {encode, #{meta => miss}}}
343 end;
344
345 recv(#{message := #{meta := get},
346 flags := _} = Arg) ->
347 2 ?LOG_DEBUG(#{arg => Arg}),
348 2 {continue,
349 {encode,
350 #{meta => case update_entry(Arg) of
351 true ->
352 2 head;
353
354 false ->
355
:-(
miss
356 end}}};
357
358 recv(#{message := #{meta := Meta, key := Key, flags := Flags} = Message} = Arg)
359 when not(is_map_key(flags, Arg)) ->
360 48 ?LOG_DEBUG(#{arg => Arg}),
361
362 48 case lists:member(base64, Flags) of
363 true ->
364
:-(
?FUNCTION_NAME(
365 Arg#{message := Message#{key := base64:decode(Key)},
366 flags => mcd_meta_flags:as_map(Meta, Flags)});
367
368 false ->
369 48 ?FUNCTION_NAME(Arg#{flags => mcd_meta_flags:as_map(Meta, Flags)})
370 end.
371
372
373 encode(Entry, #{flags := #{value := true}} = Arg) ->
374 18 ?FUNCTION_NAME(value, Entry, Arg);
375 encode(Entry, Arg) ->
376 5 ?FUNCTION_NAME(head, Entry, Arg).
377
378 encode(Meta, #entry{data = Data} = Entry, #{flags := #{value := true}} = Arg) ->
379 19 ?FUNCTION_NAME(Meta, Entry, Arg, #{data => Data});
380 encode(Meta, Entry, Arg) ->
381 8 ?FUNCTION_NAME(Meta, Entry, Arg, #{}).
382
383 encode(Meta, Entry, Arg, Encoded) ->
384 27 ?LOG_DEBUG(#{meta => Meta,
385 entry => Entry,
386 arg => Arg,
387 27 encoded => Encoded}),
388 27 [{encode,
389 maps:merge(
390 Encoded,
391 #{meta => Meta, flags => reply_flags(Entry, Arg)})} |
392 expire(Meta, Entry, Arg)].
393
394
395 expire(head, _Entry, #{key := Key, flags := #{set_ttl := TTL}}) ->
396
:-(
[{expire, #{key => Key, seconds => TTL}}];
397
398 expire(value, _Entry, #{key := Key, flags := #{set_ttl := TTL}}) ->
399
:-(
[{expire, #{key => Key, seconds => TTL}}];
400
401 expire(_, _, _) ->
402 27 [].
403
404
405 update_entry(#{message := #{key := Key, flags := Flags},
406 data := #{table := Table}}) ->
407 42 ets:update_element(
408 Table,
409 Key,
410 lists:foldl(
411 fun
412 ({set_ttl, TTL}, A) ->
413 2 [{#entry.expiry, TTL},
414 {#entry.touched, erlang:monotonic_time()} | A];
415
416 (_, A) ->
417 117 A
418 end,
419 [{#entry.accessed, erlang:monotonic_time()}],
420 Flags)).
421
422 lru_bump_entry(#{message := #{key := Key}, data := #{table := Table}}) ->
423 1 ets:update_element(
424 Table,
425 Key,
426 [{#entry.accessed, erlang:monotonic_time()},
427 {#entry.touched, erlang:monotonic_time()}]).
428
429
430 entry_from(#{message := #{meta := arithmetic} = Message,
431 flags := #{initial := Initial}} = Arg)
432 when not(is_map_key(data, Message)) ->
433 3 ?FUNCTION_NAME(Arg#{message := Message#{data => Initial}});
434
435 entry_from(#{message := #{key := Key, flags := Flags} = Message}) ->
436 16 lists:foldl(
437 fun
438 ({K, TTL}, A) when K == set_ttl; K == vivify ->
439 10 A#entry{expiry = TTL};
440
441 (_, A) ->
442 8 A
443 end,
444 case maps:find(data, Message) of
445 {ok, Data} ->
446 16 #entry{key = Key, data = Data};
447 error ->
448
:-(
#entry{key = Key}
449 end,
450 Flags).
451
452
453 reply_flags(#entry{key = Key,
454 data = Data,
455 expiry = TTL,
456 flags = Flags,
457 accessed = Accessed,
458 touched = Touched,
459 cas = CAS} = Entry,
460 #{message := #{flags := MetaFlags}}) ->
461 27 ?LOG_DEBUG(#{entry => Entry, flags => Flags}),
462 27 lists:foldr(
463 fun
464 (key, A) when Key /= undefined ->
465
:-(
[{key,
466 case lists:member(base64, MetaFlags) of
467 true ->
468
:-(
base64:encode(Key);
469
470 false ->
471
:-(
Key
472 end} | A];
473
474 (cas, A) when CAS /= undefined ->
475 6 [{cas, CAS} | A];
476
477 (size, A) when is_integer(Data) ->
478 1 [{size, iolist_size(integer_to_binary(Data))} | A];
479
480 (size, A) when Data /= undefined ->
481 8 [{size, iolist_size(Data)} | A];
482
483 (flags, A) when Flags /= undefined ->
484
:-(
[{flags, Flags} | A];
485
486 (accessed, A) ->
487
:-(
[{accessed,
488 mcd_time:monotonic_to_system(
489 Accessed,
490 native,
491 second)} | A];
492
493 (ttl, A) when TTL /= undefined ->
494 7 [{ttl,
495 max(0,
496 TTL - erlang:convert_time_unit(
497 erlang:monotonic_time() - Touched,
498 native,
499 second))} | A];
500
501 ({opaque, _} = Opaque, A) ->
502 1 [Opaque | A];
503
504 (_, A) ->
505 51 A
506 end,
507 [],
508 MetaFlags).
509
510
511 delta(#{message := #{meta := arithmetic},
512 flags := #{delta := Delta, mode := increment}}) ->
513 14 {#entry.data, Delta, mcd_util:max(uint64), 0};
514
515 delta(#{message := #{meta := arithmetic},
516 flags := #{delta := Delta, mode := decrement}}) ->
517 2 {#entry.data, -Delta, mcd_util:min(uint64), 0}.
518
519
520 update_op(Existing, Arg) when is_binary(Existing) ->
521
:-(
?FUNCTION_NAME(binary_to_integer(Existing), Arg);
522
523 update_op(Existing,
524 #{message := #{meta := arithmetic},
525 flags := #{delta := Delta, mode := increment}}) ->
526 1 ?FUNCTION_NAME(Existing, Delta, mcd_util:max(uint64), 0);
527
528 update_op(Existing,
529 #{message := #{meta := arithmetic},
530 flags := #{delta := Delta, mode := decrement}}) ->
531
:-(
?FUNCTION_NAME(Existing, -Delta, mcd_util:min(uint64), 0).
532
533
534 update_op(Current,
535 Delta,
536 Threshold,
537 SetValue) when Delta >= 0,
538 Current + Delta > Threshold ->
539
:-(
SetValue;
540
541 update_op(Current,
542 Delta,
543 Threshold,
544 SetValue) when Delta < 0,
545 Current + Delta < Threshold ->
546
:-(
SetValue;
547
548 update_op(Current, Delta, _, _) ->
549 1 Current + Delta.
Line Hits Source