diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 24ebc4b803..8babb1ebbe 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import abc
import logging
-from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
from prometheus_client import Counter
from twisted.internet import defer
-import synapse
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
@@ -40,9 +40,12 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
@@ -65,8 +68,91 @@ CATCH_UP_STARTUP_DELAY_SEC = 15
CATCH_UP_STARTUP_INTERVAL_SEC = 5
-class FederationSender:
- def __init__(self, hs: "synapse.server.HomeServer"):
+class AbstractFederationSender(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def notify_new_events(self, max_token: RoomStreamToken) -> None:
+ """This gets called when we have some new events we might want to
+ send out to other servers.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
+ """Send a RR to any other servers in the room
+
+ Args:
+ receipt: receipt to be sent
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_presence(self, states: List[UserPresenceState]) -> None:
+ """Send the new presence states to the appropriate destinations.
+
+ This actually queues up the presence states ready for sending and
+ triggers a background task to process them and send out the transactions.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_presence_to_destinations(
+ self, states: Iterable[UserPresenceState], destinations: Iterable[str]
+ ) -> None:
+ """Send the given presence states to the given destinations.
+
+ Args:
+ destinations:
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def build_and_send_edu(
+ self,
+ destination: str,
+ edu_type: str,
+ content: JsonDict,
+ key: Optional[Hashable] = None,
+ ) -> None:
+ """Construct an Edu object, and queue it for sending
+
+ Args:
+ destination: name of server to send to
+ edu_type: type of EDU to send
+ content: content of EDU
+ key: clobbering key for this edu
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_device_messages(self, destination: str) -> None:
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def wake_destination(self, destination: str) -> None:
+ """Called when we want to retry sending transactions to a remote.
+
+ This is mainly useful if the remote server has been down and we think it
+ might have come back.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def get_current_token(self) -> int:
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def federation_ack(self, instance_name: str, token: int) -> None:
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ async def get_replication_rows(
+ self, instance_name: str, from_token: int, to_token: int, target_row_count: int
+ ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
+ raise NotImplementedError()
+
+
+class FederationSender(AbstractFederationSender):
+ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.server_name = hs.hostname
@@ -432,7 +518,7 @@ class FederationSender:
queue.flush_read_receipts_for_room(room_id)
@preserve_fn # the caller should not yield on this
- async def send_presence(self, states: List[UserPresenceState]):
+ async def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
@@ -494,7 +580,7 @@ class FederationSender:
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
- async def _process_presence_inner(self, states: List[UserPresenceState]):
+ async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
@@ -516,9 +602,9 @@ class FederationSender:
self,
destination: str,
edu_type: str,
- content: dict,
+ content: JsonDict,
key: Optional[Hashable] = None,
- ):
+ ) -> None:
"""Construct an Edu object, and queue it for sending
Args:
@@ -545,7 +631,7 @@ class FederationSender:
self.send_edu(edu, key)
- def send_edu(self, edu: Edu, key: Optional[Hashable]):
+ def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
"""Queue an EDU for sending
Args:
@@ -563,7 +649,7 @@ class FederationSender:
else:
queue.send_edu(edu)
- def send_device_messages(self, destination: str):
+ def send_device_messages(self, destination: str) -> None:
if destination == self.server_name:
logger.warning("Not sending device update to ourselves")
return
@@ -575,7 +661,7 @@ class FederationSender:
self._get_per_destination_queue(destination).attempt_new_transaction()
- def wake_destination(self, destination: str):
+ def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
This is mainly useful if the remote server has been down and we think it
@@ -599,6 +685,10 @@ class FederationSender:
# to a worker.
return 0
+ def federation_ack(self, instance_name: str, token: int) -> None:
+ # It is not expected that this gets called on FederationSender.
+ raise NotImplementedError()
+
@staticmethod
async def get_replication_rows(
instance_name: str, from_token: int, to_token: int, target_row_count: int
@@ -607,7 +697,7 @@ class FederationSender:
# to a worker.
return [], 0, False
- async def _wake_destinations_needing_catchup(self):
+ async def _wake_destinations_needing_catchup(self) -> None:
"""
Wakes up destinations that need catch-up and are not currently being
backed off from.
|