summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py145
-rw-r--r--synapse/federation/sender/per_destination_queue.py15
2 files changed, 58 insertions, 102 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 022bbf7dad..deb40f4610 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,26 +14,19 @@
 
 import abc
 import logging
-from typing import (
-    TYPE_CHECKING,
-    Collection,
-    Dict,
-    Hashable,
-    Iterable,
-    List,
-    Optional,
-    Set,
-    Tuple,
-)
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
 from prometheus_client import Counter
 
+from twisted.internet import defer
+
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
 from synapse.federation.units import Edu
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics import (
     LaterGauge,
     event_processing_loop_counter,
@@ -262,27 +255,15 @@ class FederationSender(AbstractFederationSender):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                async def get_destinations_for_event(
-                    event: EventBase,
-                ) -> Collection[str]:
-                    """Computes the destinations to which this event must be sent.
-
-                    This returns an empty tuple when there are no destinations to send to,
-                    or if this event is not from this homeserver and it is not sending
-                    it on behalf of another server.
-
-                    Will also filter out destinations which this sender is not responsible for,
-                    if multiple federation senders exist.
-                    """
-
+                async def handle_event(event: EventBase) -> None:
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.sender)
                     if not is_mine and send_on_behalf_of is None:
-                        return ()
+                        return
 
                     if not event.internal_metadata.should_proactively_send():
-                        return ()
+                        return
 
                     destinations = None  # type: Optional[Set[str]]
                     if not event.prev_event_ids():
@@ -317,7 +298,7 @@ class FederationSender(AbstractFederationSender):
                                 "Failed to calculate hosts in room for event: %s",
                                 event.event_id,
                             )
-                            return ()
+                            return
 
                     destinations = {
                         d
@@ -327,15 +308,17 @@ class FederationSender(AbstractFederationSender):
                         )
                     }
 
-                    destinations.discard(self.server_name)
-
                     if send_on_behalf_of is not None:
                         # If we are sending the event on behalf of another server
                         # then it already has the event and there is no reason to
                         # send the event to it.
                         destinations.discard(send_on_behalf_of)
 
+                    logger.debug("Sending %s to %r", event, destinations)
+
                     if destinations:
+                        await self._send_pdu(event, destinations)
+
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
 
@@ -343,29 +326,24 @@ class FederationSender(AbstractFederationSender):
                             "federation_sender"
                         ).observe((now - ts) / 1000)
 
-                        return destinations
-                    return ()
-
-                async def get_federatable_events_and_destinations(
-                    events: Iterable[EventBase],
-                ) -> List[Tuple[EventBase, Collection[str]]]:
-                    with Measure(self.clock, "get_destinations_for_events"):
-                        # Fetch federation destinations per event,
-                        # skip if get_destinations_for_event returns an empty collection,
-                        # return list of event->destinations pairs.
-                        return [
-                            (event, dests)
-                            for (event, dests) in [
-                                (event, await get_destinations_for_event(event))
-                                for event in events
-                            ]
-                            if dests
-                        ]
-
-                events_and_dests = await get_federatable_events_and_destinations(events)
-
-                # Send corresponding events to each destination queue
-                await self._distribute_events(events_and_dests)
+                async def handle_room_events(events: Iterable[EventBase]) -> None:
+                    with Measure(self.clock, "handle_room_events"):
+                        for event in events:
+                            await handle_event(event)
+
+                events_by_room = {}  # type: Dict[str, List[EventBase]]
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                await make_deferred_yieldable(
+                    defer.gatherResults(
+                        [
+                            run_in_background(handle_room_events, evs)
+                            for evs in events_by_room.values()
+                        ],
+                        consumeErrors=True,
+                    )
+                )
 
                 await self.store.update_federation_out_pos("events", next_token)
 
@@ -383,7 +361,7 @@ class FederationSender(AbstractFederationSender):
                     events_processed_counter.inc(len(events))
 
                     event_processing_loop_room_count.labels("federation_sender").inc(
-                        len({event.room_id for event in events})
+                        len(events_by_room)
                     )
 
                 event_processing_loop_counter.labels("federation_sender").inc()
