diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d46f4aaeb1..d473576902 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
from six import itervalues
@@ -21,7 +22,9 @@ from prometheus_client import Counter
from twisted.internet import defer
+import synapse
import synapse.metrics
+from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
@@ -38,7 +41,9 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.metrics import measure_func
+from synapse.storage.presence import UserPresenceState
+from synapse.types import ReadReceipt
+from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@@ -49,12 +54,12 @@ sent_pdus_destination_dist_count = Counter(
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total",
- "" "Total number of PDUs queued for sending across all destinations",
+ "Total number of PDUs queued for sending across all destinations",
)
class FederationSender(object):
- def __init__(self, hs):
+ def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.server_name = hs.hostname
@@ -67,7 +72,7 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs)
# map from destination to PerDestinationQueue
- self._per_destination_queues = {} # type: dict[str, PerDestinationQueue]
+ self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
@@ -83,7 +88,7 @@ class FederationSender(object):
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
- self.pending_presence = {}
+ self.pending_presence = {} # type: Dict[str, UserPresenceState]
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
@@ -115,20 +120,17 @@ class FederationSender(object):
# and that there is a pending call to _flush_rrs_for_room in the system.
self._queues_awaiting_rr_flush_by_room = (
{}
- ) # type: dict[str, set[PerDestinationQueue]]
+ ) # type: Dict[str, Set[PerDestinationQueue]]
self._rr_txn_interval_per_room_ms = (
- 1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second
+ 1000.0 / hs.config.federation_rr_transactions_per_room_per_second
)
- def _get_per_destination_queue(self, destination):
+ def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
Args:
- destination (str): server_name of remote server
-
- Returns:
- PerDestinationQueue
+ destination: server_name of remote server
"""
queue = self._per_destination_queues.get(destination)
if not queue:
@@ -136,7 +138,7 @@ class FederationSender(object):
self._per_destination_queues[destination] = queue
return queue
- def notify_new_events(self, current_id):
+ def notify_new_events(self, current_id: int) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
@@ -150,13 +152,12 @@ class FederationSender(object):
"process_event_queue_for_federation", self._process_event_queue_loop
)
- @defer.inlineCallbacks
- def _process_event_queue_loop(self):
+ async def _process_event_queue_loop(self) -> None:
try:
self._is_processing = True
while True:
- last_token = yield self.store.get_federation_out_pos("events")
- next_token, events = yield self.store.get_all_new_events_stream(
+ last_token = await self.store.get_federation_out_pos("events")
+ next_token, events = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
)
@@ -165,8 +166,7 @@ class FederationSender(object):
if not events and next_token >= self._last_poked_id:
break
- @defer.inlineCallbacks
- def handle_event(event):
+ async def handle_event(event: EventBase) -> None:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
@@ -183,8 +183,8 @@ class FederationSender(object):
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
- destinations = yield self.state.get_current_hosts_in_room(
- event.room_id, latest_event_ids=event.prev_event_ids()
+ destinations = await self.state.get_hosts_in_room_at_events(
+ event.room_id, event_ids=event.prev_event_ids()
)
except Exception:
logger.exception(
@@ -205,16 +205,16 @@ class FederationSender(object):
self._send_pdu(event, destinations)
- @defer.inlineCallbacks
- def handle_room_events(events):
- for event in events:
- yield handle_event(event)
+ async def handle_room_events(events: Iterable[EventBase]) -> None:
+ with Measure(self.clock, "handle_room_events"):
+ for event in events:
+ await handle_event(event)
- events_by_room = {}
+ events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
- yield make_deferred_yieldable(
+ await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
@@ -224,11 +224,11 @@ class FederationSender(object):
)
)
- yield self.store.update_federation_out_pos("events", next_token)
+ await self.store.update_federation_out_pos("events", next_token)
if events:
now = self.clock.time_msec()
- ts = yield self.store.get_received_ts(events[-1].event_id)
+ ts = await self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.labels(
"federation_sender"
@@ -252,7 +252,7 @@ class FederationSender(object):
finally:
self._is_processing = False
- def _send_pdu(self, pdu, destinations):
+ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
@@ -274,11 +274,11 @@ class FederationSender(object):
self._get_per_destination_queue(destination).send_pdu(pdu, order)
@defer.inlineCallbacks
- def send_read_receipt(self, receipt):
+ def send_read_receipt(self, receipt: ReadReceipt):
"""Send a RR to any other servers in the room
Args:
- receipt (synapse.types.ReadReceipt): receipt to be sent
+ receipt: receipt to be sent
"""
# Some background on the rate-limiting going on here.
@@ -341,7 +341,7 @@ class FederationSender(object):
else:
queue.flush_read_receipts_for_room(room_id)
- def _schedule_rr_flush_for_room(self, room_id, n_domains):
+ def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
# that is going to cause approximately len(domains) transactions, so now back
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
@@ -350,7 +350,7 @@ class FederationSender(object):
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
self._queues_awaiting_rr_flush_by_room[room_id] = set()
- def _flush_rrs_for_room(self, room_id):
+ def _flush_rrs_for_room(self, room_id: str) -> None:
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
logger.debug("Flushing RRs in %s to %s", room_id, queues)
@@ -366,14 +366,11 @@ class FederationSender(object):
@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
- def send_presence(self, states):
+ def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
-
- Args:
- states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
@@ -410,11 +407,10 @@ class FederationSender(object):
finally:
self._processing_pending_presence = False
- def send_presence_to_destinations(self, states, destinations):
+ def send_presence_to_destinations(
+ self, states: List[UserPresenceState], destinations: List[str]
+ ) -> None:
"""Send the given presence states to the given destinations.
-
- Args:
- states (list[UserPresenceState])
destinations (list[str])
"""
@@ -429,12 +425,9 @@ class FederationSender(object):
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
- def _process_presence_inner(self, states):
+ def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
-
- Args:
- states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
@@ -444,14 +437,20 @@ class FederationSender(object):
continue
self._get_per_destination_queue(destination).send_presence(states)
- def build_and_send_edu(self, destination, edu_type, content, key=None):
+ def build_and_send_edu(
+ self,
+ destination: str,
+ edu_type: str,
+ content: dict,
+ key: Optional[Hashable] = None,
+ ):
"""Construct an Edu object, and queue it for sending
Args:
- destination (str): name of server to send to
- edu_type (str): type of EDU to send
- content (dict): content of EDU
- key (Any|None): clobbering key for this edu
+ destination: name of server to send to
+ edu_type: type of EDU to send
+ content: content of EDU
+ key: clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
@@ -466,12 +465,12 @@ class FederationSender(object):
self.send_edu(edu, key)
- def send_edu(self, edu, key):
+ def send_edu(self, edu: Edu, key: Optional[Hashable]):
"""Queue an EDU for sending
Args:
- edu (Edu): edu to send
- key (Any|None): clobbering key for this edu
+ edu: edu to send
+ key: clobbering key for this edu
"""
queue = self._get_per_destination_queue(edu.destination)
if key:
@@ -479,12 +478,36 @@ class FederationSender(object):
else:
queue.send_edu(edu)
- def send_device_messages(self, destination):
+ def send_device_messages(self, destination: str):
if destination == self.server_name:
- logger.info("Not sending device update to ourselves")
+ logger.warning("Not sending device update to ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()
- def get_current_token(self):
+ def wake_destination(self, destination: str):
+ """Called when we want to retry sending transactions to a remote.
+
+ This is mainly useful if the remote server has been down and we think it
+ might have come back.
+ """
+
+ if destination == self.server_name:
+ logger.warning("Not waking up ourselves")
+ return
+
+ self._get_per_destination_queue(destination).attempt_new_transaction()
+
+ @staticmethod
+ def get_current_token() -> int:
+ # Dummy implementation for case where federation sender isn't offloaded
+ # to a worker.
return 0
+
+ @staticmethod
+ async def get_replication_rows(
+ instance_name: str, from_token: int, to_token: int, target_row_count: int
+ ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
+ # Dummy implementation for case where federation sender isn't offloaded
+ # to a worker.
+ return [], 0, False
|