summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py140
-rw-r--r--synapse/federation/sender/per_destination_queue.py15
2 files changed, 93 insertions, 62 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 155161685d..952ad39f8c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -18,8 +18,6 @@ from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set,
 
 from prometheus_client import Counter
 
-from twisted.internet import defer
-
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
@@ -27,11 +25,7 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
 from synapse.federation.units import Edu
 from synapse.handlers.presence import get_interested_remotes
-from synapse.logging.context import (
-    make_deferred_yieldable,
-    preserve_fn,
-    run_in_background,
-)
+from synapse.logging.context import preserve_fn
 from synapse.metrics import (
     LaterGauge,
     event_processing_loop_counter,
@@ -39,7 +33,7 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
 from synapse.util.metrics import Measure, measure_func
 
 if TYPE_CHECKING:
@@ -276,15 +270,27 @@ class FederationSender(AbstractFederationSender):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                async def handle_event(event: EventBase) -> None:
+                async def get_destinations_for_event(
+                    event: EventBase,
+                ) -> Collection[str]:
+                    """Computes the destinations to which this event must be sent.
+
+                    This returns an empty tuple when there are no destinations to send to,
+                    or if this event is not from this homeserver and it is not sending
+                    it on behalf of another server.
+
+                    Will also filter out destinations which this sender is not responsible for,
+                    if multiple federation senders exist.
+                    """
+
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.sender)
                     if not is_mine and send_on_behalf_of is None:
-                        return
+                        return ()
 
                     if not event.internal_metadata.should_proactively_send():
-                        return
+                        return ()
 
                     destinations = None  # type: Optional[Set[str]]
                     if not event.prev_event_ids():
@@ -319,7 +325,7 @@ class FederationSender(AbstractFederationSender):
                                 "Failed to calculate hosts in room for event: %s",
                                 event.event_id,
                             )
-                            return
+                            return ()
 
                     destinations = {
                         d
@@ -329,17 +335,15 @@ class FederationSender(AbstractFederationSender):
                         )
                     }
 
+                    destinations.discard(self.server_name)
+
                     if send_on_behalf_of is not None:
                         # If we are sending the event on behalf of another server
                         # then it already has the event and there is no reason to
                         # send the event to it.
                         destinations.discard(send_on_behalf_of)
 
-                    logger.debug("Sending %s to %r", event, destinations)
-
                     if destinations:
-                        await self._send_pdu(event, destinations)
-
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
 
@@ -347,24 +351,29 @@ class FederationSender(AbstractFederationSender):
                             "federation_sender"
                         ).observe((now - ts) / 1000)
 
-                async def handle_room_events(events: Iterable[EventBase]) -> None:
-                    with Measure(self.clock, "handle_room_events"):
-                        for event in events:
-                            await handle_event(event)
-
-                events_by_room = {}  # type: Dict[str, List[EventBase]]
-                for event in events:
-                    events_by_room.setdefault(event.room_id, []).append(event)
-
-                await make_deferred_yieldable(
-                    defer.gatherResults(
-                        [
-                            run_in_background(handle_room_events, evs)
-                            for evs in events_by_room.values()
-                        ],
-                        consumeErrors=True,
-                    )
-                )
+                        return destinations
+                    return ()
+
+                async def get_federatable_events_and_destinations(
+                    events: Iterable[EventBase],
+                ) -> List[Tuple[EventBase, Collection[str]]]:
+                    with Measure(self.clock, "get_destinations_for_events"):
+                        # Fetch federation destinations per event,
+                        # skip if get_destinations_for_event returns an empty collection,
+                        # return list of event->destinations pairs.
+                        return [
+                            (event, dests)
+                            for (event, dests) in [
+                                (event, await get_destinations_for_event(event))
+                                for event in events
+                            ]
+                            if dests
+                        ]
+
+                events_and_dests = await get_federatable_events_and_destinations(events)
+
+                # Send corresponding events to each destination queue
+                await self._distribute_events(events_and_dests)
 
                 await self.store.update_federation_out_pos("events", next_token)
 
