summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
blob: 700ae7915852d2bd535ba417a251f18e5ddd3555 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A replication client for use by synapse workers.
"""

import logging
from typing import TYPE_CHECKING, Dict

from twisted.internet.protocol import ReconnectingClientFactory

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol

if TYPE_CHECKING:
    from synapse.server import HomeServer
    from synapse.replication.tcp.handler import ReplicationCommandHandler

logger = logging.getLogger(__name__)


class ReplicationClientFactory(ReconnectingClientFactory):
    """Factory for building connections to the master. Will reconnect if the
    connection is lost.

    Accepts a handler that is passed to `ClientReplicationStreamProtocol`.
    """

    initialDelay = 0.1
    maxDelay = 1  # Try at least once every N seconds

    def __init__(
        self,
        hs: "HomeServer",
        client_name: str,
        command_handler: "ReplicationCommandHandler",
    ):
        self.client_name = client_name
        self.command_handler = command_handler
        self.server_name = hs.config.server_name
        self.hs = hs
        self._clock = hs.get_clock()  # As self.clock is defined in super class

        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)

    def startedConnecting(self, connector):
        logger.info("Connecting to replication: %r", connector.getDestination())

    def buildProtocol(self, addr):
        logger.info("Connected to replication: %r", addr)
        return ClientReplicationStreamProtocol(
            self.hs,
            self.client_name,
            self.server_name,
            self._clock,
            self.command_handler,
        )

    def clientConnectionLost(self, connector, reason):
        logger.error("Lost replication conn: %r", reason)
        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

    def clientConnectionFailed(self, connector, reason):
        logger.error("Failed to connect to replication: %r", reason)
        ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)


class ReplicationDataHandler:
    """Handles incoming stream updates from replication.

    This instance notifies the slave data store about updates. Can be subclassed
    to handle updates in additional ways.
    """

    def __init__(self, store: BaseSlavedStore):
        self.store = store

    async def on_rdata(self, stream_name: str, token: int, rows: list):
        """Called to handle a batch of replication data with a given stream token.

        By default this just pokes the slave store. Can be overridden in subclasses to
        handle more.

        Args:
            stream_name (str): name of the replication stream for this batch of rows
            token (int): stream token for this batch of rows
            rows (list): a list of Stream.ROW_TYPE objects as returned by
                Stream.parse_row.
        """
        self.store.process_replication_rows(stream_name, token, rows)

    def get_streams_to_replicate(self) -> Dict[str, int]:
        """Called when a new connection has been established and we need to
        subscribe to streams.

        Returns:
            map from stream name to the most recent update we have for
            that stream (ie, the point we want to start replicating from)
        """
        args = self.store.stream_positions()
        user_account_data = args.pop("user_account_data", None)
        room_account_data = args.pop("room_account_data", None)
        if user_account_data:
            args["account_data"] = user_account_data
        elif room_account_data:
            args["account_data"] = room_account_data
        return args

    async def on_position(self, stream_name: str, token: int):
        self.store.process_replication_rows(stream_name, token, [])

    def on_remote_server_up(self, server: str):
        """Called when get a new REMOTE_SERVER_UP command."""