summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-20 15:17:01 +0000
committerErik Johnston <erik@matrix.org>2020-03-20 15:31:47 +0000
commitba90596687986c28503dc77b6079bf45bd7f4eb9 (patch)
tree78bd4a004ef21382d6aad73e293106fa0879de41 /synapse/replication
parentDon't panic if streams get behind. (diff)
downloadsynapse-ba90596687986c28503dc77b6079bf45bd7f4eb9.tar.xz
Add ability to catchup on stream by talking to master.
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/__init__.py2
-rw-r--r--synapse/replication/http/streams.py65
-rw-r--r--synapse/replication/tcp/streams/__init__.py4
-rw-r--r--synapse/replication/tcp/streams/_base.py45
-rw-r--r--synapse/replication/tcp/streams/federation.py19
5 files changed, 119 insertions, 16 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 28dbc6fcba..4613b2538c 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -21,6 +21,7 @@ from synapse.replication.http import (
     membership,
     register,
     send_event,
+    streams,
 )
 
 REPLICATION_PREFIX = "/_synapse/replication"
@@ -38,3 +39,4 @@ class ReplicationRestResource(JsonResource):
         login.register_servlets(hs, self)
         register.register_servlets(hs, self)
         devices.register_servlets(hs, self)
+        streams.register_servlets(hs, self)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
new file mode 100644
index 0000000000..3889278b2a
--- /dev/null
+++ b/synapse/replication/http/streams.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import parse_integer
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationGetStreamUpdates(ReplicationEndpoint):
+    """Fetches stream updates from a server. Used for streams not persisted to
+    the database, e.g. typing notifications.
+    """
+
+    NAME = "get_repl_stream_updates"
+    PATH_ARGS = ("stream_name",)
+    METHOD = "GET"
+
+    def __init__(self, hs):
+        super(ReplicationGetStreamUpdates, self).__init__(hs)
+
+        from synapse.replication.tcp.streams import STREAMS_MAP
+
+        self.streams = {stream.NAME: stream(hs) for stream in STREAMS_MAP.values()}
+
+    @staticmethod
+    def _serialize_payload(stream_name, from_token, upto_token, limit):
+        return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
+
+    async def _handle_request(self, request, stream_name):
+        stream = self.streams.get(stream_name)
+        if stream is None:
+            raise SynapseError(400, "Unknown stream")
+
+        from_token = parse_integer(request, "from_token", required=True)
+        upto_token = parse_integer(request, "upto_token", required=True)
+        limit = parse_integer(request, "limit", required=True)
+
+        updates, upto_token, limited = await stream.get_updates_since(
+            from_token, upto_token, limit
+        )
+
+        return (
+            200,
+            {"updates": updates, "upto_token": upto_token, "limited": limited},
+        )
+
+
+def register_servlets(hs, http_server):
+    ReplicationGetStreamUpdates(hs).register(http_server)
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 5f52264e84..c3b9a90ca5 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -25,6 +25,8 @@ Each stream is defined by the following information:
     update_function:    The function that returns a list of updates between two tokens
 """
 
+from typing import Dict, Type
+
 from . import _base, events, federation
 
 STREAMS_MAP = {
@@ -47,4 +49,4 @@ STREAMS_MAP = {
         _base.GroupServerStream,
         _base.UserSignatureStream,
     )
-}
+}  # type: Dict[str, Type[_base.Stream]]
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 99cef97532..6dea523f8c 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -20,6 +20,7 @@ from typing import Any, List, Optional, Tuple, Union
 
 import attr
 
+from synapse.replication.http.streams import ReplicationGetStreamUpdates
 from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
@@ -127,6 +128,10 @@ class Stream(object):
     # The type of the row. Used by the default impl of parse_row.
     ROW_TYPE = None  # type: Any
 
+    # Whether the update function is only available on master. If True then
+    # calls to get updates are proxied to the master via a HTTP call.
+    _QUERY_MASTER = False
+
     @classmethod
     def parse_row(cls, row):
         """Parse a row received over replication