@@ -395,53 +373,34 @@ class FederationSender(AbstractFederationSender):
         finally:
             self._is_processing = False
 
-    async def _distribute_events(
-        self,
-        events_and_dests: Iterable[Tuple[EventBase, Collection[str]]],
-    ) -> None:
-        """Distribute events to the respective per_destination queues.
-
-        Also persists last-seen per-room stream_ordering to 'destination_rooms'.
-
-        Args:
-            events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
-                              Every event is paired with its intended destinations (in federation).
-        """
-        # Tuples of room_id + destination to their max-seen stream_ordering
-        room_with_dest_stream_ordering = {}  # type: Dict[Tuple[str, str], int]
-
-        # List of events to send to each destination
-        events_by_dest = {}  # type: Dict[str, List[EventBase]]
+    async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
+        # We loop through all destinations to see whether we already have
+        # a transaction in progress. If we do, stick it in the pending_pdus
+        # table and we'll get back to it later.
 
-        # For each event-destinations pair...
-        for event, destinations in events_and_dests:
+        destinations = set(destinations)
+        destinations.discard(self.server_name)
+        logger.debug("Sending to: %s", str(destinations))
 
-            # (we got this from the database, it's filled)
-            assert event.internal_metadata.stream_ordering
-
-            sent_pdus_destination_dist_total.inc(len(destinations))
-            sent_pdus_destination_dist_count.inc()
+        if not destinations:
+            return
 
-            # ...iterate over those destinations..
-            for destination in destinations:
-                # ...update their stream-ordering...
-                room_with_dest_stream_ordering[(event.room_id, destination)] = max(
-                    event.internal_metadata.stream_ordering,
-                    room_with_dest_stream_ordering.get((event.room_id, destination), 0),
-                )
+        sent_pdus_destination_dist_total.inc(len(destinations))
+        sent_pdus_destination_dist_count.inc()
 
-                # ...and add the event to each destination queue.
-                events_by_dest.setdefault(destination, []).append(event)
+        assert pdu.internal_metadata.stream_ordering
 
-        # Bulk-store destination_rooms stream_ids
-        await self.store.bulk_store_destination_rooms_entries(
-            room_with_dest_stream_ordering
+        # track the fact that we have a PDU for these destinations,
+        # to allow us to perform catch-up later on if the remote is unreachable
+        # for a while.
+        await self.store.store_destination_rooms_entries(
+            destinations,
+            pdu.room_id,
+            pdu.internal_metadata.stream_ordering,
         )
 
-        for destination, pdus in events_by_dest.items():
-            logger.debug("Sending %d pdus to %s", len(pdus), destination)
-
-            self._get_per_destination_queue(destination).send_pdus(pdus)
+        for destination in destinations:
+            self._get_per_destination_queue(destination).send_pdu(pdu)
 
     async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """Send a RR to any other servers in the room
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3bb66bce32..3b053ebcfb 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -154,22 +154,19 @@ class PerDestinationQueue:
             + len(self._pending_edus_keyed)
         )
 
-    def send_pdus(self, pdus: Iterable[EventBase]) -> None:
-        """Add PDUs to the queue, and start the transmission loop if necessary
+    def send_pdu(self, pdu: EventBase) -> None:
+        """Add a PDU to the queue, and start the transmission loop if necessary
 
         Args:
-            pdus: pdus to send
+            pdu: pdu to send
         """
         if not self._catching_up or self._last_successful_stream_ordering is None:
             # only enqueue the PDU if we are not catching up (False) or do not
             # yet know if we have anything to catch up (None)
-            self._pending_pdus.extend(pdus)
+            self._pending_pdus.append(pdu)
         else:
-            self._catchup_last_skipped = max(
-                pdu.internal_metadata.stream_ordering
-                for pdu in pdus
-                if pdu.internal_metadata.stream_ordering is not None
-            )
+            assert pdu.internal_metadata.stream_ordering
+            self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
 
         self.attempt_new_transaction()