_site/cover/pgmp_socket.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_socket).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -export([init/1]).
22 -export([send/1]).
23 -export([start_link/1]).
24 -export([terminate/3]).
25 -export([upgrade/1]).
26 -import(pgmp_statem, [nei/1]).
27 -include_lib("kernel/include/inet.hrl").
28 -include_lib("kernel/include/logger.hrl").
29
30
31 start_link(Arg) ->
32 66 gen_statem:start_link(?MODULE, [Arg], envy_gen:options(?MODULE)).
33
34
35 send(Arg) ->
36 39487 send_request(Arg, ?FUNCTION_NAME).
37
38
39 upgrade(Arg) ->
40 66 send_request(Arg, ?FUNCTION_NAME).
41
42
43 send_request(Arg, Action) ->
44 39553 ?FUNCTION_NAME(Arg, Action, config(Action)).
45
46
47 send_request(Arg, Action, Config) ->
48 39553 send_request(
49 maps:without(
50 keys(Config),
51 maybe_label(
52 Arg#{request => {request, args(Action, Arg, Config)}}))).
53
54
55 maybe_label(#{requests := _, label := _} = Arg) ->
56 66 Arg;
57
58 maybe_label(#{requests := _} = Arg) ->
59 39487 Arg#{label => ?MODULE};
60
61 maybe_label(Arg) ->
62
:-(
Arg.
63
64
65 config(send) ->
66 39487 [data];
67
68 config(upgrade) ->
69 66 [config].
70
71
72 keys(Config) ->
73 39553 lists:map(
74 fun
75 ({Key, _}) ->
76
:-(
Key;
77
78 (Key) ->
79 39553 Key
80 end,
81 Config).
82
83
84 args(Action, Arg, Config) ->
85 39553 lists:foldl(
86 fun
87 ({Parameter, Default}, A) ->
88
:-(
A#{Parameter => maps:get(Parameter, Arg, Default)};
89
90 (Parameter, A) ->
91 39553 A#{Parameter => maps:get(Parameter, Arg)}
92 end,
93 #{action => Action},
94 Config).
95
96
97 send_request(#{label := _} = Arg) ->
98 39553 pgmp_statem:send_request(Arg);
99
100 send_request(Arg) ->
101
:-(
pgmp_statem:send_request(Arg#{label => ?MODULE}).
102
103
104 init([Arg]) ->
105 66 process_flag(trap_exit, true),
106 66 {ok,
107 disconnected,
108 #{requests => gen_statem:reqids_new(),
109 config => Arg,
110 telemetry => #{db => #{system => postgresql}}}}.
111
112
113 callback_mode() ->
114 66 handle_event_function.
115
116
117 handle_event({call, {Peer, _} = From},
118 {request, #{action := send, data := Message}},
119 connected,
120 #{peer := Peer}) ->
121 39487 {keep_state_and_data,
122 [nei({send, Message}), {reply, From, ok}]};
123
124 handle_event({call, {Peer, _} = From},
125 {request, #{action := upgrade}},
126 connected,
127 #{socket := Socket, peer := Peer} = Data) ->
128 66 case ssl:connect(
129 Socket,
130 [{verify, verify_none}]) of
131
132 {ok, TLS} ->
133 66 {keep_state, Data#{tls => TLS}, {reply, From, ok}};
134
135 {error, Reason} = Error ->
136
:-(
{stop_and_reply, Reason, {reply, From, Error}}
137 end;
138
139 handle_event({call, {Peer, _}}, {request, _}, _, Data) ->
140 66 {keep_state,
141 Data#{peer => Peer},
142 [postpone, nei(connect)]};
143
144 handle_event(internal,
145 {telemetry, EventName, Measurements},
146 _,
147 _) ->
148 39553 {keep_state_and_data,
149 nei({telemetry, EventName, Measurements, #{}})};
150
151 handle_event(internal,
152 {telemetry, EventName, Measurements, Metadata},
153 _,
154 Data) ->
155 95667 ok = telemetry:execute(
156 [pgmp, socket, EventName],
157 Measurements,
158 maps:merge(
159 maps:with([peer, socket, telemetry], Data),
160 Metadata)),
161 95667 keep_state_and_data;
162
163 handle_event(internal,
164 {error, #{event := EventName, reason := Reason}},
165 _,
166 Data) ->
167
:-(
{next_state,
168 limbo,
169 Data,
170 [nei({telemetry,
171 error,
172 #{count => 1},
173 #{event => EventName, reason => Reason}}),
174
175 {state_timeout,
176 timer:seconds(
177 backoff:rand_increment(
178 pgmp_config:backoff(rand_increment))),
179 {backoff, #{action => EventName, reason => Reason}}}]};
180
181 handle_event(internal, {send = EventName, Data}, _, #{tls := TLS}) ->
182 39421 case ssl:send(TLS, Data) of
183 ok ->
184 39421 {keep_state_and_data,
185 nei({telemetry,
186 EventName,
187 #{bytes => iolist_size(Data)}})};
188
189 {error, Reason} ->
190
:-(
{keep_state_and_data,
191 nei({error, #{reason => Reason, event => EventName}})}
192 end;
193
194 handle_event(internal, {send = EventName, Data}, _, #{socket := Socket}) ->
195 66 case gen_tcp:send(Socket, Data) of
196 ok ->
197 66 {keep_state_and_data,
198 nei({telemetry,
199 EventName,
200 #{bytes => iolist_size(Data)}})};
201
202 {error, Reason} ->
203
:-(
{keep_state_and_data,
204 nei({error, #{reason => Reason, event => EventName}})}
205 end;
206
207 handle_event(info,
208 {tcp_closed, Socket},
209 _,
210 #{socket := Socket}) ->
211
:-(
stop;
212
213 handle_event(info,
214 {tcp, Socket, Received},
215 _,
216 #{socket := Socket, partial := Partial} = Data) ->
217 66 {keep_state,
218 Data#{partial := <<>>},
219 [nei({telemetry,
220 recv,
221 #{bytes => iolist_size(Received)},
222 #{}}),
223 nei({recv, iolist_to_binary([Partial, Received])})]};
224
225 handle_event(info,
226 {ssl_error, TLS, {tls_alert, _} = Reason},
227 _,
228 #{tls := TLS}) ->
229
:-(
{stop, Reason};
230
231 handle_event(info,
232 {ssl_closed, TLS},
233 _,
234 #{tls := TLS}) ->
235
:-(
stop;
236
237 handle_event(info,
238 {ssl, TLS, Received},
239 _,
240 #{tls := TLS, partial := Partial} = Data) ->
241 20273 {keep_state,
242 Data#{partial := <<>>},
243 [nei({telemetry,
244 recv,
245 #{bytes => iolist_size(Received)},
246 #{}}),
247 nei({recv, iolist_to_binary([Partial, Received])})]};
248
249 handle_event(info, {'DOWN', _, process, Peer, noproc}, _, #{peer := Peer}) ->
250
:-(
stop;
251
252 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
253 35775 case gen_statem:check_response(Msg, Existing, true) of
254 {{reply, Reply}, Label, Updated} ->
255 35775 {keep_state,
256 Data#{requests := Updated},
257 nei({response, #{label => Label, reply => Reply}})};
258
259 {{error, {Reason, _}}, _, UpdatedRequests} ->
260
:-(
{stop, Reason, Data#{requests := UpdatedRequests}}
261 end;
262
263 handle_event(internal, {response, #{label := pgmp_mm, reply := ok}}, _, _) ->
264 35775 keep_state_and_data;
265
266 handle_event(internal,
267 {recv,
268 <<Tag:1/signed-bytes,
269 Length:32/signed,
270 Message:(Length - 4)/bytes, Remainder/bytes>>},
271 _,
272 _) ->
273 35709 {keep_state_and_data,
274 [nei({tag_msg, Tag, Message}), nei({recv, Remainder})]};
275
276 handle_event(internal,
277 {recv, Message},
278 _,
279 #{peer := Peer, requests := Requests} = Data)
280 when Message == <<"S">>;
281 Message == <<"N">> ->
282 66 {keep_state,
283 Data#{requests := pgmp_mm:recv(
284 #{server_ref => Peer,
285 tag => ssl,
286 message => Message,
287 requests => Requests})},
288 nei({telemetry,
289 ssl,
290 #{count => 1, bytes => iolist_size(Message)},
291 #{tag => ssl}})};
292
293 handle_event(internal, {recv, <<>>}, _, _) ->
294 20093 keep_state_and_data;
295
296 handle_event(internal, {recv, Partial}, _, #{partial := <<>>} = Data) ->
297 180 {keep_state, Data#{partial := Partial}};
298
299 handle_event(internal,
300 {tag_msg = EventName, Tag, Message},
301 _,
302 #{peer := Peer, requests := Requests} = Data) ->
303 35709 {keep_state,
304 Data#{requests := pgmp_mm:recv(
305 #{server_ref => Peer,
306 tag => pgmp_message_tags:name(backend, Tag),
307 message => Message,
308 requests => Requests})},
309 nei({telemetry,
310 EventName,
311 #{count => 1, bytes => iolist_size(Message)},
312 #{tag => pgmp_message_tags:name(backend, Tag)}})};
313
314 handle_event(internal,
315 connect,
316 _,
317 #{config := #{host := Host, port := Port}}) ->
318 66 {keep_state_and_data,
319 nei({connect,
320 #{family => inet,
321 port => Port,
322 addr => addr(Host)}})};
323
324 handle_event(internal,
325 {connect = EventName,
326 #{family := Family, port := Port, addr := Addr} = SockAddr},
327 _,
328 #{telemetry := Telemetry} = Data) ->
329
330 66 case gen_tcp:connect(SockAddr, [{mode, binary}]) of
331 {ok, Socket} ->
332 66 {next_state,
333 connected,
334 Data#{partial => <<>>,
335 socket => Socket,
336 telemetry :=
337 Telemetry#{net => #{peer => #{name => Addr,
338 port => Port},
339 sock => #{family => Family}}}},
340 nei({telemetry, EventName, #{count => 1}})};
341
342 {error, Reason} ->
343
:-(
{keep_state_and_data,
344 nei({error, #{reason => Reason, event => EventName}})}
345 end;
346
347 handle_event(state_timeout, {backoff, _}, limbo, _) ->
348
:-(
stop.
349
350
351 terminate(_Reason, _State, #{socket := Socket}) ->
352 66 _ = gen_tcp:close(Socket);
353
354 terminate(_Reason, _State, _Data) ->
355
:-(
ok.
356
357
358 addr(Hostname) ->
359 66 case inet:gethostbyname(binary_to_list(Hostname)) of
360 {ok, #hostent{h_addr_list = Addresses}} ->
361 66 pick_one(Addresses);
362
363 {error, _} = Error ->
364
:-(
Error
365 end.
366
367
368 pick_one(L) ->
369 66 lists:nth(rand:uniform(length(L)), L).
Line Hits Source