diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
new file mode 100644
index 0000000000..a59ab01471
--- /dev/null
+++ b/synapse/replication/tcp/handler.py
@@ -0,0 +1,401 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""A replication client for use by synapse workers.
+"""
+
+import logging
+from typing import Any, Dict, List
+
+from prometheus_client import Counter
+
+from synapse.metrics import LaterGauge
+from synapse.replication.tcp.commands import (
+ ClearUserSyncsCommand,
+ Command,
+ FederationAckCommand,
+ InvalidateCacheCommand,
+ PositionCommand,
+ RdataCommand,
+ RemoteServerUpCommand,
+ RemovePusherCommand,
+ ReplicateCommand,
+ UserIpCommand,
+ UserSyncCommand,
+)
+from synapse.replication.tcp.streams import STREAMS_MAP, Stream
+
+logger = logging.getLogger(__name__)
+
+
+user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
+federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
+remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
+invalidate_cache_counter = Counter(
+ "synapse_replication_tcp_resource_invalidate_cache", ""
+)
+user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
+
+
+class ReplicationClientHandler:
+ """Handles incoming commands from replication.
+
+ Proxies data to `HomeServer.get_replication_data_handler()`.
+ """
+
+ def __init__(self, hs):
+ self.replication_data_handler = hs.get_replication_data_handler()
+ self.store = hs.get_datastore()
+ self.notifier = hs.get_notifier()
+ self.clock = hs.get_clock()
+ self.presence_handler = hs.get_presence_handler()
+ self.instance_id = hs.get_instance_id()
+
+ self.connections = []
+
+ self.streams = {
+ stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
+ } # type: Dict[str, Stream]
+
+ LaterGauge(
+ "synapse_replication_tcp_resource_total_connections",
+ "",
+ [],
+ lambda: len(self.connections),
+ )
+
+ LaterGauge(
+ "synapse_replication_tcp_resource_connections_per_stream",
+ "",
+ ["stream_name"],
+ lambda: {
+ (stream_name,): len(
+ [
+ conn
+ for conn in self.connections
+ if stream_name in conn.replication_streams
+ ]
+ )
+ for stream_name in self.streams
+ },
+ )
+
+ # Map of stream to batched updates. See RdataCommand for info on how
+ # batching works.
+ self.pending_batches = {} # type: Dict[str, List[Any]]
+
+ self.is_master = hs.config.worker_app is None
+
+ self.federation_sender = None
+ if self.is_master and not hs.config.send_federation:
+ self.federation_sender = hs.get_federation_sender()
+
+ self._server_notices_sender = None
+ if self.is_master:
+ self._server_notices_sender = hs.get_server_notices_sender()
+ self.notifier.add_remote_server_up_callback(self.send_remote_server_up)
+
+ def new_connection(self, connection):
+ self.connections.append(connection)
+
+ def lost_connection(self, connection):
+ try:
+ self.connections.remove(connection)
+ except ValueError:
+ pass
+
+ def connected(self) -> bool:
+ """Do we have any replication connections open?
+
+ Used to no-op if nothing is connected.
+ """
+ return bool(self.connections)
+
+ async def on_REPLICATE(self, cmd: ReplicateCommand):
+ # We only want to announce positions by the writer of the streams.
+ # Currently this is just the master process.
+ if not self.is_master:
+ return
+
+ if not self.connections:
+ raise Exception("Not connected")
+
+ for stream_name, stream in self.streams.items():
+ current_token = stream.current_token()
+ self.send_command(PositionCommand(stream_name, current_token))
+
+ async def on_USER_SYNC(self, cmd: UserSyncCommand):
+ user_sync_counter.inc()
+
+ if self.is_master:
+ await self.presence_handler.update_external_syncs_row(
+ cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
+ )
+
+ async def on_CLEAR_USER_SYNC(self, cmd: ClearUserSyncsCommand):
+ if self.is_master:
+ await self.presence_handler.update_external_syncs_clear(cmd.instance_id)
+
+ async def on_FEDERATION_ACK(self, cmd: FederationAckCommand):
+ federation_ack_counter.inc()
+
+ if self.federation_sender:
+ self.federation_sender.federation_ack(cmd.token)
+
+ async def on_REMOVE_PUSHER(self, cmd: RemovePusherCommand):
+ remove_pusher_counter.inc()
+
+ if self.is_master:
+ await self.store.delete_pusher_by_app_id_pushkey_user_id(
+ app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
+ )
+
+ self.notifier.on_new_replication_data()
+
+ async def on_INVALIDATE_CACHE(self, cmd: InvalidateCacheCommand):
+ invalidate_cache_counter.inc()
+
+ if self.is_master:
+ # We invalidate the cache locally, but then also stream that to other
+ # workers.
+ await self.store.invalidate_cache_and_stream(
+ cmd.cache_func, tuple(cmd.keys)
+ )
+
+ async def on_USER_IP(self, cmd: UserIpCommand):
+ user_ip_cache_counter.inc()
+
+ if self.is_master:
+ await self.store.insert_client_ip(
+ cmd.user_id,
+ cmd.access_token,
+ cmd.ip,
+ cmd.user_agent,
+ cmd.device_id,
+ cmd.last_seen,
+ )
+ await self._server_notices_sender.on_user_ip(cmd.user_id)
+
+ async def on_RDATA(self, cmd: RdataCommand):
+ stream_name = cmd.stream_name
+
+ try:
+ row = STREAMS_MAP[stream_name].parse_row(cmd.row)
+ except Exception:
+ logger.exception("[%s] Failed to parse RDATA: %r", stream_name, cmd.row)
+ raise
+
+ if cmd.token is None:
+ # I.e. this is part of a batch of updates for this stream. Batch
+ # until we get an update for the stream with a non None token
+ self.pending_batches.setdefault(stream_name, []).append(row)
+ else:
+ # Check if this is the last of a batch of updates
+ rows = self.pending_batches.pop(stream_name, [])
+ rows.append(row)
+ await self.on_rdata(stream_name, cmd.token, rows)
+
+ async def on_rdata(self, stream_name: str, token: int, rows: list):
+ """Called to handle a batch of replication data with a given stream token.
+
+ Args:
+ stream_name: name of the replication stream for this batch of rows
+ token: stream token for this batch of rows
+ rows: a list of Stream.ROW_TYPE objects as returned by
+ Stream.parse_row.
+ """
+ logger.debug("Received rdata %s -> %s", stream_name, token)
+ await self.replication_data_handler.on_rdata(stream_name, token, rows)
+
+ async def on_POSITION(self, cmd: PositionCommand):
+ stream = self.streams.get(cmd.stream_name)
+ if not stream:
+ logger.error("Got POSITION for unknown stream: %s", cmd.stream_name)
+ return
+
+ # Find where we previously streamed up to.
+ current_token = self.replication_data_handler.get_streams_to_replicate().get(
+ cmd.stream_name
+ )
+ if current_token is None:
+ logger.debug(
+ "Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
+ )
+ return
+
+ # Fetch all updates between then and now.
+ limited = cmd.token != current_token
+ while limited:
+ updates, current_token, limited = await stream.get_updates_since(
+ current_token, cmd.token
+ )
+ if updates:
+ await self.on_rdata(
+ cmd.stream_name,
+ current_token,
+ [stream.parse_row(update[1]) for update in updates],
+ )
+
+ # We've now caught up to position sent to us, notify handler.
+ await self.replication_data_handler.on_position(cmd.stream_name, cmd.token)
+
+ # Handle any RDATA that came in while we were catching up.
+ rows = self.pending_batches.pop(cmd.stream_name, [])
+ if rows:
+ await self.on_rdata(cmd.stream_name, rows[-1].token, rows)
+
+ async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
+ """Called when get a new REMOTE_SERVER_UP command."""
+ if self.is_master:
+ self.notifier.notify_remote_server_up(cmd.server)
+
+ def get_currently_syncing_users(self):
+ """Get the list of currently syncing users (if any). This is called
+ when a connection has been established and we need to send the
+ currently syncing users.
+ """
+ return self.presence_handler.get_currently_syncing_users()
+
+ def send_command(self, cmd: Command):
+ """Send a command to master (when we get establish a connection if we
+ don't have one already.)
+ """
+ for conn in self.connections:
+ conn.send_command(cmd)
+
+ def send_federation_ack(self, token: int):
+ """Ack data for the federation stream. This allows the master to drop
+ data stored purely in memory.
+ """
+ self.send_command(FederationAckCommand(token))
+
+ def send_user_sync(
+ self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
+ ):
+ """Poke the master that a user has started/stopped syncing.
+ """
+ self.send_command(
+ UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
+ )
+
+ def send_remove_pusher(self, app_id: str, push_key: str, user_id: str):
+ """Poke the master to remove a pusher for a user
+ """
+ cmd = RemovePusherCommand(app_id, push_key, user_id)
+ self.send_command(cmd)
+
+ def send_invalidate_cache(self, cache_func: str, keys: tuple):
+ """Poke the master to invalidate a cache.
+ """
+ cmd = InvalidateCacheCommand(cache_func.__name__, keys)
+ self.send_command(cmd)
+
+ def send_user_ip(
+ self,
+ user_id: str,
+ access_token: str,
+ ip: str,
+ user_agent: str,
+ device_id: str,
+ last_seen: int,
+ ):
+ """Tell the master that the user made a request.
+ """
+ cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
+ self.send_command(cmd)
+
+ def send_remote_server_up(self, server: str):
+ self.send_command(RemoteServerUpCommand(server))
+
+ def stream_update(self, stream_name: str, token: str, data: Any):
+ """Called when a new update is available to stream to clients.
+
+ We need to check if the client is interested in the stream or not
+ """
+ self.send_command(RdataCommand(stream_name, token, data))
+
+
+class DummyReplicationDataHandler:
+ """A replication data handler that simply discards all data.
+ """
+
+ async def on_rdata(self, stream_name: str, token: int, rows: list):
+ """Called to handle a batch of replication data with a given stream token.
+
+ By default this just pokes the slave store. Can be overridden in subclasses to
+ handle more.
+
+ Args:
+ stream_name (str): name of the replication stream for this batch of rows
+ token (int): stream token for this batch of rows
+ rows (list): a list of Stream.ROW_TYPE objects as returned by
+ Stream.parse_row.
+ """
+ pass
+
+ def get_streams_to_replicate(self) -> Dict[str, int]:
+ """Called when a new connection has been established and we need to
+ subscribe to streams.
+
+ Returns:
+ map from stream name to the most recent update we have for
+ that stream (ie, the point we want to start replicating from)
+ """
+ return {}
+
+ async def on_position(self, stream_name: str, token: int):
+ pass
+
+
+class WorkerReplicationDataHandler:
+ """A replication data handler that calls slave data stores.
+ """
+
+ def __init__(self, store):
+ self.store = store
+
+ async def on_rdata(self, stream_name: str, token: int, rows: list):
+ """Called to handle a batch of replication data with a given stream token.
+
+ By default this just pokes the slave store. Can be overridden in subclasses to
+ handle more.
+
+ Args:
+ stream_name (str): name of the replication stream for this batch of rows
+ token (int): stream token for this batch of rows
+ rows (list): a list of Stream.ROW_TYPE objects as returned by
+ Stream.parse_row.
+ """
+ self.store.process_replication_rows(stream_name, token, rows)
+
+ def get_streams_to_replicate(self) -> Dict[str, int]:
+ """Called when a new connection has been established and we need to
+ subscribe to streams.
+
+ Returns:
+ map from stream name to the most recent update we have for
+ that stream (ie, the point we want to start replicating from)
+ """
+ args = self.store.stream_positions()
+ user_account_data = args.pop("user_account_data", None)
+ room_account_data = args.pop("room_account_data", None)
+ if user_account_data:
+ args["account_data"] = user_account_data
+ elif room_account_data:
+ args["account_data"] = room_account_data
+ return args
+
+ async def on_position(self, stream_name: str, token: int):
+ self.store.process_replication_rows(stream_name, token, [])
|