summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7374.misc1
-rw-r--r--synapse/federation/send_queue.py40
-rw-r--r--synapse/federation/sender/__init__.py12
-rw-r--r--synapse/federation/sender/per_destination_queue.py6
-rw-r--r--synapse/federation/sender/transaction_manager.py6
-rw-r--r--synapse/replication/tcp/resource.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py3
-rw-r--r--synapse/replication/tcp/streams/federation.py30
-rw-r--r--tests/replication/tcp/streams/_base.py20
-rw-r--r--tests/replication/tcp/streams/test_federation.py81
-rw-r--r--tests/replication/tcp/streams/test_typing.py5
11 files changed, 158 insertions, 48 deletions
diff --git a/changelog.d/7374.misc b/changelog.d/7374.misc
new file mode 100644
index 0000000000..676f285377
--- /dev/null
+++ b/changelog.d/7374.misc
@@ -0,0 +1 @@
+Move catchup of replication streams logic to worker.
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index e1700ca8aa..6fbacf6a3e 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,6 +31,7 @@ Events are replicated via a separate events stream.
 
 import logging
 from collections import namedtuple
+from typing import List, Tuple
 
 from six import iteritems
 
@@ -69,7 +70,11 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
+        # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
         self.pos = 1
+
+        # map from stream ID to the time that stream entry was generated, so that we
+        # can clear out entries after a while
         self.pos_time = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
@@ -250,19 +255,23 @@ class FederationRemoteSendQueue(object):
         self._clear_queue_before_pos(token)
 
     async def get_replication_rows(
-        self, from_token, to_token, limit, federation_ack=None
-    ):
+        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
         """Get rows to be sent over federation between the two tokens
 
         Args:
-            from_token (int)
-            to_token(int)
-            limit (int)
-            federation_ack (int): Optional. The position where the worker is
-                explicitly acknowledged it has handled. Allows us to drop
-                data from before that point
+            instance_name: the name of the current process
+            from_token: the previous stream token: the starting point for fetching the
+                updates
+            to_token: the new stream token: the point to get updates up to
+            target_row_count: a target for the number of rows to be returned.
+
+        Returns: a triplet `(updates, new_last_token, limited)`, where:
+           * `updates` is a list of `(token, row)` entries.
+           * `new_last_token` is the new position in stream.
+           * `limited` is whether there are more updates to fetch.
         """
-        # TODO: Handle limit.
+        # TODO: Handle target_row_count.
 
         # To handle restarts where we wrap around
         if from_token > self.pos:
@@ -270,12 +279,7 @@ class FederationRemoteSendQueue(object):
 
         # list of tuple(int, BaseFederationRow), where the first is the position
         # of the federation stream.
-        rows = []
-
-        # There should be only one reader, so lets delete everything its
-        # acknowledged its seen.
-        if federation_ack:
-            self._clear_queue_before_pos(federation_ack)
+        rows = []  # type: List[Tuple[int, BaseFederationRow]]
 
         # Fetch changed presence
         i = self.presence_changed.bisect_right(from_token)
@@ -332,7 +336,11 @@ class FederationRemoteSendQueue(object):
         # Sort rows based on pos
         rows.sort()
 
