diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 155161685d..952ad39f8c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -18,8 +18,6 @@ from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set,
from prometheus_client import Counter
-from twisted.internet import defer
-
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
@@ -27,11 +25,7 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.handlers.presence import get_interested_remotes
-from synapse.logging.context import (
- make_deferred_yieldable,
- preserve_fn,
- run_in_background,
-)
+from synapse.logging.context import preserve_fn
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
@@ -39,7 +33,7 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
if TYPE_CHECKING:
@@ -276,15 +270,27 @@ class FederationSender(AbstractFederationSender):
if not events and next_token >= self._last_poked_id:
break
- async def handle_event(event: EventBase) -> None:
+ async def get_destinations_for_event(
+ event: EventBase,
+ ) -> Collection[str]:
+ """Computes the destinations to which this event must be sent.
+
+ This returns an empty tuple when there are no destinations to send to,
+ or if this event is not from this homeserver and it is not sending
+ it on behalf of another server.
+
+ Will also filter out destinations which this sender is not responsible for,
+ if multiple federation senders exist.
+ """
+
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
- return
+ return ()
if not event.internal_metadata.should_proactively_send():
- return
+ return ()
destinations = None # type: Optional[Set[str]]
if not event.prev_event_ids():
@@ -319,7 +325,7 @@ class FederationSender(AbstractFederationSender):
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
- return
+ return ()
destinations = {
d
@@ -329,17 +335,15 @@ class FederationSender(AbstractFederationSender):
)
}
+ destinations.discard(self.server_name)
+
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
- logger.debug("Sending %s to %r", event, destinations)
-
if destinations:
- await self._send_pdu(event, destinations)
-
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
@@ -347,24 +351,29 @@ class FederationSender(AbstractFederationSender):
"federation_sender"
).observe((now - ts) / 1000)
- async def handle_room_events(events: Iterable[EventBase]) -> None:
- with Measure(self.clock, "handle_room_events"):
- for event in events:
- await handle_event(event)
-
- events_by_room = {} # type: Dict[str, List[EventBase]]
- for event in events:
- events_by_room.setdefault(event.room_id, []).append(event)
-
- await make_deferred_yieldable(
- defer.gatherResults(
- [
- run_in_background(handle_room_events, evs)
- for evs in events_by_room.values()
- ],
- consumeErrors=True,
- )
- )
+ return destinations
+ return ()
+
+ async def get_federatable_events_and_destinations(
+ events: Iterable[EventBase],
+ ) -> List[Tuple[EventBase, Collection[str]]]:
+ with Measure(self.clock, "get_destinations_for_events"):
+ # Fetch federation destinations per event,
+ # skip if get_destinations_for_event returns an empty collection,
+ # return list of event->destinations pairs.
+ return [
+ (event, dests)
+ for (event, dests) in [
+ (event, await get_destinations_for_event(event))
+ for event in events
+ ]
+ if dests
+ ]
+
+ events_and_dests = await get_federatable_events_and_destinations(events)
+
+ # Send corresponding events to each destination queue
+ await self._distribute_events(events_and_dests)
await self.store.update_federation_out_pos("events", next_token)
@@ -382,7 +391,7 @@ class FederationSender(AbstractFederationSender):
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels("federation_sender").inc(
- len(events_by_room)
+ len({event.room_id for event in events})
)
event_processing_loop_counter.labels("federation_sender").inc()
@@ -394,34 +403,53 @@ class FederationSender(AbstractFederationSender):
finally:
self._is_processing = False
- async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
- # We loop through all destinations to see whether we already have
- # a transaction in progress. If we do, stick it in the pending_pdus
- # table and we'll get back to it later.
+ async def _distribute_events(
+ self,
+ events_and_dests: Iterable[Tuple[EventBase, Collection[str]]],
+ ) -> None:
+ """Distribute events to the respective per_destination queues.
- destinations = set(destinations)
- destinations.discard(self.server_name)
- logger.debug("Sending to: %s", str(destinations))
+ Also persists last-seen per-room stream_ordering to 'destination_rooms'.
- if not destinations:
- return
+ Args:
+ events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
+ Every event is paired with its intended destinations (in federation).
+ """
+ # Tuples of room_id + destination to their max-seen stream_ordering
+ room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int]
- sent_pdus_destination_dist_total.inc(len(destinations))
- sent_pdus_destination_dist_count.inc()
+ # List of events to send to each destination
+ events_by_dest = {} # type: Dict[str, List[EventBase]]
- assert pdu.internal_metadata.stream_ordering
+ # For each event-destinations pair...
+ for event, destinations in events_and_dests:
- # track the fact that we have a PDU for these destinations,
- # to allow us to perform catch-up later on if the remote is unreachable
- # for a while.
- await self.store.store_destination_rooms_entries(
- destinations,
- pdu.room_id,
- pdu.internal_metadata.stream_ordering,
+ # (we got this from the database, it's filled)
+ assert event.internal_metadata.stream_ordering
+
+ sent_pdus_destination_dist_total.inc(len(destinations))
+ sent_pdus_destination_dist_count.inc()
+
+ # ...iterate over those destinations..
+ for destination in destinations:
+ # ...update their stream-ordering...
+ room_with_dest_stream_ordering[(event.room_id, destination)] = max(
+ event.internal_metadata.stream_ordering,
+ room_with_dest_stream_ordering.get((event.room_id, destination), 0),
+ )
+
+ # ...and add the event to each destination queue.
+ events_by_dest.setdefault(destination, []).append(event)
+
+ # Bulk-store destination_rooms stream_ids
+ await self.store.bulk_store_destination_rooms_entries(
+ room_with_dest_stream_ordering
)
- for destination in destinations:
- self._get_per_destination_queue(destination).send_pdu(pdu)
+ for destination, pdus in events_by_dest.items():
+ logger.debug("Sending %d pdus to %s", len(pdus), destination)
+
+ self._get_per_destination_queue(destination).send_pdus(pdus)
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3b053ebcfb..3bb66bce32 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -154,19 +154,22 @@ class PerDestinationQueue:
+ len(self._pending_edus_keyed)
)
- def send_pdu(self, pdu: EventBase) -> None:
- """Add a PDU to the queue, and start the transmission loop if necessary
+ def send_pdus(self, pdus: Iterable[EventBase]) -> None:
+ """Add PDUs to the queue, and start the transmission loop if necessary
Args:
- pdu: pdu to send
+ pdus: pdus to send
"""
if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we have anything to catch up (None)
- self._pending_pdus.append(pdu)
+ self._pending_pdus.extend(pdus)
else:
- assert pdu.internal_metadata.stream_ordering
- self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
+ self._catchup_last_skipped = max(
+ pdu.internal_metadata.stream_ordering
+ for pdu in pdus
+ if pdu.internal_metadata.stream_ordering is not None
+ )
self.attempt_new_transaction()
|