1 |
|
%% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com> |
2 |
|
%% |
3 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
%% you may not use this file except in compliance with the License. |
5 |
|
%% You may obtain a copy of the License at |
6 |
|
%% |
7 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
%% |
9 |
|
%% Unless required by applicable law or agreed to in writing, software |
10 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
%% See the License for the specific language governing permissions and |
13 |
|
%% limitations under the License. |
14 |
|
|
15 |
|
|
16 |
|
-module(pgmp_mm_squery). |
17 |
|
|
18 |
|
|
19 |
|
-export([callback_mode/0]). |
20 |
|
-export([handle_event/4]). |
21 |
|
-export([terminate/3]). |
22 |
|
-import(pgmp_codec, [marshal/2]). |
23 |
|
-import(pgmp_codec, [size_inclusive/1]). |
24 |
|
-import(pgmp_data_row, [decode/3]). |
25 |
|
-import(pgmp_mm_common, [actions/3]). |
26 |
|
-import(pgmp_mm_common, [data/3]). |
27 |
|
-import(pgmp_mm_common, [field_names/1]). |
28 |
|
-import(pgmp_statem, [nei/1]). |
29 |
|
-include("pgmp_types.hrl"). |
30 |
|
-include_lib("kernel/include/logger.hrl"). |
31 |
|
|
32 |
|
|
33 |
|
%% https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.4 |
34 |
|
|
35 |
|
|
36 |
|
callback_mode() -> |
37 |
144 |
[handle_event_function, state_enter]. |
38 |
|
|
39 |
|
|
40 |
|
terminate(Reason, State, Data) -> |
41 |
14 |
pgmp_mm_common:terminate(Reason, State, Data). |
42 |
|
|
43 |
|
|
44 |
|
handle_event(internal, bootstrap_complete, State, Data) -> |
45 |
61 |
case pgmp:get_env(named_statements) of |
46 |
|
{ok, NamedStatements} -> |
47 |
:-( |
{next_state, |
48 |
|
{named_statements, State}, |
49 |
|
Data#{named => maps:iterator(NamedStatements)}, |
50 |
|
[{push_callback_module, pgmp_mm_equery}, nei(next_named)]}; |
51 |
|
|
52 |
|
undefined -> |
53 |
61 |
keep_state_and_data |
54 |
|
end; |
55 |
|
|
56 |
|
handle_event(internal, |
57 |
|
{recv, {ready_for_query, _} = TM}, |
58 |
|
sync = Action, |
59 |
|
#{from := From} = Data) -> |
60 |
|
%% sync is different, unlike the other requests ready_for_query is |
61 |
|
%% the reply, as well as the marker for being ready for the next |
62 |
|
%% query. |
63 |
:-( |
{next_state, |
64 |
|
TM, |
65 |
|
Data, |
66 |
|
[nei({span_stop, Action}), {reply, From, TM}]}; |
67 |
|
|
68 |
|
handle_event(internal, {recv, {ready_for_query, _} = TM}, _, Data) -> |
69 |
205 |
{next_state, TM, Data}; |
70 |
|
|
71 |
|
handle_event( |
72 |
|
{call, _} = Call, |
73 |
|
{request, #{action := query = Action, args := [<<?TYPE_SQL>>]} = Arg}, |
74 |
|
{ready_for_query, _}, |
75 |
|
#{types_ready := false} = Data) -> |
76 |
10 |
{next_state, |
77 |
|
Action, |
78 |
|
data(Call, Arg, Data), |
79 |
|
actions(Call, Arg, Data)}; |
80 |
|
|
81 |
|
handle_event({call, _}, |
82 |
|
{request, #{action := Action}}, |
83 |
|
{ready_for_query, _} = ReadyForQuery, |
84 |
|
#{types_ready := false} = Data) |
85 |
|
when Action == parse; |
86 |
|
Action == query; |
87 |
|
Action == describe; |
88 |
|
Action == bind; |
89 |
|
Action == sync; |
90 |
|
Action == execute -> |
91 |
9 |
{next_state, {waiting_for_types, ReadyForQuery}, Data, postpone}; |
92 |
|
|
93 |
|
handle_event({call, _} = Call, |
94 |
|
{request, #{action := Action} = Arg}, |
95 |
|
{ready_for_query, _}, |
96 |
|
Data) |
97 |
|
when Action == parse; |
98 |
|
Action == describe; |
99 |
|
Action == bind; |
100 |
|
Action == execute -> |
101 |
130 |
{next_state, |
102 |
|
Action, |
103 |
|
data(Call, Arg, Data), |
104 |
|
[{push_callback_module, pgmp_mm_equery} | actions(Call, Arg, Data)]}; |
105 |
|
|
106 |
|
handle_event({call, _} = Call, |
107 |
|
{request, #{action := Action} = Arg}, |
108 |
|
{ready_for_query, _}, |
109 |
|
Data) |
110 |
|
when Action == query; Action == sync -> |
111 |
117 |
{next_state, |
112 |
|
Action, |
113 |
|
data(Call, Arg, Data), |
114 |
|
actions(Call, Arg, Data)}; |
115 |
|
|
116 |
|
handle_event({call, From}, |
117 |
|
{request, #{action := parameters}}, |
118 |
|
{ready_for_query, State}, |
119 |
|
#{requests := Requests, |
120 |
|
config := Config, |
121 |
|
parameters := Parameters} = Data) -> |
122 |
1 |
{keep_state, |
123 |
|
Data#{requests => pgmp_connection:ready_for_query( |
124 |
|
#{state => State, |
125 |
|
server_ref => pgmp_connection:server_ref(Config), |
126 |
|
requests => Requests})}, |
127 |
|
{reply, From, Parameters}}; |
128 |
|
|
129 |
|
handle_event({call, _}, {request, _}, _, _) -> |
130 |
9 |
{keep_state_and_data, postpone}; |
131 |
|
|
132 |
|
handle_event(internal, |
133 |
|
{response, #{label := pgmp_types, reply := ready}}, |
134 |
|
{waiting_for_types, ReadyForQuery}, |
135 |
|
#{types_ready := false} = Data) -> |
136 |
9 |
{next_state, ReadyForQuery, Data#{types_ready := true}}; |
137 |
|
|
138 |
|
handle_event(internal, |
139 |
|
{response, #{label := pgmp_types, reply := ready}}, |
140 |
|
_, |
141 |
|
#{types_ready := false} = Data) -> |
142 |
10 |
{keep_state, Data#{types_ready := true}}; |
143 |
|
|
144 |
|
handle_event(internal, {sync, _}, _, _) -> |
145 |
:-( |
{keep_state_and_data, nei({send, ["S", size_inclusive([])]})}; |
146 |
|
|
147 |
|
handle_event(internal, {query, [SQL]}, _, _) -> |
148 |
205 |
{keep_state_and_data, |
149 |
|
nei({send, ["Q", size_inclusive([marshal(string, SQL)])]})}; |
150 |
|
|
151 |
|
handle_event(internal, |
152 |
|
{recv = EventName, |
153 |
|
{command_complete = Tag, {Command, Rows}} = TM}, |
154 |
|
query, |
155 |
|
#{span := #{metadata := Metadata, |
156 |
|
measurements := Measurements} = Span} = Data) |
157 |
|
when is_atom(Command), |
158 |
|
is_integer(Rows) -> |
159 |
12 |
?LOG_DEBUG(#{tm => TM}), |
160 |
12 |
{keep_state, |
161 |
|
Data#{span := Span#{metadata := Metadata#{command => Command}, |
162 |
|
measurements := Measurements#{rows => Rows}}}, |
163 |
|
[nei({telemetry, |
164 |
|
EventName, |
165 |
|
#{count => 1, rows => Rows}, |
166 |
|
#{tag => Tag, command => Command}}), |
167 |
|
nei({process, TM}), |
168 |
|
nei(complete)]}; |
169 |
|
|
170 |
|
handle_event(internal, |
171 |
|
{recv = EventName, {error_response = Tag, _} = TM}, |
172 |
|
query, |
173 |
|
#{span := #{metadata := Metadata} = Span} = Data) -> |
174 |
1 |
{Tag, Detail} = pgmp_error_notice_fields:map(TM), |
175 |
1 |
{keep_state, |
176 |
|
Data#{span := Span#{metadata := Metadata#{Tag => Detail}}}, |
177 |
|
[nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}), |
178 |
|
nei({process, TM}), |
179 |
|
nei(complete)]}; |
180 |
|
|
181 |
|
handle_event(internal, |
182 |
|
{recv = EventName, {Tag, _} = TM}, |
183 |
|
query, |
184 |
|
_) |
185 |
|
when Tag == empty_query_response; |
186 |
|
Tag == error_response; |
187 |
|
Tag == command_complete -> |
188 |
192 |
?LOG_DEBUG(#{tm => TM}), |
189 |
192 |
{keep_state_and_data, |
190 |
|
[nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}), |
191 |
|
nei({process, TM}), |
192 |
|
nei(complete)]}; |
193 |
|
|
194 |
|
handle_event(internal, |
195 |
|
{recv = EventName, {copy_out_response = Tag, Response} = TM}, |
196 |
|
query, |
197 |
|
_) -> |
198 |
:-( |
?LOG_DEBUG(#{tm => TM}), |
199 |
:-( |
{keep_state_and_data, |
200 |
|
[nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}), |
201 |
|
nei({process, {Tag, Response}})]}; |
202 |
|
|
203 |
|
handle_event(internal, |
204 |
|
{recv = EventName, {copy_data = Tag, Row} = TM}, |
205 |
|
query, |
206 |
|
_) -> |
207 |
:-( |
?LOG_DEBUG(#{tm => TM}), |
208 |
:-( |
{keep_state_and_data, |
209 |
|
[nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}), |
210 |
|
nei({process, {Tag, Row}})]}; |
211 |
|
|
212 |
|
handle_event(internal, |
213 |
|
{recv = EventName, {copy_done = Tag, _} = TM}, |
214 |
|
query, |
215 |
|
_) -> |
216 |
:-( |
?LOG_DEBUG(#{tm => TM}), |
217 |
:-( |
{keep_state_and_data, |
218 |
|
nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})}; |
219 |
|
|
220 |
|
handle_event(internal, |
221 |
|
{recv = EventName, |
222 |
|
{row_description = Tag, Types}}, |
223 |
|
query, |
224 |
|
Data) -> |
225 |
12 |
{keep_state, |
226 |
|
Data#{types => Types}, |
227 |
|
[nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}), |
228 |
|
nei({process, {Tag, field_names(Types)}})]}; |
229 |
|
|
230 |
|
handle_event(internal, |
231 |
|
{recv = EventName, {data_row = Tag, Columns}}, |
232 |
|
query, |
233 |
|
#{parameters := Parameters, |
234 |
|
config := Config, |
235 |
|
types_ready := true, |
236 |
|
types := Types}) -> |
237 |
2 |
{keep_state_and_data, |
238 |
|
[nei({telemetry, |
239 |
|
EventName, |
240 |
|
#{count => 1}, |
241 |
|
#{tag => Tag, types_ready => true}}), |
242 |
|
|
243 |
|
nei({process, |
244 |
|
{Tag, |
245 |
|
decode(Parameters, |
246 |
|
lists:zip(Types, Columns), |
247 |
|
pgmp_types:cache(Config))}})]}; |
248 |
|
|
249 |
|
handle_event(internal, |
250 |
|
{recv = EventName, {data_row = Tag, Columns}}, |
251 |
|
query, |
252 |
|
#{parameters := Parameters, |
253 |
|
types_ready := false, |
254 |
|
types := Types}) -> |
255 |
6130 |
{keep_state_and_data, |
256 |
|
[nei({telemetry, |
257 |
|
EventName, |
258 |
|
#{count => 1}, |
259 |
|
#{tag => Tag, types_ready => false}}), |
260 |
|
|
261 |
|
nei({process, {Tag, decode(Parameters, lists:zip(Types, Columns), #{})}})]}; |
262 |
|
|
263 |
|
handle_event(internal, |
264 |
|
complete, |
265 |
|
Action, |
266 |
|
#{replies := Replies, from := From} = Data) -> |
267 |
205 |
{keep_state, |
268 |
|
maps:without([args, from, replies, types], Data), |
269 |
|
[{reply, From, lists:reverse(Replies)}, |
270 |
|
nei({span_stop, Action})]}; |
271 |
|
|
272 |
|
handle_event(EventType, EventContent, State, Data) -> |
273 |
28798 |
pgmp_mm_common:handle_event(EventType, |
274 |
|
EventContent, |
275 |
|
State, |
276 |
|
Data). |