_site/cover/pgmp_mm_squery.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_mm_squery).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -export([terminate/3]).
22 -import(pgmp_codec, [marshal/2]).
23 -import(pgmp_codec, [size_inclusive/1]).
24 -import(pgmp_data_row, [decode/3]).
25 -import(pgmp_mm_common, [actions/3]).
26 -import(pgmp_mm_common, [data/3]).
27 -import(pgmp_mm_common, [field_names/1]).
28 -import(pgmp_statem, [nei/1]).
29 -include("pgmp_types.hrl").
30 -include_lib("kernel/include/logger.hrl").
31
32
33 %% https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.4
34
35
36 callback_mode() ->
37 144 [handle_event_function, state_enter].
38
39
40 terminate(Reason, State, Data) ->
41 14 pgmp_mm_common:terminate(Reason, State, Data).
42
43
44 handle_event(internal, bootstrap_complete, State, Data) ->
45 61 case pgmp:get_env(named_statements) of
46 {ok, NamedStatements} ->
47
:-(
{next_state,
48 {named_statements, State},
49 Data#{named => maps:iterator(NamedStatements)},
50 [{push_callback_module, pgmp_mm_equery}, nei(next_named)]};
51
52 undefined ->
53 61 keep_state_and_data
54 end;
55
56 handle_event(internal,
57 {recv, {ready_for_query, _} = TM},
58 sync = Action,
59 #{from := From} = Data) ->
60 %% sync is different, unlike the other requests ready_for_query is
61 %% the reply, as well as the marker for being ready for the next
62 %% query.
63
:-(
{next_state,
64 TM,
65 Data,
66 [nei({span_stop, Action}), {reply, From, TM}]};
67
68 handle_event(internal, {recv, {ready_for_query, _} = TM}, _, Data) ->
69 205 {next_state, TM, Data};
70
71 handle_event(
72 {call, _} = Call,
73 {request, #{action := query = Action, args := [<<?TYPE_SQL>>]} = Arg},
74 {ready_for_query, _},
75 #{types_ready := false} = Data) ->
76 10 {next_state,
77 Action,
78 data(Call, Arg, Data),
79 actions(Call, Arg, Data)};
80
81 handle_event({call, _},
82 {request, #{action := Action}},
83 {ready_for_query, _} = ReadyForQuery,
84 #{types_ready := false} = Data)
85 when Action == parse;
86 Action == query;
87 Action == describe;
88 Action == bind;
89 Action == sync;
90 Action == execute ->
91 9 {next_state, {waiting_for_types, ReadyForQuery}, Data, postpone};
92
93 handle_event({call, _} = Call,
94 {request, #{action := Action} = Arg},
95 {ready_for_query, _},
96 Data)
97 when Action == parse;
98 Action == describe;
99 Action == bind;
100 Action == execute ->
101 130 {next_state,
102 Action,
103 data(Call, Arg, Data),
104 [{push_callback_module, pgmp_mm_equery} | actions(Call, Arg, Data)]};
105
106 handle_event({call, _} = Call,
107 {request, #{action := Action} = Arg},
108 {ready_for_query, _},
109 Data)
110 when Action == query; Action == sync ->
111 117 {next_state,
112 Action,
113 data(Call, Arg, Data),
114 actions(Call, Arg, Data)};
115
116 handle_event({call, From},
117 {request, #{action := parameters}},
118 {ready_for_query, State},
119 #{requests := Requests,
120 config := Config,
121 parameters := Parameters} = Data) ->
122 1 {keep_state,
123 Data#{requests => pgmp_connection:ready_for_query(
124 #{state => State,
125 server_ref => pgmp_connection:server_ref(Config),
126 requests => Requests})},
127 {reply, From, Parameters}};
128
129 handle_event({call, _}, {request, _}, _, _) ->
130 9 {keep_state_and_data, postpone};
131
132 handle_event(internal,
133 {response, #{label := pgmp_types, reply := ready}},
134 {waiting_for_types, ReadyForQuery},
135 #{types_ready := false} = Data) ->
136 9 {next_state, ReadyForQuery, Data#{types_ready := true}};
137
138 handle_event(internal,
139 {response, #{label := pgmp_types, reply := ready}},
140 _,
141 #{types_ready := false} = Data) ->
142 10 {keep_state, Data#{types_ready := true}};
143
144 handle_event(internal, {sync, _}, _, _) ->
145
:-(
{keep_state_and_data, nei({send, ["S", size_inclusive([])]})};
146
147 handle_event(internal, {query, [SQL]}, _, _) ->
148 205 {keep_state_and_data,
149 nei({send, ["Q", size_inclusive([marshal(string, SQL)])]})};
150
151 handle_event(internal,
152 {recv = EventName,
153 {command_complete = Tag, {Command, Rows}} = TM},
154 query,
155 #{span := #{metadata := Metadata,
156 measurements := Measurements} = Span} = Data)
157 when is_atom(Command),
158 is_integer(Rows) ->
159 12 ?LOG_DEBUG(#{tm => TM}),
160 12 {keep_state,
161 Data#{span := Span#{metadata := Metadata#{command => Command},
162 measurements := Measurements#{rows => Rows}}},
163 [nei({telemetry,
164 EventName,
165 #{count => 1, rows => Rows},
166 #{tag => Tag, command => Command}}),
167 nei({process, TM}),
168 nei(complete)]};
169
170 handle_event(internal,
171 {recv = EventName, {error_response = Tag, _} = TM},
172 query,
173 #{span := #{metadata := Metadata} = Span} = Data) ->
174 1 {Tag, Detail} = pgmp_error_notice_fields:map(TM),
175 1 {keep_state,
176 Data#{span := Span#{metadata := Metadata#{Tag => Detail}}},
177 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
178 nei({process, TM}),
179 nei(complete)]};
180
181 handle_event(internal,
182 {recv = EventName, {Tag, _} = TM},
183 query,
184 _)
185 when Tag == empty_query_response;
186 Tag == error_response;
187 Tag == command_complete ->
188 192 ?LOG_DEBUG(#{tm => TM}),
189 192 {keep_state_and_data,
190 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
191 nei({process, TM}),
192 nei(complete)]};
193
194 handle_event(internal,
195 {recv = EventName, {copy_out_response = Tag, Response} = TM},
196 query,
197 _) ->
198
:-(
?LOG_DEBUG(#{tm => TM}),
199
:-(
{keep_state_and_data,
200 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
201 nei({process, {Tag, Response}})]};
202
203 handle_event(internal,
204 {recv = EventName, {copy_data = Tag, Row} = TM},
205 query,
206 _) ->
207
:-(
?LOG_DEBUG(#{tm => TM}),
208
:-(
{keep_state_and_data,
209 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
210 nei({process, {Tag, Row}})]};
211
212 handle_event(internal,
213 {recv = EventName, {copy_done = Tag, _} = TM},
214 query,
215 _) ->
216
:-(
?LOG_DEBUG(#{tm => TM}),
217
:-(
{keep_state_and_data,
218 nei({telemetry, EventName, #{count => 1}, #{tag => Tag}})};
219
220 handle_event(internal,
221 {recv = EventName,
222 {row_description = Tag, Types}},
223 query,
224 Data) ->
225 12 {keep_state,
226 Data#{types => Types},
227 [nei({telemetry, EventName, #{count => 1}, #{tag => Tag}}),
228 nei({process, {Tag, field_names(Types)}})]};
229
230 handle_event(internal,
231 {recv = EventName, {data_row = Tag, Columns}},
232 query,
233 #{parameters := Parameters,
234 config := Config,
235 types_ready := true,
236 types := Types}) ->
237 2 {keep_state_and_data,
238 [nei({telemetry,
239 EventName,
240 #{count => 1},
241 #{tag => Tag, types_ready => true}}),
242
243 nei({process,
244 {Tag,
245 decode(Parameters,
246 lists:zip(Types, Columns),
247 pgmp_types:cache(Config))}})]};
248
249 handle_event(internal,
250 {recv = EventName, {data_row = Tag, Columns}},
251 query,
252 #{parameters := Parameters,
253 types_ready := false,
254 types := Types}) ->
255 6130 {keep_state_and_data,
256 [nei({telemetry,
257 EventName,
258 #{count => 1},
259 #{tag => Tag, types_ready => false}}),
260
261 nei({process, {Tag, decode(Parameters, lists:zip(Types, Columns), #{})}})]};
262
263 handle_event(internal,
264 complete,
265 Action,
266 #{replies := Replies, from := From} = Data) ->
267 205 {keep_state,
268 maps:without([args, from, replies, types], Data),
269 [{reply, From, lists:reverse(Replies)},
270 nei({span_stop, Action})]};
271
272 handle_event(EventType, EventContent, State, Data) ->
273 28798 pgmp_mm_common:handle_event(EventType,
274 EventContent,
275 State,
276 Data).
Line Hits Source