_site/cover/mcd_tcp_listener.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(mcd_tcp_listener).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -export([init/1]).
22 -export([start/0]).
23 -export([start/1]).
24 -export([start_link/0]).
25 -export([start_link/1]).
26 -import(mcd_statem, [nei/1]).
27 -include("mcd.hrl").
28 -include_lib("kernel/include/inet.hrl").
29 -include_lib("kernel/include/logger.hrl").
30
31
32 start() ->
33
:-(
?FUNCTION_NAME(#{}).
34
35
36 start(Arg) ->
37
:-(
gen_statem:start(?MODULE, [Arg], envy_gen:options(?MODULE)).
38
39
40 start_link() ->
41
:-(
?FUNCTION_NAME(#{}).
42
43
44 start_link(Arg) ->
45 11 gen_statem:start_link(?MODULE, [Arg], envy_gen:options(?MODULE)).
46
47
48 init([Arg]) ->
49 11 process_flag(trap_exit, true),
50 11 {ok,
51 unready,
52 #{arg => Arg, requests => gen_statem:reqids_new()},
53 [nei(callback_init), nei(open)]}.
54
55
56 callback_mode() ->
57 11 handle_event_function.
58
59
60 handle_event(internal,
61 callback_init,
62 _,
63 #{requests := Requests,
64 arg := #{callback := Module}} = Data) ->
65 11 case Module:init([]) of
66 {ok, CallbackData} ->
67 11 {keep_state,
68 Data#{callback_data => CallbackData,
69 requests := gen_statem:send_request(
70 mcd_reaper,
71 {callback, #{data => CallbackData}},
72 reaper,
73 Requests)}};
74
75 stop ->
76
:-(
stop;
77
78 {stop, Reason} ->
79
:-(
{stop, Reason}
80 end;
81
82 handle_event(internal,
83 {connect = EventName, Socket},
84 _,
85 #{callback_data := CallbackData,
86 arg := #{callback := Module}}) ->
87 11 {ok, Child} = mcd_tcp_connection_sup:start_child(
88 #{socket => Socket,
89 callback => #{data => CallbackData,
90 module => Module}}),
91 11 {keep_state_and_data,
92 [nei({setopt,
93 #{socket => Socket,
94 level => otp,
95 option => controlling_process,
96 value => Child}}),
97
98 nei({telemetry, EventName, #{count => 1}})]};
99
100 handle_event(info,
101 {'$socket', Listener, select = EventName, Handle},
102 _,
103 #{socket := Listener}) ->
104 11 case socket:accept(Listener, Handle) of
105 {ok, Connected} ->
106 11 {keep_state_and_data,
107 [nei({telemetry, EventName, #{count => 1}}),
108 nei({connect, Connected}),
109 nei(accept)]};
110
111 {select, {select_info, _, _}} ->
112
:-(
keep_state_and_data;
113
114 {error, Reason} ->
115
:-(
{stop, Reason}
116 end;
117
118
119 handle_event(internal, accept = EventName, _, #{socket := Listener}) ->
120 22 case socket:accept(Listener, nowait) of
121 {ok, Connected} ->
122
:-(
{keep_state_and_data,
123 [nei({telemetry, EventName, #{count => 1}}),
124 nei({connect, Connected}),
125 nei(accept)]};
126
127 {select, {select_info, _, _}} ->
128 22 keep_state_and_data;
129
130 {error, Reason} ->
131
:-(
{stop, Reason}
132 end;
133
134 handle_event(internal, listen = EventName, _, #{socket := Listener}) ->
135 11 case socket:listen(Listener, mcd_config:socket(backlog)) of
136 ok ->
137 11 {keep_state_and_data,
138 [nei({telemetry, EventName, #{count => 1}}), nei(accept)]};
139
140 {error, Reason} ->
141
:-(
{stop, Reason}
142 end;
143
144 handle_event(internal, open = EventName, _, Data) ->
145 11 case socket:open(inet, stream, tcp) of
146 {ok, Listener} ->
147 11 {keep_state,
148 Data#{socket => Listener},
149 [nei({telemetry,
150 EventName,
151 #{count => 1}}),
152
153 nei({setopt,
154 #{socket => Listener,
155 level => socket,
156 option => reuseaddr,
157 value => true}}),
158
159 nei(bind)]};
160
161 {error, Reason} ->
162
:-(
{stop, Reason}
163 end;
164
165 handle_event(
166 internal,
167 {setopt,
168 #{socket := Socket, level := Level, option := Option, value := Value}},
169 _,
170 _) ->
171 22 case socket:setopt(Socket, {Level, Option}, Value) of
172 ok ->
173 22 keep_state_and_data;
174
175 {error, Reason} ->
176
:-(
{stop, Reason}
177 end;
178
179 handle_event(internal, bind = EventName, _, _) ->
180 11 {keep_state_and_data,
181 nei({EventName,
182 #{family => inet,
183 port => mcd_config:memcached(port),
184 addr => any}})};
185
186 handle_event(internal,
187 {bind = EventName, Addr},
188 _,
189 #{socket := Listener} = Data) ->
190 11 case socket:bind(Listener, Addr) of
191 ok ->
192 11 {next_state,
193 ready,
194 Data,
195 [nei({telemetry, EventName, #{count => 1}, Addr}),
196 nei(listen)]};
197
198 {error, Reason} ->
199
:-(
{stop, Reason}
200 end;
201
202 handle_event(internal,
203 {telemetry, EventName, Measurements},
204 _,
205 _) ->
206 44 {keep_state_and_data,
207 nei({telemetry, EventName, Measurements, #{}})};
208
209 handle_event(internal,
210 {telemetry, EventName, Measurements, Metadata},
211 _,
212 Data) ->
213 55 ok = telemetry:execute(
214 [mcd, tcp, listener, EventName],
215 Measurements,
216 maps:merge(
217 maps:with([socket], Data),
218 Metadata)),
219 55 keep_state_and_data;
220
221 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
222 11 case gen_statem:check_response(Msg, Existing, true) of
223 {{reply, ok}, reaper, Updated} ->
224 11 {keep_state, Data#{requests := Updated}};
225
226 no_request ->
227
:-(
?LOG_ERROR(#{msg => Msg, data => Data}),
228
:-(
keep_state_and_data;
229
230 no_reply ->
231
:-(
?LOG_ERROR(#{msg => Msg, data => Data}),
232
:-(
keep_state_and_data
233 end.
Line Hits Source