@@ -382,7 +391,7 @@ class FederationSender(AbstractFederationSender):
                     events_processed_counter.inc(len(events))
 
                     event_processing_loop_room_count.labels("federation_sender").inc(
-                        len(events_by_room)
+                        len({event.room_id for event in events})
                     )
 
                 event_processing_loop_counter.labels("federation_sender").inc()
@@ -394,34 +403,53 @@ class FederationSender(AbstractFederationSender):
         finally:
             self._is_processing = False
 
-    async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
-        # We loop through all destinations to see whether we already have
-        # a transaction in progress. If we do, stick it in the pending_pdus
-        # table and we'll get back to it later.
+    async def _distribute_events(
+        self,
+        events_and_dests: Iterable[Tuple[EventBase, Collection[str]]],
+    ) -> None:
+        """Distribute events to the respective per_destination queues.
 
-        destinations = set(destinations)
-        destinations.discard(self.server_name)
-        logger.debug("Sending to: %s", str(destinations))
+        Also persists last-seen per-room stream_ordering to 'destination_rooms'.
 
-        if not destinations:
-            return
+        Args:
+            events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
+                              Every event is paired with its intended destinations (in federation).
+        """
+        # Tuples of room_id + destination to their max-seen stream_ordering
+        room_with_dest_stream_ordering = {}  # type: Dict[Tuple[str, str], int]
 
-        sent_pdus_destination_dist_total.inc(len(destinations))
-        sent_pdus_destination_dist_count.inc()
+        # List of events to send to each destination
+        events_by_dest = {}  # type: Dict[str, List[EventBase]]
 
-        assert pdu.internal_metadata.stream_ordering
+        # For each event-destinations pair...
+        for event, destinations in events_and_dests:
 
-        # track the fact that we have a PDU for these destinations,
-        # to allow us to perform catch-up later on if the remote is unreachable
-        # for a while.
-        await self.store.store_destination_rooms_entries(
-            destinations,
-            pdu.room_id,
-            pdu.internal_metadata.stream_ordering,
+            # (we got this from the database, it's filled)
+            assert event.internal_metadata.stream_ordering
+
+            sent_pdus_destination_dist_total.inc(len(destinations))
+            sent_pdus_destination_dist_count.inc()
+
+            # ...iterate over those destinations..
+            for destination in destinations:
+                # ...update their stream-ordering...
+                room_with_dest_stream_ordering[(event.room_id, destination)] = max(
+                    event.internal_metadata.stream_ordering,
+                    room_with_dest_stream_ordering.get((event.room_id, destination), 0),
+                )
+
+                # ...and add the event to each destination queue.
+                events_by_dest.setdefault(destination, []).append(event)
+
+        # Bulk-store destination_rooms stream_ids
+        await self.store.bulk_store_destination_rooms_entries(
+            room_with_dest_stream_ordering
         )
 
-        for destination in destinations:
-            self._get_per_destination_queue(destination).send_pdu(pdu)
+        for destination, pdus in events_by_dest.items():
+            logger.debug("Sending %d pdus to %s", len(pdus), destination)
+
+            self._get_per_destination_queue(destination).send_pdus(pdus)
 
     async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """Send a RR to any other servers in the room
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3b053ebcfb..3bb66bce32 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -154,19 +154,22 @@ class PerDestinationQueue:
             + len(self._pending_edus_keyed)
         )
 
-    def send_pdu(self, pdu: EventBase) -> None:
-        """Add a PDU to the queue, and start the transmission loop if necessary
+    def send_pdus(self, pdus: Iterable[EventBase]) -> None:
+        """Add PDUs to the queue, and start the transmission loop if necessary
 
         Args:
-            pdu: pdu to send
+            pdus: pdus to send
         """
         if not self._catching_up or self._last_successful_stream_ordering is None:
             # only enqueue the PDU if we are not catching up (False) or do not
             # yet know if we have anything to catch up (None)
-            self._pending_pdus.append(pdu)
+            self._pending_pdus.extend(pdus)
         else:
-            assert pdu.internal_metadata.stream_ordering
-            self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
+            self._catchup_last_skipped = max(
+                pdu.internal_metadata.stream_ordering
+                for pdu in pdus
+                if pdu.internal_metadata.stream_ordering is not None
+            )
 
         self.attempt_new_transaction()