summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/generic_worker.py7
-rw-r--r--synapse/federation/send_queue.py88
-rw-r--r--synapse/federation/sender/__init__.py116
-rw-r--r--synapse/replication/tcp/commands.py6
-rw-r--r--synapse/replication/tcp/streams/federation.py14
-rw-r--r--synapse/server.py4
6 files changed, 176 insertions, 59 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 6139881dbb..3df2aa5c2b 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -787,13 +787,6 @@ class FederationSenderHandler:
 
         self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
 
-    def on_start(self):
-        # There may be some events that are persisted but haven't been sent,
-        # so send them now.
-        self.federation_sender.notify_new_events(
-            self.store.get_room_max_stream_ordering()
-        )
-
     def wake_destination(self, server: str):
         self.federation_sender.wake_destination(server)
 
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 3e993b428b..0c18c49abb 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,25 +31,39 @@ Events are replicated via a separate events stream.
 
 import logging
 from collections import namedtuple
-from typing import Dict, List, Tuple, Type
+from typing import (
+    TYPE_CHECKING,
+    Dict,
+    Hashable,
+    Iterable,
+    List,
+    Optional,
+    Sized,
+    Tuple,
+    Type,
+)
 
 from sortedcontainers import SortedDict
 
-from twisted.internet import defer
-
 from synapse.api.presence import UserPresenceState
+from synapse.federation.sender import AbstractFederationSender, FederationSender
 from synapse.metrics import LaterGauge
+from synapse.replication.tcp.streams.federation import FederationStream
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
 from synapse.util.metrics import Measure
 
 from .units import Edu
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
-class FederationRemoteSendQueue:
+class FederationRemoteSendQueue(AbstractFederationSender):
     """A drop in replacement for FederationSender"""
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
@@ -58,7 +72,7 @@ class FederationRemoteSendQueue:
         # We may have multiple federation sender instances, so we need to track
         # their positions separately.
         self._sender_instances = hs.config.worker.federation_shard_config.instances
-        self._sender_positions = {}
+        self._sender_positions = {}  # type: Dict[str, int]
 
         # Pending presence map user_id -> UserPresenceState
         self.presence_map = {}  # type: Dict[str, UserPresenceState]
@@ -71,7 +85,7 @@ class FederationRemoteSendQueue:
         # Stream position -> (user_id, destinations)
         self.presence_destinations = (
             SortedDict()
-        )  # type: SortedDict[int, Tuple[str, List[str]]]
+        )  # type: SortedDict[int, Tuple[str, Iterable[str]]]
 
         # (destination, key) -> EDU
         self.keyed_edu = {}  # type: Dict[Tuple[str, tuple], Edu]
@@ -94,7 +108,7 @@ class FederationRemoteSendQueue:
         # we make a new function, so we need to make a new function so the inner
         # lambda binds to the queue rather than to the name of the queue which
         # changes. ARGH.
