_site/cover/msc_binlog_ets.COVER.html

1 %% Copyright (c) 2023 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15 %% @doc An example binlog dump receiver using ETS.
16
17 -module(msc_binlog_ets).
18
19
20 -export([callback_mode/0]).
21 -export([handle_event/4]).
22 -export([init/1]).
23 -export([start/0]).
24
25
26 start() ->
27
:-(
gen_statem:start({local, ?MODULE}, ?MODULE, [], []).
28
29
30 callback_mode() ->
31
:-(
handle_event_function.
32
33
34 init([]) ->
35
:-(
process_flag(trap_exit, true),
36
:-(
{ok,
37 ready,
38 #{mapped => #{},
39 requests => gen_statem:reqids_new()}}.
40
41
42 handle_event(
43 {call, From},
44 {table_map, #{table_id := TableId} = Mapping},
45 _,
46 #{mapped := Mapped} = Data) when is_map_key(TableId, Mapped) ->
47
:-(
{keep_state,
48 Data#{mapped := Mapped#{TableId => Mapping}},
49 {reply, From, ok}};
50
51 handle_event(
52 {call, From},
53 {table_map, #{table_id := TableId} = Mapping},
54 _,
55 #{mapped := Mapped} = Data) ->
56
:-(
_ = new_table(Mapping),
57
:-(
{keep_state,
58 Data#{mapped := Mapped#{TableId => Mapping}},
59 {reply, From, ok}};
60
61 handle_event(
62 {call, From},
63 {Event, #{rows := Rows, table_id := TableId}},
64 _,
65 #{mapped := Mapped}) when (Event == write_rows orelse
66 Event == write_rows_compressed_v1 orelse
67 Event == write_rows_v1) andalso
68 is_map_key(TableId, Mapped) ->
69
:-(
#{TableId := Mapping} = Mapped,
70
:-(
ets:insert(table_name(Mapping),
71
:-(
[insert_or_update_tuple(Row, Mapping) || Row <- Rows]),
72
:-(
{keep_state_and_data, {reply, From, ok}};
73
74 handle_event({call, From}, _Event, _State, _Data) ->
75
:-(
{keep_state_and_data, {reply, From, ok}}.
76
77
78 new_table(Mapping) ->
79
:-(
try
80
:-(
ets:new(table_name(Mapping),
81 [{keypos, keypos(Mapping)}, protected, named_table])
82 catch
83 error:badarg ->
84
:-(
ok
85 end.
86
87
88 table_name(#{table := Table, database := Database}) ->
89
:-(
binary_to_atom(<<Database/bytes, "_", Table/bytes>>).
90
91
92 keypos(#{metadata := #{simple_primary_key := [Key]}}) ->
93
:-(
Key + 1;
94
95 keypos(#{metadata := #{simple_primary_key := _}}) ->
96
:-(
1.
97
98
99 insert_or_update_tuple(
100 Tuple,
101 #{metadata := #{simple_primary_key := [_]}}) ->
102
:-(
Tuple;
103
104 insert_or_update_tuple(
105 Tuple,
106 #{metadata := #{simple_primary_key := Composite}} = Mapping) ->
107
:-(
list_to_tuple(
108 [key(Tuple, Mapping) |
109 lists:filtermap(
110 fun
111 ({Position, Value}) ->
112
:-(
case lists:member(Position, Composite) of
113 true ->
114
:-(
false;
115
116 false ->
117
:-(
{true, Value}
118 end
119 end,
120 lists:zip(
121 lists:seq(0, tuple_size(Tuple) - 1),
122 tuple_to_list(Tuple)))]).
123
124
125 key(Tuples, KeyPositions) when is_list(Tuples) ->
126
:-(
[?FUNCTION_NAME(Tuple, KeyPositions) || Tuple <- Tuples];
127
128 key(Tuple, #{metadata := #{simple_primary_key := [Primary]}}) ->
129
:-(
element(Primary + 1, Tuple);
130
131 key(Tuple, #{metadata := #{simple_primary_key := Composite}}) ->
132
:-(
list_to_tuple([element(Position + 1, Tuple) || Position <- Composite]).
Line Hits Source