summary refs log tree commit diff
path: root/synapse/federation/sender/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/__init__.py')
-rw-r--r--synapse/federation/sender/__init__.py116
1 files changed, 103 insertions, 13 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 24ebc4b803..8babb1ebbe 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -13,14 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import abc
 import logging
-from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
 from prometheus_client import Counter
 
 from twisted.internet import defer
 
-import synapse
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
@@ -40,9 +40,12 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
 from synapse.util.metrics import Measure, measure_func
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 sent_pdus_destination_dist_count = Counter(
@@ -65,8 +68,91 @@ CATCH_UP_STARTUP_DELAY_SEC = 15
 CATCH_UP_STARTUP_INTERVAL_SEC = 5
 
 
-class FederationSender:
-    def __init__(self, hs: "synapse.server.HomeServer"):
+class AbstractFederationSender(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def notify_new_events(self, max_token: RoomStreamToken) -> None:
+        """This gets called when we have some new events we might want to
+        send out to other servers.
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    async def send_read_receipt(self, receipt: ReadReceipt) -> None:
+        """Send a RR to any other servers in the room
+
+        Args:
+            receipt: receipt to be sent
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def send_presence(self, states: List[UserPresenceState]) -> None:
+        """Send the new presence states to the appropriate destinations.
+
+        This actually queues up the presence states ready for sending and
+        triggers a background task to process them and send out the transactions.
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def send_presence_to_destinations(
+        self, states: Iterable[UserPresenceState], destinations: Iterable[str]
+    ) -> None:
+        """Send the given presence states to the given destinations.
+
+        Args:
+            destinations:
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def build_and_send_edu(
+        self,
+        destination: str,
+        edu_type: str,
+        content: JsonDict,
+        key: Optional[Hashable] = None,
+    ) -> None:
+        """Construct an Edu object, and queue it for sending
+
+        Args:
+            destination: name of server to send to
+            edu_type: type of EDU to send
+            content: content of EDU
+            key: clobbering key for this edu
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def send_device_messages(self, destination: str) -> None:
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def wake_destination(self, destination: str) -> None:
+        """Called when we want to retry sending transactions to a remote.
+
+        This is mainly useful if the remote server has been down and we think it
+        might have come back.
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_current_token(self) -> int:
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def federation_ack(self, instance_name: str, token: int) -> None:
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    async def get_replication_rows(
+        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
+    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
+        raise NotImplementedError()
+
+
+class FederationSender(AbstractFederationSender):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.server_name = hs.hostname
 
@@ -432,7 +518,7 @@ class FederationSender:
             queue.flush_read_receipts_for_room(room_id)
 
     @preserve_fn  # the caller should not yield on this
-    async def send_presence(self, states: List[UserPresenceState]):
+    async def send_presence(self, states: List[UserPresenceState]) -> None:
         """Send the new presence states to the appropriate destinations.
 
         This actually queues up the presence states ready for sending and
@@ -494,7 +580,7 @@ class FederationSender:
             self._get_per_destination_queue(destination).send_presence(states)
 
     @measure_func("txnqueue._process_presence")
-    async def _process_presence_inner(self, states: List[UserPresenceState]):
+    async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
         """Given a list of states populate self.pending_presence_by_dest and
         poke to send a new transaction to each destination
         """
@@ -516,9 +602,9 @@ class FederationSender:
         self,
         destination: str,
         edu_type: str,
-        content: dict,
+        content: JsonDict,
         key: Optional[Hashable] = None,
-    ):
+    ) -> None:
         """Construct an Edu object, and queue it for sending
 
         Args:
@@ -545,7 +631,7 @@ class FederationSender:
 
         self.send_edu(edu, key)
 
-    def send_edu(self, edu: Edu, key: Optional[Hashable]):
+    def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
         """Queue an EDU for sending
 
         Args:
@@ -563,7 +649,7 @@ class FederationSender:
         else:
             queue.send_edu(edu)
 
-    def send_device_messages(self, destination: str):
+    def send_device_messages(self, destination: str) -> None:
         if destination == self.server_name:
             logger.warning("Not sending device update to ourselves")
             return
@@ -575,7 +661,7 @@ class FederationSender:
 
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
-    def wake_destination(self, destination: str):
+    def wake_destination(self, destination: str) -> None:
         """Called when we want to retry sending transactions to a remote.
 
         This is mainly useful if the remote server has been down and we think it
@@ -599,6 +685,10 @@ class FederationSender:
         # to a worker.
         return 0
 
+    def federation_ack(self, instance_name: str, token: int) -> None:
+        # It is not expected that this gets called on FederationSender.
+        raise NotImplementedError()
+
     @staticmethod
     async def get_replication_rows(
         instance_name: str, from_token: int, to_token: int, target_row_count: int
@@ -607,7 +697,7 @@ class FederationSender:
         # to a worker.
         return [], 0, False
 
-    async def _wake_destinations_needing_catchup(self):
+    async def _wake_destinations_needing_catchup(self) -> None:
         """
         Wakes up destinations that need catch-up and are not currently being
         backed off from.