_site/cover/pgmp_connection.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_connection).
17
18
19 -export([bind/1]).
20 -export([callback_mode/0]).
21 -export([describe/1]).
22 -export([execute/1]).
23 -export([handle_event/4]).
24 -export([init/1]).
25 -export([join/1]).
26 -export([parameters/1]).
27 -export([parse/1]).
28 -export([query/1]).
29 -export([ready_for_query/1]).
30 -export([server_ref/1]).
31 -export([start_link/1]).
32 -export([sync/1]).
33 -import(pgmp_statem, [nei/1]).
34 -import(pgmp_statem, [send_request/1]).
35 -include_lib("kernel/include/logger.hrl").
36
37
38 start_link(Arg) ->
39 10 gen_statem:start_link(
40 {local, server_ref(Arg)},
41 ?MODULE,
42 [Arg],
43 envy_gen:options(?MODULE)).
44
45
46 -spec server_ref(pgmp_dbs_sup:db()) -> gen_statem:server_ref().
47
48 server_ref(#{application_name := ApplicationName}) ->
49 427 pgmp_util:snake_case([binary_to_list(ApplicationName), ?MODULE]).
50
51
52 query(Arg) ->
53 205 pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)).
54
55
56 parse(Arg) ->
57 3803 pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)).
58
59
60 sync(Arg) ->
61 2 pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)).
62
63
64 bind(Arg) ->
65 3953 pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)).
66
67
68 describe(Arg) ->
69 5 pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)).
70
71
72 execute(Arg) ->
73 3951 pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)).
74
75
76 parameters(Arg) ->
77 1 pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)).
78
79
80 join(Arg) ->
81 61 send_request(Arg, ?FUNCTION_NAME, [group]).
82
83
84 ready_for_query(Arg) ->
85 281 send_request(Arg, ?FUNCTION_NAME, [state]).
86
87
88 send_request(Arg, Action, Config) ->
89 342 send_request(
90 maps:without(
91 keys(Config),
92 maybe_label(
93 default_server_ref(
94 Arg#{request => {Action, args(Arg, Config)}})))).
95
96
97 default_server_ref(Arg) ->
98 12262 maps:merge(
99 #{server_ref => pgmp_util:snake_case(
100 [binary_to_list(
101 pgmp_dbs_sup:default(
102 application_name)),
103 ?MODULE])},
104 Arg).
105
106
107 maybe_label(#{requests := _, label := _} = Arg) ->
108
:-(
Arg;
109
110 maybe_label(#{requests := _} = Arg) ->
111 342 Arg#{label => ?MODULE};
112
113 maybe_label(Arg) ->
114
:-(
Arg.
115
116
117 keys(Config) ->
118 342 lists:map(
119 fun
120 ({Key, _}) ->
121
:-(
Key;
122
123 (Key) ->
124 342 Key
125 end,
126 Config).
127
128
129 args(Arg, Config) ->
130 342 lists:map(
131 fun
132 ({Parameter, Default}) ->
133
:-(
maps:get(Parameter, Arg, Default);
134
135 (Parameter) ->
136 342 maps:get(Parameter, Arg)
137 end,
138 Config).
139
140
141 callback_mode() ->
142 10 handle_event_function.
143
144
145 init([Arg]) ->
146 10 {ok,
147 drained,
148 Arg#{requests => gen_statem:reqids_new(),
149 max => pgmp_config:pool(max),
150 owners => #{},
151 reservations => #{},
152 outstanding => #{},
153 monitors => #{},
154 connections => #{}},
155 [nei(pool_sup), nei(census)]}.
156
157 handle_event({call, {Owner, _} = From},
158 {request, _} = Request,
159 _,
160 #{owners := Owners,
161 outstanding := Outstanding,
162 requests := Requests} = Data)
163 when is_map_key(Owner, Owners) ->
164 11756 #{Owner := Connection} = Owners,
165 11756 Inflight = maps:get(Connection, Outstanding, []),
166 11756 {keep_state,
167 Data#{outstanding := Outstanding#{Connection => [From | Inflight]},
168 requests := gen_statem:send_request(
169 Connection,
170 Request,
171 #{module => ?MODULE,
172 connection => Connection,
173 from => From},
174 Requests)},
175 nei(census)};
176
177 handle_event({call, {Owner, _} = From},
178 {request, _} = Request,
179 available,
180 #{connections := Connections,
181 monitors := Monitors,
182 outstanding := Outstanding,
183 requests := Requests,
184 reservations := Reservations,
185 owners := Owners} = Data) ->
186
187 164 [Connection | _] = Idle = maps:keys(
188 maps:filter(
189 connection_state_is(idle),
190 Connections)),
191
192 164 NextState = case Idle of
193 [Connection] ->
194 73 drained;
195
196 [Connection | _] ->
197 91 available
198 end,
199
200 164 {next_state,
201 NextState,
202 Data#{connections := Connections#{Connection := reserved},
203 outstanding := Outstanding#{Connection => [From]},
204 monitors := Monitors#{Owner => erlang:monitor(process, Owner)},
205 requests := gen_statem:send_request(
206 Connection,
207 Request,
208 #{module => ?MODULE,
209 connection => Connection,
210 from => From},
211 Requests),
212 reservations := Reservations#{Connection => Owner},
213 owners := Owners#{Owner => Connection}},
214 nei(census)};
215
216 handle_event({call, _},
217 {request, _},
218 drained,
219 #{max := Max,
220 owners := Owners,
221 pool_sup := PoolSup,
222 connections := Connections})
223 when map_size(Connections) + map_size(Owners) < Max ->
224 61 {ok, _} = pgmp_pool_sup:start_child(PoolSup),
225 61 {keep_state_and_data,
226 [postpone,
227 nei({telemetry, new, #{count => 1}})]};
228
229 handle_event({call, _}, {request, _}, drained, _) ->
230
:-(
{keep_state_and_data,
231 [postpone,
232 nei({telemetry, postponed, #{count => 1}})]};
233
234 handle_event({call, {Connection, _} = From},
235 {join, [_Group]},
236 drained,
237 #{connections := Connections} = Data) ->
238 61 _ = erlang:monitor(process, Connection),
239 61 {next_state,
240 available,
241 Data#{connections := Connections#{Connection => idle}},
242 [{reply, From, ok}, nei(census)]};
243
244 handle_event({call, {Connection, _} = From},
245 {ready_for_query, [idle]},
246 _,
247 #{outstanding := Outstanding})
248 when is_map_key(Connection, Outstanding) ->
249 71 {keep_state_and_data, {reply, From, ok}};
250
251 handle_event({call, {Connection, _} = From},
252 {ready_for_query, [idle = State]},
253 drained,
254 #{connections := Connections,
255 reservations := Reservations,
256 monitors := Monitors,
257 owners := Owners} = Data)
258 when is_map_key(Connection, Reservations) ->
259 20 #{Connection := Owner} = Reservations,
260 20 #{Owner := Monitor} = Monitors,
261 20 true = erlang:demonitor(Monitor, [flush]),
262 20 {next_state,
263 available,
264 Data#{connections := Connections#{Connection => State},
265 owners := maps:without([Owner], Owners),
266 monitors := maps:without([Owner], Monitors),
267 reservations := maps:without([Connection], Reservations)},
268 [{reply, From, ok}, nei(census)]};
269
270 handle_event({call, {Connection, _} = From},
271 {join, [_Group]},
272 _,
273 #{connections := Connections} = Data) ->
274
:-(
_ = erlang:monitor(process, Connection),
275
:-(
{keep_state,
276 Data#{connections := Connections#{Connection => idle}},
277 [{reply, From, ok}, nei(census)]};
278
279 handle_event({call, {Connection, _} = From},
280 {ready_for_query, [idle = State]},
281 _,
282 #{connections := Connections,
283 reservations := Reservations,
284 monitors := Monitors,
285 owners := Owners} = Data)
286 when is_map_key(Connection, Reservations) ->
287 96 #{Connection := Owner} = Reservations,
288 96 #{Owner := Monitor} = Monitors,
289 96 true = erlang:demonitor(Monitor, [flush]),
290 96 {keep_state,
291 Data#{connections := Connections#{Connection => State},
292 owners := maps:without([Owner], Owners),
293 monitors := maps:without([Owner], Monitors),
294 reservations := maps:without([Connection], Reservations)},
295 [{reply, From, ok}, nei(census)]};
296
297 handle_event({call, {Connection, _} = From},
298 {ready_for_query, [State]},
299 _,
300 #{connections := Connections} = Data) ->
301 94 {keep_state,
302 Data#{connections := Connections#{Connection => State}},
303 [{reply, From, ok}, nei(census)]};
304
305
306 handle_event(internal, pool_sup, _, Data) ->
307 10 {keep_state,
308 Data#{pool_sup => pgmp_sup:get_child_pid(
309 hd(get('$ancestors')),
310 pool_sup)}};
311
312 handle_event(internal,
313 {response,
314 #{label := #{module := ?MODULE,
315 connection := Connection,
316 from := {'DOWN', _Monitor, process, Owner, _Info}},
317 reply := ok}},
318 _,
319 #{owners := Owners,
320 reservations := Reservations,
321 monitors := Monitors} = Data)
322 when is_map_key(Owner, Owners) ->
323
:-(
#{Owner := Connection} = Owners,
324
:-(
{keep_state,
325 Data#{monitors := maps:without([Owner], Monitors),
326 reservations := maps:without([Connection], Reservations),
327 owners := maps:without([Owner], Owners)},
328 nei(census)};
329
330 handle_event(internal,
331 {response,
332 #{label := #{module := ?MODULE,
333 connection := Connection,
334 from := {'DOWN', _Monitor, process, Owner, _Info}},
335 reply := ok}},
336 _,
337 #{monitors := Monitors,
338 reservations := Reservations} = Data) ->
339
:-(
{keep_state,
340 Data#{monitors := maps:without([Owner], Monitors),
341 reservations := maps:without([Connection], Reservations)},
342 nei(census)};
343
344 handle_event(internal,
345 {response,
346 #{label := #{module := ?MODULE,
347 connection := Connection,
348 from := {'DOWN', _Monitor, process, Owner, _Info}},
349 reply := {ready_for_query, in_tx_block}}},
350 _,
351 #{outstanding := Outstanding,
352 owners := Owners,
353 reservations := Reservations,
354 connections := Connections} = Data)
355 when not(is_map_key(Connection, Outstanding)),
356 is_map_key(Connection, Connections),
357 is_map_key(Connection, Reservations),
358 is_map_key(Owner, Owners) ->
359
:-(
ok = gen_statem:stop(Connection),
360
:-(
{keep_state,
361 Data#{owners := maps:without([Owner], Owners),
362 reservations := maps:without([Connection], Reservations),
363 connections := Connections#{Connection => limbo}},
364 nei(census)};
365
366 handle_event(internal,
367 {response,
368 #{label := #{module := ?MODULE,
369 connection := Connection,
370 from := From},
371 reply := Reply}},
372 _,
373 #{outstanding := Outstanding} = Data) ->
374 11920 #{Connection := Inflight} = Outstanding,
375 11920 case lists:delete(From, Inflight) of
376 [] ->
377 11920 {keep_state,
378 Data#{outstanding := maps:without([Connection], Outstanding)},
379 [{reply, From, Reply}, nei(census)]};
380
381 Remaining ->
382
:-(
{keep_state,
383 Data#{outstanding := Outstanding#{Connection := Remaining}},
384 [{reply, From, Reply}, nei(census)]}
385 end;
386
387 handle_event(info,
388 {'DOWN', _Monitor, process, Owner, _Info},
389 _,
390 #{monitors := Monitors,
391 reservations := Reservations,
392 connections := Connections,
393 owners := Owners} = Data)
394 when is_map_key(Owner, Monitors),
395 is_map_key(Owner, Owners) ->
396 48 #{Owner := Connection} = Owners,
397 48 ok = gen_statem:stop(Connection),
398 48 {keep_state,
399 Data#{monitors := maps:without([Owner], Monitors),
400 owners := maps:without([Owner], Owners),
401 reservations := maps:without([Connection], Reservations),
402 connections := Connections#{Connection => limbo}},
403 nei(census)};
404
405 handle_event(info,
406 {'DOWN', _Monitor, process, Owner, _Info},
407 _,
408 #{monitors := Monitors} = Data)
409 when is_map_key(Owner, Monitors) ->
410
:-(
{keep_state,
411 Data#{monitors := maps:without([Owner], Monitors)},
412 nei(census)};
413
414 handle_event(info,
415 {'DOWN', _Monitor, process, Connection, _Info},
416 available,
417 #{connections := Connections,
418 reservations := Reservations,
419 monitors := Monitors,
420 owners := Owners} = Data)
421 when is_map_key(Connection, Connections),
422 is_map_key(Connection, Reservations),
423 map_size(Connections) > 1 ->
424
:-(
#{Connection := Owner} = Reservations,
425
:-(
#{Owner := Monitor} = Monitors,
426
:-(
true = erlang:demonitor(Monitor, [flush]),
427
:-(
{keep_state,
428 Data#{connections := maps:without([Connection], Connections),
429 owners := maps:without([Owner], Owners),
430 monitors := maps:without([Owner], Monitors),
431 reservations := maps:without([Connection], Reservations)},
432 nei(census)};
433
434 handle_event(info,
435 {'DOWN', _Monitor, process, Connection, _Info},
436 _,
437 #{connections := Connections,
438 reservations := Reservations,
439 monitors := Monitors,
440 owners := Owners} = Data)
441 when is_map_key(Connection, Connections),
442 is_map_key(Connection, Reservations) ->
443
:-(
#{Connection := Owner} = Reservations,
444
:-(
#{Owner := Monitor} = Monitors,
445
:-(
true = erlang:demonitor(Monitor, [flush]),
446
:-(
{next_state,
447 drained,
448 Data#{connections := maps:without([Connection], Connections),
449 owners := maps:without([Owner], Owners),
450 monitors := maps:without([Owner], Monitors),
451 reservations := maps:without([Connection], Reservations)},
452 nei(census)};
453
454 handle_event(info,
455 {'DOWN', _Monitor, process, Connection, _Info},
456 available,
457 #{connections := Connections} = Data)
458 when is_map_key(Connection, Connections),
459 map_size(Connections) > 1 ->
460 4 {keep_state,
461 Data#{connections := maps:without([Connection], Connections)},
462 nei(census)};
463
464 handle_event(info,
465 {'DOWN', _Monitor, process, Connection, _Info},
466 _,
467 #{connections := Connections} = Data)
468 when is_map_key(Connection, Connections) ->
469 44 {next_state,
470 drained,
471 Data#{connections := maps:without([Connection], Connections)},
472 nei(census)};
473
474 handle_event(info, Msg, _, #{requests := Existing} = Data) ->
475 11920 case gen_statem:check_response(Msg, Existing, true) of
476 {{reply, Reply}, Label, Updated} ->
477 11920 {keep_state,
478 Data#{requests := Updated},
479 nei({response, #{label => Label, reply => Reply}})};
480
481 {{error, {Reason, _}}, pgmp_socket, UpdatedRequests}
482 when Reason == noproc; Reason == normal ->
483
:-(
{stop,
484 normal,
485 Data#{requests := UpdatedRequests}};
486
487 {{error, {Reason, _}}, #{module := ?MODULE}, UpdatedRequests}
488 when Reason == normal; Reason == shutdown ->
489
:-(
{stop,
490 Reason,
491 Data#{requests := UpdatedRequests}};
492
493 {{error, {Reason, ServerRef}}, Label, UpdatedRequests} ->
494
:-(
{stop,
495 #{reason => Reason,
496 server_ref => ServerRef,
497 label => Label},
498 Data#{requests := UpdatedRequests}};
499
500 Otherwise ->
501
:-(
?LOG_ERROR(#{reason => Otherwise, msg => Msg, data => Data}),
502
:-(
keep_state_and_data
503 end;
504
505 handle_event(internal, census, _, #{max := Max}) ->
506 24217 {keep_state_and_data,
507 [nei({census,
508 [owners,
509 reservations,
510 outstanding,
511 monitors,
512 connections]}),
513 nei({telemetry, pool, #{size => Max}})]};
514
515 handle_event(internal, {census, Names}, _, Data) ->
516 24217 {keep_state_and_data,
517 lists:map(
518 fun
519 (Name) ->
520 121085 nei({telemetry, Name, #{size => maps:size(maps:get(Name, Data))}})
521 end,
522 Names)};
523
524 handle_event(internal,
525 {telemetry, EventName, Measurements},
526 _,
527 _) ->
528 145363 {keep_state_and_data,
529 nei({telemetry, EventName, Measurements, #{}})};
530
531 handle_event(internal,
532 {telemetry, EventName, Measurements, Metadata},
533 _,
534 _) when is_atom(EventName) ->
535 145363 {keep_state_and_data,
536 nei({telemetry, [EventName], Measurements, Metadata})};
537
538 handle_event(internal,
539 {telemetry, EventName, Measurements, Metadata},
540 State,
541 _) ->
542 145363 ok = telemetry:execute(
543 [pgmp, connection] ++ EventName,
544 Measurements,
545 Metadata#{state => State}),
546 145363 keep_state_and_data.
547
548
549 connection_state_is(Desired) ->
550 164 fun
551 (_, Current) ->
552 265 Current == Desired
553 end.
Line Hits Source