summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py141
1 files changed, 62 insertions, 79 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 454456a52d..52f4f54215 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,11 +31,14 @@ Events are replicated via a separate events stream.
 
 import logging
 from collections import namedtuple
+from typing import Dict, List, Tuple, Type
 
 from six import iteritems
 
 from sortedcontainers import SortedDict
 
+from twisted.internet import defer
+
 from synapse.metrics import LaterGauge
 from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
@@ -54,23 +57,35 @@ class FederationRemoteSendQueue(object):
         self.notifier = hs.get_notifier()
         self.is_mine_id = hs.is_mine_id
 
-        self.presence_map = {}  # Pending presence map user_id -> UserPresenceState
-        self.presence_changed = SortedDict()  # Stream position -> list[user_id]
+        # Pending presence map user_id -> UserPresenceState
+        self.presence_map = {}  # type: Dict[str, UserPresenceState]
+
+        # Stream position -> list[user_id]
+        self.presence_changed = SortedDict()  # type: SortedDict[int, List[str]]
 
         # Stores the destinations we need to explicitly send presence to about a
         # given user.
         # Stream position -> (user_id, destinations)
-        self.presence_destinations = SortedDict()
+        self.presence_destinations = (
+            SortedDict()
+        )  # type: SortedDict[int, Tuple[str, List[str]]]
 
-        self.keyed_edu = {}  # (destination, key) -> EDU
-        self.keyed_edu_changed = SortedDict()  # stream position -> (destination, key)
+        # (destination, key) -> EDU
+        self.keyed_edu = {}  # type: Dict[Tuple[str, tuple], Edu]
 
-        self.edus = SortedDict()  # stream position -> Edu
+        # stream position -> (destination, key)
+        self.keyed_edu_changed = (
+            SortedDict()
+        )  # type: SortedDict[int, Tuple[str, tuple]]
 
-        self.device_messages = SortedDict()  # stream position -> destination
+        self.edus = SortedDict()  # type: SortedDict[int, Edu]
 
+        # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
         self.pos = 1
-        self.pos_time = SortedDict()
+
+        # map from stream ID to the time that stream entry was generated, so that we
+        # can clear out entries after a while
+        self.pos_time = SortedDict()  # type: SortedDict[int, int]
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
@@ -90,7 +105,6 @@ class FederationRemoteSendQueue(object):
             "keyed_edu",
             "keyed_edu_changed",
             "edus",
-            "device_messages",
             "pos_time",
             "presence_destinations",
         ]:
@@ -130,9 +144,9 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.presence_changed[key]
 
-            user_ids = set(
+            user_ids = {
                 user_id for uids in self.presence_changed.values() for user_id in uids
-            )
+            }
 
             keys = self.presence_destinations.keys()
             i = self.presence_destinations.bisect_left(position_to_delete)
@@ -159,8 +173,10 @@ class FederationRemoteSendQueue(object):
             for edu_key in self.keyed_edu_changed.values():
                 live_keys.add(edu_key)
 
-            to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
-            for edu_key in to_del:
+            keys_to_del = [
+                edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
+            ]
+            for edu_key in keys_to_del:
                 del self.keyed_edu[edu_key]
 
             # Delete things out of edu map
@@ -169,12 +185,6 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.edus[key]
 
-            # Delete things out of device map
-            keys = self.device_messages.keys()
-            i = self.device_messages.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.device_messages[key]
-
     def notify_new_events(self, current_id):
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
@@ -212,7 +222,7 @@ class FederationRemoteSendQueue(object):
             receipt (synapse.types.ReadReceipt):
         """
         # nothing to do here: the replication listener will handle it.
-        pass
+        return defer.succeed(None)
 
     def send_presence(self, states):
         """As per FederationSender
@@ -247,9 +257,8 @@ class FederationRemoteSendQueue(object):
 
     def send_device_messages(self, destination):
         """As per FederationSender"""
-        pos = self._next_pos()
-        self.device_messages[pos] = destination
-        self.notifier.on_new_replication_data()
+        # We don't need to replicate this as it gets sent down a different
+        # stream.
 
     def get_current_token(self):
         return self.pos - 1
@@ -257,18 +266,24 @@ class FederationRemoteSendQueue(object):
     def federation_ack(self, token):
         self._clear_queue_before_pos(token)
 
