summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-20 15:17:01 +0000
committerErik Johnston <erik@matrix.org>2020-03-20 15:31:47 +0000
commitba90596687986c28503dc77b6079bf45bd7f4eb9 (patch)
tree78bd4a004ef21382d6aad73e293106fa0879de41 /synapse/replication/tcp
parentDon't panic if streams get behind. (diff)
downloadsynapse-ba90596687986c28503dc77b6079bf45bd7f4eb9.tar.xz
Add ability to catchup on stream by talking to master.
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/streams/__init__.py4
-rw-r--r--synapse/replication/tcp/streams/_base.py45
-rw-r--r--synapse/replication/tcp/streams/federation.py19
3 files changed, 52 insertions, 16 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 5f52264e84..c3b9a90ca5 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -25,6 +25,8 @@ Each stream is defined by the following information:
     update_function:    The function that returns a list of updates between two tokens
 """
 
+from typing import Dict, Type
+
 from . import _base, events, federation
 
 STREAMS_MAP = {
@@ -47,4 +49,4 @@ STREAMS_MAP = {
         _base.GroupServerStream,
         _base.UserSignatureStream,
     )
-}
+}  # type: Dict[str, Type[_base.Stream]]
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 99cef97532..6dea523f8c 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -20,6 +20,7 @@ from typing import Any, List, Optional, Tuple, Union
 
 import attr
 
+from synapse.replication.http.streams import ReplicationGetStreamUpdates
 from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
@@ -127,6 +128,10 @@ class Stream(object):
     # The type of the row. Used by the default impl of parse_row.
     ROW_TYPE = None  # type: Any
 
+    # Whether the update function is only available on master. If True then
+    # calls to get updates are proxied to the master via a HTTP call.
+    _QUERY_MASTER = False
+
     @classmethod
     def parse_row(cls, row):
         """Parse a row received over replication
@@ -143,6 +148,11 @@ class Stream(object):
         return cls.ROW_TYPE(*row)
 
     def __init__(self, hs):
+        self._is_worker = hs.config.worker_app is not None
+
+        if self._QUERY_MASTER and self._is_worker:
+            self._replication_client = ReplicationGetStreamUpdates.make_client(hs)
+
         # The token from which we last asked for updates
         self.last_token = self.current_token()
 
@@ -191,14 +201,23 @@ class Stream(object):
         if from_token == upto_token:
             return [], upto_token, False
 
-        limited = False
-        rows = await self.update_function(from_token, upto_token, limit=limit)
-        updates = [(row[0], row[1:]) for row in rows]
-        if len(updates) == limit:
-            upto_token = rows[-1][0]
-            limited = True
-
-        return updates, upto_token, limited
+        if self._is_worker and self._QUERY_MASTER:
+            result = await self._replication_client(
+                stream_name=self.NAME,
+                from_token=from_token,
+                upto_token=upto_token,
+                limit=limit,
+            )
+            return result["updates"], result["upto_token"], result["limited"]
+        else:
+            limited = False
+            rows = await self.update_function(from_token, upto_token, limit=limit)
+            updates = [(row[0], row[1:]) for row in rows]
+            if len(updates) == limit:
+                upto_token = rows[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
 
     def current_token(self):
         """Gets the current token of the underlying streams. Should be provided
@@ -239,13 +258,16 @@ class BackfillStream(Stream):
 class PresenceStream(Stream):
     NAME = "presence"
     ROW_TYPE = PresenceStreamRow
+    _QUERY_MASTER = True
 
     def __init__(self, hs):
         store = hs.get_datastore()
         presence_handler = hs.get_presence_handler()
 
         self.current_token = store.get_current_presence_token  # type: ignore
-        self.update_function = presence_handler.get_all_presence_updates  # type: ignore
+
+        if hs.config.worker_app is None:
+            self.update_function = presence_handler.get_all_presence_updates  # type: ignore
 
         super(PresenceStream, self).__init__(hs)
 
@@ -253,12 +275,15 @@ class PresenceStream(Stream):
 class TypingStream(Stream):
     NAME = "typing"
     ROW_TYPE = TypingStreamRow
+    _QUERY_MASTER = True
 
     def __init__(self, hs):
         typing_handler = hs.get_typing_handler()
 
         self.current_token = typing_handler.get_current_token  # type: ignore
-        self.update_function = typing_handler.get_all_typing_updates  # type: ignore
+
+        if hs.config.worker_app is None:
+            self.update_function = typing_handler.get_all_typing_updates  # type: ignore
 
         super(TypingStream, self).__init__(hs)
 
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 615f3dc9ac..5d9e87188b 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,7 +15,9 @@
 # limitations under the License.
 from collections import namedtuple
 
-from ._base import Stream
+from twisted.internet import defer
+
+from synapse.replication.tcp.streams._base import Stream
 
 FederationStreamRow = namedtuple(
     "FederationStreamRow",
@@ -33,11 +35,18 @@ class FederationStream(Stream):
 
     NAME = "federation"
     ROW_TYPE = FederationStreamRow
+    _QUERY_MASTER = True
 
     def __init__(self, hs):
-        federation_sender = hs.get_federation_sender()
-
-        self.current_token = federation_sender.get_current_token  # type: ignore
-        self.update_function = federation_sender.get_replication_rows  # type: ignore
+        # Not all synapse instances will have a federation sender instance,
+        # whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
+        # so we stub the stream out when that is the case.
+        if hs.config.worker_app is None or hs.should_send_federation():
+            federation_sender = hs.get_federation_sender()
+            self.current_token = federation_sender.get_current_token  # type: ignore
+            self.update_function = federation_sender.get_replication_rows  # type: ignore
+        else:
+            self.current_token = lambda: 0  # type: ignore
+            self.update_function = lambda *args, **kwargs: defer.succeed([])  # type: ignore
 
         super(FederationStream, self).__init__(hs)