summary refs log tree commit diff
path: root/synapse/federation/sender/per_destination_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r--synapse/federation/sender/per_destination_queue.py104
1 files changed, 89 insertions, 15 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index cc0d765e5f..af85fe0a1e 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import datetime
 import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
 
 import attr
 from prometheus_client import Counter
@@ -77,6 +77,7 @@ class PerDestinationQueue:
         self._transaction_manager = transaction_manager
         self._instance_name = hs.get_instance_name()
         self._federation_shard_config = hs.config.worker.federation_shard_config
+        self._state = hs.get_state_handler()
 
         self._should_send_on_this_instance = True
         if not self._federation_shard_config.should_handle(
@@ -415,22 +416,95 @@ class PerDestinationQueue:
                     "This should not happen." % event_ids
                 )
 
-            if logger.isEnabledFor(logging.INFO):
-                rooms = [p.room_id for p in catchup_pdus]
-                logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+            # We send transactions with events from one room only, as its likely
+            # that the remote will have to do additional processing, which may
+            # take some time. It's better to give it small amounts of work
+            # rather than risk the request timing out and repeatedly being
+            # retried, and not making any progress.
+            #
+            # Note: `catchup_pdus` will have exactly one PDU per room.
+            for pdu in catchup_pdus:
+                # The PDU from the DB will be the last PDU in the room from
+                # *this server* that wasn't sent to the remote. However, other
+                # servers may have sent lots of events since then, and we want
+                # to try and tell the remote only about the *latest* events in
+                # the room. This is so that it doesn't get inundated by events
+                # from various parts of the DAG, which all need to be processed.
+                #
+                # Note: this does mean that in large rooms a server coming back
+                # online will get sent the same events from all the different
+                # servers, but the remote will correctly deduplicate them and
+                # handle it only once.
+
+                # Step 1, fetch the current extremities
+                extrems = await self._store.get_prev_events_for_room(pdu.room_id)
+
+                if pdu.event_id in extrems:
+                    # If the event is in the extremities, then great! We can just
+                    # use that without having to do further checks.
+                    room_catchup_pdus = [pdu]
+                else:
+                    # If not, fetch the extremities and figure out which we can
+                    # send.
+                    extrem_events = await self._store.get_events_as_list(extrems)
+
+                    new_pdus = []
+                    for p in extrem_events:
+                        # We pulled this from the DB, so it'll be non-null
+                        assert p.internal_metadata.stream_ordering
+
+                        # Filter out events that happened before the remote went
+                        # offline
+                        if (
+                            p.internal_metadata.stream_ordering
+                            < self._last_successful_stream_ordering
+                        ):
+                            continue
 
-            await self._transaction_manager.send_new_transaction(
-                self._destination, catchup_pdus, []
-            )
+                        # Filter out events where the server is not in the room,
+                        # e.g. it may have left/been kicked. *Ideally* we'd pull
+                        # out the kick and send that, but it's a rare edge case
+                        # so we don't bother for now (the server that sent the
+                        # kick should send it out if its online).
+                        hosts = await self._state.get_hosts_in_room_at_events(
+                            p.room_id, [p.event_id]
+                        )
+                        if self._destination not in hosts:
+                            continue
 
-            sent_transactions_counter.inc()
-            final_pdu = catchup_pdus[-1]
-            self._last_successful_stream_ordering = cast(
-                int, final_pdu.internal_metadata.stream_ordering
-            )
-            await self._store.set_destination_last_successful_stream_ordering(
-                self._destination, self._last_successful_stream_ordering
-            )
+                        new_pdus.append(p)
+
+                    # If we've filtered out all the extremities, fall back to
+                    # sending the original event. This should ensure that the
+                    # server gets at least some of missed events (especially if
+                    # the other sending servers are up).
+                    if new_pdus:
+                        room_catchup_pdus = new_pdus
+
+                logger.info(
+                    "Catching up rooms to %s: %r", self._destination, pdu.room_id
+                )
+
+                await self._transaction_manager.send_new_transaction(
+                    self._destination, room_catchup_pdus, []
+                )
+
+                sent_transactions_counter.inc()
+
+                # We pulled this from the DB, so it'll be non-null
+                assert pdu.internal_metadata.stream_ordering
+
+                # Note that we mark the last successful stream ordering as that
+                # from the *original* PDU, rather than the PDU(s) we actually
+                # send. This is because we use it to mark our position in the
+                # queue of missed PDUs to process.
+                self._last_successful_stream_ordering = (
+                    pdu.internal_metadata.stream_ordering
+                )
+
+                await self._store.set_destination_last_successful_stream_ordering(
+                    self._destination, self._last_successful_stream_ordering
+                )
 
     def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
         if not self._pending_rrs: