diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 3e993b428b..0c18c49abb 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,25 +31,39 @@ Events are replicated via a separate events stream.
import logging
from collections import namedtuple
-from typing import Dict, List, Tuple, Type
+from typing import (
+ TYPE_CHECKING,
+ Dict,
+ Hashable,
+ Iterable,
+ List,
+ Optional,
+ Sized,
+ Tuple,
+ Type,
+)
from sortedcontainers import SortedDict
-from twisted.internet import defer
-
from synapse.api.presence import UserPresenceState
+from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
+from synapse.replication.tcp.streams.federation import FederationStream
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure
from .units import Edu
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
-class FederationRemoteSendQueue:
+class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
@@ -58,7 +72,7 @@ class FederationRemoteSendQueue:
# We may have multiple federation sender instances, so we need to track
# their positions separately.
self._sender_instances = hs.config.worker.federation_shard_config.instances
- self._sender_positions = {}
+ self._sender_positions = {} # type: Dict[str, int]
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]
@@ -71,7 +85,7 @@ class FederationRemoteSendQueue:
# Stream position -> (user_id, destinations)
self.presence_destinations = (
SortedDict()
- ) # type: SortedDict[int, Tuple[str, List[str]]]
+ ) # type: SortedDict[int, Tuple[str, Iterable[str]]]
# (destination, key) -> EDU
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
@@ -94,7 +108,7 @@ class FederationRemoteSendQueue:
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
- def register(name, queue):
+ def register(name: str, queue: Sized) -> None:
LaterGauge(
"synapse_federation_send_queue_%s_size" % (queue_name,),
"",
@@ -115,13 +129,13 @@ class FederationRemoteSendQueue:
self.clock.looping_call(self._clear_queue, 30 * 1000)
- def _next_pos(self):
+ def _next_pos(self) -> int:
pos = self.pos
self.pos += 1
self.pos_time[self.clock.time_msec()] = pos
return pos
- def _clear_queue(self):
+ def _clear_queue(self) -> None:
"""Clear the queues for anything older than N minutes"""
FIVE_MINUTES_AGO = 5 * 60 * 1000
@@ -138,7 +152,7 @@ class FederationRemoteSendQueue:
self._clear_queue_before_pos(position_to_delete)
- def _clear_queue_before_pos(self, position_to_delete):
+ def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
@@ -188,13 +202,18 @@ class FederationRemoteSendQueue:
for key in keys[:i]:
del self.edus[key]
- def notify_new_events(self, max_token):
+ def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""As per FederationSender"""
- # We don't need to replicate this as it gets sent down a different
- # stream.
- pass
+ # This should never get called.
+ raise NotImplementedError()
- def build_and_send_edu(self, destination, edu_type, content, key=None):
+ def build_and_send_edu(
+ self,
+ destination: str,
+ edu_type: str,
+ content: JsonDict,
+ key: Optional[Hashable] = None,
+ ) -> None:
"""As per FederationSender"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
@@ -218,38 +237,39 @@ class FederationRemoteSendQueue:
self.notifier.on_new_replication_data()
- def send_read_receipt(self, receipt):
+ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""As per FederationSender
Args:
- receipt (synapse.types.ReadReceipt):
+ receipt:
"""
# nothing to do here: the replication listener will handle it.
- return defer.succeed(None)
- def send_presence(self, states):
+ def send_presence(self, states: List[UserPresenceState]) -> None:
"""As per FederationSender
Args:
- states (list(UserPresenceState))
+ states
"""
pos = self._next_pos()
# We only want to send presence for our own users, so lets always just
# filter here just in case.
- local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
+ local_states = [s for s in states if self.is_mine_id(s.user_id)]
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
- def send_presence_to_destinations(self, states, destinations):
+ def send_presence_to_destinations(
+ self, states: Iterable[UserPresenceState], destinations: Iterable[str]
+ ) -> None:
"""As per FederationSender
Args:
- states (list[UserPresenceState])
- destinations (list[str])
+ states
+ destinations
"""
for state in states:
pos = self._next_pos()
@@ -258,15 +278,18 @@ class FederationRemoteSendQueue:
self.notifier.on_new_replication_data()
- def send_device_messages(self, destination):
+ def send_device_messages(self, destination: str) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
- def get_current_token(self):
+ def wake_destination(self, server: str) -> None:
+ pass
+
+ def get_current_token(self) -> int:
return self.pos - 1
- def federation_ack(self, instance_name, token):
+ def federation_ack(self, instance_name: str, token: int) -> None:
if self._sender_instances:
# If we have configured multiple federation sender instances we need
# to track their positions separately, and only clear the queue up
@@ -504,13 +527,16 @@ ParsedFederationStreamData = namedtuple(
)
-def process_rows_for_federation(transaction_queue, rows):
+def process_rows_for_federation(
+ transaction_queue: FederationSender,
+ rows: List[FederationStream.FederationStreamRow],
+) -> None:
"""Parse a list of rows from the federation stream and put them in the
transaction queue ready for sending to the relevant homeservers.
Args:
- transaction_queue (FederationSender)
- rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
+ transaction_queue
+ rows
"""
# The federation stream contains a bunch of different types of
|