_site/cover/msc_socket.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15 %% @doc Responsible for dealing with a TCP socket, backing off on
16 %% failure, upgrading to TLS and sending raw protocol to a middleman.
17
18 -module(msc_socket).
19
20
21 -export([callback_mode/0]).
22 -export([connect/1]).
23 -export([handle_event/4]).
24 -export([init/1]).
25 -export([send/1]).
26 -export([start/1]).
27 -export([start_link/1]).
28 -export([terminate/3]).
29 -export([upgrade/1]).
30 -import(msc_statem, [nei/1]).
31 -include_lib("kernel/include/inet.hrl").
32 -include_lib("kernel/include/logger.hrl").
33
34
35 start_link(Arg) ->
36 2 gen_statem:start_link(?MODULE, [Arg], envy_gen:options(?MODULE)).
37
38
39 start(Arg) ->
40
:-(
gen_statem:start(?MODULE, [Arg], [{debug, [trace]}]).
41
42
43 connect(Arg) ->
44 2 send_request(Arg, ?FUNCTION_NAME).
45
46 send(Arg) ->
47 14 send_request(Arg, ?FUNCTION_NAME).
48
49 upgrade(Arg) ->
50 2 send_request(Arg, ?FUNCTION_NAME).
51
52
53 send_request(Arg, Action) ->
54 18 ?FUNCTION_NAME(Arg, Action, config(Action)).
55
56
57 send_request(Arg, Action, Config) ->
58 18 send_request(
59 maps:without(
60 keys(Config),
61 maybe_label(
62 Arg#{request => {request, args(Action, Arg, Config)}}))).
63
64
65 maybe_label(#{requests := _, label := _} = Arg) ->
66 16 Arg;
67
68 maybe_label(#{requests := _} = Arg) ->
69 2 Arg#{label => ?MODULE};
70
71 maybe_label(Arg) ->
72
:-(
Arg.
73
74
75 config(send) ->
76 14 [message];
77
78 config(upgrade) ->
79 2 [];
80
81 config(connect) ->
82 2 [].
83
84
85 keys(Config) ->
86 18 lists:map(
87 fun
88 ({Key, _}) ->
89
:-(
Key;
90
91 (Key) ->
92 14 Key
93 end,
94 Config).
95
96
97 args(Action, Arg, Config) ->
98 18 lists:foldl(
99 fun
100 ({Parameter, Default}, A) ->
101
:-(
A#{Parameter => maps:get(Parameter, Arg, Default)};
102
103 (Parameter, A) ->
104 14 A#{Parameter => maps:get(Parameter, Arg)}
105 end,
106 #{action => Action},
107 Config).
108
109
110 send_request(#{label := _} = Arg) ->
111 18 msc_statem:send_request(Arg);
112
113 send_request(Arg) ->
114
:-(
msc_statem:send_request(Arg#{label => ?MODULE}).
115
116
117 init([Arg]) ->
118 2 process_flag(trap_exit, true),
119 2 {ok,
120 disconnected,
121 #{requests => gen_statem:reqids_new(),
122 config => Arg,
123 telemetry => #{}},
124 nei(peer)}.
125
126
127 callback_mode() ->
128 2 handle_event_function.
129
130
131 handle_event(internal, peer, _, Data) ->
132 2 case msc_sup:get_child(hd(get('$ancestors')), mm) of
133 {_, PID, worker, _} when is_pid(PID) ->
134 2 {keep_state, Data#{peer => PID}};
135
136 {_, _, _, _} = Reason ->
137
:-(
{stop, Reason};
138
139 false ->
140
:-(
{stop, peer_not_found}
141 end;
142
143 handle_event({call, {Peer, _} = From},
144 {request,
145 #{action := send,
146 message := Message}},
147 connected,
148 #{peer := Peer}) ->
149 14 {keep_state_and_data,
150 [nei({send, Message}), {reply, From, ok}]};
151
152 handle_event({call, {Peer, _} = From},
153 {request, #{action := upgrade}},
154 connected,
155 #{peer := Peer}) ->
156 2 {keep_state_and_data, nei({upgrade, From})};
157
158 handle_event({call, {Peer, _} = From},
159 {request, #{action := connect}},
160 _,
161 Data) ->
162 2 {keep_state, Data#{peer => Peer}, [{reply, From, ok}, nei(connect)]};
163
164 handle_event(internal,
165 {telemetry, EventName, Measurements},
166 _,
167 _) ->
168 38 {keep_state_and_data,
169 nei({telemetry, EventName, Measurements, #{}})};
170
171 handle_event(internal,
172 {telemetry, EventName, Measurements, Metadata},
173 _,
174 Data) ->
175 51 ok = telemetry:execute(
176 [msc, socket, EventName],
177 Measurements,
178 maps:merge(
179 maps:with([peer, socket, telemetry], Data),
180 Metadata)),
181 51 keep_state_and_data;
182
183 handle_event(internal,
184 {error, #{event := EventName, reason := Reason}},
185 _,
186 Data) ->
187
:-(
{next_state,
188 limbo,
189 Data,
190 [nei({telemetry,
191 error,
192 #{count => 1},
193 #{event => EventName, reason => Reason}}),
194
195 {state_timeout,
196 timer:seconds(
197 backoff:rand_increment(
198 msc_config:backoff(rand_increment))),
199 {backoff, #{action => EventName, reason => Reason}}}]};
200
201 handle_event(internal,
202 {send, #{packet := _, sequence := _} = Decoded},
203 _,
204 #{encoder := Encoder}) ->
205
:-(
{keep_state_and_data, nei({send, Encoder(Decoded)})};
206
207 handle_event(internal, {send, <<>>}, _, _) ->
208
:-(
keep_state_and_data;
209
210 handle_event(internal, {send = EventName, Data}, _, #{tls := TLS}) ->
211 12 case ssl:send(TLS, Data) of
212 ok ->
213 12 {keep_state_and_data,
214 nei({telemetry,
215 EventName,
216 #{bytes => iolist_size(Data)}})};
217
218 {error, Reason} ->
219
:-(
{keep_state_and_data,
220 nei({error, #{reason => Reason, event => EventName}})}
221 end;
222
223 handle_event(internal, {send = EventName, Data}, _, #{socket := Socket}) ->
224 2 case gen_tcp:send(Socket, Data) of
225 ok ->
226 2 {keep_state_and_data,
227 nei({telemetry,
228 EventName,
229 #{bytes => iolist_size(Data)}})};
230
231 {error, Reason} ->
232
:-(
{keep_state_and_data,
233 nei({error, #{reason => Reason, event => EventName}})}
234 end;
235
236 handle_event(info,
237 {tcp_closed, Socket},
238 _,
239 #{socket := Socket}) ->
240
:-(
stop;
241
242 handle_event(info,
243 {tcp, Socket, Received},
244 _,
245 #{socket := Socket, partial := Partial} = Data) ->
246 2 {keep_state,
247 Data#{partial := <<>>},
248 [nei({telemetry,
249 recv,
250 #{bytes => iolist_size(Received)},
251 #{}}),
252 nei({recv, iolist_to_binary([Partial, Received])})]};
253
254 handle_event(info,
255 {ssl_error, TLS, {tls_alert, _} = Reason},
256 _,
257 #{tls := TLS}) ->
258
:-(
{stop, Reason};
259
260 handle_event(info,
261 {ssl_closed, TLS},
262 _,
263 #{tls := TLS}) ->
264
:-(
stop;
265
266 handle_event(info,
267 {ssl, TLS, Received},
268 _,
269 #{tls := TLS, partial := Partial} = Data) ->
270 11 {keep_state,
271 Data#{partial := <<>>},
272 [nei({telemetry,
273 recv,
274 #{bytes => iolist_size(Received)},
275 #{}}),
276 nei({recv, iolist_to_binary([Partial, Received])})]};
277
278 handle_event(info, {'DOWN', _, process, Peer, noproc}, _, #{peer := Peer}) ->
279
:-(
stop;
280
281 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
282 22 case gen_statem:check_response(Msg, Existing, true) of
283 {{reply, Reply}, Label, Updated} ->
284 22 {keep_state,
285 Data#{requests := Updated},
286 [nei({telemetry,
287 reqids_size,
288 #{value => gen_statem:reqids_size(Updated)}}),
289 nei({response, #{label => Label, reply => Reply}})]};
290
291 {{error, {Reason, _}}, _, UpdatedRequests} ->
292
:-(
{stop, Reason, Data#{requests := UpdatedRequests}}
293 end;
294
295 handle_event(internal, {response, #{label := msc_mm, reply := ok}}, _, _) ->
296 22 keep_state_and_data;
297
298 handle_event(
299 internal,
300 {recv,
301 <<Length:24/little, Sequence:8, Packet:Length/bytes, Remainder/bytes>>},
302 _,
303 #{peer := Peer,
304 requests := Requests} = Data) ->
305 22 {keep_state,
306 Data#{requests := msc_mm:recv(
307 #{server_ref => Peer,
308 message => <<Length:24/little,
309 Sequence:8,
310 Packet:Length/bytes>>,
311 requests => Requests})},
312 nei({recv, Remainder})};
313
314 handle_event(internal, {recv, <<>>}, _, _) ->
315 13 keep_state_and_data;
316
317 handle_event(internal, {recv, Partial}, _, #{partial := <<>>} = Data) ->
318
:-(
{keep_state, Data#{partial := Partial}};
319
320 handle_event(internal,
321 connect = EventName,
322 _,
323 #{config := #{host := Host, port := Port}}) ->
324 2 case addr(Host) of
325 {ok, Addr} ->
326 2 {keep_state_and_data,
327 nei({connect,
328 #{family => inet,
329 port => Port,
330 addr => Addr}})};
331
332 {error, Reason} ->
333
:-(
{keep_state_and_data,
334 nei({error, #{reason => Reason, event => EventName}})}
335 end;
336
337 handle_event(internal,
338 {connect = EventName,
339 #{family := Family, port := Port, addr := Addr} = SockAddr},
340 _,
341 #{telemetry := Telemetry} = Data) ->
342
343 2 case gen_tcp:connect(SockAddr,
344 [{mode, binary}],
345 msc_config:timeout(EventName)) of
346 {ok, Socket} ->
347 2 {next_state,
348 connected,
349 Data#{partial => <<>>,
350 socket => Socket,
351 telemetry :=
352 Telemetry#{net => #{peer => #{name => Addr,
353 port => Port},
354 sock => #{family => Family}}}},
355 nei({telemetry, EventName, #{count => 1}})};
356
357 {error, Reason} ->
358
:-(
{keep_state_and_data,
359 nei({error, #{reason => Reason, event => EventName}})}
360 end;
361
362 handle_event(internal,
363 {upgrade, From},
364 _,
365 #{socket := Socket} = Data) ->
366 2 case ssl:connect(Socket, [{verify, verify_none}]) of
367 {ok, TLS} ->
368 2 {keep_state, Data#{tls => TLS}, {reply, From, ok}};
369
370 {error, Reason} = Error ->
371
:-(
{stop_and_reply, Reason, {reply, From, Error}}
372 end;
373
374 handle_event(state_timeout, {backoff, _}, limbo, _) ->
375
:-(
stop.
376
377
378 terminate(_Reason, _State, #{tls := TLS, socket := Socket}) ->
379
:-(
_ = ssl:close(TLS),
380
:-(
_ = gen_tcp:close(Socket);
381
382 terminate(_Reason, _State, #{socket := Socket}) ->
383
:-(
_ = gen_tcp:close(Socket);
384
385 terminate(_Reason, _State, _Data) ->
386
:-(
ok.
387
388
389 addr(Hostname) ->
390 2 case inet:gethostbyname(binary_to_list(Hostname)) of
391 {ok, #hostent{h_addr_list = Addresses}} ->
392 2 {ok, pick_one(Addresses)};
393
394 {error, _} = Error ->
395
:-(
Error
396 end.
397
398
399 pick_one(L) ->
400 2 lists:nth(rand:uniform(length(L)), L).
Line Hits Source