summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-03-18 15:52:26 +0000
committerGitHub <noreply@github.com>2021-03-18 15:52:26 +0000
commitdd71eb0f8ab5a6e0d8eda3be8c2d5ff01271d147 (patch)
treefeef9f010e5593a5fc604876c06d75e706f7d364
parentEnsure we use a copy of the event content dict before modifying it in seriali... (diff)
downloadsynapse-dd71eb0f8ab5a6e0d8eda3be8c2d5ff01271d147.tar.xz
Make federation catchup send last event from any server. (#9640)
Currently federation catchup will send the last *local* event that we
failed to send to the remote. This can cause issues for large rooms
where lots of servers have sent events while the remote server was down,
as when it comes back up again it'll be flooded with events from various
points in the DAG.

Instead, let's make it so that all the servers send the most recent
events, even if its not theirs. The remote should deduplicate the
events, so there shouldn't be much overhead in doing this.
Alternatively, the servers could only send local events if they were
also extremities and hope that the other server will send the event
over, but that is a bit risky.
-rw-r--r--changelog.d/9640.misc1
-rw-r--r--synapse/federation/federation_server.py25
-rw-r--r--synapse/federation/sender/per_destination_queue.py104
-rw-r--r--tests/federation/test_federation_catch_up.py49
4 files changed, 141 insertions, 38 deletions
diff --git a/changelog.d/9640.misc b/changelog.d/9640.misc
new file mode 100644
index 0000000000..3d410ed4cd
--- /dev/null
+++ b/changelog.d/9640.misc
@@ -0,0 +1 @@
+Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9839d3d016..d84e362070 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -35,7 +35,7 @@ from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
 from twisted.python import failure
 
-from synapse.api.constants import EduTypes, EventTypes, Membership
+from synapse.api.constants import EduTypes, EventTypes
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -63,7 +63,7 @@ from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
     ReplicationGetQueryRestServlet,
 )
-from synapse.types import JsonDict, get_domain_from_id
+from synapse.types import JsonDict
 from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