-        return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
+        return (
+            [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
+            to_token,
+            False,
+        )
 
 
 class BaseFederationRow(object):
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index a477578e44..d473576902 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import Dict, Hashable, Iterable, List, Optional, Set
+from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
 from six import itervalues
 
@@ -498,14 +498,16 @@ class FederationSender(object):
 
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
-    def get_current_token(self) -> int:
+    @staticmethod
+    def get_current_token() -> int:
         # Dummy implementation for case where federation sender isn't offloaded
         # to a worker.
         return 0
 
+    @staticmethod
     async def get_replication_rows(
-        self, from_token, to_token, limit, federation_ack=None
-    ):
+        instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
         # Dummy implementation for case where federation sender isn't offloaded
         # to a worker.
-        return []
+        return [], 0, False
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index e13cd20ffa..276a2b596f 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,11 +15,10 @@
 # limitations under the License.
 import datetime
 import logging
-from typing import Dict, Hashable, Iterable, List, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
 
 from prometheus_client import Counter
 
-import synapse.server
 from synapse.api.errors import (
     FederationDeniedError,
     HttpResponseException,
@@ -34,6 +33,9 @@ from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
+if TYPE_CHECKING:
+    import synapse.server
+
 # This is defined in the Matrix spec and enforced by the receiver.
 MAX_EDUS_PER_TRANSACTION = 100
 
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 3c2a02a3b3..a2752a54a5 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -13,11 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import List
+from typing import TYPE_CHECKING, List
 
 from canonicaljson import json
 
-import synapse.server
 from synapse.api.errors import HttpResponseException
 from synapse.events import EventBase
 from synapse.federation.persistence import TransactionActions
@@ -31,6 +30,9 @@ from synapse.logging.opentracing import (
 )
 from synapse.util.metrics import measure_func
 
+if TYPE_CHECKING:
+    import synapse.server
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 33d2f589ac..b690abedad 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -80,7 +80,7 @@ class ReplicationStreamer(object):
             for stream in STREAMS_MAP.values():
                 if stream == FederationStream and hs.config.send_federation:
                     # We only support federation stream if federation sending
-                    # hase been disabled on the master.
+                    # has been disabled on the master.
                     continue
 
                 self.streams.append(stream(hs))
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index b0f87c365b..084604e8b0 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -104,7 +104,8 @@ class Stream(object):
         implemented by subclasses.
 
         current_token_function is called to get the current token of the underlying
-        stream.
+        stream. It is only meaningful on the process that is the source of the
+        replication stream (ie, usually the master).
 
         update_function is called to get updates for this stream between a pair of
         stream tokens. See the UpdateFunction type definition for more info.
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index e8bd52e389..b0505b8a2c 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 from collections import namedtuple
 
-from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
+from synapse.replication.tcp.streams._base import Stream, make_http_update_function
 
 
 class FederationStream(Stream):
@@ -35,21 +35,33 @@ class FederationStream(Stream):
     ROW_TYPE = FederationStreamRow
 
     def __init__(self, hs):
-        # Not all synapse instances will have a federation sender instance,
-        # whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
-        # so we stub the stream out when that is the case.
-        if hs.config.worker_app is None or hs.should_send_federation():
+        if hs.config.worker_app is None:
+            # master process: get updates from the FederationRemoteSendQueue.
+            # (if the master is configured to send federation itself, federation_sender
+            # will be a real FederationSender, which has stubs for current_token and
+            # get_replication_rows.)
             federation_sender = hs.get_federation_sender()
             current_token = federation_sender.get_current_token
-            update_function = db_query_to_update_function(
-                federation_sender.get_replication_rows
-            )
+            update_function = federation_sender.get_replication_rows
+
+        elif hs.should_send_federation():
+            # federation sender: Query master process
+            update_function = make_http_update_function(hs, self.NAME)
+            current_token = self._stub_current_token
+
         else:
-            current_token = lambda: 0
+            # other worker: stub out the update function (we're not interested in
+            # any updates so when we get a POSITION we do nothing)
             update_function = self._stub_update_function
+            current_token = self._stub_current_token
 
         super().__init__(hs.get_instance_name(), current_token, update_function)
 
     @staticmethod
+    def _stub_current_token():
+        # dummy current-token method for use on workers
+        return 0
+
+    @staticmethod
     async def _stub_update_function(instance_name, from_token, upto_token, limit):
         return [], upto_token, False
diff --git a/tests/replication/tcp/streams/_base.py b/tests/replication/tcp/streams/_base.py
index 7b56d2028d..9d4f0bbe44 100644
--- a/tests/replication/tcp/streams/_base.py
+++ b/tests/replication/tcp/streams/_base.py
@@ -27,6 +27,7 @@ from synapse.app.generic_worker import (
     GenericWorkerServer,
 )
 from synapse.http.site import SynapseRequest
+from synapse.replication.http import streams
 from synapse.replication.tcp.handler import ReplicationCommandHandler
 from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@@ -42,6 +43,10 @@ logger = logging.getLogger(__name__)
 class BaseStreamTestCase(unittest.HomeserverTestCase):
     """Base class for tests of the replication streams"""
 
+    servlets = [
+        streams.register_servlets,
+    ]
+
     def prepare(self, reactor, clock, hs):
         # build a replication server
         server_factory = ReplicationStreamProtocolFactory(hs)
@@ -49,17 +54,11 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
         self.server = server_factory.buildProtocol(None)
 
         # Make a new HomeServer object for the worker
-        config = self.default_config()
-        config["worker_app"] = "synapse.app.generic_worker"
-        config["worker_replication_host"] = "testserv"
-        config["worker_replication_http_port"] = "8765"
-
         self.reactor.lookups["testserv"] = "1.2.3.4"
-
         self.worker_hs = self.setup_test_homeserver(
             http_client=None,
             homeserverToUse=GenericWorkerServer,
-            config=config,
+            config=self._get_worker_hs_config(),
             reactor=self.reactor,
         )
 
@@ -78,6 +77,13 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
         self._client_transport = None
         self._server_transport = None
 
+    def _get_worker_hs_config(self) -> dict:
+        config = self.default_config()
+        config["worker_app"] = "synapse.app.generic_worker"
+        config["worker_replication_host"] = "testserv"
+        config["worker_replication_http_port"] = "8765"
+        return config
+
     def _build_replication_data_handler(self):
         return TestReplicationDataHandler(self.worker_hs)
 
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
new file mode 100644
index 0000000000..eea4565da3
--- /dev/null
+++ b/tests/replication/tcp/streams/test_federation.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.federation.send_queue import EduRow
+from synapse.replication.tcp.streams.federation import FederationStream
+
+from tests.replication.tcp.streams._base import BaseStreamTestCase
+
+
+class FederationStreamTestCase(BaseStreamTestCase):
+    def _get_worker_hs_config(self) -> dict:
+        # enable federation sending on the worker
+        config = super()._get_worker_hs_config()
+        # TODO: make it so we don't need both of these
+        config["send_federation"] = True
+        config["worker_app"] = "synapse.app.federation_sender"
+        return config
+
+    def test_catchup(self):
+        """Basic test of catchup on reconnect
+
+        Makes sure that updates sent while we are offline are received later.
+        """
+        fed_sender = self.hs.get_federation_sender()
+        received_rows = self.test_handler.received_rdata_rows
+
+        fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
+
+        self.reconnect()
+        self.reactor.advance(0)
+
+        # check we're testing what we think we are: no rows should yet have been
+        # received
+        self.assertEqual(received_rows, [])
+
+        # We should now see an attempt to connect to the master
+        request = self.handle_http_replication_attempt()
+        self.assert_request_is_get_repl_stream_updates(request, "federation")
+
+        # we should have received an update row
+        stream_name, token, row = received_rows.pop()
+        self.assertEqual(stream_name, "federation")
+        self.assertIsInstance(row, FederationStream.FederationStreamRow)
+        self.assertEqual(row.type, EduRow.TypeId)
+        edurow = EduRow.from_data(row.data)
+        self.assertEqual(edurow.edu.edu_type, "m.test_edu")
+        self.assertEqual(edurow.edu.origin, self.hs.hostname)
+        self.assertEqual(edurow.edu.destination, "testdest")
+        self.assertEqual(edurow.edu.content, {"a": "b"})
+
+        self.assertEqual(received_rows, [])
+
+        # additional updates should be transferred without an HTTP hit
+        fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
+        self.reactor.advance(0)
+        # there should be no http hit
+        self.assertEqual(len(self.reactor.tcpClients), 0)
+        # ... but we should have a row
+        self.assertEqual(len(received_rows), 1)
+
+        stream_name, token, row = received_rows.pop()
+        self.assertEqual(stream_name, "federation")
+        self.assertIsInstance(row, FederationStream.FederationStreamRow)
+        self.assertEqual(row.type, EduRow.TypeId)
+        edurow = EduRow.from_data(row.data)
+        self.assertEqual(edurow.edu.edu_type, "m.test1")
+        self.assertEqual(edurow.edu.origin, self.hs.hostname)
+        self.assertEqual(edurow.edu.destination, "testdest")
+        self.assertEqual(edurow.edu.content, {"c": "d"})
diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py
index d25a7b194e..125c63dab5 100644
--- a/tests/replication/tcp/streams/test_typing.py
+++ b/tests/replication/tcp/streams/test_typing.py
@@ -15,7 +15,6 @@
 from mock import Mock
 
 from synapse.handlers.typing import RoomMember
-from synapse.replication.http import streams
 from synapse.replication.tcp.streams import TypingStream
 
 from tests.replication.tcp.streams._base import BaseStreamTestCase
@@ -24,10 +23,6 @@ USER_ID = "@feeling:blue"
 
 
 class TypingStreamTestCase(BaseStreamTestCase):
-    servlets = [
-        streams.register_servlets,
-    ]
-
     def _build_replication_data_handler(self):
         return Mock(wraps=super()._build_replication_data_handler())