@@ -143,6 +148,11 @@ class Stream(object):
         return cls.ROW_TYPE(*row)
 
     def __init__(self, hs):
+        self._is_worker = hs.config.worker_app is not None
+
+        if self._QUERY_MASTER and self._is_worker:
+            self._replication_client = ReplicationGetStreamUpdates.make_client(hs)
+
         # The token from which we last asked for updates
         self.last_token = self.current_token()
 
@@ -191,14 +201,23 @@ class Stream(object):
         if from_token == upto_token:
             return [], upto_token, False
 
-        limited = False
-        rows = await self.update_function(from_token, upto_token, limit=limit)
-        updates = [(row[0], row[1:]) for row in rows]
-        if len(updates) == limit:
-            upto_token = rows[-1][0]
-            limited = True
-
-        return updates, upto_token, limited
+        if self._is_worker and self._QUERY_MASTER:
+            result = await self._replication_client(
+                stream_name=self.NAME,
+                from_token=from_token,
+                upto_token=upto_token,
+                limit=limit,
+            )
+            return result["updates"], result["upto_token"], result["limited"]
+        else:
+            limited = False
+            rows = await self.update_function(from_token, upto_token, limit=limit)
+            updates = [(row[0], row[1:]) for row in rows]
+            if len(updates) == limit:
+                upto_token = rows[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
 
     def current_token(self):
         """Gets the current token of the underlying streams. Should be provided
@@ -239,13 +258,16 @@ class BackfillStream(Stream):
 class PresenceStream(Stream):
     NAME = "presence"
     ROW_TYPE = PresenceStreamRow
+    _QUERY_MASTER = True
 
     def __init__(self, hs):
         store = hs.get_datastore()
         presence_handler = hs.get_presence_handler()
 
         self.current_token = store.get_current_presence_token  # type: ignore
-        self.update_function = presence_handler.get_all_presence_updates  # type: ignore
+
+        if hs.config.worker_app is None:
+            self.update_function = presence_handler.get_all_presence_updates  # type: ignore
 
         super(PresenceStream, self).__init__(hs)
 
@@ -253,12 +275,15 @@ class PresenceStream(Stream):
 class TypingStream(Stream):
     NAME = "typing"
     ROW_TYPE = TypingStreamRow
+    _QUERY_MASTER = True
 
     def __init__(self, hs):
         typing_handler = hs.get_typing_handler()
 
         self.current_token = typing_handler.get_current_token  # type: ignore
-        self.update_function = typing_handler.get_all_typing_updates  # type: ignore
+
+        if hs.config.worker_app is None:
+            self.update_function = typing_handler.get_all_typing_updates  # type: ignore
 
         super(TypingStream, self).__init__(hs)
 
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 615f3dc9ac..5d9e87188b 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,7 +15,9 @@
 # limitations under the License.
 from collections import namedtuple
 
-from ._base import Stream
+from twisted.internet import defer
+
+from synapse.replication.tcp.streams._base import Stream
 
 FederationStreamRow = namedtuple(
     "FederationStreamRow",
@@ -33,11 +35,18 @@ class FederationStream(Stream):
 
     NAME = "federation"
     ROW_TYPE = FederationStreamRow
+    _QUERY_MASTER = True
 
     def __init__(self, hs):
-        federation_sender = hs.get_federation_sender()
-
-        self.current_token = federation_sender.get_current_token  # type: ignore
-        self.update_function = federation_sender.get_replication_rows  # type: ignore
+        # Not all synapse instances will have a federation sender instance,
+        # whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
+        # so we stub the stream out when that is the case.
+        if hs.config.worker_app is None or hs.should_send_federation():
+            federation_sender = hs.get_federation_sender()
+            self.current_token = federation_sender.get_current_token  # type: ignore
+            self.update_function = federation_sender.get_replication_rows  # type: ignore
+        else:
+            self.current_token = lambda: 0  # type: ignore
+            self.update_function = lambda *args, **kwargs: defer.succeed([])  # type: ignore
 
         super(FederationStream, self).__init__(hs)