diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-20 15:17:01 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-03-20 15:31:47 +0000 |
commit | ba90596687986c28503dc77b6079bf45bd7f4eb9 (patch) | |
tree | 78bd4a004ef21382d6aad73e293106fa0879de41 /synapse/replication/tcp | |
parent | Don't panic if streams get behind. (diff) | |
download | synapse-ba90596687986c28503dc77b6079bf45bd7f4eb9.tar.xz |
Add ability to catchup on stream by talking to master.
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/streams/__init__.py | 4 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 45 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/federation.py | 19 |
3 files changed, 52 insertions, 16 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 5f52264e84..c3b9a90ca5 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -25,6 +25,8 @@ Each stream is defined by the following information: update_function: The function that returns a list of updates between two tokens """ +from typing import Dict, Type + from . import _base, events, federation STREAMS_MAP = { @@ -47,4 +49,4 @@ STREAMS_MAP = { _base.GroupServerStream, _base.UserSignatureStream, ) -} +} # type: Dict[str, Type[_base.Stream]] diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 99cef97532..6dea523f8c 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -20,6 +20,7 @@ from typing import Any, List, Optional, Tuple, Union import attr +from synapse.replication.http.streams import ReplicationGetStreamUpdates from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -127,6 +128,10 @@ class Stream(object): # The type of the row. Used by the default impl of parse_row. ROW_TYPE = None # type: Any + # Whether the update function is only available on master. If True then + # calls to get updates are proxied to the master via a HTTP call. + _QUERY_MASTER = False + @classmethod def parse_row(cls, row): """Parse a row received over replication @@ -143,6 +148,11 @@ class Stream(object): return cls.ROW_TYPE(*row) def __init__(self, hs): + self._is_worker = hs.config.worker_app is not None + + if self._QUERY_MASTER and self._is_worker: + self._replication_client = ReplicationGetStreamUpdates.make_client(hs) + # The token from which we last asked for updates self.last_token = self.current_token() @@ -191,14 +201,23 @@ class Stream(object): if from_token == upto_token: return [], upto_token, False - limited = False - rows = await self.update_function(from_token, upto_token, limit=limit) - updates = [(row[0], row[1:]) for row in rows] - if len(updates) == limit: - upto_token = rows[-1][0] - limited = True - - return updates, upto_token, limited + if self._is_worker and self._QUERY_MASTER: + result = await self._replication_client( + stream_name=self.NAME, + from_token=from_token, + upto_token=upto_token, + limit=limit, + ) + return result["updates"], result["upto_token"], result["limited"] + else: + limited = False + rows = await self.update_function(from_token, upto_token, limit=limit) + updates = [(row[0], row[1:]) for row in rows] + if len(updates) == limit: + upto_token = rows[-1][0] + limited = True + + return updates, upto_token, limited def current_token(self): """Gets the current token of the underlying streams. Should be provided @@ -239,13 +258,16 @@ class BackfillStream(Stream): class PresenceStream(Stream): NAME = "presence" ROW_TYPE = PresenceStreamRow + _QUERY_MASTER = True def __init__(self, hs): store = hs.get_datastore() presence_handler = hs.get_presence_handler() self.current_token = store.get_current_presence_token # type: ignore - self.update_function = presence_handler.get_all_presence_updates # type: ignore + + if hs.config.worker_app is None: + self.update_function = presence_handler.get_all_presence_updates # type: ignore super(PresenceStream, self).__init__(hs) @@ -253,12 +275,15 @@ class PresenceStream(Stream): class TypingStream(Stream): NAME = "typing" ROW_TYPE = TypingStreamRow + _QUERY_MASTER = True def __init__(self, hs): typing_handler = hs.get_typing_handler() self.current_token = typing_handler.get_current_token # type: ignore - self.update_function = typing_handler.get_all_typing_updates # type: ignore + + if hs.config.worker_app is None: + self.update_function = typing_handler.get_all_typing_updates # type: ignore super(TypingStream, self).__init__(hs) diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index 615f3dc9ac..5d9e87188b 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -15,7 +15,9 @@ # limitations under the License. from collections import namedtuple -from ._base import Stream +from twisted.internet import defer + +from synapse.replication.tcp.streams._base import Stream FederationStreamRow = namedtuple( "FederationStreamRow", @@ -33,11 +35,18 @@ class FederationStream(Stream): NAME = "federation" ROW_TYPE = FederationStreamRow + _QUERY_MASTER = True def __init__(self, hs): - federation_sender = hs.get_federation_sender() - - self.current_token = federation_sender.get_current_token # type: ignore - self.update_function = federation_sender.get_replication_rows # type: ignore + # Not all synapse instances will have a federation sender instance, + # whether that's a `FederationSender` or a `FederationRemoteSendQueue`, + # so we stub the stream out when that is the case. + if hs.config.worker_app is None or hs.should_send_federation(): + federation_sender = hs.get_federation_sender() + self.current_token = federation_sender.get_current_token # type: ignore + self.update_function = federation_sender.get_replication_rows # type: ignore + else: + self.current_token = lambda: 0 # type: ignore + self.update_function = lambda *args, **kwargs: defer.succeed([]) # type: ignore super(FederationStream, self).__init__(hs) |