_site/cover/mcd_tcp_connection.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(mcd_tcp_connection).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -export([init/1]).
22 -export([start/0]).
23 -export([start/1]).
24 -export([start_link/0]).
25 -export([start_link/1]).
26 -export([terminate/3]).
27 -import(mcd_statem, [nei/1]).
28 -include("mcd.hrl").
29 -include_lib("kernel/include/inet.hrl").
30 -include_lib("kernel/include/logger.hrl").
31
32
33 start() ->
34
:-(
?FUNCTION_NAME(#{}).
35
36
37 start(Arg) ->
38
:-(
gen_statem:start(?MODULE, [Arg], envy_gen:options(?MODULE)).
39
40
41 start_link() ->
42
:-(
?FUNCTION_NAME(#{}).
43
44
45 start_link(Arg) ->
46 11 gen_statem:start_link(?MODULE, [Arg], envy_gen:options(?MODULE)).
47
48
49 init([Arg]) ->
50 11 process_flag(trap_exit, true),
51 11 {ok,
52 ready,
53 #{arg => Arg,
54 partial => <<>>,
55 requests => gen_statem:reqids_new()},
56 nei(recv)}.
57
58
59 callback_mode() ->
60 11 handle_event_function.
61
62
63 handle_event(internal,
64 {callback, F, A},
65 _,
66 #{arg := #{callback := #{module := M}}}) ->
67 204 try apply(M, F, A) of
68 {continue, Actions} when is_list(Actions) ->
69 162 {keep_state_and_data, [nei(Action) || Action <- Actions]};
70
71 {continue, Action} ->
72 40 {keep_state_and_data, nei(Action)};
73
74 continue ->
75 1 keep_state_and_data;
76
77 stop ->
78
:-(
stop;
79
80 {stop, Reason} ->
81
:-(
{stop, Reason}
82 catch
83 error:badarg ->
84 1 {keep_state_and_data,
85 nei({encode, #{reason => <<"bad command line format">>,
86 command => client_error}})}
87 end;
88
89 handle_event(info,
90 {'$socket', Socket, select, Handle},
91 _,
92 #{arg := #{socket := Socket},
93 partial := Partial} = Data) ->
94 176 case socket:recv(Socket, 0, Handle) of
95 {ok, Received} ->
96 165 {keep_state,
97 Data#{partial := <<>>},
98 [nei({gauge,
99 #{name => bytes_read,
100 delta => iolist_size(Received)}}),
101 nei({telemetry,
102 recv,
103 #{bytes => iolist_size(Received)}}),
104 nei({recv, iolist_to_binary([Partial, Received])}),
105 nei(recv)]};
106
107 {select, {select_info, _, _}} ->
108
:-(
keep_state_and_data;
109
110 {error, econnreset} ->
111
:-(
stop;
112
113 {error, closed} ->
114 11 stop;
115
116 {error, Reason} ->
117
:-(
{stop, Reason}
118 end;
119
120 handle_event(internal, {recv, <<>>}, _, _) ->
121 191 keep_state_and_data;
122
123 handle_event(internal,
124 {recv,
125 <<Magic:8,
126 _Opcode:8,
127 _KeyLength:16,
128 _ExtraLength:8,
129 0:8,
130 _BucketOrStatus:16,
131 TotalBodyLength:32,
132 _Opaque:32,
133 _CAS:64,
134 Body:TotalBodyLength/bytes,
135 Remainder/bytes>> = Encoded},
136 _,
137 _) when Magic == ?REQUEST;
138 Magic == ?RESPONSE ->
139 15 {keep_state_and_data,
140 [nei({decode, <<(binary:part(Encoded, {0, 24}))/bytes, Body/bytes>>}),
141 nei({recv, Remainder})]};
142
143 handle_event(internal, {recv, <<"stats", _/bytes>> = Command}, _, _) ->
144
:-(
{keep_state_and_data, nei({decode, Command})};
145
146 handle_event(internal, {recv, <<"quit", _/bytes>> = Command}, _, _) ->
147
:-(
{keep_state_and_data, nei({decode, Command})};
148
149 handle_event(internal, {recv, <<"flush_all", _/bytes>> = Command}, _, _) ->
150 2 {keep_state_and_data, nei({decode, Command})};
151
152 handle_event(internal, {recv, <<"verbosity ", _/bytes>> = Command}, _, _) ->
153
:-(
{keep_state_and_data, nei({decode, Command})};
154
155 handle_event(internal, {recv, <<"incr ", _/bytes>> = Command}, _, _) ->
156 14 {keep_state_and_data, nei({decode, Command})};
157
158 handle_event(internal, {recv, <<"decr ", _/bytes>> = Command}, _, _) ->
159 9 {keep_state_and_data, nei({decode, Command})};
160
161 handle_event(internal, {recv, <<"set ", _/bytes>> = Command}, _, _) ->
162 39 {keep_state_and_data, nei({decode, Command})};
163
164 handle_event(internal, {recv, <<"append ", _/bytes>> = Command}, _, _) ->
165 1 {keep_state_and_data, nei({decode, Command})};
166
167 handle_event(internal, {recv, <<"prepend ", _/bytes>> = Command}, _, _) ->
168 1 {keep_state_and_data, nei({decode, Command})};
169
170 handle_event(internal, {recv, <<"cas ", _/bytes>> = Command}, _, _) ->
171 7 {keep_state_and_data, nei({decode, Command})};
172
173 handle_event(internal, {recv, <<"get ", _/bytes>> = Command}, _, _) ->
174 35 {keep_state_and_data, nei({decode, Command})};
175
176 handle_event(internal, {recv, <<"gets ", _/bytes>> = Command}, _, _) ->
177 9 {keep_state_and_data, nei({decode, Command})};
178
179 handle_event(internal, {recv, <<"add ", _/bytes>> = Command}, _, _) ->
180 8 {keep_state_and_data, nei({decode, Command})};
181
182 handle_event(internal, {recv, <<"replace ", _/bytes>> = Command}, _, _) ->
183 3 {keep_state_and_data, nei({decode, Command})};
184
185 handle_event(internal, {recv, <<"delete ", _/bytes>> = Command}, _, _) ->
186 7 {keep_state_and_data, nei({decode, Command})};
187
188 handle_event(internal, {recv, <<"gat ", _/bytes>> = Command}, _, _) ->
189 2 {keep_state_and_data, nei({decode, Command})};
190
191 handle_event(internal, {recv, <<"gats ", _/bytes>> = Command}, _, _) ->
192 1 {keep_state_and_data, nei({decode, Command})};
193
194 handle_event(internal, {recv, <<"touch ", _/bytes>> = Command}, _, _) ->
195 1 {keep_state_and_data, nei({decode, Command})};
196
197 handle_event(internal, {recv, <<"ma ", _/bytes>> = Command}, _, _) ->
198 16 {keep_state_and_data, nei({decode, Command})};
199
200 handle_event(internal, {recv, <<"md ", _/bytes>> = Command}, _, _) ->
201 2 {keep_state_and_data, nei({decode, Command})};
202
203 handle_event(internal, {recv, <<"me ", _/bytes>> = Command}, _, _) ->
204
:-(
{keep_state_and_data, nei({decode, Command})};
205
206 handle_event(internal, {recv, <<"mg ", _/bytes>> = Command}, _, _) ->
207 11 {keep_state_and_data, nei({decode, Command})};
208
209 handle_event(internal, {recv, <<"mn", _/bytes>> = Command}, _, _) ->
210 2 {keep_state_and_data, nei({decode, Command})};
211
212 handle_event(internal, {recv, <<"ms ", _/bytes>> = Command}, _, _) ->
213 19 {keep_state_and_data, nei({decode, Command})};
214
215 handle_event(internal, {recv, _}, _, _) ->
216
:-(
{keep_state_and_data, nei({encode, #{command => error}})};
217
218 handle_event(internal,
219 {decode = EventName, Encoded},
220 _,
221 #{arg := #{callback := #{data := CallbackData}},
222 partial := Partial} = Data) ->
223 204 try mcd_protocol:decode(Encoded) of
224 {Decoded, Remainder} ->
225 204 cmd_stats(Decoded),
226 204 {keep_state_and_data,
227 [nei({callback,
228 recv,
229 [#{message => Decoded,
230 data => CallbackData}]}),
231 nei({telemetry,
232 EventName,
233 #{count => 1},
234 #{message => Decoded}}),
235 nei({recv, Remainder})]};
236
237 partial ->
238
:-(
{keep_state, Data#{partial := [Partial, Encoded]}}
239 catch
240 error:badarg ->
241
:-(
?LOG_ERROR(#{encoded => Encoded}),
242
:-(
{keep_state_and_data,
243 nei({encode, #{reason => <<"bad command line format">>,
244 command => client_error}})}
245 end;
246
247 handle_event(internal,
248 {expire, _} = Expire,
249 _,
250 #{requests := Requests} = Data) ->
251 62 {keep_state,
252 Data#{requests := gen_statem:send_request(
253 mcd_reaper,
254 Expire,
255 expire,
256 Requests)}};
257
258 handle_event(internal,
259 {flush_all, _} = FlushAll,
260 _,
261 #{requests := Requests} = Data) ->
262 2 {keep_state,
263 Data#{requests := gen_statem:send_request(
264 mcd_reaper,
265 FlushAll,
266 flush_all,
267 Requests)}};
268
269 handle_event(internal, {encode, Command}, _, _) when is_atom(Command) ->
270 58 {keep_state_and_data, nei({encode, #{command => Command}})};
271
272 handle_event(internal, {encode = EventName, Decoded}, _, _) ->
273 227 {keep_state_and_data,
274 [nei({telemetry,
275 EventName,
276 #{count => 1},
277 #{message => Decoded}}),
278 nei({send, mcd_protocol:encode(Decoded)})]};
279
280 handle_event(internal,
281 {send = EventName, Encoded},
282 _,
283 #{arg := #{socket := Socket}}) ->
284 227 case socket:send(Socket, Encoded) of
285 ok ->
286 227 {keep_state_and_data,
287 [nei({gauge,
288 #{name => bytes_written,
289 delta => iolist_size(Encoded)}}),
290
291 nei({telemetry,
292 EventName,
293 #{bytes => iolist_size(Encoded)}})]};
294
295 {error, econnreset} ->
296
:-(
stop;
297
298 {error, closed} ->
299
:-(
stop;
300
301 {error, Reason} ->
302
:-(
{stop, Reason}
303 end;
304
305 handle_event(internal,
306 recv = EventName,
307 _,
308 #{arg := #{socket := Socket},
309 partial := Partial} = Data) ->
310 187 case socket:recv(Socket, 0, nowait) of
311 {ok, Received} ->
312 11 {keep_state,
313 Data#{partial := <<>>},
314 [nei({gauge,
315 #{name => bytes_read,
316 delta => iolist_size(Received)}}),
317 nei({telemetry,
318 EventName,
319 #{bytes => iolist_size(Received)}}),
320 nei({recv, iolist_to_binary([Partial, Received])}),
321 nei(recv)]};
322
323 {select, {select_info, _, _}} ->
324 176 keep_state_and_data;
325
326 {error, econnreset} ->
327
:-(
stop;
328
329 {error, closed} ->
330
:-(
stop;
331
332 {error, Reason} ->
333
:-(
{stop, Reason}
334 end;
335
336 handle_event(internal,
337 {telemetry, EventName, Measurements},
338 _,
339 _) ->
340 403 {keep_state_and_data,
341 nei({telemetry, EventName, Measurements, #{}})};
342
343 handle_event(internal,
344 {telemetry, EventName, Measurements, Metadata},
345 _,
346 #{arg := #{callback := #{module := M}}}) ->
347 834 ok = telemetry:execute(
348 [mcd, tcp, connection, EventName],
349 Measurements,
350 maps:merge(
351 #{callback => M},
352 Metadata)),
353 834 keep_state_and_data;
354
355 handle_event(internal, {gauge, Arg}, _, _) ->
356 403 mcd_stat:gauge(Arg),
357 403 keep_state_and_data;
358
359 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
360 64 case gen_statem:check_response(Msg, Existing, true) of
361 {{reply, ok}, expire, Updated} ->
362 62 {keep_state, Data#{requests := Updated}};
363
364 {{reply, ok}, flush_all, Updated} ->
365 2 {keep_state, Data#{requests := Updated}};
366
367 no_request ->
368
:-(
?LOG_ERROR(#{msg => Msg, data => Data}),
369
:-(
keep_state_and_data;
370
371 no_reply ->
372
:-(
?LOG_ERROR(#{msg => Msg, data => Data}),
373
:-(
keep_state_and_data
374 end.
375
376
377 terminate(_Reason, _State, #{arg := #{socket := Socket}}) ->
378 11 _ = socket:close(Socket);
379
380 terminate(_Reason, _State, _Data) ->
381
:-(
ok.
382
383
384 cmd_stats(#{header := #{opcode := flush}}) ->
385 2 mcd_stat:gauge(cmd_flush);
386
387 cmd_stats(_) ->
388 202 ok.
Line Hits Source