summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
new file mode 100644
index 0000000000..90fb6c1336
--- /dev/null
+++ b/synapse/replication/tcp/client.py
@@ -0,0 +1,196 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""A replication client for use by synapse workers.
+"""
+
+from twisted.internet import reactor, defer
+from twisted.internet.protocol import ReconnectingClientFactory
+
+from .commands import (
+    FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
+)
+from .protocol import ClientReplicationStreamProtocol
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationClientFactory(ReconnectingClientFactory):
+    """Factory for building connections to the master. Will reconnect if the
+    connection is lost.
+
+    Accepts a handler that will be called when new data is available or data
+    is required.
+    """
+    maxDelay = 5  # Try at least once every N seconds
+
+    def __init__(self, hs, client_name, handler):
+        self.client_name = client_name
+        self.handler = handler
+        self.server_name = hs.config.server_name
+        self._clock = hs.get_clock()  # As self.clock is defined in super class
+
+        reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
+
+    def startedConnecting(self, connector):
+        logger.info("Connecting to replication: %r", connector.getDestination())
+
+    def buildProtocol(self, addr):
+        logger.info("Connected to replication: %r", addr)
+        self.resetDelay()
+        return ClientReplicationStreamProtocol(
+            self.client_name, self.server_name, self._clock, self.handler
+        )
+
+    def clientConnectionLost(self, connector, reason):
+        logger.error("Lost replication conn: %r", reason)
+        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
+
+    def clientConnectionFailed(self, connector, reason):
+        logger.error("Failed to connect to replication: %r", reason)
+        ReconnectingClientFactory.clientConnectionFailed(
+            self, connector, reason
+        )
+
+
+class ReplicationClientHandler(object):
+    """A base handler that can be passed to the ReplicationClientFactory.
+
+    By default proxies incoming replication data to the SlaveStore.
+    """
+    def __init__(self, store):
+        self.store = store
+
+        # The current connection. None if we are currently (re)connecting
+        self.connection = None
+
+        # Any pending commands to be sent once a new connection has been
+        # established
+        self.pending_commands = []
+
+        # Map from string -> deferred, to wake up when receiveing a SYNC with
+        # the given string.
+        # Used for tests.
+        self.awaiting_syncs = {}
+
+    def start_replication(self, hs):
+        """Helper method to start a replication connection to the remote server
+        using TCP.
+        """
+        client_name = hs.config.worker_name
+        factory = ReplicationClientFactory(hs, client_name, self)
+        host = hs.config.worker_replication_host
+        port = hs.config.worker_replication_port
+        reactor.connectTCP(host, port, factory)
+
+    def on_rdata(self, stream_name, token, rows):
+        """Called when we get new replication data. By default this just pokes
+        the slave store.
+
+        Can be overriden in subclasses to handle more.
+        """
+        logger.info("Received rdata %s -> %s", stream_name, token)
+        self.store.process_replication_rows(stream_name, token, rows)
+
+    def on_position(self, stream_name, token):
+        """Called when we get new position data. By default this just pokes
+        the slave store.
+
+        Can be overriden in subclasses to handle more.
+        """
+        self.store.process_replication_rows(stream_name, token, [])
+
+    def on_sync(self, data):
+        """When we received a SYNC we wake up any deferreds that were waiting
+        for the sync with the given data.
+
+        Used by tests.
+        """
+        d = self.awaiting_syncs.pop(data, None)
+        if d:
+            d.callback(data)
+
+    def get_streams_to_replicate(self):
+        """Called when a new connection has been established and we need to
+        subscribe to streams.
+
+        Returns a dictionary of stream name to token.
+        """
+        args = self.store.stream_positions()
+        user_account_data = args.pop("user_account_data", None)
+        room_account_data = args.pop("room_account_data", None)
+        if user_account_data:
+            args["account_data"] = user_account_data
+        elif room_account_data:
+            args["account_data"] = room_account_data
+        return args
+
+    def get_currently_syncing_users(self):
+        """Get the list of currently syncing users (if any). This is called
+        when a connection has been established and we need to send the
+        currently syncing users. (Overriden by the synchrotron's only)
+        """
+        return []
+
+    def send_command(self, cmd):
+        """Send a command to master (when we get establish a connection if we
+        don't have one already.)
+        """
+        if self.connection:
+            self.connection.send_command(cmd)
+        else:
+            logger.warn("Queuing command as not connected: %r", cmd.NAME)
+            self.pending_commands.append(cmd)
+
+    def send_federation_ack(self, token):
+        """Ack data for the federation stream. This allows the master to drop
+        data stored purely in memory.
+        """
+        self.send_command(FederationAckCommand(token))
+
+    def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+        """Poke the master that a user has started/stopped syncing.
+        """
+        self.send_command(UserSyncCommand(user_id, is_syncing, last_sync_ms))
+
+    def send_remove_pusher(self, app_id, push_key, user_id):
+        """Poke the master to remove a pusher for a user
+        """
+        cmd = RemovePusherCommand(app_id, push_key, user_id)
+        self.send_command(cmd)
+
+    def send_invalidate_cache(self, cache_func, keys):
+        """Poke the master to invalidate a cache.
+        """
+        cmd = InvalidateCacheCommand(cache_func.__name__, keys)
+        self.send_command(cmd)
+
+    def await_sync(self, data):
+        """Returns a deferred that is resolved when we receive a SYNC command
+        with given data.
+
+        Used by tests.
+        """
+        return self.awaiting_syncs.setdefault(data, defer.Deferred())
+
+    def update_connection(self, connection):
+        """Called when a connection has been established (or lost with None).
+        """
+        self.connection = connection
+        if connection:
+            for cmd in self.pending_commands:
+                connection.send_command(cmd)
+            self.pending_commands = []