diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 454456a52d..52f4f54215 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,11 +31,14 @@ Events are replicated via a separate events stream.
import logging
from collections import namedtuple
+from typing import Dict, List, Tuple, Type
from six import iteritems
from sortedcontainers import SortedDict
+from twisted.internet import defer
+
from synapse.metrics import LaterGauge
from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
@@ -54,23 +57,35 @@ class FederationRemoteSendQueue(object):
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
- self.presence_map = {} # Pending presence map user_id -> UserPresenceState
- self.presence_changed = SortedDict() # Stream position -> list[user_id]
+ # Pending presence map user_id -> UserPresenceState
+ self.presence_map = {} # type: Dict[str, UserPresenceState]
+
+ # Stream position -> list[user_id]
+ self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]
# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
- self.presence_destinations = SortedDict()
+ self.presence_destinations = (
+ SortedDict()
+ ) # type: SortedDict[int, Tuple[str, List[str]]]
- self.keyed_edu = {} # (destination, key) -> EDU
- self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
+ # (destination, key) -> EDU
+ self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
- self.edus = SortedDict() # stream position -> Edu
+ # stream position -> (destination, key)
+ self.keyed_edu_changed = (
+ SortedDict()
+ ) # type: SortedDict[int, Tuple[str, tuple]]
- self.device_messages = SortedDict() # stream position -> destination
+ self.edus = SortedDict() # type: SortedDict[int, Edu]
+ # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
self.pos = 1
- self.pos_time = SortedDict()
+
+ # map from stream ID to the time that stream entry was generated, so that we
+ # can clear out entries after a while
+ self.pos_time = SortedDict() # type: SortedDict[int, int]
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
@@ -90,7 +105,6 @@ class FederationRemoteSendQueue(object):
"keyed_edu",
"keyed_edu_changed",
"edus",
- "device_messages",
"pos_time",
"presence_destinations",
]:
@@ -130,9 +144,9 @@ class FederationRemoteSendQueue(object):
for key in keys[:i]:
del self.presence_changed[key]
- user_ids = set(
+ user_ids = {
user_id for uids in self.presence_changed.values() for user_id in uids
- )
+ }
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
@@ -159,8 +173,10 @@ class FederationRemoteSendQueue(object):
for edu_key in self.keyed_edu_changed.values():
live_keys.add(edu_key)
- to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
- for edu_key in to_del:
+ keys_to_del = [
+ edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
+ ]
+ for edu_key in keys_to_del:
del self.keyed_edu[edu_key]
# Delete things out of edu map
@@ -169,12 +185,6 @@ class FederationRemoteSendQueue(object):
for key in keys[:i]:
del self.edus[key]
- # Delete things out of device map
- keys = self.device_messages.keys()
- i = self.device_messages.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.device_messages[key]
-
def notify_new_events(self, current_id):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
@@ -212,7 +222,7 @@ class FederationRemoteSendQueue(object):
receipt (synapse.types.ReadReceipt):
"""
# nothing to do here: the replication listener will handle it.
- pass
+ return defer.succeed(None)
def send_presence(self, states):
"""As per FederationSender
@@ -247,9 +257,8 @@ class FederationRemoteSendQueue(object):
def send_device_messages(self, destination):
"""As per FederationSender"""
- pos = self._next_pos()
- self.device_messages[pos] = destination
- self.notifier.on_new_replication_data()
+ # We don't need to replicate this as it gets sent down a different
+ # stream.
def get_current_token(self):
return self.pos - 1
@@ -257,18 +266,24 @@ class FederationRemoteSendQueue(object):
def federation_ack(self, token):
self._clear_queue_before_pos(token)
- def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
+ async def get_replication_rows(
+ self, instance_name: str, from_token: int, to_token: int, target_row_count: int
+ ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
"""Get rows to be sent over federation between the two tokens
Args:
- from_token (int)
- to_token(int)
- limit (int)
- federation_ack (int): Optional. The position where the worker is
- explicitly acknowledged it has handled. Allows us to drop
- data from before that point
+ instance_name: the name of the current process
+ from_token: the previous stream token: the starting point for fetching the
+ updates
+ to_token: the new stream token: the point to get updates up to
+ target_row_count: a target for the number of rows to be returned.
+
+ Returns: a triplet `(updates, new_last_token, limited)`, where:
+ * `updates` is a list of `(token, row)` entries.
+ * `new_last_token` is the new position in stream.
+ * `limited` is whether there are more updates to fetch.
"""
- # TODO: Handle limit.
+ # TODO: Handle target_row_count.
# To handle restarts where we wrap around
if from_token > self.pos:
@@ -276,12 +291,7 @@ class FederationRemoteSendQueue(object):
# list of tuple(int, BaseFederationRow), where the first is the position
# of the federation stream.
- rows = []
-
- # There should be only one reader, so lets delete everything its
- # acknowledged its seen.
- if federation_ack:
- self._clear_queue_before_pos(federation_ack)
+ rows = [] # type: List[Tuple[int, BaseFederationRow]]
# Fetch changed presence
i = self.presence_changed.bisect_right(from_token)
@@ -335,18 +345,14 @@ class FederationRemoteSendQueue(object):
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
- # Fetch changed device messages
- i = self.device_messages.bisect_right(from_token)
- j = self.device_messages.bisect_right(to_token) + 1
- device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
-
- for (destination, pos) in iteritems(device_messages):
- rows.append((pos, DeviceRow(destination=destination)))
-
# Sort rows based on pos
rows.sort()
- return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
+ return (
+ [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
+ to_token,
+ False,
+ )
class BaseFederationRow(object):
@@ -355,7 +361,7 @@ class BaseFederationRow(object):
Specifies how to identify, serialize and deserialize the different types.
"""
- TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
+ TypeId = "" # Unique string that ids the type. Must be overriden in sub classes.
@staticmethod
def from_data(data):
@@ -468,29 +474,14 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))): # str
- """Streams the fact that either a) there is pending to device messages for
- users on the remote, or b) a local users device has changed and needs to
- be sent to the remote.
- """
+_rowtypes = (
+ PresenceRow,
+ PresenceDestinationsRow,
+ KeyedEduRow,
+ EduRow,
+) # type: Tuple[Type[BaseFederationRow], ...]
- TypeId = "d"
-
- @staticmethod
- def from_data(data):
- return DeviceRow(destination=data["destination"])
-
- def to_data(self):
- return {"destination": self.destination}
-
- def add_to_buffer(self, buff):
- buff.device_destinations.add(self.destination)
-
-
-TypeToRow = {
- Row.TypeId: Row
- for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
-}
+TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
ParsedFederationStreamData = namedtuple(
@@ -500,7 +491,6 @@ ParsedFederationStreamData = namedtuple(
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
- "device_destinations", # set of destinations
),
)
@@ -511,7 +501,7 @@ def process_rows_for_federation(transaction_queue, rows):
Args:
transaction_queue (FederationSender)
- rows (list(synapse.replication.tcp.streams.FederationStreamRow))
+ rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
"""
# The federation stream contains a bunch of different types of
@@ -519,11 +509,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
- presence=[],
- presence_destinations=[],
- keyed_edus={},
- edus={},
- device_destinations=set(),
+ presence=[], presence_destinations=[], keyed_edus={}, edus={},
)
# Parse the rows in the stream and add to the buffer
@@ -551,6 +537,3 @@ def process_rows_for_federation(transaction_queue, rows):
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(edu, None)
-
- for destination in buff.device_destinations:
- transaction_queue.send_device_messages(destination)
|