_site/cover/msc_mm.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 %% @doc Middleman.
17
18 -module(msc_mm).
19
20
21 -export([binlog_dump/1]).
22 -export([binlog_dump_gtid/1]).
23 -export([callback_mode/0]).
24 -export([execute/1]).
25 -export([handle_event/4]).
26 -export([init/1]).
27 -export([operator/1]).
28 -export([prepare/1]).
29 -export([query/1]).
30 -export([recv/1]).
31 -export([register_replica/1]).
32 -export([start_link/1]).
33 -export([stmt_close/1]).
34 -export([stmt_reset/1]).
35 -import(msc_statem, [nei/1]).
36 -import(msc_statem, [send_request/1]).
37 -include_lib("kernel/include/logger.hrl").
38
39
40 start_link(Arg) ->
41 2 gen_statem:start_link(?MODULE, [Arg], envy_gen:options(?MODULE)).
42
43
44 binlog_dump(Arg) ->
45
:-(
send_request(Arg, ?FUNCTION_NAME).
46
47
48 binlog_dump_gtid(Arg) ->
49
:-(
send_request(Arg, ?FUNCTION_NAME).
50
51
52 query(Arg) ->
53 3 send_request(Arg, ?FUNCTION_NAME).
54
55
56 prepare(Arg) ->
57 2 send_request(Arg, ?FUNCTION_NAME).
58
59
60 execute(Arg) ->
61 2 send_request(Arg, ?FUNCTION_NAME).
62
63
64 register_replica(Arg) ->
65
:-(
send_request(Arg, ?FUNCTION_NAME).
66
67
68 stmt_close(Arg) ->
69 2 send_request(Arg, ?FUNCTION_NAME).
70
71
72 stmt_reset(Arg) ->
73
:-(
send_request(Arg, ?FUNCTION_NAME).
74
75
76 operator(Arg) ->
77
:-(
send_request(Arg, ?FUNCTION_NAME).
78
79
80 send_request(Arg, Action) ->
81 9 ?FUNCTION_NAME(Arg, Action, config(Action)).
82
83
84 send_request(Arg, Action, Config) ->
85 9 send_request(
86 maps:without(
87 keys(Config),
88 maybe_label(
89 Arg#{request => {request, args(Action, Arg, Config)}}))).
90
91
92 config(binlog_dump) ->
93
:-(
[{flags, 2},
94 {binlog_pos, 4},
95 {server_id, 200},
96 call_back,
97 {binlog_filename, <<>>}];
98
99
100 config(binlog_dump_gtid) ->
101
:-(
[{flags, 2},
102 {position, 4},
103 {server_id, 200},
104 call_back,
105 {name, <<>>},
106 {gtids, []}];
107
108 config(query) ->
109 3 [{parameter_count, 0}, {parameter_set, 1}, query];
110
111 config(prepare) ->
112 2 [query];
113
114 config(execute) ->
115 2 [{parameters, []},
116 statement_id];
117
118 config(stmt_close) ->
119 2 [statement_id];
120
121 config(stmt_reset) ->
122
:-(
[statement_id];
123
124 config(operator) ->
125
:-(
[];
126
127 config(register_replica) ->
128
:-(
[{port, 3306},
129 {user, <<>>},
130 {host, <<>>},
131 {server_id, 200},
132 {password, <<>>},
133 {recovery_rank, <<0, 0, 0, 0>>},
134 {master_id, 0}].
135
136
137 keys(Config) ->
138 9 lists:map(
139 fun
140 ({Key, _}) ->
141 8 Key;
142
143 (Key) ->
144 9 Key
145 end,
146 Config).
147
148
149 args(Action, Arg, Config) ->
150 9 lists:foldl(
151 fun
152 ({Parameter, Default}, A) ->
153 8 A#{Parameter => maps:get(Parameter, Arg, Default)};
154
155 (Parameter, A) ->
156 9 case maps:find(Parameter, Arg) of
157 {ok, Value} ->
158 9 A#{Parameter => Value};
159
160 error ->
161
:-(
error(arg_missing, [Parameter])
162 end
163 end,
164 #{action => Action},
165 Config).
166
167
168 maybe_label(#{requests := _, label := _} = Arg) ->
169
:-(
Arg;
170
171 maybe_label(#{requests := _} = Arg) ->
172 22 Arg#{label => ?MODULE};
173
174 maybe_label(Arg) ->
175 9 Arg.
176
177
178 recv(#{message := Message} = Arg) ->
179 22 send_request(
180 maps:without(
181 [message],
182 maybe_label(
183 Arg#{request => {?FUNCTION_NAME, Message}}))).
184
185
186 init([Arg]) ->
187 2 process_flag(trap_exit, true),
188 2 {ok,
189 unready,
190 #{requests => gen_statem:reqids_new(),
191 decoder => msmp_codec:decode(
192 msmp_handshake:decode()),
193 encoder => msmp_codec:encode(
194 msmp_handshake_response:encode()),
195 prepared => #{},
196 config => Arg},
197 nei(peer)}.
198
199
200 callback_mode() ->
201 13 handle_event_function.
202
203
204 handle_event(internal, peer, _, Data) ->
205 2 case msc_sup:get_child(hd(get('$ancestors')), socket) of
206 {_, PID, worker, _} when is_pid(PID) ->
207 2 {keep_state, Data#{socket => PID}};
208
209 {_, _, _, _} = Reason ->
210
:-(
{stop, Reason};
211
212 false ->
213
:-(
{stop, peer_not_found}
214 end;
215
216 handle_event({call, From},
217 {request, #{action := operator}},
218 authenticated,
219 #{operator := Operator}) ->
220
:-(
{keep_state_and_data, {reply, From, Operator}};
221
222 handle_event({call, From},
223 {request, #{action := Action}},
224 authenticated,
225 Data) ->
226 9 {next_state,
227 {Action, From},
228 Data,
229 [{push_callback_module, action_callback_module(Action)},
230 postpone]};
231
232 handle_event({call, _}, {request, _}, _, _) ->
233 2 {keep_state_and_data, [nei(connect), postpone]};
234
235 handle_event(internal,
236 connect = Action,
237 _,
238 #{requests := Requests, socket := Socket} = Data) ->
239 2 {keep_state,
240 Data#{requests := msc_socket:connect(
241 #{server_ref => Socket,
242 requests => Requests})},
243 [{state_timeout, msc_config:timeout(Action), connect},
244 {push_callback_module, msc_mm_auth}]};
245
246 handle_event(EventType, EventContent, State, Data) ->
247 8 msc_mm_common:handle_event(EventType,
248 EventContent,
249 State,
250 Data).
251
252
253 action_callback_module(binlog_dump_gtid) ->
254
:-(
?FUNCTION_NAME(binlog_dump);
255
256 action_callback_module(Action) ->
257 9 msc_util:snake_case([?MODULE, Action]).
Line Hits Source