1 |
|
%% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com> |
2 |
|
%% |
3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
%% you may not use this file except in compliance with the License. |
5 |
|
%% You may obtain a copy of the License at |
6 |
|
%% |
7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
%% |
9 |
|
%% Unless required by applicable law or agreed to in writing, software |
10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
%% See the License for the specific language governing permissions and |
13 |
|
%% limitations under the License. |
14 |
|
|
15 |
|
|
16 |
|
-module(pgmp_connection). |
17 |
|
|
18 |
|
|
19 |
|
-export([bind/1]). |
20 |
|
-export([callback_mode/0]). |
21 |
|
-export([describe/1]). |
22 |
|
-export([execute/1]). |
23 |
|
-export([handle_event/4]). |
24 |
|
-export([init/1]). |
25 |
|
-export([join/1]). |
26 |
|
-export([parameters/1]). |
27 |
|
-export([parse/1]). |
28 |
|
-export([query/1]). |
29 |
|
-export([ready_for_query/1]). |
30 |
|
-export([server_ref/1]). |
31 |
|
-export([start_link/1]). |
32 |
|
-export([sync/1]). |
33 |
|
-import(pgmp_statem, [nei/1]). |
34 |
|
-import(pgmp_statem, [send_request/1]). |
35 |
|
-include_lib("kernel/include/logger.hrl"). |
36 |
|
|
37 |
|
|
38 |
|
start_link(Arg) -> |
39 |
10 |
gen_statem:start_link( |
40 |
|
{local, server_ref(Arg)}, |
41 |
|
?MODULE, |
42 |
|
[Arg], |
43 |
|
envy_gen:options(?MODULE)). |
44 |
|
|
45 |
|
|
46 |
|
-spec server_ref(pgmp_dbs_sup:db()) -> gen_statem:server_ref(). |
47 |
|
|
48 |
|
server_ref(#{application_name := ApplicationName}) -> |
49 |
427 |
pgmp_util:snake_case([binary_to_list(ApplicationName), ?MODULE]). |
50 |
|
|
51 |
|
|
52 |
|
query(Arg) -> |
53 |
205 |
pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)). |
54 |
|
|
55 |
|
|
56 |
|
parse(Arg) -> |
57 |
3803 |
pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)). |
58 |
|
|
59 |
|
|
60 |
|
sync(Arg) -> |
61 |
2 |
pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)). |
62 |
|
|
63 |
|
|
64 |
|
bind(Arg) -> |
65 |
3953 |
pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)). |
66 |
|
|
67 |
|
|
68 |
|
describe(Arg) -> |
69 |
5 |
pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)). |
70 |
|
|
71 |
|
|
72 |
|
execute(Arg) -> |
73 |
3951 |
pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)). |
74 |
|
|
75 |
|
|
76 |
|
parameters(Arg) -> |
77 |
1 |
pgmp_mm:?FUNCTION_NAME(default_server_ref(Arg)). |
78 |
|
|
79 |
|
|
80 |
|
join(Arg) -> |
81 |
61 |
send_request(Arg, ?FUNCTION_NAME, [group]). |
82 |
|
|
83 |
|
|
84 |
|
ready_for_query(Arg) -> |
85 |
281 |
send_request(Arg, ?FUNCTION_NAME, [state]). |
86 |
|
|
87 |
|
|
88 |
|
send_request(Arg, Action, Config) -> |
89 |
342 |
send_request( |
90 |
|
maps:without( |
91 |
|
keys(Config), |
92 |
|
maybe_label( |
93 |
|
default_server_ref( |
94 |
|
Arg#{request => {Action, args(Arg, Config)}})))). |
95 |
|
|
96 |
|
|
97 |
|
default_server_ref(Arg) -> |
98 |
12262 |
maps:merge( |
99 |
|
#{server_ref => pgmp_util:snake_case( |
100 |
|
[binary_to_list( |
101 |
|
pgmp_dbs_sup:default( |
102 |
|
application_name)), |
103 |
|
?MODULE])}, |
104 |
|
Arg). |
105 |
|
|
106 |
|
|
107 |
|
maybe_label(#{requests := _, label := _} = Arg) -> |
108 |
:-( |
Arg; |
109 |
|
|
110 |
|
maybe_label(#{requests := _} = Arg) -> |
111 |
342 |
Arg#{label => ?MODULE}; |
112 |
|
|
113 |
|
maybe_label(Arg) -> |
114 |
:-( |
Arg. |
115 |
|
|
116 |
|
|
117 |
|
keys(Config) -> |
118 |
342 |
lists:map( |
119 |
|
fun |
120 |
|
({Key, _}) -> |
121 |
:-( |
Key; |
122 |
|
|
123 |
|
(Key) -> |
124 |
342 |
Key |
125 |
|
end, |
126 |
|
Config). |
127 |
|
|
128 |
|
|
129 |
|
args(Arg, Config) -> |
130 |
342 |
lists:map( |
131 |
|
fun |
132 |
|
({Parameter, Default}) -> |
133 |
:-( |
maps:get(Parameter, Arg, Default); |
134 |
|
|
135 |
|
(Parameter) -> |
136 |
342 |
maps:get(Parameter, Arg) |
137 |
|
end, |
138 |
|
Config). |
139 |
|
|
140 |
|
|
141 |
|
callback_mode() -> |
142 |
10 |
handle_event_function. |
143 |
|
|
144 |
|
|
145 |
|
init([Arg]) -> |
146 |
10 |
{ok, |
147 |
|
drained, |
148 |
|
Arg#{requests => gen_statem:reqids_new(), |
149 |
|
max => pgmp_config:pool(max), |
150 |
|
owners => #{}, |
151 |
|
reservations => #{}, |
152 |
|
outstanding => #{}, |
153 |
|
monitors => #{}, |
154 |
|
connections => #{}}, |
155 |
|
[nei(pool_sup), nei(census)]}. |
156 |
|
|
157 |
|
handle_event({call, {Owner, _} = From}, |
158 |
|
{request, _} = Request, |
159 |
|
_, |
160 |
|
#{owners := Owners, |
161 |
|
outstanding := Outstanding, |
162 |
|
requests := Requests} = Data) |
163 |
|
when is_map_key(Owner, Owners) -> |
164 |
11756 |
#{Owner := Connection} = Owners, |
165 |
11756 |
Inflight = maps:get(Connection, Outstanding, []), |
166 |
11756 |
{keep_state, |
167 |
|
Data#{outstanding := Outstanding#{Connection => [From | Inflight]}, |
168 |
|
requests := gen_statem:send_request( |
169 |
|
Connection, |
170 |
|
Request, |
171 |
|
#{module => ?MODULE, |
172 |
|
connection => Connection, |
173 |
|
from => From}, |
174 |
|
Requests)}, |
175 |
|
nei(census)}; |
176 |
|
|
177 |
|
handle_event({call, {Owner, _} = From}, |
178 |
|
{request, _} = Request, |
179 |
|
available, |
180 |
|
#{connections := Connections, |
181 |
|
monitors := Monitors, |
182 |
|
outstanding := Outstanding, |
183 |
|
requests := Requests, |
184 |
|
reservations := Reservations, |
185 |
|
owners := Owners} = Data) -> |
186 |
|
|
187 |
164 |
[Connection | _] = Idle = maps:keys( |
188 |
|
maps:filter( |
189 |
|
connection_state_is(idle), |
190 |
|
Connections)), |
191 |
|
|
192 |
164 |
NextState = case Idle of |
193 |
|
[Connection] -> |
194 |
73 |
drained; |
195 |
|
|
196 |
|
[Connection | _] -> |
197 |
91 |
available |
198 |
|
end, |
199 |
|
|
200 |
164 |
{next_state, |
201 |
|
NextState, |
202 |
|
Data#{connections := Connections#{Connection := reserved}, |
203 |
|
outstanding := Outstanding#{Connection => [From]}, |
204 |
|
monitors := Monitors#{Owner => erlang:monitor(process, Owner)}, |
205 |
|
requests := gen_statem:send_request( |
206 |
|
Connection, |
207 |
|
Request, |
208 |
|
#{module => ?MODULE, |
209 |
|
connection => Connection, |
210 |
|
from => From}, |
211 |
|
Requests), |
212 |
|
reservations := Reservations#{Connection => Owner}, |
213 |
|
owners := Owners#{Owner => Connection}}, |
214 |
|
nei(census)}; |
215 |
|
|
216 |
|
handle_event({call, _}, |
217 |
|
{request, _}, |
218 |
|
drained, |
219 |
|
#{max := Max, |
220 |
|
owners := Owners, |
221 |
|
pool_sup := PoolSup, |
222 |
|
connections := Connections}) |
223 |
|
when map_size(Connections) + map_size(Owners) < Max -> |
224 |
61 |
{ok, _} = pgmp_pool_sup:start_child(PoolSup), |
225 |
61 |
{keep_state_and_data, |
226 |
|
[postpone, |
227 |
|
nei({telemetry, new, #{count => 1}})]}; |
228 |
|
|
229 |
|
handle_event({call, _}, {request, _}, drained, _) -> |
230 |
:-( |
{keep_state_and_data, |
231 |
|
[postpone, |
232 |
|
nei({telemetry, postponed, #{count => 1}})]}; |
233 |
|
|
234 |
|
handle_event({call, {Connection, _} = From}, |
235 |
|
{join, [_Group]}, |
236 |
|
drained, |
237 |
|
#{connections := Connections} = Data) -> |
238 |
61 |
_ = erlang:monitor(process, Connection), |
239 |
61 |
{next_state, |
240 |
|
available, |
241 |
|
Data#{connections := Connections#{Connection => idle}}, |
242 |
|
[{reply, From, ok}, nei(census)]}; |
243 |
|
|
244 |
|
handle_event({call, {Connection, _} = From}, |
245 |
|
{ready_for_query, [idle]}, |
246 |
|
_, |
247 |
|
#{outstanding := Outstanding}) |
248 |
|
when is_map_key(Connection, Outstanding) -> |
249 |
71 |
{keep_state_and_data, {reply, From, ok}}; |
250 |
|
|
251 |
|
handle_event({call, {Connection, _} = From}, |
252 |
|
{ready_for_query, [idle = State]}, |
253 |
|
drained, |
254 |
|
#{connections := Connections, |
255 |
|
reservations := Reservations, |
256 |
|
monitors := Monitors, |
257 |
|
owners := Owners} = Data) |
258 |
|
when is_map_key(Connection, Reservations) -> |
259 |
20 |
#{Connection := Owner} = Reservations, |
260 |
20 |
#{Owner := Monitor} = Monitors, |
261 |
20 |
true = erlang:demonitor(Monitor, [flush]), |
262 |
20 |
{next_state, |
263 |
|
available, |
264 |
|
Data#{connections := Connections#{Connection => State}, |
265 |
|
owners := maps:without([Owner], Owners), |
266 |
|
monitors := maps:without([Owner], Monitors), |
267 |
|
reservations := maps:without([Connection], Reservations)}, |
268 |
|
[{reply, From, ok}, nei(census)]}; |
269 |
|
|
270 |
|
handle_event({call, {Connection, _} = From}, |
271 |
|
{join, [_Group]}, |
272 |
|
_, |
273 |
|
#{connections := Connections} = Data) -> |
274 |
:-( |
_ = erlang:monitor(process, Connection), |
275 |
:-( |
{keep_state, |
276 |
|
Data#{connections := Connections#{Connection => idle}}, |
277 |
|
[{reply, From, ok}, nei(census)]}; |
278 |
|
|
279 |
|
handle_event({call, {Connection, _} = From}, |
280 |
|
{ready_for_query, [idle = State]}, |
281 |
|
_, |
282 |
|
#{connections := Connections, |
283 |
|
reservations := Reservations, |
284 |
|
monitors := Monitors, |
285 |
|
owners := Owners} = Data) |
286 |
|
when is_map_key(Connection, Reservations) -> |
287 |
96 |
#{Connection := Owner} = Reservations, |
288 |
96 |
#{Owner := Monitor} = Monitors, |
289 |
96 |
true = erlang:demonitor(Monitor, [flush]), |
290 |
96 |
{keep_state, |
291 |
|
Data#{connections := Connections#{Connection => State}, |
292 |
|
owners := maps:without([Owner], Owners), |
293 |
|
monitors := maps:without([Owner], Monitors), |
294 |
|
reservations := maps:without([Connection], Reservations)}, |
295 |
|
[{reply, From, ok}, nei(census)]}; |
296 |
|
|
297 |
|
handle_event({call, {Connection, _} = From}, |
298 |
|
{ready_for_query, [State]}, |
299 |
|
_, |
300 |
|
#{connections := Connections} = Data) -> |
301 |
94 |
{keep_state, |
302 |
|
Data#{connections := Connections#{Connection => State}}, |
303 |
|
[{reply, From, ok}, nei(census)]}; |
304 |
|
|
305 |
|
|
306 |
|
handle_event(internal, pool_sup, _, Data) -> |
307 |
10 |
{keep_state, |
308 |
|
Data#{pool_sup => pgmp_sup:get_child_pid( |
309 |
|
hd(get('$ancestors')), |
310 |
|
pool_sup)}}; |
311 |
|
|
312 |
|
handle_event(internal, |
313 |
|
{response, |
314 |
|
#{label := #{module := ?MODULE, |
315 |
|
connection := Connection, |
316 |
|
from := {'DOWN', _Monitor, process, Owner, _Info}}, |
317 |
|
reply := ok}}, |
318 |
|
_, |
319 |
|
#{owners := Owners, |
320 |
|
reservations := Reservations, |
321 |
|
monitors := Monitors} = Data) |
322 |
|
when is_map_key(Owner, Owners) -> |
323 |
:-( |
#{Owner := Connection} = Owners, |
324 |
:-( |
{keep_state, |
325 |
|
Data#{monitors := maps:without([Owner], Monitors), |
326 |
|
reservations := maps:without([Connection], Reservations), |
327 |
|
owners := maps:without([Owner], Owners)}, |
328 |
|
nei(census)}; |
329 |
|
|
330 |
|
handle_event(internal, |
331 |
|
{response, |
332 |
|
#{label := #{module := ?MODULE, |
333 |
|
connection := Connection, |
334 |
|
from := {'DOWN', _Monitor, process, Owner, _Info}}, |
335 |
|
reply := ok}}, |
336 |
|
_, |
337 |
|
#{monitors := Monitors, |
338 |
|
reservations := Reservations} = Data) -> |
339 |
:-( |
{keep_state, |
340 |
|
Data#{monitors := maps:without([Owner], Monitors), |
341 |
|
reservations := maps:without([Connection], Reservations)}, |
342 |
|
nei(census)}; |
343 |
|
|
344 |
|
handle_event(internal, |
345 |
|
{response, |
346 |
|
#{label := #{module := ?MODULE, |
347 |
|
connection := Connection, |
348 |
|
from := {'DOWN', _Monitor, process, Owner, _Info}}, |
349 |
|
reply := {ready_for_query, in_tx_block}}}, |
350 |
|
_, |
351 |
|
#{outstanding := Outstanding, |
352 |
|
owners := Owners, |
353 |
|
reservations := Reservations, |
354 |
|
connections := Connections} = Data) |
355 |
|
when not(is_map_key(Connection, Outstanding)), |
356 |
|
is_map_key(Connection, Connections), |
357 |
|
is_map_key(Connection, Reservations), |
358 |
|
is_map_key(Owner, Owners) -> |
359 |
:-( |
ok = gen_statem:stop(Connection), |
360 |
:-( |
{keep_state, |
361 |
|
Data#{owners := maps:without([Owner], Owners), |
362 |
|
reservations := maps:without([Connection], Reservations), |
363 |
|
connections := Connections#{Connection => limbo}}, |
364 |
|
nei(census)}; |
365 |
|
|
366 |
|
handle_event(internal, |
367 |
|
{response, |
368 |
|
#{label := #{module := ?MODULE, |
369 |
|
connection := Connection, |
370 |
|
from := From}, |
371 |
|
reply := Reply}}, |
372 |
|
_, |
373 |
|
#{outstanding := Outstanding} = Data) -> |
374 |
11920 |
#{Connection := Inflight} = Outstanding, |
375 |
11920 |
case lists:delete(From, Inflight) of |
376 |
|
[] -> |
377 |
11920 |
{keep_state, |
378 |
|
Data#{outstanding := maps:without([Connection], Outstanding)}, |
379 |
|
[{reply, From, Reply}, nei(census)]}; |
380 |
|
|
381 |
|
Remaining -> |
382 |
:-( |
{keep_state, |
383 |
|
Data#{outstanding := Outstanding#{Connection := Remaining}}, |
384 |
|
[{reply, From, Reply}, nei(census)]} |
385 |
|
end; |
386 |
|
|
387 |
|
handle_event(info, |
388 |
|
{'DOWN', _Monitor, process, Owner, _Info}, |
389 |
|
_, |
390 |
|
#{monitors := Monitors, |
391 |
|
reservations := Reservations, |
392 |
|
connections := Connections, |
393 |
|
owners := Owners} = Data) |
394 |
|
when is_map_key(Owner, Monitors), |
395 |
|
is_map_key(Owner, Owners) -> |
396 |
48 |
#{Owner := Connection} = Owners, |
397 |
48 |
ok = gen_statem:stop(Connection), |
398 |
48 |
{keep_state, |
399 |
|
Data#{monitors := maps:without([Owner], Monitors), |
400 |
|
owners := maps:without([Owner], Owners), |
401 |
|
reservations := maps:without([Connection], Reservations), |
402 |
|
connections := Connections#{Connection => limbo}}, |
403 |
|
nei(census)}; |
404 |
|
|
405 |
|
handle_event(info, |
406 |
|
{'DOWN', _Monitor, process, Owner, _Info}, |
407 |
|
_, |
408 |
|
#{monitors := Monitors} = Data) |
409 |
|
when is_map_key(Owner, Monitors) -> |
410 |
:-( |
{keep_state, |
411 |
|
Data#{monitors := maps:without([Owner], Monitors)}, |
412 |
|
nei(census)}; |
413 |
|
|
414 |
|
handle_event(info, |
415 |
|
{'DOWN', _Monitor, process, Connection, _Info}, |
416 |
|
available, |
417 |
|
#{connections := Connections, |
418 |
|
reservations := Reservations, |
419 |
|
monitors := Monitors, |
420 |
|
owners := Owners} = Data) |
421 |
|
when is_map_key(Connection, Connections), |
422 |
|
is_map_key(Connection, Reservations), |
423 |
|
map_size(Connections) > 1 -> |
424 |
:-( |
#{Connection := Owner} = Reservations, |
425 |
:-( |
#{Owner := Monitor} = Monitors, |
426 |
:-( |
true = erlang:demonitor(Monitor, [flush]), |
427 |
:-( |
{keep_state, |
428 |
|
Data#{connections := maps:without([Connection], Connections), |
429 |
|
owners := maps:without([Owner], Owners), |
430 |
|
monitors := maps:without([Owner], Monitors), |
431 |
|
reservations := maps:without([Connection], Reservations)}, |
432 |
|
nei(census)}; |
433 |
|
|
434 |
|
handle_event(info, |
435 |
|
{'DOWN', _Monitor, process, Connection, _Info}, |
436 |
|
_, |
437 |
|
#{connections := Connections, |
438 |
|
reservations := Reservations, |
439 |
|
monitors := Monitors, |
440 |
|
owners := Owners} = Data) |
441 |
|
when is_map_key(Connection, Connections), |
442 |
|
is_map_key(Connection, Reservations) -> |
443 |
:-( |
#{Connection := Owner} = Reservations, |
444 |
:-( |
#{Owner := Monitor} = Monitors, |
445 |
:-( |
true = erlang:demonitor(Monitor, [flush]), |
446 |
:-( |
{next_state, |
447 |
|
drained, |
448 |
|
Data#{connections := maps:without([Connection], Connections), |
449 |
|
owners := maps:without([Owner], Owners), |
450 |
|
monitors := maps:without([Owner], Monitors), |
451 |
|
reservations := maps:without([Connection], Reservations)}, |
452 |
|
nei(census)}; |
453 |
|
|
454 |
|
handle_event(info, |
455 |
|
{'DOWN', _Monitor, process, Connection, _Info}, |
456 |
|
available, |
457 |
|
#{connections := Connections} = Data) |
458 |
|
when is_map_key(Connection, Connections), |
459 |
|
map_size(Connections) > 1 -> |
460 |
4 |
{keep_state, |
461 |
|
Data#{connections := maps:without([Connection], Connections)}, |
462 |
|
nei(census)}; |
463 |
|
|
464 |
|
handle_event(info, |
465 |
|
{'DOWN', _Monitor, process, Connection, _Info}, |
466 |
|
_, |
467 |
|
#{connections := Connections} = Data) |
468 |
|
when is_map_key(Connection, Connections) -> |
469 |
44 |
{next_state, |
470 |
|
drained, |
471 |
|
Data#{connections := maps:without([Connection], Connections)}, |
472 |
|
nei(census)}; |
473 |
|
|
474 |
|
handle_event(info, Msg, _, #{requests := Existing} = Data) -> |
475 |
11920 |
case gen_statem:check_response(Msg, Existing, true) of |
476 |
|
{{reply, Reply}, Label, Updated} -> |
477 |
11920 |
{keep_state, |
478 |
|
Data#{requests := Updated}, |
479 |
|
nei({response, #{label => Label, reply => Reply}})}; |
480 |
|
|
481 |
|
{{error, {Reason, _}}, pgmp_socket, UpdatedRequests} |
482 |
|
when Reason == noproc; Reason == normal -> |
483 |
:-( |
{stop, |
484 |
|
normal, |
485 |
|
Data#{requests := UpdatedRequests}}; |
486 |
|
|
487 |
|
{{error, {Reason, _}}, #{module := ?MODULE}, UpdatedRequests} |
488 |
|
when Reason == normal; Reason == shutdown -> |
489 |
:-( |
{stop, |
490 |
|
Reason, |
491 |
|
Data#{requests := UpdatedRequests}}; |
492 |
|
|
493 |
|
{{error, {Reason, ServerRef}}, Label, UpdatedRequests} -> |
494 |
:-( |
{stop, |
495 |
|
#{reason => Reason, |
496 |
|
server_ref => ServerRef, |
497 |
|
label => Label}, |
498 |
|
Data#{requests := UpdatedRequests}}; |
499 |
|
|
500 |
|
Otherwise -> |
501 |
:-( |
?LOG_ERROR(#{reason => Otherwise, msg => Msg, data => Data}), |
502 |
:-( |
keep_state_and_data |
503 |
|
end; |
504 |
|
|
505 |
|
handle_event(internal, census, _, #{max := Max}) -> |
506 |
24217 |
{keep_state_and_data, |
507 |
|
[nei({census, |
508 |
|
[owners, |
509 |
|
reservations, |
510 |
|
outstanding, |
511 |
|
monitors, |
512 |
|
connections]}), |
513 |
|
nei({telemetry, pool, #{size => Max}})]}; |
514 |
|
|
515 |
|
handle_event(internal, {census, Names}, _, Data) -> |
516 |
24217 |
{keep_state_and_data, |
517 |
|
lists:map( |
518 |
|
fun |
519 |
|
(Name) -> |
520 |
121085 |
nei({telemetry, Name, #{size => maps:size(maps:get(Name, Data))}}) |
521 |
|
end, |
522 |
|
Names)}; |
523 |
|
|
524 |
|
handle_event(internal, |
525 |
|
{telemetry, EventName, Measurements}, |
526 |
|
_, |
527 |
|
_) -> |
528 |
145363 |
{keep_state_and_data, |
529 |
|
nei({telemetry, EventName, Measurements, #{}})}; |
530 |
|
|
531 |
|
handle_event(internal, |
532 |
|
{telemetry, EventName, Measurements, Metadata}, |
533 |
|
_, |
534 |
|
_) when is_atom(EventName) -> |
535 |
145363 |
{keep_state_and_data, |
536 |
|
nei({telemetry, [EventName], Measurements, Metadata})}; |
537 |
|
|
538 |
|
handle_event(internal, |
539 |
|
{telemetry, EventName, Measurements, Metadata}, |
540 |
|
State, |
541 |
|
_) -> |
542 |
145363 |
ok = telemetry:execute( |
543 |
|
[pgmp, connection] ++ EventName, |
544 |
|
Measurements, |
545 |
|
Metadata#{state => State}), |
546 |
145363 |
keep_state_and_data. |
547 |
|
|
548 |
|
|
549 |
|
connection_state_is(Desired) -> |
550 |
164 |
fun |
551 |
|
(_, Current) -> |
552 |
265 |
Current == Desired |
553 |
|
end. |