@@ -727,27 +727,6 @@ class FederationServer(FederationBase):
             if the event was unacceptable for any other reason (eg, too large,
             too many prev_events, couldn't find the prev_events)
         """
-        # check that it's actually being sent from a valid destination to
-        # workaround bug #1753 in 0.18.5 and 0.18.6
-        if origin != get_domain_from_id(pdu.sender):
-            # We continue to accept join events from any server; this is
-            # necessary for the federation join dance to work correctly.
-            # (When we join over federation, the "helper" server is
-            # responsible for sending out the join event, rather than the
-            # origin. See bug #1893. This is also true for some third party
-            # invites).
-            if not (
-                pdu.type == "m.room.member"
-                and pdu.content
-                and pdu.content.get("membership", None)
-                in (Membership.JOIN, Membership.INVITE)
-            ):
-                logger.info(
-                    "Discarding PDU %s from invalid origin %s", pdu.event_id, origin
-                )
-                return
-            else:
-                logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
 
         # We've already checked that we know the room version by this point
         room_version = await self.store.get_room_version(pdu.room_id)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index cc0d765e5f..af85fe0a1e 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import datetime
 import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
 
 import attr
 from prometheus_client import Counter
@@ -77,6 +77,7 @@ class PerDestinationQueue:
         self._transaction_manager = transaction_manager
         self._instance_name = hs.get_instance_name()
         self._federation_shard_config = hs.config.worker.federation_shard_config
+        self._state = hs.get_state_handler()
 
         self._should_send_on_this_instance = True
         if not self._federation_shard_config.should_handle(
@@ -415,22 +416,95 @@ class PerDestinationQueue:
                     "This should not happen." % event_ids
                 )
 
-            if logger.isEnabledFor(logging.INFO):
-                rooms = [p.room_id for p in catchup_pdus]
-                logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+            # We send transactions with events from one room only, as its likely
+            # that the remote will have to do additional processing, which may
+            # take some time. It's better to give it small amounts of work
+            # rather than risk the request timing out and repeatedly being
+            # retried, and not making any progress.
+            #
+            # Note: `catchup_pdus` will have exactly one PDU per room.
+            for pdu in catchup_pdus:
+                # The PDU from the DB will be the last PDU in the room from
+                # *this server* that wasn't sent to the remote. However, other
+                # servers may have sent lots of events since then, and we want
+                # to try and tell the remote only about the *latest* events in
+                # the room. This is so that it doesn't get inundated by events
+                # from various parts of the DAG, which all need to be processed.
+                #
+                # Note: this does mean that in large rooms a server coming back
+                # online will get sent the same events from all the different
+                # servers, but the remote will correctly deduplicate them and
+                # handle it only once.
+
+                # Step 1, fetch the current extremities
+                extrems = await self._store.get_prev_events_for_room(pdu.room_id)
+
+                if pdu.event_id in extrems:
+                    # If the event is in the extremities, then great! We can just
+                    # use that without having to do further checks.
+                    room_catchup_pdus = [pdu]
+                else:
+                    # If not, fetch the extremities and figure out which we can
+                    # send.
+                    extrem_events = await self._store.get_events_as_list(extrems)
+
+                    new_pdus = []
+                    for p in extrem_events:
+                        # We pulled this from the DB, so it'll be non-null
+                        assert p.internal_metadata.stream_ordering
+
+                        # Filter out events that happened before the remote went
+                        # offline
+                        if (
+                            p.internal_metadata.stream_ordering
+                            < self._last_successful_stream_ordering
+                        ):
+                            continue
 
-            await self._transaction_manager.send_new_transaction(
-                self._destination, catchup_pdus, []
-            )
+                        # Filter out events where the server is not in the room,
+                        # e.g. it may have left/been kicked. *Ideally* we'd pull
+                        # out the kick and send that, but it's a rare edge case
+                        # so we don't bother for now (the server that sent the
+                        # kick should send it out if its online).
+                        hosts = await self._state.get_hosts_in_room_at_events(
+                            p.room_id, [p.event_id]
+                        )
+                        if self._destination not in hosts:
+                            continue
 
-            sent_transactions_counter.inc()
-            final_pdu = catchup_pdus[-1]
-            self._last_successful_stream_ordering = cast(
-                int, final_pdu.internal_metadata.stream_ordering
-            )
-            await self._store.set_destination_last_successful_stream_ordering(
-                self._destination, self._last_successful_stream_ordering
-            )
+                        new_pdus.append(p)
+
+                    # If we've filtered out all the extremities, fall back to
+                    # sending the original event. This should ensure that the
+                    # server gets at least some of missed events (especially if
+                    # the other sending servers are up).
+                    if new_pdus:
+                        room_catchup_pdus = new_pdus
+
+                logger.info(
+                    "Catching up rooms to %s: %r", self._destination, pdu.room_id
+                )
+
+                await self._transaction_manager.send_new_transaction(
+                    self._destination, room_catchup_pdus, []
+                )
+
+                sent_transactions_counter.inc()
+
+                # We pulled this from the DB, so it'll be non-null
+                assert pdu.internal_metadata.stream_ordering
+
+                # Note that we mark the last successful stream ordering as that
+                # from the *original* PDU, rather than the PDU(s) we actually
+                # send. This is because we use it to mark our position in the
+                # queue of missed PDUs to process.
+                self._last_successful_stream_ordering = (
+                    pdu.internal_metadata.stream_ordering
+                )
+
+                await self._store.set_destination_last_successful_stream_ordering(
+                    self._destination, self._last_successful_stream_ordering
+                )
 
     def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
         if not self._pending_rrs:
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 6f96cd7940..95eac6a5a3 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -2,6 +2,7 @@ from typing import List, Tuple
 
 from mock import Mock
 
+from synapse.api.constants import EventTypes
 from synapse.events import EventBase
 from synapse.federation.sender import PerDestinationQueue, TransactionManager
 from synapse.federation.units import Edu
@@ -421,3 +422,51 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
         self.assertNotIn("zzzerver", woken)
         # - all destinations are woken exactly once; they appear once in woken.
         self.assertCountEqual(woken, server_names[:-1])
+
+    @override_config({"send_federation": True})
+    def test_not_latest_event(self):
+        """Test that we send the latest event in the room even if its not ours."""
+
+        per_dest_queue, sent_pdus = self.make_fake_destination_queue()
+
+        # Make a room with a local user, and two servers. One will go offline
+        # and one will send some events.
+        self.register_user("u1", "you the one")
+        u1_token = self.login("u1", "you the one")
+        room_1 = self.helper.create_room_as("u1", tok=u1_token)
+
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
+        )
+        event_1 = self.get_success(
+            event_injection.inject_member_event(self.hs, room_1, "@user:host3", "join")
+        )
+
+        # First we send something from the local server, so that we notice the
+        # remote is down and go into catchup mode.
+        self.helper.send(room_1, "you hear me!!", tok=u1_token)
+
+        # Now simulate us receiving an event from the still online remote.
+        event_2 = self.get_success(
+            event_injection.inject_event(
+                self.hs,
+                type=EventTypes.Message,
+                sender="@user:host3",
+                room_id=room_1,
+                content={"msgtype": "m.text", "body": "Hello"},
+            )
+        )
+
+        self.get_success(
+            self.hs.get_datastore().set_destination_last_successful_stream_ordering(
+                "host2", event_1.internal_metadata.stream_ordering
+            )
+        )
+
+        self.get_success(per_dest_queue._catch_up_transmission_loop())
+
+        # We expect only the last message from the remote, event_2, to have been
+        # sent, rather than the last *local* event that was sent.
+        self.assertEqual(len(sent_pdus), 1)
+        self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
+        self.assertFalse(per_dest_queue._catching_up)