summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-05-05 14:15:57 +0100
committerGitHub <noreply@github.com>2020-05-05 14:15:57 +0100
commitd5aa7d93ed1f7963524125d16ab640ebf6cb91c2 (patch)
tree20e85df13577cdd24effdc9038f3a2357a584dfc /synapse
parentAdd MultiWriterIdGenerator. (#7281) (diff)
downloadsynapse-d5aa7d93ed1f7963524125d16ab640ebf6cb91c2.tar.xz
Fix catchup-on-reconnect for the Federation Stream (#7374)
looks like we managed to break this during the refactorathon.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/send_queue.py40
-rw-r--r--synapse/federation/sender/__init__.py12
-rw-r--r--synapse/federation/sender/per_destination_queue.py6
-rw-r--r--synapse/federation/sender/transaction_manager.py6
-rw-r--r--synapse/replication/tcp/resource.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py3
-rw-r--r--synapse/replication/tcp/streams/federation.py30
7 files changed, 63 insertions, 36 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index e1700ca8aa..6fbacf6a3e 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,6 +31,7 @@ Events are replicated via a separate events stream.
 
 import logging
 from collections import namedtuple
+from typing import List, Tuple
 
 from six import iteritems
 
@@ -69,7 +70,11 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
+        # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
         self.pos = 1
+
+        # map from stream ID to the time that stream entry was generated, so that we
+        # can clear out entries after a while
         self.pos_time = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
@@ -250,19 +255,23 @@ class FederationRemoteSendQueue(object):
         self._clear_queue_before_pos(token)
 
     async def get_replication_rows(
-        self, from_token, to_token, limit, federation_ack=None
-    ):
+        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
         """Get rows to be sent over federation between the two tokens
 
         Args:
-            from_token (int)
-            to_token(int)
-            limit (int)
-            federation_ack (int): Optional. The position where the worker is
-                explicitly acknowledged it has handled. Allows us to drop
-                data from before that point
+            instance_name: the name of the current process
+            from_token: the previous stream token: the starting point for fetching the
+                updates
+            to_token: the new stream token: the point to get updates up to
+            target_row_count: a target for the number of rows to be returned.
+
+        Returns: a triplet `(updates, new_last_token, limited)`, where:
+           * `updates` is a list of `(token, row)` entries.
+           * `new_last_token` is the new position in stream.
+           * `limited` is whether there are more updates to fetch.
         """
-        # TODO: Handle limit.
+        # TODO: Handle target_row_count.
 
         # To handle restarts where we wrap around
         if from_token > self.pos:
@@ -270,12 +279,7 @@ class FederationRemoteSendQueue(object):
 
         # list of tuple(int, BaseFederationRow), where the first is the position
         # of the federation stream.
-        rows = []
-
-        # There should be only one reader, so lets delete everything its
-        # acknowledged its seen.
-        if federation_ack:
-            self._clear_queue_before_pos(federation_ack)
+        rows = []  # type: List[Tuple[int, BaseFederationRow]]
 
         # Fetch changed presence
         i = self.presence_changed.bisect_right(from_token)
@@ -332,7 +336,11 @@ class FederationRemoteSendQueue(object):
         # Sort rows based on pos
         rows.sort()
 
-        return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
+        return (
+            [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
+            to_token,
+            False,
+        )
 
 
 class BaseFederationRow(object):
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index a477578e44..d473576902 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import Dict, Hashable, Iterable, List, Optional, Set
+from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
 from six import itervalues
 
@@ -498,14 +498,16 @@ class FederationSender(object):
 
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
-    def get_current_token(self) -> int:
+    @staticmethod
+    def get_current_token() -> int:
         # Dummy implementation for case where federation sender isn't offloaded
         # to a worker.
         return 0
 
+    @staticmethod
     async def get_replication_rows(
-        self, from_token, to_token, limit, federation_ack=None
-    ):
+        instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
         # Dummy implementation for case where federation sender isn't offloaded
         # to a worker.
-        return []
+        return [], 0, False
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index e13cd20ffa..276a2b596f 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,11 +15,10 @@
 # limitations under the License.
 import datetime
 import logging
-from typing import Dict, Hashable, Iterable, List, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
 
 from prometheus_client import Counter
 
-import synapse.server
 from synapse.api.errors import (
     FederationDeniedError,
     HttpResponseException,
@@ -34,6 +33,9 @@ from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
+if TYPE_CHECKING:
+    import synapse.server
+
 # This is defined in the Matrix spec and enforced by the receiver.
 MAX_EDUS_PER_TRANSACTION = 100
 
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 3c2a02a3b3..a2752a54a5 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -13,11 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import List
+from typing import TYPE_CHECKING, List
 
 from canonicaljson import json
 
-import synapse.server
 from synapse.api.errors import HttpResponseException
 from synapse.events import EventBase
 from synapse.federation.persistence import TransactionActions
@@ -31,6 +30,9 @@ from synapse.logging.opentracing import (
 )
 from synapse.util.metrics import measure_func
 
+if TYPE_CHECKING:
+    import synapse.server
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 33d2f589ac..b690abedad 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -80,7 +80,7 @@ class ReplicationStreamer(object):
             for stream in STREAMS_MAP.values():
                 if stream == FederationStream and hs.config.send_federation:
                     # We only support federation stream if federation sending
-                    # hase been disabled on the master.
+                    # has been disabled on the master.
                     continue
 
                 self.streams.append(stream(hs))
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index b0f87c365b..084604e8b0 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -104,7 +104,8 @@ class Stream(object):
         implemented by subclasses.
 
         current_token_function is called to get the current token of the underlying
-        stream.
+        stream. It is only meaningful on the process that is the source of the
+        replication stream (ie, usually the master).
 
         update_function is called to get updates for this stream between a pair of
         stream tokens. See the UpdateFunction type definition for more info.
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index e8bd52e389..b0505b8a2c 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 from collections import namedtuple
 
-from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
+from synapse.replication.tcp.streams._base import Stream, make_http_update_function
 
 
 class FederationStream(Stream):
@@ -35,21 +35,33 @@ class FederationStream(Stream):
     ROW_TYPE = FederationStreamRow
 
     def __init__(self, hs):
-        # Not all synapse instances will have a federation sender instance,
-        # whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
-        # so we stub the stream out when that is the case.
-        if hs.config.worker_app is None or hs.should_send_federation():
+        if hs.config.worker_app is None:
+            # master process: get updates from the FederationRemoteSendQueue.
+            # (if the master is configured to send federation itself, federation_sender
+            # will be a real FederationSender, which has stubs for current_token and
+            # get_replication_rows.)
             federation_sender = hs.get_federation_sender()
             current_token = federation_sender.get_current_token
-            update_function = db_query_to_update_function(
-                federation_sender.get_replication_rows
-            )
+            update_function = federation_sender.get_replication_rows
+
+        elif hs.should_send_federation():
+            # federation sender: Query master process
+            update_function = make_http_update_function(hs, self.NAME)
+            current_token = self._stub_current_token
+
         else:
-            current_token = lambda: 0
+            # other worker: stub out the update function (we're not interested in
+            # any updates so when we get a POSITION we do nothing)
             update_function = self._stub_update_function
+            current_token = self._stub_current_token
 
         super().__init__(hs.get_instance_name(), current_token, update_function)
 
     @staticmethod
+    def _stub_current_token():
+        # dummy current-token method for use on workers
+        return 0
+
+    @staticmethod
     async def _stub_update_function(instance_name, from_token, upto_token, limit):
         return [], upto_token, False