-        def register(name, queue):
+        def register(name: str, queue: Sized) -> None:
             LaterGauge(
                 "synapse_federation_send_queue_%s_size" % (queue_name,),
                 "",
@@ -115,13 +129,13 @@ class FederationRemoteSendQueue:
 
         self.clock.looping_call(self._clear_queue, 30 * 1000)
 
-    def _next_pos(self):
+    def _next_pos(self) -> int:
         pos = self.pos
         self.pos += 1
         self.pos_time[self.clock.time_msec()] = pos
         return pos
 
-    def _clear_queue(self):
+    def _clear_queue(self) -> None:
         """Clear the queues for anything older than N minutes"""
 
         FIVE_MINUTES_AGO = 5 * 60 * 1000
@@ -138,7 +152,7 @@ class FederationRemoteSendQueue:
 
         self._clear_queue_before_pos(position_to_delete)
 
-    def _clear_queue_before_pos(self, position_to_delete):
+    def _clear_queue_before_pos(self, position_to_delete: int) -> None:
         """Clear all the queues from before a given position"""
         with Measure(self.clock, "send_queue._clear"):
             # Delete things out of presence maps
@@ -188,13 +202,18 @@ class FederationRemoteSendQueue:
             for key in keys[:i]:
                 del self.edus[key]
 
-    def notify_new_events(self, max_token):
+    def notify_new_events(self, max_token: RoomStreamToken) -> None:
         """As per FederationSender"""
-        # We don't need to replicate this as it gets sent down a different
-        # stream.
-        pass
+        # This should never get called.
+        raise NotImplementedError()
 
-    def build_and_send_edu(self, destination, edu_type, content, key=None):
+    def build_and_send_edu(
+        self,
+        destination: str,
+        edu_type: str,
+        content: JsonDict,
+        key: Optional[Hashable] = None,
+    ) -> None:
         """As per FederationSender"""
         if destination == self.server_name:
             logger.info("Not sending EDU to ourselves")
@@ -218,38 +237,39 @@ class FederationRemoteSendQueue:
 
         self.notifier.on_new_replication_data()
 
-    def send_read_receipt(self, receipt):
+    async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """As per FederationSender
 
         Args:
-            receipt (synapse.types.ReadReceipt):
+            receipt:
         """
         # nothing to do here: the replication listener will handle it.
-        return defer.succeed(None)
 
-    def send_presence(self, states):
+    def send_presence(self, states: List[UserPresenceState]) -> None:
         """As per FederationSender
 
         Args:
-            states (list(UserPresenceState))
+            states
         """
         pos = self._next_pos()
 
         # We only want to send presence for our own users, so lets always just
         # filter here just in case.
-        local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
+        local_states = [s for s in states if self.is_mine_id(s.user_id)]
 
         self.presence_map.update({state.user_id: state for state in local_states})
         self.presence_changed[pos] = [state.user_id for state in local_states]
 
         self.notifier.on_new_replication_data()
 
-    def send_presence_to_destinations(self, states, destinations):
+    def send_presence_to_destinations(
+        self, states: Iterable[UserPresenceState], destinations: Iterable[str]
+    ) -> None:
         """As per FederationSender
 
         Args:
-            states (list[UserPresenceState])
-            destinations (list[str])
+            states
+            destinations
         """
         for state in states:
             pos = self._next_pos()
@@ -258,15 +278,18 @@ class FederationRemoteSendQueue:
 
         self.notifier.on_new_replication_data()
 
-    def send_device_messages(self, destination):
+    def send_device_messages(self, destination: str) -> None:
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
         # stream.
 
-    def get_current_token(self):
+    def wake_destination(self, server: str) -> None:
+        pass
+
+    def get_current_token(self) -> int:
         return self.pos - 1
 
-    def federation_ack(self, instance_name, token):
+    def federation_ack(self, instance_name: str, token: int) -> None:
         if self._sender_instances:
             # If we have configured multiple federation sender instances we need
             # to track their positions separately, and only clear the queue up
@@ -504,13 +527,16 @@ ParsedFederationStreamData = namedtuple(
 )
 
 
-def process_rows_for_federation(transaction_queue, rows):
+def process_rows_for_federation(
+    transaction_queue: FederationSender,
+    rows: List[FederationStream.FederationStreamRow],
+) -> None:
     """Parse a list of rows from the federation stream and put them in the
     transaction queue ready for sending to the relevant homeservers.
 
     Args:
-        transaction_queue (FederationSender)
-        rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
+        transaction_queue
+        rows
     """
 
     # The federation stream contains a bunch of different types of
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 24ebc4b803..8babb1ebbe 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -13,14 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import abc
 import logging
-from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
 from prometheus_client import Counter
 
 from twisted.internet import defer
 
-import synapse
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
@@ -40,9 +40,12 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
 from synapse.util.metrics import Measure, measure_func
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 sent_pdus_destination_dist_count = Counter(
@@ -65,8 +68,91 @@ CATCH_UP_STARTUP_DELAY_SEC = 15
 CATCH_UP_STARTUP_INTERVAL_SEC = 5
 
 
-class FederationSender:
-    def __init__(self, hs: "synapse.server.HomeServer"):
+class AbstractFederationSender(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def notify_new_events(self, max_token: RoomStreamToken) -> None:
+        """This gets called when we have some new events we might want to
+        send out to other servers.
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    async def send_read_receipt(self, receipt: ReadReceipt) -> None:
+        """Send a RR to any other servers in the room
+
+        Args:
+            receipt: receipt to be sent
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def send_presence(self, states: List[UserPresenceState]) -> None:
+        """Send the new presence states to the appropriate destinations.
+
+        This actually queues up the presence states ready for sending and
+        triggers a background task to process them and send out the transactions.
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def send_presence_to_destinations(
+        self, states: Iterable[UserPresenceState], destinations: Iterable[str]
+    ) -> None:
+        """Send the given presence states to the given destinations.
+
+        Args:
+            destinations:
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def build_and_send_edu(
+        self,
+        destination: str,
+        edu_type: str,
+        content: JsonDict,
+        key: Optional[Hashable] = None,
+    ) -> None:
+        """Construct an Edu object, and queue it for sending
+
+        Args:
+            destination: name of server to send to
+            edu_type: type of EDU to send
+            content: content of EDU
+            key: clobbering key for this edu
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def send_device_messages(self, destination: str) -> None:
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def wake_destination(self, destination: str) -> None:
+        """Called when we want to retry sending transactions to a remote.
+
+        This is mainly useful if the remote server has been down and we think it
+        might have come back.
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_current_token(self) -> int:
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def federation_ack(self, instance_name: str, token: int) -> None:
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    async def get_replication_rows(
+        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
+        raise NotImplementedError()
+
+
+class FederationSender(AbstractFederationSender):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.server_name = hs.hostname
 
@@ -432,7 +518,7 @@ class FederationSender:
             queue.flush_read_receipts_for_room(room_id)
 
     @preserve_fn  # the caller should not yield on this
-    async def send_presence(self, states: List[UserPresenceState]):
+    async def send_presence(self, states: List[UserPresenceState]) -> None:
         """Send the new presence states to the appropriate destinations.
 
         This actually queues up the presence states ready for sending and
@@ -494,7 +580,7 @@ class FederationSender:
             self._get_per_destination_queue(destination).send_presence(states)
 
     @measure_func("txnqueue._process_presence")
-    async def _process_presence_inner(self, states: List[UserPresenceState]):
+    async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
         """Given a list of states populate self.pending_presence_by_dest and
         poke to send a new transaction to each destination
         """
@@ -516,9 +602,9 @@ class FederationSender:
         self,
         destination: str,
         edu_type: str,
-        content: dict,
+        content: JsonDict,
         key: Optional[Hashable] = None,
-    ):
+    ) -> None:
         """Construct an Edu object, and queue it for sending
 
         Args:
@@ -545,7 +631,7 @@ class FederationSender:
 
         self.send_edu(edu, key)
 
-    def send_edu(self, edu: Edu, key: Optional[Hashable]):
+    def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
         """Queue an EDU for sending
 
         Args:
@@ -563,7 +649,7 @@ class FederationSender:
         else:
             queue.send_edu(edu)
 
-    def send_device_messages(self, destination: str):
+    def send_device_messages(self, destination: str) -> None:
         if destination == self.server_name:
             logger.warning("Not sending device update to ourselves")
             return
@@ -575,7 +661,7 @@ class FederationSender:
 
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
-    def wake_destination(self, destination: str):
+    def wake_destination(self, destination: str) -> None:
         """Called when we want to retry sending transactions to a remote.
 
         This is mainly useful if the remote server has been down and we think it
@@ -599,6 +685,10 @@ class FederationSender:
         # to a worker.
         return 0
 
+    def federation_ack(self, instance_name: str, token: int) -> None:
+        # It is not expected that this gets called on FederationSender.
+        raise NotImplementedError()
+
     @staticmethod
     async def get_replication_rows(
         instance_name: str, from_token: int, to_token: int, target_row_count: int
@@ -607,7 +697,7 @@ class FederationSender:
         # to a worker.
         return [], 0, False
 
-    async def _wake_destinations_needing_catchup(self):
+    async def _wake_destinations_needing_catchup(self) -> None:
         """
         Wakes up destinations that need catch-up and are not currently being
         backed off from.
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index bb447f75b4..8abed1f52d 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -312,16 +312,16 @@ class FederationAckCommand(Command):
 
     NAME = "FEDERATION_ACK"
 
-    def __init__(self, instance_name, token):
+    def __init__(self, instance_name: str, token: int):
         self.instance_name = instance_name
         self.token = token
 
     @classmethod
-    def from_line(cls, line):
+    def from_line(cls, line: str) -> "FederationAckCommand":
         instance_name, token = line.split(" ")
         return cls(instance_name, int(token))
 
-    def to_line(self):
+    def to_line(self) -> str:
         return "%s %s" % (self.instance_name, self.token)
 
 
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 9bcd13b009..9bb8e9e177 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from collections import namedtuple
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
 
 from synapse.replication.tcp.streams._base import (
     Stream,
@@ -21,6 +22,9 @@ from synapse.replication.tcp.streams._base import (
     make_http_update_function,
 )
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class FederationStream(Stream):
     """Data to be sent over federation. Only available when master has federation
@@ -38,7 +42,7 @@ class FederationStream(Stream):
     NAME = "federation"
     ROW_TYPE = FederationStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         if hs.config.worker_app is None:
             # master process: get updates from the FederationRemoteSendQueue.
             # (if the master is configured to send federation itself, federation_sender
@@ -48,7 +52,9 @@ class FederationStream(Stream):
             current_token = current_token_without_instance(
                 federation_sender.get_current_token
             )
-            update_function = federation_sender.get_replication_rows
+            update_function = (
+                federation_sender.get_replication_rows
+            )  # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
 
         elif hs.should_send_federation():
             # federation sender: Query master process
@@ -69,5 +75,7 @@ class FederationStream(Stream):
         return 0
 
     @staticmethod
-    async def _stub_update_function(instance_name, from_token, upto_token, limit):
+    async def _stub_update_function(
+        instance_name: str, from_token: int, upto_token: int, limit: int
+    ) -> Tuple[list, int, bool]:
         return [], upto_token, False
diff --git a/synapse/server.py b/synapse/server.py
index 5e787e2281..e85b9391fa 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -60,7 +60,7 @@ from synapse.federation.federation_server import (
     FederationServer,
 )
 from synapse.federation.send_queue import FederationRemoteSendQueue
-from synapse.federation.sender import FederationSender
+from synapse.federation.sender import AbstractFederationSender, FederationSender
 from synapse.federation.transport.client import TransportLayerClient
 from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
 from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
@@ -571,7 +571,7 @@ class HomeServer(metaclass=abc.ABCMeta):
         return TransportLayerClient(self)
 
     @cache_in_self
-    def get_federation_sender(self):
+    def get_federation_sender(self) -> AbstractFederationSender:
         if self.should_send_federation():
             return FederationSender(self)
         elif not self.config.worker_app: