summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-05-05 14:15:57 +0100
committerGitHub <noreply@github.com>2020-05-05 14:15:57 +0100
commitd5aa7d93ed1f7963524125d16ab640ebf6cb91c2 (patch)
tree20e85df13577cdd24effdc9038f3a2357a584dfc /synapse/federation
parentAdd MultiWriterIdGenerator. (#7281) (diff)
downloadsynapse-d5aa7d93ed1f7963524125d16ab640ebf6cb91c2.tar.xz
Fix catchup-on-reconnect for the Federation Stream (#7374)
looks like we managed to break this during the refactorathon.
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/send_queue.py40
-rw-r--r--synapse/federation/sender/__init__.py12
-rw-r--r--synapse/federation/sender/per_destination_queue.py6
-rw-r--r--synapse/federation/sender/transaction_manager.py6
4 files changed, 39 insertions, 25 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index e1700ca8aa..6fbacf6a3e 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,6 +31,7 @@ Events are replicated via a separate events stream.
 
 import logging
 from collections import namedtuple
+from typing import List, Tuple
 
 from six import iteritems
 
@@ -69,7 +70,11 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
+        # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
         self.pos = 1
+
+        # map from stream ID to the time that stream entry was generated, so that we
+        # can clear out entries after a while
         self.pos_time = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
@@ -250,19 +255,23 @@ class FederationRemoteSendQueue(object):
         self._clear_queue_before_pos(token)
 
     async def get_replication_rows(
-        self, from_token, to_token, limit, federation_ack=None
-    ):
+        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
         """Get rows to be sent over federation between the two tokens
 
         Args:
-            from_token (int)
-            to_token(int)
-            limit (int)
-            federation_ack (int): Optional. The position where the worker is
-                explicitly acknowledged it has handled. Allows us to drop
-                data from before that point
+            instance_name: the name of the current process
+            from_token: the previous stream token: the starting point for fetching the
+                updates
+            to_token: the new stream token: the point to get updates up to
+            target_row_count: a target for the number of rows to be returned.
+
+        Returns: a triplet `(updates, new_last_token, limited)`, where:
+           * `updates` is a list of `(token, row)` entries.
+           * `new_last_token` is the new position in stream.
+           * `limited` is whether there are more updates to fetch.
         """
-        # TODO: Handle limit.
+        # TODO: Handle target_row_count.
 
         # To handle restarts where we wrap around
         if from_token > self.pos:
@@ -270,12 +279,7 @@ class FederationRemoteSendQueue(object):
 
         # list of tuple(int, BaseFederationRow), where the first is the position
         # of the federation stream.
-        rows = []
-
-        # There should be only one reader, so lets delete everything its
-        # acknowledged its seen.
-        if federation_ack:
-            self._clear_queue_before_pos(federation_ack)
+        rows = []  # type: List[Tuple[int, BaseFederationRow]]
 
         # Fetch changed presence
         i = self.presence_changed.bisect_right(from_token)
@@ -332,7 +336,11 @@ class FederationRemoteSendQueue(object):
         # Sort rows based on pos
         rows.sort()
 
-        return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
+        return (
+            [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
+            to_token,
+            False,
+        )
 
 
 class BaseFederationRow(object):
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index a477578e44..d473576902 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import Dict, Hashable, Iterable, List, Optional, Set
+from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
 from six import itervalues
 
@@ -498,14 +498,16 @@ class FederationSender(object):
 
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
-    def get_current_token(self) -> int:
+    @staticmethod
+    def get_current_token() -> int:
         # Dummy implementation for case where federation sender isn't offloaded
         # to a worker.
         return 0
 
+    @staticmethod
     async def get_replication_rows(
-        self, from_token, to_token, limit, federation_ack=None
-    ):
+        instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
         # Dummy implementation for case where federation sender isn't offloaded
         # to a worker.
-        return []
+        return [], 0, False
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index e13cd20ffa..276a2b596f 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,11 +15,10 @@
 # limitations under the License.
 import datetime
 import logging
-from typing import Dict, Hashable, Iterable, List, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
 
 from prometheus_client import Counter
 
-import synapse.server
 from synapse.api.errors import (
     FederationDeniedError,
     HttpResponseException,
@@ -34,6 +33,9 @@ from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
+if TYPE_CHECKING:
+    import synapse.server
+
 # This is defined in the Matrix spec and enforced by the receiver.
 MAX_EDUS_PER_TRANSACTION = 100
 
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 3c2a02a3b3..a2752a54a5 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -13,11 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import List
+from typing import TYPE_CHECKING, List
 
 from canonicaljson import json
 
-import synapse.server
 from synapse.api.errors import HttpResponseException
 from synapse.events import EventBase
 from synapse.federation.persistence import TransactionActions
@@ -31,6 +30,9 @@ from synapse.logging.opentracing import (
 )
 from synapse.util.metrics import measure_func
 
+if TYPE_CHECKING:
+    import synapse.server
+
 logger = logging.getLogger(__name__)