_site/cover/msc_mm_binlog_dump.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15 %% @doc Middleman dealing with the binlog dump process.
16
17 -module(msc_mm_binlog_dump).
18
19
20 -export([callback_mode/0]).
21 -export([handle_event/4]).
22 -import(msc_statem, [nei/1]).
23 -include_lib("kernel/include/logger.hrl").
24
25
26 callback_mode() ->
27
:-(
handle_event_function.
28
29
30 handle_event({call, From},
31 {request, #{action := Action,
32 call_back := CallBack} = Packet},
33 {Action, From},
34 #{client_flags := ClientFlags} = Data)
35 when Action == binlog_dump;
36 Action == binlog_dump_gtid ->
37
:-(
Mapped = #{mapped => #{}},
38
:-(
{keep_state,
39 Data#{decoder := msmp_codec:decode(
40 scran_branch:alt(
41 [msmp_binlog_network_stream:decode(Mapped),
42 msmp_packet_eof:decode(ClientFlags),
43 msmp_packet_error:decode(ClientFlags)])),
44 binlog_dump => Mapped,
45 call_back => CallBack,
46 encoder := msmp_codec:encode(
47 case Action of
48 binlog_dump ->
49
:-(
msmp_binlog_dump:encode();
50
51 binlog_dump_gtid ->
52
:-(
msmp_binlog_dump_gtid:encode()
53 end)},
54 nei({send, #{packet => maps:without([call_back], Packet), sequence => 0}})};
55
56 handle_event(internal,
57 {recv, #{packet := #{action := error} = Packet}},
58 {Action, From},
59 _)
60 when Action == binlog_dump;
61 Action == binlog_dump_gtid ->
62
:-(
{stop_and_reply,
63 normal,
64 {reply,
65 From,
66 {error, maps:without([action], Packet)}}};
67
68 handle_event(
69 internal,
70 {recv,
71 #{packet :=
72 #{header := #{event_type := format_description = EventType},
73 event := Event,
74 action := log_event}}},
75 {Action, From},
76 #{call_back := CallBack, requests := Requests} = Data)
77 when Action == binlog_dump;
78 Action == binlog_dump_gtid ->
79
:-(
{keep_state,
80 Data#{requests := gen_statem:send_request(
81 CallBack,
82 {EventType, Event},
83 ?MODULE, Requests)},
84 {reply, From, ok}};
85
86 handle_event(
87 internal,
88 {recv,
89 #{packet := #{header := #{event_type := table_map = EventType},
90 event := #{table_id := TableId} = Event,
91 action := log_event}}},
92 _,
93 #{binlog_dump := #{mapped := Mapped} = Binlog,
94 client_flags := ClientFlags,
95 call_back := CallBack,
96 requests := Requests} = Data) ->
97
:-(
Updated = Binlog#{mapped := Mapped#{TableId => maps:without([table_id], Event)}},
98
:-(
{keep_state,
99 Data#{decoder := msmp_codec:decode(
100 scran_branch:alt(
101 [msmp_binlog_network_stream:decode(Updated),
102 msmp_packet_eof:decode(ClientFlags),
103 msmp_packet_error:decode(ClientFlags)])),
104 requests := gen_statem:send_request(
105 CallBack,
106 {EventType, Event},
107 ?MODULE, Requests),
108 binlog_dump := Updated}};
109
110 handle_event(
111 internal,
112 {recv, #{packet := #{header := #{event_type := gtid = EventType,
113 server_id := ServerId},
114 event := Event,
115 action := log_event}}},
116 _,
117 #{call_back := CallBack, requests := Requests} = Data) ->
118
:-(
{keep_state,
119 Data#{requests := gen_statem:send_request(
120 CallBack,
121 {EventType, Event#{server_id => ServerId}},
122 ?MODULE, Requests)}};
123
124 handle_event(
125 internal,
126 {recv, #{packet := #{header := #{event_type := EventType},
127 event := Event,
128 action := log_event}}},
129 _,
130 #{call_back := CallBack, requests := Requests} = Data) ->
131
:-(
{keep_state,
132 Data#{requests := gen_statem:send_request(
133 CallBack,
134 {EventType, Event},
135 ?MODULE, Requests)}};
136
137 handle_event(internal, {response, #{label := ?MODULE, reply := ok}}, _, _) ->
138
:-(
keep_state_and_data;
139
140 handle_event(internal,
141 {response,
142 #{label := ?MODULE, reply := {error, Reason}}},
143 _,
144 _) ->
145
:-(
{stop, Reason};
146
147 handle_event(internal,
148 {recv, #{packet := #{action := eof}}},
149 _,
150 _) ->
151
:-(
stop;
152
153 handle_event(EventType, EventContent, State, Data) ->
154
:-(
msc_mm_common:handle_event(EventType,
155 EventContent,
156 State,
157 Data).
Line Hits Source