diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 233cb33daf..a477578e44 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -499,4 +499,13 @@ class FederationSender(object):
self._get_per_destination_queue(destination).attempt_new_transaction()
def get_current_token(self) -> int:
+ # Dummy implementation for case where federation sender isn't offloaded
+ # to a worker.
return 0
+
+ async def get_replication_rows(
+ self, from_token, to_token, limit, federation_ack=None
+ ):
+ # Dummy implementation for case where federation sender isn't offloaded
+ # to a worker.
+ return []
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 28dbc6fcba..4613b2538c 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -21,6 +21,7 @@ from synapse.replication.http import (
membership,
register,
send_event,
+ streams,
)
REPLICATION_PREFIX = "/_synapse/replication"
@@ -38,3 +39,4 @@ class ReplicationRestResource(JsonResource):
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
+ streams.register_servlets(hs, self)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
new file mode 100644
index 0000000000..3889278b2a
--- /dev/null
+++ b/synapse/replication/http/streams.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import parse_integer
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationGetStreamUpdates(ReplicationEndpoint):
+ """Fetches stream updates from a server. Used for streams not persisted to
+ the database, e.g. typing notifications.
+ """
+
+ NAME = "get_repl_stream_updates"
+ PATH_ARGS = ("stream_name",)
+ METHOD = "GET"
+
+ def __init__(self, hs):
+ super(ReplicationGetStreamUpdates, self).__init__(hs)
+
+ from synapse.replication.tcp.streams import STREAMS_MAP
+
+ self.streams = {stream.NAME: stream(hs) for stream in STREAMS_MAP.values()}
+
+ @staticmethod
+ def _serialize_payload(stream_name, from_token, upto_token, limit):
+ return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
+
+ async def _handle_request(self, request, stream_name):
+ stream = self.streams.get(stream_name)
+ if stream is None:
+ raise SynapseError(400, "Unknown stream")
+
+ from_token = parse_integer(request, "from_token", required=True)
+ upto_token = parse_integer(request, "upto_token", required=True)
+ limit = parse_integer(request, "limit", required=True)
+
+ updates, upto_token, limited = await stream.get_updates_since(
+ from_token, upto_token, limit
+ )
+
+ return (
+ 200,
+ {"updates": updates, "upto_token": upto_token, "limited": limited},
+ )
+
+
+def register_servlets(hs, http_server):
+ ReplicationGetStreamUpdates(hs).register(http_server)
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 5f52264e84..c3b9a90ca5 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -25,6 +25,8 @@ Each stream is defined by the following information:
update_function: The function that returns a list of updates between two tokens
"""
+from typing import Dict, Type
+
from . import _base, events, federation
STREAMS_MAP = {
@@ -47,4 +49,4 @@ STREAMS_MAP = {
_base.GroupServerStream,
_base.UserSignatureStream,
)
-}
+} # type: Dict[str, Type[_base.Stream]]
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 99cef97532..6dea523f8c 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -20,6 +20,7 @@ from typing import Any, List, Optional, Tuple, Union
import attr
+from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -127,6 +128,10 @@ class Stream(object):
# The type of the row. Used by the default impl of parse_row.
ROW_TYPE = None # type: Any
+ # Whether the update function is only available on master. If True then
+ # calls to get updates are proxied to the master via a HTTP call.
+ _QUERY_MASTER = False
+
@classmethod
def parse_row(cls, row):
"""Parse a row received over replication
@@ -143,6 +148,11 @@ class Stream(object):
return cls.ROW_TYPE(*row)
def __init__(self, hs):
+ self._is_worker = hs.config.worker_app is not None
+
+ if self._QUERY_MASTER and self._is_worker:
+ self._replication_client = ReplicationGetStreamUpdates.make_client(hs)
+
# The token from which we last asked for updates
self.last_token = self.current_token()
@@ -191,14 +201,23 @@ class Stream(object):
if from_token == upto_token:
return [], upto_token, False
- limited = False
- rows = await self.update_function(from_token, upto_token, limit=limit)
- updates = [(row[0], row[1:]) for row in rows]
- if len(updates) == limit:
- upto_token = rows[-1][0]
- limited = True
-
- return updates, upto_token, limited
+ if self._is_worker and self._QUERY_MASTER:
+ result = await self._replication_client(
+ stream_name=self.NAME,
+ from_token=from_token,
+ upto_token=upto_token,
+ limit=limit,
+ )
+ return result["updates"], result["upto_token"], result["limited"]
+ else:
+ limited = False
+ rows = await self.update_function(from_token, upto_token, limit=limit)
+ updates = [(row[0], row[1:]) for row in rows]
+ if len(updates) == limit:
+ upto_token = rows[-1][0]
+ limited = True
+
+ return updates, upto_token, limited
def current_token(self):
"""Gets the current token of the underlying streams. Should be provided
@@ -239,13 +258,16 @@ class BackfillStream(Stream):
class PresenceStream(Stream):
NAME = "presence"
ROW_TYPE = PresenceStreamRow
+ _QUERY_MASTER = True
def __init__(self, hs):
store = hs.get_datastore()
presence_handler = hs.get_presence_handler()
self.current_token = store.get_current_presence_token # type: ignore
- self.update_function = presence_handler.get_all_presence_updates # type: ignore
+
+ if hs.config.worker_app is None:
+ self.update_function = presence_handler.get_all_presence_updates # type: ignore
super(PresenceStream, self).__init__(hs)
@@ -253,12 +275,15 @@ class PresenceStream(Stream):
class TypingStream(Stream):
NAME = "typing"
ROW_TYPE = TypingStreamRow
+ _QUERY_MASTER = True
def __init__(self, hs):
typing_handler = hs.get_typing_handler()
self.current_token = typing_handler.get_current_token # type: ignore
- self.update_function = typing_handler.get_all_typing_updates # type: ignore
+
+ if hs.config.worker_app is None:
+ self.update_function = typing_handler.get_all_typing_updates # type: ignore
super(TypingStream, self).__init__(hs)
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 615f3dc9ac..5d9e87188b 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,7 +15,9 @@
# limitations under the License.
from collections import namedtuple
-from ._base import Stream
+from twisted.internet import defer
+
+from synapse.replication.tcp.streams._base import Stream
FederationStreamRow = namedtuple(
"FederationStreamRow",
@@ -33,11 +35,18 @@ class FederationStream(Stream):
NAME = "federation"
ROW_TYPE = FederationStreamRow
+ _QUERY_MASTER = True
def __init__(self, hs):
- federation_sender = hs.get_federation_sender()
-
- self.current_token = federation_sender.get_current_token # type: ignore
- self.update_function = federation_sender.get_replication_rows # type: ignore
+ # Not all synapse instances will have a federation sender instance,
+ # whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
+ # so we stub the stream out when that is the case.
+ if hs.config.worker_app is None or hs.should_send_federation():
+ federation_sender = hs.get_federation_sender()
+ self.current_token = federation_sender.get_current_token # type: ignore
+ self.update_function = federation_sender.get_replication_rows # type: ignore
+ else:
+ self.current_token = lambda: 0 # type: ignore
+ self.update_function = lambda *args, **kwargs: defer.succeed([]) # type: ignore
super(FederationStream, self).__init__(hs)
|