_site/cover/mcd_client.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(mcd_client).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -export([init/1]).
22 -export([send/1]).
23 -export([start/0]).
24 -export([start/1]).
25 -export([start_link/0]).
26 -export([start_link/1]).
27 -import(mcd_statem, [nei/1]).
28 -include("mcd.hrl").
29 -include_lib("kernel/include/inet.hrl").
30 -include_lib("kernel/include/logger.hrl").
31
32
33 start() ->
34 11 ?FUNCTION_NAME(#{}).
35
36
37 start(Arg) ->
38 11 gen_statem:start(?MODULE, [Arg], []).
39
40
41 start_link() ->
42
:-(
?FUNCTION_NAME(#{}).
43
44
45 start_link(Arg) ->
46
:-(
gen_statem:start_link(?MODULE, [Arg], envy_gen:options(?MODULE)).
47
48
49 send(#{data := Data} = Arg) ->
50 176 send_request(
51 maps:without(
52 [data],
53 Arg#{request => {?FUNCTION_NAME, Data}})).
54
55
56 send_request(#{label := _} = Arg) ->
57
:-(
mcd_statem:send_request(Arg);
58
59 send_request(Arg) ->
60 176 mcd_statem:send_request(Arg#{label => ?MODULE}).
61
62
63 init([Arg]) ->
64 11 process_flag(trap_exit, true),
65 11 {ok, disconnected, Arg#{requests => gen_statem:reqids_new()}}.
66
67
68 callback_mode() ->
69 11 handle_event_function.
70
71
72 handle_event({call, _}, {send, _}, disconnected, Data) ->
73 11 {next_state, connecting, Data, [postpone, nei(open)]};
74
75 handle_event({call, _}, {send, _}, connecting, _) ->
76
:-(
{keep_state_and_data, postpone};
77
78 handle_event({call, _}, {send, _}, {busy, _}, _) ->
79
:-(
{keep_state_and_data, postpone};
80
81 handle_event({call, From}, {send, _} = Send, connected, Data) ->
82 176 {next_state,
83 {busy, From},
84 Data#{replies => []},
85 nei(Send)};
86
87 handle_event(internal, open = EventName, connecting, Data) ->
88 11 case socket:open(inet, stream, default) of
89 {ok, Socket} ->
90 11 {keep_state,
91 Data#{socket => Socket},
92 [nei({telemetry, EventName, #{count => 1}}),
93 nei(connect)]};
94
95 {error, Reason} ->
96
:-(
{stop, Reason}
97 end;
98
99 handle_event(internal, {send, #{} = Decoded}, {busy, _}, _) ->
100 159 {keep_state_and_data, nei({send, [Decoded]})};
101
102 handle_event(internal,
103 {send = EventName, Decoded},
104 {busy, _},
105 #{socket := Socket}) ->
106 176 Encoded = mcd_protocol:encode(Decoded),
107 176 case socket:send(Socket, Encoded) of
108 ok ->
109 176 {keep_state_and_data,
110 [nei({telemetry,
111 EventName,
112 #{count => 1,
113 bytes => iolist_size(Encoded)},
114 #{messages => Decoded}}),
115 nei({reply_expected, mcd_protocol:reply_expected(Decoded)})]};
116
117 {error, Reason} ->
118
:-(
{stop, Reason}
119 end;
120
121 handle_event(internal, {reply_expected, []}, {busy, From}, Data) ->
122
:-(
{next_state,
123 connected,
124 maps:without([replies], Data),
125 {reply, From, ok}};
126
127 handle_event(internal, {reply_expected, ReplyExpected}, {busy, _}, Data) ->
128 176 {keep_state, Data#{reply_expected => ReplyExpected}};
129
130 handle_event(internal,
131 recv,
132 _,
133 #{socket := Socket, partial := Partial} = Data) ->
134 221 case socket:recv(Socket, 0, nowait) of
135 {ok, Received} ->
136
:-(
{keep_state,
137 Data#{partial := <<>>},
138 [nei({telemetry, recv, #{bytes => iolist_size(Received)}}),
139 nei({recv, iolist_to_binary([Partial, Received])}),
140 nei(recv)]};
141
142 {select, {select_info, _, _}} ->
143 221 keep_state_and_data;
144
145 {error, Reason} ->
146
:-(
{stop, Reason}
147 end;
148
149 handle_event(info,
150 {'$socket', Socket, select, Handle},
151 _,
152 #{socket := Socket, partial := Partial} = Data) ->
153 210 case socket:recv(Socket, 0, Handle) of
154 {ok, Received} ->
155 210 {keep_state,
156 Data#{partial := <<>>},
157 [nei({telemetry, recv, #{bytes => iolist_size(Received)}}),
158 nei({recv, iolist_to_binary([Partial, Received])}),
159 nei(recv)]};
160
161 {select, {select_info, _, _}} ->
162
:-(
keep_state_and_data;
163
164 {error, Reason} ->
165
:-(
{stop, Reason}
166 end;
167
168 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
169
:-(
case gen_statem:check_response(Msg, Existing, true) of
170 {{reply, Reply}, Label, Updated} ->
171
:-(
{keep_state,
172 Data#{requests := Updated},
173 nei({response, #{label => Label, reply => Reply}})};
174
175 {{error, {Reason, ServerRef}}, Label, UpdatedRequests} ->
176
:-(
{stop,
177 #{reason => Reason,
178 server_ref => ServerRef,
179 label => Label},
180 Data#{requests := UpdatedRequests}}
181 end;
182
183 handle_event(internal,
184 {recv,
185 <<Magic:8,
186 _Opcode:8,
187 _KeyLength:16,
188 _ExtraLength:8,
189 0:8,
190 _BucketOrStatus:16,
191 TotalBodyLength:32,
192 _Opaque:32,
193 _CAS:64,
194 Body:TotalBodyLength/bytes,
195 Remainder/bytes>> = Encoded},
196 {busy, _},
197 _) when Magic == ?REQUEST;
198 Magic == ?RESPONSE ->
199 15 {keep_state_and_data,
200 [nei({decode, <<(binary:part(Encoded, {0, 24}))/bytes, Body/bytes>>}),
201 nei({recv, Remainder})]};
202
203 handle_event(internal, {recv, <<"stats", _/bytes>> = Command}, _, _) ->
204
:-(
{keep_state_and_data, nei({decode, Command})};
205
206 handle_event(internal, {recv, <<"quit", _/bytes>> = Command}, _, _) ->
207
:-(
{keep_state_and_data, nei({decode, Command})};
208
209 handle_event(internal, {recv, <<"flush_all", _/bytes>> = Command}, _, _) ->
210
:-(
{keep_state_and_data, nei({decode, Command})};
211
212 handle_event(internal, {recv, <<"verbosity ", _/bytes>> = Command}, _, _) ->
213
:-(
{keep_state_and_data, nei({decode, Command})};
214
215 handle_event(internal, {recv, <<"incr ", _/bytes>> = Command}, _, _) ->
216
:-(
{keep_state_and_data, nei({decode, Command})};
217
218 handle_event(internal, {recv, <<"decr ", _/bytes>> = Command}, _, _) ->
219
:-(
{keep_state_and_data, nei({decode, Command})};
220
221 handle_event(internal, {recv, <<"set ", _/bytes>> = Command}, _, _) ->
222
:-(
{keep_state_and_data, nei({decode, Command})};
223
224 handle_event(internal, {recv, <<"append ", _/bytes>> = Command}, _, _) ->
225
:-(
{keep_state_and_data, nei({decode, Command})};
226
227 handle_event(internal, {recv, <<"prepend ", _/bytes>> = Command}, _, _) ->
228
:-(
{keep_state_and_data, nei({decode, Command})};
229
230 handle_event(internal, {recv, <<"cas ", _/bytes>> = Command}, _, _) ->
231
:-(
{keep_state_and_data, nei({decode, Command})};
232
233 handle_event(internal, {recv, <<"get ", _/bytes>> = Command}, _, _) ->
234
:-(
{keep_state_and_data, nei({decode, Command})};
235
236 handle_event(internal, {recv, <<"gets ", _/bytes>> = Command}, _, _) ->
237
:-(
{keep_state_and_data, nei({decode, Command})};
238
239 handle_event(internal, {recv, <<"add ", _/bytes>> = Command}, _, _) ->
240
:-(
{keep_state_and_data, nei({decode, Command})};
241
242 handle_event(internal, {recv, <<"replace ", _/bytes>> = Command}, _, _) ->
243
:-(
{keep_state_and_data, nei({decode, Command})};
244
245 handle_event(internal, {recv, <<"delete ", _/bytes>> = Command}, _, _) ->
246
:-(
{keep_state_and_data, nei({decode, Command})};
247
248 handle_event(internal, {recv, <<"gat ", _/bytes>> = Command}, _, _) ->
249
:-(
{keep_state_and_data, nei({decode, Command})};
250
251 handle_event(internal, {recv, <<"gats ", _/bytes>> = Command}, _, _) ->
252
:-(
{keep_state_and_data, nei({decode, Command})};
253
254 handle_event(internal, {recv, <<"touch ", _/bytes>> = Command}, _, _) ->
255
:-(
{keep_state_and_data, nei({decode, Command})};
256
257 handle_event(internal, {recv, <<"EN", _/bytes>> = Command}, _, _) ->
258 40 {keep_state_and_data, nei({decode, Command})};
259
260 handle_event(internal, {recv, <<"EX", _/bytes>> = Command}, _, _) ->
261 6 {keep_state_and_data, nei({decode, Command})};
262
263 handle_event(internal, {recv, <<"HD", _/bytes>> = Command}, _, _) ->
264 20 {keep_state_and_data, nei({decode, Command})};
265
266 handle_event(internal, {recv, <<"NF", _/bytes>> = Command}, _, _) ->
267 2 {keep_state_and_data, nei({decode, Command})};
268
269 handle_event(internal, {recv, <<"NOT_FOUND", _/bytes>> = Command}, _, _) ->
270 4 {keep_state_and_data, nei({decode, Command})};
271
272 handle_event(internal, {recv, <<"NOT_STORED", _/bytes>> = Command}, _, _) ->
273 2 {keep_state_and_data, nei({decode, Command})};
274
275 handle_event(internal, {recv, <<"OK", _/bytes>> = Command}, _, _) ->
276 2 {keep_state_and_data, nei({decode, Command})};
277
278 handle_event(internal, {recv, <<"SERVER_ERROR", _/bytes>> = Command}, _, _) ->
279
:-(
{keep_state_and_data, nei({decode, Command})};
280
281 handle_event(internal, {recv, <<"CLIENT_ERROR", _/bytes>> = Command}, _, _) ->
282 4 {keep_state_and_data, nei({decode, Command})};
283
284 handle_event(internal, {recv, <<"DELETED", _/bytes>> = Command}, _, _) ->
285 3 {keep_state_and_data, nei({decode, Command})};
286
287 handle_event(internal, {recv, <<"END", _/bytes>> = Command}, _, _) ->
288
:-(
{keep_state_and_data, nei({decode, Command})};
289
290 handle_event(internal, {recv, <<"ERROR", _/bytes>> = Command}, _, _) ->
291
:-(
{keep_state_and_data, nei({decode, Command})};
292
293 handle_event(internal, {recv, <<"EXISTS", _/bytes>> = Command}, _, _) ->
294
:-(
{keep_state_and_data, nei({decode, Command})};
295
296 handle_event(internal, {recv, <<"STAT ", _/bytes>> = Command}, _, _) ->
297
:-(
{keep_state_and_data, nei({decode, Command})};
298
299 handle_event(internal, {recv, <<"STORED", _/bytes>> = Command}, _, _) ->
300 38 {keep_state_and_data, nei({decode, Command})};
301
302 handle_event(internal, {recv, <<"TOUCHED", _/bytes>> = Command}, _, _) ->
303
:-(
{keep_state_and_data, nei({decode, Command})};
304
305 handle_event(internal, {recv, <<"VALUE ", _/bytes>> = Command}, _, _) ->
306 39 {keep_state_and_data, nei({decode, Command})};
307
308 handle_event(internal, {recv, <<"NS", _/bytes>> = Command}, _, _) ->
309 2 {keep_state_and_data, nei({decode, Command})};
310
311 handle_event(internal, {recv, <<"VA", _/bytes>> = Command}, _, _) ->
312 18 {keep_state_and_data, nei({decode, Command})};
313
314 handle_event(internal, {recv, <<"ma", _/bytes>> = Command}, _, _) ->
315
:-(
{keep_state_and_data, nei({decode, Command})};
316
317 handle_event(internal, {recv, <<"md ", _/bytes>> = Command}, _, _) ->
318
:-(
{keep_state_and_data, nei({decode, Command})};
319
320 handle_event(internal, {recv, <<"me ", _/bytes>> = Command}, _, _) ->
321
:-(
{keep_state_and_data, nei({decode, Command})};
322
323 handle_event(internal, {recv, <<"ME", _/bytes>> = Command}, _, _) ->
324
:-(
{keep_state_and_data, nei({decode, Command})};
325
326 handle_event(internal, {recv, <<"mg", _/bytes>> = Command}, _, _) ->
327
:-(
{keep_state_and_data, nei({decode, Command})};
328
329 handle_event(internal, {recv, <<"mn", _/bytes>> = Command}, _, _) ->
330
:-(
{keep_state_and_data, nei({decode, Command})};
331
332 handle_event(internal, {recv, <<"MN", _/bytes>> = Command}, _, _) ->
333 2 {keep_state_and_data, nei({decode, Command})};
334
335 handle_event(internal, {recv, <<"ms", _/bytes>> = Command}, _, _) ->
336
:-(
{keep_state_and_data, nei({decode, Command})};
337
338 handle_event(internal, {recv, <<>>}, _, _) ->
339 15 keep_state_and_data;
340
341 handle_event(internal, {recv, Command}, _, _) ->
342 13 {keep_state_and_data, nei({decode, Command})};
343
344 handle_event(internal, {decode, <<>>}, _, _) ->
345 34 keep_state_and_data;
346
347 handle_event(internal, {decode, Encoded}, _, _) ->
348 223 {keep_state_and_data, nei({message, mcd_protocol:decode(Encoded)})};
349
350 handle_event(internal,
351 {message, #{header := #{opcode := stat,
352 status := no_error},
353 key := _,
354 value := _} = Decoded},
355 {busy, _},
356 #{replies := Replies} = Data) ->
357
:-(
{keep_state,
358 Data#{replies => [Decoded | Replies]},
359 nei({telemetry, message, #{count => 1}, #{message => Decoded}})};
360
361 handle_event(internal,
362 {message, #{header := #{opcode := stat,
363 status := no_error},
364 key := _, value := _} = Decoded},
365 {busy, _},
366 Data) ->
367
:-(
{keep_state,
368 Data#{replies => [Decoded]},
369 nei({telemetry, message, #{count => 1}, #{message => Decoded}})};
370
371 handle_event(internal,
372 {message, #{header := #{opcode := stat,
373 status := no_error}} = Decoded},
374 {busy, From},
375 #{replies := Replies} = Data) ->
376
:-(
{next_state,
377 connected,
378 maps:without([replies, reply_expected], Data),
379 [{reply, From, lists:reverse([Decoded | Replies])},
380 nei({telemetry, message, #{count => 1}, #{message => Decoded}})]};
381
382
383 handle_event(internal,
384 {message, {#{command := value} = Decoded, Encoded}},
385 {busy, _},
386 #{replies := Replies} = Data) ->
387 42 {keep_state,
388 Data#{replies := [Decoded | Replies]},
389 [nei({telemetry, message, #{count => 1}, #{message => Decoded}}),
390 nei({decode, Encoded})]};
391
392 handle_event(internal,
393 {message, {Decoded, <<>>}},
394 {busy, From},
395 #{replies := [], reply_expected := [_]} = Data) ->
396 137 {next_state,
397 connected,
398 maps:without([replies, reply_expected], Data),
399 [{reply, From, Decoded},
400 nei({telemetry, message, #{count => 1}, #{message => Decoded}})]};
401
402 handle_event(internal,
403 {message, {Decoded, <<>>}},
404 {busy, From},
405 #{replies := Replies, reply_expected := [_]} = Data) ->
406 39 {next_state,
407 connected,
408 maps:without([replies, reply_expected], Data),
409 [{reply, From, lists:reverse([Decoded | Replies])},
410 nei({telemetry, message, #{count => 1}, #{message => Decoded}})]};
411
412 handle_event(internal,
413 {message, {Decoded, Encoded}},
414 {busy, _},
415 #{replies := Replies, reply_expected := [_ | T]} = Data) ->
416 5 {keep_state,
417 Data#{replies := [Decoded | Replies], reply_expected := T},
418 [nei({telemetry, message, #{count => 1}, #{message => Decoded}}),
419 nei({decode, Encoded})]};
420
421 handle_event(internal,
422 {telemetry, EventName, Measurements},
423 _,
424 _) ->
425 221 {keep_state_and_data,
426 nei({telemetry, EventName, Measurements, #{}})};
427
428 handle_event(internal,
429 {telemetry, EventName, Measurements, Metadata},
430 _,
431 Data) ->
432 631 ok = telemetry:execute(
433 [mcd, client, EventName],
434 Measurements,
435 maps:merge(
436 maps:with([socket], Data),
437 Metadata)),
438 631 keep_state_and_data;
439
440 handle_event(internal, connect, connecting, _) ->
441 11 {keep_state_and_data,
442 nei({connect,
443 #{family => inet,
444 port => mcd_config:memcached(port),
445 addr => addr()}})};
446
447 handle_event(internal,
448 {connect = EventName, Arg},
449 connecting,
450 #{socket := Socket} = Data) ->
451 11 case socket:connect(Socket, Arg) of
452 ok ->
453 11 {next_state,
454 connected,
455 Data#{partial => <<>>},
456 [nei({telemetry, EventName, #{count => 1}, Arg}),
457 nei(recv)]};
458
459 {error, Reason} ->
460
:-(
{stop, Reason}
461 end.
462
463
464 addr() ->
465 11 ?FUNCTION_NAME(mcd_config:memcached(hostname)).
466
467
468 addr(Hostname) ->
469 11 {ok, #hostent{h_addr_list = Addresses}} = inet:gethostbyname(Hostname),
470 11 pick_one(Addresses).
471
472
473 pick_one(L) ->
474 11 lists:nth(rand:uniform(length(L)), L).
Line Hits Source