-    def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
+    async def get_replication_rows(
+        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
         """Get rows to be sent over federation between the two tokens
 
         Args:
-            from_token (int)
-            to_token(int)
-            limit (int)
-            federation_ack (int): Optional. The position where the worker is
-                explicitly acknowledged it has handled. Allows us to drop
-                data from before that point
+            instance_name: the name of the current process
+            from_token: the previous stream token: the starting point for fetching the
+                updates
+            to_token: the new stream token: the point to get updates up to
+            target_row_count: a target for the number of rows to be returned.
+
+        Returns: a triplet `(updates, new_last_token, limited)`, where:
+           * `updates` is a list of `(token, row)` entries.
+           * `new_last_token` is the new position in stream.
+           * `limited` is whether there are more updates to fetch.
         """
-        # TODO: Handle limit.
+        # TODO: Handle target_row_count.
 
         # To handle restarts where we wrap around
         if from_token > self.pos:
@@ -276,12 +291,7 @@ class FederationRemoteSendQueue(object):
 
         # list of tuple(int, BaseFederationRow), where the first is the position
         # of the federation stream.
-        rows = []
-
-        # There should be only one reader, so lets delete everything its
-        # acknowledged its seen.
-        if federation_ack:
-            self._clear_queue_before_pos(federation_ack)
+        rows = []  # type: List[Tuple[int, BaseFederationRow]]
 
         # Fetch changed presence
         i = self.presence_changed.bisect_right(from_token)
@@ -335,18 +345,14 @@ class FederationRemoteSendQueue(object):
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
-        # Fetch changed device messages
-        i = self.device_messages.bisect_right(from_token)
-        j = self.device_messages.bisect_right(to_token) + 1
-        device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
-
-        for (destination, pos) in iteritems(device_messages):
-            rows.append((pos, DeviceRow(destination=destination)))
-
         # Sort rows based on pos
         rows.sort()
 
-        return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
+        return (
+            [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
+            to_token,
+            False,
+        )
 
 
 class BaseFederationRow(object):
@@ -355,7 +361,7 @@ class BaseFederationRow(object):
     Specifies how to identify, serialize and deserialize the different types.
     """
 
-    TypeId = None  # Unique string that ids the type. Must be overriden in sub classes.
+    TypeId = ""  # Unique string that ids the type. Must be overriden in sub classes.
 
     @staticmethod
     def from_data(data):
@@ -468,29 +474,14 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))):  # str
-    """Streams the fact that either a) there is pending to device messages for
-    users on the remote, or b) a local users device has changed and needs to
-    be sent to the remote.
-    """
+_rowtypes = (
+    PresenceRow,
+    PresenceDestinationsRow,
+    KeyedEduRow,
+    EduRow,
+)  # type: Tuple[Type[BaseFederationRow], ...]
 
-    TypeId = "d"
-
-    @staticmethod
-    def from_data(data):
-        return DeviceRow(destination=data["destination"])
-
-    def to_data(self):
-        return {"destination": self.destination}
-
-    def add_to_buffer(self, buff):
-        buff.device_destinations.add(self.destination)
-
-
-TypeToRow = {
-    Row.TypeId: Row
-    for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
-}
+TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
 
 
 ParsedFederationStreamData = namedtuple(
@@ -500,7 +491,6 @@ ParsedFederationStreamData = namedtuple(
         "presence_destinations",  # list of tuples of UserPresenceState and destinations
         "keyed_edus",  # dict of destination -> { key -> Edu }
         "edus",  # dict of destination -> [Edu]
-        "device_destinations",  # set of destinations
     ),
 )
 
@@ -511,7 +501,7 @@ def process_rows_for_federation(transaction_queue, rows):
 
     Args:
         transaction_queue (FederationSender)
-        rows (list(synapse.replication.tcp.streams.FederationStreamRow))
+        rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
     """
 
     # The federation stream contains a bunch of different types of
@@ -519,11 +509,7 @@ def process_rows_for_federation(transaction_queue, rows):
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence=[],
-        presence_destinations=[],
-        keyed_edus={},
-        edus={},
-        device_destinations=set(),
+        presence=[], presence_destinations=[], keyed_edus={}, edus={},
     )
 
     # Parse the rows in the stream and add to the buffer
@@ -551,6 +537,3 @@ def process_rows_for_federation(transaction_queue, rows):
     for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(edu, None)
-
-    for destination in buff.device_destinations:
-        transaction_queue.send_device_messages(destination)