1 |
|
%% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com> |
2 |
|
%% |
3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
%% you may not use this file except in compliance with the License. |
5 |
|
%% You may obtain a copy of the License at |
6 |
|
%% |
7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
%% |
9 |
|
%% Unless required by applicable law or agreed to in writing, software |
10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
%% See the License for the specific language governing permissions and |
13 |
|
%% limitations under the License. |
14 |
|
|
15 |
|
|
16 |
|
-module(pgec_telemetry_pgmp_metrics). |
17 |
|
|
18 |
|
|
19 |
|
-export([handle/4]). |
20 |
|
-include_lib("kernel/include/logger.hrl"). |
21 |
|
|
22 |
|
|
23 |
|
%% This clause handles telemetry from pgmp middleman dealing with |
24 |
|
%% replication events |
25 |
|
%% |
26 |
|
handle([pgmp, mm, rep, _] = EventName, |
27 |
|
#{wal := WAL} = Measurements, |
28 |
|
#{identify_system := #{<<"dbname">> := DBName, |
29 |
|
<<"systemid">> := SystemId}} = Metadata, |
30 |
|
Config) -> |
31 |
|
|
32 |
:-( |
?LOG_DEBUG(#{event_name => EventName, |
33 |
|
measurements => Measurements, |
34 |
|
metadata => Metadata, |
35 |
:-( |
config => Config}), |
36 |
|
|
37 |
:-( |
Prefix = lists:sublist(EventName, 3), |
38 |
|
|
39 |
:-( |
Label = maps:merge( |
40 |
|
#{dbname => DBName, systemid => SystemId}, |
41 |
|
maps:with([publication], Metadata)), |
42 |
|
|
43 |
|
%% A gauge for each WAL metric |
44 |
|
%% |
45 |
:-( |
metrics:gauge( |
46 |
|
maps:fold( |
47 |
|
fun |
48 |
|
(WALMetricName, Value, A) -> |
49 |
:-( |
[#{name => pgec_util:snake_case(Prefix ++ [wal, WALMetricName]), |
50 |
|
label => Label, |
51 |
|
value => Value} | A] |
52 |
|
end, |
53 |
|
[], |
54 |
|
WAL)), |
55 |
|
|
56 |
|
%% A counter for each processed replication event. Truncate is a |
57 |
|
%% special case because it is on a list of relations, rather than |
58 |
|
%% a single relation. |
59 |
|
%% |
60 |
:-( |
metrics:counter( |
61 |
|
mm_rep_count(EventName, Measurements, Metadata, Config, Label)); |
62 |
|
|
63 |
|
|
64 |
|
%% This clause handles telemetry from pgmp middleman dealing with |
65 |
|
%% parse and query, to include the SQL being processed |
66 |
|
%% |
67 |
|
handle([pgmp, mm, Action, stop] = EventName, |
68 |
|
#{duration := Duration} = Measurements, |
69 |
|
#{args := #{sql := SQL}} = Metadata, |
70 |
|
Config) |
71 |
|
when Action == parse; |
72 |
|
Action == query -> |
73 |
:-( |
?LOG_DEBUG(#{event_name => EventName, |
74 |
|
measurements => Measurements, |
75 |
|
metadata => Metadata, |
76 |
:-( |
config => Config}), |
77 |
|
|
78 |
:-( |
Prefix = lists:sublist(EventName, 3), |
79 |
|
|
80 |
|
%% Maintain a count of each action processed with their cumulative |
81 |
|
%% duration. |
82 |
|
%% |
83 |
:-( |
metrics:counter( |
84 |
|
[#{name => pgec_util:snake_case(Prefix ++ [count]), |
85 |
|
label => #{sql => SQL}, |
86 |
|
delta => 1}, |
87 |
|
|
88 |
|
#{name => pgec_util:snake_case(Prefix ++ [duration, ms]), |
89 |
|
label => #{sql => SQL}, |
90 |
|
delta => erlang:convert_time_unit( |
91 |
|
Duration, |
92 |
|
native, |
93 |
|
millisecond)}]); |
94 |
|
|
95 |
|
|
96 |
|
%% This clause handles telemetry from pgmp middleman dealing with |
97 |
|
%% execute to include the number of rows that were returned |
98 |
|
%% |
99 |
|
handle([pgmp, mm, execute, stop] = EventName, |
100 |
|
#{duration := Duration, rows := Rows} = Measurements, |
101 |
|
Metadata, |
102 |
|
Config) -> |
103 |
:-( |
?LOG_DEBUG(#{event_name => EventName, |
104 |
|
measurements => Measurements, |
105 |
|
metadata => Metadata, |
106 |
:-( |
config => Config}), |
107 |
|
|
108 |
:-( |
Prefix = lists:sublist(EventName, 3), |
109 |
:-( |
metrics:counter( |
110 |
|
[#{name => pgec_util:snake_case(Prefix ++ [count]), |
111 |
|
label => maps:with([command], Metadata), |
112 |
|
delta => 1}, |
113 |
|
|
114 |
|
#{name => pgec_util:snake_case(Prefix ++ [rows]), |
115 |
|
label => maps:with([command], Metadata), |
116 |
|
delta => Rows}, |
117 |
|
|
118 |
|
#{name => pgec_util:snake_case(Prefix ++ [duration, ms]), |
119 |
|
label => maps:with([command], Metadata), |
120 |
|
delta => erlang:convert_time_unit( |
121 |
|
Duration, |
122 |
|
native, |
123 |
|
millisecond)}]); |
124 |
|
|
125 |
|
|
126 |
|
%% This clause handles telemetry from pgmp middleman for bind, |
127 |
|
%% describe or execute. |
128 |
|
%% |
129 |
|
handle([pgmp, mm, Action, stop] = EventName, |
130 |
|
#{duration := Duration} = Measurements, |
131 |
|
Metadata, |
132 |
|
Config) |
133 |
|
when Action == bind; |
134 |
|
Action == describe; |
135 |
|
Action == execute -> |
136 |
:-( |
?LOG_DEBUG(#{event_name => EventName, |
137 |
|
measurements => Measurements, |
138 |
|
metadata => Metadata, |
139 |
:-( |
config => Config}), |
140 |
|
|
141 |
:-( |
Prefix = lists:sublist(EventName, 3), |
142 |
:-( |
metrics:counter( |
143 |
|
[#{name => pgec_util:snake_case(Prefix ++ [count]), delta => 1}, |
144 |
|
|
145 |
|
#{name => pgec_util:snake_case(Prefix ++ [duration, ms]), |
146 |
|
delta => erlang:convert_time_unit( |
147 |
|
Duration, |
148 |
|
native, |
149 |
|
millisecond)}]); |
150 |
|
|
151 |
|
|
152 |
|
%% Nothing to do for a start of span from pgmp middleman. |
153 |
|
%% |
154 |
|
handle([pgmp, mm, _, start], _, _, _) -> |
155 |
:-( |
ok; |
156 |
|
|
157 |
|
|
158 |
|
%% This clause handles pgmp socket tagged messages forming the lower |
159 |
|
%% level PostgreSQL protocol. |
160 |
|
%% |
161 |
|
handle([pgmp, socket, tag_msg] = EventName, |
162 |
|
#{bytes := Bytes, count := N} = Measurements, |
163 |
|
Metadata, |
164 |
|
Config) -> |
165 |
|
%% Maintain a count of each tagged message with the cumulative |
166 |
|
%% bytes processed. |
167 |
|
%% |
168 |
:-( |
?LOG_DEBUG(#{event_name => EventName, |
169 |
|
measurements => Measurements, |
170 |
|
metadata => Metadata, |
171 |
:-( |
config => Config}), |
172 |
|
|
173 |
:-( |
metrics:counter( |
174 |
|
[#{name => pgec_util:snake_case(EventName ++ [count]), |
175 |
|
label => maps:with([tag], Metadata), |
176 |
|
delta => N}, |
177 |
|
|
178 |
|
#{name => pgec_util:snake_case(EventName ++ [bytes]), |
179 |
|
label => maps:with([tag], Metadata), |
180 |
|
delta => Bytes}]); |
181 |
|
|
182 |
|
|
183 |
|
%% This generic clause catches any other telemetry from pgmp that |
184 |
|
%% includes the number of bytes that were processed. |
185 |
|
%% |
186 |
|
handle(EventName, #{bytes := N} = Measurements, Metadata, Config) -> |
187 |
:-( |
?LOG_DEBUG(#{event_name => EventName, |
188 |
|
measurements => Measurements, |
189 |
|
metadata => Metadata, |
190 |
:-( |
config => Config}), |
191 |
:-( |
metrics:counter( |
192 |
|
#{name => pgec_util:snake_case(EventName ++ [bytes]), |
193 |
|
delta => N}); |
194 |
|
|
195 |
|
|
196 |
|
%% This generic clause catches any other telemetry from pgmp that is a |
197 |
|
%% count. |
198 |
|
%% |
199 |
|
handle(EventName, #{count := N} = Measurements, Metadata, Config) -> |
200 |
:-( |
?LOG_DEBUG(#{event_name => EventName, |
201 |
|
measurements => Measurements, |
202 |
|
metadata => Metadata, |
203 |
:-( |
config => Config}), |
204 |
:-( |
metrics:counter( |
205 |
|
#{name => pgec_util:snake_case(EventName ++ [count]), |
206 |
|
delta => N}); |
207 |
|
|
208 |
|
|
209 |
|
%% This generic clause catches any other telemetry from pgmp that is a |
210 |
|
%% size. |
211 |
|
%% |
212 |
|
handle(EventName, #{size := N} = Measurements, Metadata, Config) -> |
213 |
:-( |
?LOG_DEBUG(#{event_name => EventName, |
214 |
|
measurements => Measurements, |
215 |
|
metadata => Metadata, |
216 |
:-( |
config => Config}), |
217 |
:-( |
metrics:gauge( |
218 |
|
#{name => pgec_util:snake_case(EventName ++ [size]), |
219 |
|
value => N}); |
220 |
|
|
221 |
|
|
222 |
|
%% Fall through clause to log any missed telemetry events from pgmp. |
223 |
|
%% |
224 |
|
handle(EventName, Measurements, Metadata, Config) -> |
225 |
:-( |
?LOG_INFO(#{event_name => EventName, |
226 |
|
measurements => Measurements, |
227 |
|
metadata => Metadata, |
228 |
:-( |
config => Config}). |
229 |
|
|
230 |
|
|
231 |
|
%% A truncate can be applied to a list of relations. |
232 |
|
%% |
233 |
|
mm_rep_count(EventName, #{count := N}, #{relations := Relations}, _Config, Label) -> |
234 |
:-( |
lists:map( |
235 |
|
fun |
236 |
|
(Relation) -> |
237 |
:-( |
#{name => pgec_util:snake_case(EventName ++ [count]), |
238 |
|
label => Label#{relation => relation(Relation)}, |
239 |
|
delta => N} |
240 |
|
end, |
241 |
|
Relations); |
242 |
|
|
243 |
|
|
244 |
|
%% This clause deals with actions other than truncate that only apply |
245 |
|
%% to a single relation. |
246 |
|
%% |
247 |
|
mm_rep_count(EventName, #{count := N}, #{relation := Relation}, _Config, Label) -> |
248 |
:-( |
#{name => pgec_util:snake_case(EventName ++ [count]), |
249 |
|
label => Label#{relation => relation(Relation)}, |
250 |
|
delta => N}; |
251 |
|
|
252 |
|
mm_rep_count(EventName, #{count := N}, _, _Config, Label) -> |
253 |
:-( |
#{name => pgec_util:snake_case(EventName ++ [count]), |
254 |
|
label => Label, |
255 |
|
delta => N}. |
256 |
|
|
257 |
|
|
258 |
|
relation(#{namespace := Namespace, name := Name}) -> |
259 |
:-( |
iolist_to_binary([Namespace, ".", Name]). |