From cc6243b4c08bfae77c9ff29d23c40568ab284924 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 4 Nov 2019 12:40:18 +0000 Subject: document the REPLICATE command a bit better (#6305) since I found myself wonder how it works --- synapse/replication/tcp/client.py | 20 +++++++--- synapse/replication/tcp/protocol.py | 74 ++++++++++++++++++++++++++++++++++++- 2 files changed, 86 insertions(+), 8 deletions(-) (limited to 'synapse/replication/tcp') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 563ce0fc53..fead78388c 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -16,10 +16,17 @@ """ import logging +from typing import Dict from twisted.internet import defer from twisted.internet.protocol import ReconnectingClientFactory +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.tcp.protocol import ( + AbstractReplicationClientHandler, + ClientReplicationStreamProtocol, +) + from .commands import ( FederationAckCommand, InvalidateCacheCommand, @@ -27,7 +34,6 @@ from .commands import ( UserIpCommand, UserSyncCommand, ) -from .protocol import ClientReplicationStreamProtocol logger = logging.getLogger(__name__) @@ -42,7 +48,7 @@ class ReplicationClientFactory(ReconnectingClientFactory): maxDelay = 30 # Try at least once every N seconds - def __init__(self, hs, client_name, handler): + def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler): self.client_name = client_name self.handler = handler self.server_name = hs.config.server_name @@ -68,13 +74,13 @@ class ReplicationClientFactory(ReconnectingClientFactory): ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) -class ReplicationClientHandler(object): +class ReplicationClientHandler(AbstractReplicationClientHandler): """A base handler that can be passed to the ReplicationClientFactory. By default proxies incoming replication data to the SlaveStore. """ - def __init__(self, store): + def __init__(self, store: BaseSlavedStore): self.store = store # The current connection. None if we are currently (re)connecting @@ -138,11 +144,13 @@ class ReplicationClientHandler(object): if d: d.callback(data) - def get_streams_to_replicate(self): + def get_streams_to_replicate(self) -> Dict[str, int]: """Called when a new connection has been established and we need to subscribe to streams. - Returns a dictionary of stream name to token. + Returns: + map from stream name to the most recent update we have for + that stream (ie, the point we want to start replicating from) """ args = self.store.stream_positions() user_account_data = args.pop("user_account_data", None) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index b64f3f44b5..afaf002fe6 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -48,7 +48,7 @@ indicate which side is sending, these are *not* included on the wire:: > ERROR server stopping * connection closed by server * """ - +import abc import fcntl import logging import struct @@ -65,6 +65,7 @@ from twisted.python.failure import Failure from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import Clock from synapse.util.stringutils import random_string from .commands import ( @@ -558,11 +559,80 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): self.streamer.lost_connection(self) +class AbstractReplicationClientHandler(metaclass=abc.ABCMeta): + """ + The interface for the handler that should be passed to + ClientReplicationStreamProtocol + """ + + @abc.abstractmethod + def on_rdata(self, stream_name, token, rows): + """Called to handle a batch of replication data with a given stream token. + + Args: + stream_name (str): name of the replication stream for this batch of rows + token (int): stream token for this batch of rows + rows (list): a list of Stream.ROW_TYPE objects as returned by + Stream.parse_row. + + Returns: + Deferred|None + """ + raise NotImplementedError() + + @abc.abstractmethod + def on_position(self, stream_name, token): + """Called when we get new position data.""" + raise NotImplementedError() + + @abc.abstractmethod + def on_sync(self, data): + """Called when get a new SYNC command.""" + raise NotImplementedError() + + @abc.abstractmethod + def get_streams_to_replicate(self): + """Called when a new connection has been established and we need to + subscribe to streams. + + Returns: + map from stream name to the most recent update we have for + that stream (ie, the point we want to start replicating from) + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_currently_syncing_users(self): + """Get the list of currently syncing users (if any). This is called + when a connection has been established and we need to send the + currently syncing users.""" + raise NotImplementedError() + + @abc.abstractmethod + def update_connection(self, connection): + """Called when a connection has been established (or lost with None). + """ + raise NotImplementedError() + + @abc.abstractmethod + def finished_connecting(self): + """Called when we have successfully subscribed and caught up to all + streams we're interested in. + """ + raise NotImplementedError() + + class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS - def __init__(self, client_name, server_name, clock, handler): + def __init__( + self, + client_name: str, + server_name: str, + clock: Clock, + handler: AbstractReplicationClientHandler, + ): BaseReplicationStreamProtocol.__init__(self, clock) self.client_name = client_name -- cgit 1.5.1 From c4bdf2d7855dcddb7e294e9dba458884db2be7e0 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 8 Nov 2019 11:42:55 +0000 Subject: Remove content from being sent for account data rdata stream --- synapse/replication/tcp/streams/_base.py | 6 +++--- synapse/storage/data_stores/main/account_data.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/replication/tcp') diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 9e45429d49..dd87733842 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -89,7 +89,7 @@ TagAccountDataStreamRow = namedtuple( ) AccountDataStreamRow = namedtuple( "AccountDataStream", - ("user_id", "room_id", "data_type", "data"), # str # str # str # dict + ("user_id", "room_id", "data_type"), # str # str # str ) GroupsStreamRow = namedtuple( "GroupsStreamRow", @@ -421,8 +421,8 @@ class AccountDataStream(Stream): results = list(room_results) results.extend( - (stream_id, user_id, None, account_data_type, content) - for stream_id, user_id, account_data_type, content in global_results + (stream_id, user_id, None, account_data_type) + for stream_id, user_id, account_data_type in global_results ) return results diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py index 6afbfc0d74..22093484ed 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py @@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore): current_id(int): The position to fetch up to. Returns: A deferred pair of lists of tuples of stream_id int, user_id string, - room_id string, type string, and content string. + room_id string, and type string. """ if last_room_id == current_id and last_global_id == current_id: return defer.succeed(([], [])) def get_updated_account_data_txn(txn): sql = ( - "SELECT stream_id, user_id, account_data_type, content" + "SELECT stream_id, user_id, account_data_type" " FROM account_data WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC LIMIT ?" ) @@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore): global_results = txn.fetchall() sql = ( - "SELECT stream_id, user_id, room_id, account_data_type, content" + "SELECT stream_id, user_id, room_id, account_data_type" " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC LIMIT ?" ) -- cgit 1.5.1 From cd96b4586f8dfa8a6857e1bee7de2bd9660238d5 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 8 Nov 2019 15:45:45 +0000 Subject: lint --- synapse/replication/tcp/streams/_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/replication/tcp') diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index dd87733842..8512923eae 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -88,8 +88,7 @@ TagAccountDataStreamRow = namedtuple( "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict ) AccountDataStreamRow = namedtuple( - "AccountDataStream", - ("user_id", "room_id", "data_type"), # str # str # str + "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str ) GroupsStreamRow = namedtuple( "GroupsStreamRow", -- cgit 1.5.1