_site/cover/pgmp_mm_rep_phy.COVER.html

1 %% Copyright (c) 2022 Peter Morgan <peter.james.morgan@gmail.com>
2 %%
3 %% Licensed under the Apache License, Version 2.0 (the "License");
4 %% you may not use this file except in compliance with the License.
5 %% You may obtain a copy of the License at
6 %%
7 %% http://www.apache.org/licenses/LICENSE-2.0
8 %%
9 %% Unless required by applicable law or agreed to in writing, software
10 %% distributed under the License is distributed on an "AS IS" BASIS,
11 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 %% See the License for the specific language governing permissions and
13 %% limitations under the License.
14
15
16 -module(pgmp_mm_rep_phy).
17
18
19 -export([callback_mode/0]).
20 -export([handle_event/4]).
21 -export([terminate/3]).
22 -import(pgmp_codec, [marshal/2]).
23 -import(pgmp_codec, [size_inclusive/1]).
24 -import(pgmp_data_row, [decode/3]).
25 -import(pgmp_statem, [nei/1]).
26 -include_lib("kernel/include/logger.hrl").
27
28
29 callback_mode() ->
30
:-(
[handle_event_function, state_enter].
31
32
33 terminate(Reason, State, Data) ->
34
:-(
pgmp_mm_common:terminate(Reason, State, Data).
35
36
37 handle_event(internal, {recv, {copy_both_response, _}}, _, _) ->
38
:-(
keep_state_and_data;
39
40 handle_event(internal, {recv, {copy_data, _}}, _, _) ->
41
:-(
keep_state_and_data;
42
43 handle_event(internal,
44 {response, #{label := pgmp_types, reply := ready}},
45 waiting_for_types,
46 #{types_ready := Missing} = Data) when Missing == false ->
47
:-(
{next_state,
48 identify_system,
49 Data#{types_ready => not(Missing)},
50 nei(identify_system)};
51
52 handle_event(internal,
53 bootstrap_complete,
54 _,
55 #{types_ready := false} = Data) ->
56
:-(
{next_state, waiting_for_types, Data};
57
58 handle_event(internal,
59 bootstrap_complete,
60 _,
61 #{types_ready := true} = Data) ->
62
:-(
{next_state,
63 identify_system,
64 Data,
65 nei(identify_system)};
66
67 handle_event(internal, identify_system, _, _) ->
68
:-(
{keep_state_and_data, nei({query, [<<"IDENTIFY_SYSTEM">>]})};
69
70 handle_event(internal,
71 {recv, {command_complete, Command}},
72 _,
73 Data) when Command == identify_system;
74 Command == create_replication_slot ->
75
:-(
{keep_state, maps:without([columns], Data)};
76
77 handle_event(internal,
78 {recv, {ready_for_query, idle}},
79 identify_system,
80 Data) ->
81
:-(
{next_state, replication_slot, Data, nei(create_replication_slot)};
82
83 handle_event(internal,
84 {recv, {ready_for_query, idle}},
85 replication_slot,
86 Data) ->
87
:-(
{next_state, replication, Data, nei(start_replication)};
88
89 handle_event(internal, create_replication_slot = Command, _, _) ->
90
:-(
{keep_state_and_data, nei({Command, <<"abc">>})};
91
92 handle_event(internal, {create_replication_slot, SlotName}, _, _) ->
93
:-(
{keep_state_and_data,
94 nei({query,
95 [iolist_to_binary(
96 io_lib:format(
97 <<"CREATE_REPLICATION_SLOT ~s TEMPORARY PHYSICAL">>,
98 [SlotName]))]})};
99
100 handle_event(internal,
101 start_replication,
102 _,
103 #{identify_system := #{<<"xlogpos">> := Location},
104 replication_slot := #{<<"slot_name">> := SlotName}}) ->
105
:-(
{keep_state_and_data,
106 nei({query,
107 [iolist_to_binary(
108 io_lib:format(
109 <<"START_REPLICATION SLOT ~s PHYSICAL ~s">>,
110 [SlotName, Location]))]})};
111
112 handle_event(internal,
113 {recv, {ready_for_query, idle}},
114 ignore_for_the_moment,
115 #{identify_system := #{<<"timeline">> := Timeline}} = Data) ->
116
:-(
{next_state,
117 timeline_history,
118 Data,
119 nei({query,
120 [iolist_to_binary(
121 io_lib:format(
122 <<"TIMELINE_HISTORY ~b">>,
123 [Timeline]))]})};
124
125 handle_event(internal, {recv, {row_description, Columns}}, _, Data) ->
126
:-(
{keep_state, Data#{columns => Columns}};
127
128 handle_event(internal,
129 {recv, {data_row, Values}},
130 State,
131 #{parameters := Parameters,
132 config := Config,
133 columns := Columns} = Data) ->
134
:-(
{keep_state,
135 maps:put(State,
136 lists:foldl(
137 fun
138 ({#{field_name := FieldName}, Value}, A) ->
139
:-(
A#{FieldName => Value}
140 end,
141 maps:get(State, Data, #{}),
142 lists:zip(Columns,
143 decode(
144 Parameters,
145 lists:zip(Columns, Values),
146 pgmp_types:cache(Config)))),
147 Data)};
148
149 handle_event(internal, {query, [SQL]}, _, _) ->
150
:-(
{keep_state_and_data,
151 nei({send, ["Q", size_inclusive([marshal(string, SQL)])]})};
152
153 handle_event(EventType, EventContent, State, Data) ->
154
:-(
pgmp_mm_common:handle_event(EventType,
155 EventContent,
156 State,
157 Data).
Line Hits Source