diff --git a/changelog.d/9640.misc b/changelog.d/9640.misc
new file mode 100644
index 0000000000..3d410ed4cd
--- /dev/null
+++ b/changelog.d/9640.misc
@@ -0,0 +1 @@
+Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9839d3d016..d84e362070 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -35,7 +35,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
-from synapse.api.constants import EduTypes, EventTypes, Membership
+from synapse.api.constants import EduTypes, EventTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -63,7 +63,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
-from synapse.types import JsonDict, get_domain_from_id
+from synapse.types import JsonDict
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -727,27 +727,6 @@ class FederationServer(FederationBase):
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
- # check that it's actually being sent from a valid destination to
- # workaround bug #1753 in 0.18.5 and 0.18.6
- if origin != get_domain_from_id(pdu.sender):
- # We continue to accept join events from any server; this is
- # necessary for the federation join dance to work correctly.
- # (When we join over federation, the "helper" server is
- # responsible for sending out the join event, rather than the
- # origin. See bug #1893. This is also true for some third party
- # invites).
- if not (
- pdu.type == "m.room.member"
- and pdu.content
- and pdu.content.get("membership", None)
- in (Membership.JOIN, Membership.INVITE)
- ):
- logger.info(
- "Discarding PDU %s from invalid origin %s", pdu.event_id, origin
- )
- return
- else:
- logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
# We've already checked that we know the room version by this point
room_version = await self.store.get_room_version(pdu.room_id)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index cc0d765e5f..af85fe0a1e 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
import attr
from prometheus_client import Counter
@@ -77,6 +77,7 @@ class PerDestinationQueue:
self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
+ self._state = hs.get_state_handler()
self._should_send_on_this_instance = True
if not self._federation_shard_config.should_handle(
@@ -415,22 +416,95 @@ class PerDestinationQueue:
"This should not happen." % event_ids
)
- if logger.isEnabledFor(logging.INFO):
- rooms = [p.room_id for p in catchup_pdus]
- logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+ # We send transactions with events from one room only, as its likely
+ # that the remote will have to do additional processing, which may
+ # take some time. It's better to give it small amounts of work
+ # rather than risk the request timing out and repeatedly being
+ # retried, and not making any progress.
+ #
+ # Note: `catchup_pdus` will have exactly one PDU per room.
+ for pdu in catchup_pdus:
+ # The PDU from the DB will be the last PDU in the room from
+ # *this server* that wasn't sent to the remote. However, other
+ # servers may have sent lots of events since then, and we want
+ # to try and tell the remote only about the *latest* events in
+ # the room. This is so that it doesn't get inundated by events
+ # from various parts of the DAG, which all need to be processed.
+ #
+ # Note: this does mean that in large rooms a server coming back
+ # online will get sent the same events from all the different
+ # servers, but the remote will correctly deduplicate them and
+ # handle it only once.
+
+ # Step 1, fetch the current extremities
+ extrems = await self._store.get_prev_events_for_room(pdu.room_id)
+
+ if pdu.event_id in extrems:
+ # If the event is in the extremities, then great! We can just
+ # use that without having to do further checks.
+ room_catchup_pdus = [pdu]
+ else:
+ # If not, fetch the extremities and figure out which we can
+ # send.
+ extrem_events = await self._store.get_events_as_list(extrems)
+
+ new_pdus = []
+ for p in extrem_events:
+ # We pulled this from the DB, so it'll be non-null
+ assert p.internal_metadata.stream_ordering
+
+ # Filter out events that happened before the remote went
+ # offline
+ if (
+ p.internal_metadata.stream_ordering
+ < self._last_successful_stream_ordering
+ ):
+ continue
- await self._transaction_manager.send_new_transaction(
- self._destination, catchup_pdus, []
- )
+ # Filter out events where the server is not in the room,
+ # e.g. it may have left/been kicked. *Ideally* we'd pull
+ # out the kick and send that, but it's a rare edge case
+ # so we don't bother for now (the server that sent the
+ # kick should send it out if its online).
+ hosts = await self._state.get_hosts_in_room_at_events(
+ p.room_id, [p.event_id]
+ )
+ if self._destination not in hosts:
+ continue
- sent_transactions_counter.inc()
- final_pdu = catchup_pdus[-1]
- self._last_successful_stream_ordering = cast(
- int, final_pdu.internal_metadata.stream_ordering
- )
- await self._store.set_destination_last_successful_stream_ordering(
- self._destination, self._last_successful_stream_ordering
- )
+ new_pdus.append(p)
+
+ # If we've filtered out all the extremities, fall back to
+ # sending the original event. This should ensure that the
+ # server gets at least some of missed events (especially if
+ # the other sending servers are up).
+ if new_pdus:
+ room_catchup_pdus = new_pdus
+
+ logger.info(
+ "Catching up rooms to %s: %r", self._destination, pdu.room_id
+ )
+
+ await self._transaction_manager.send_new_transaction(
+ self._destination, room_catchup_pdus, []
+ )
+
+ sent_transactions_counter.inc()
+
+ # We pulled this from the DB, so it'll be non-null
+ assert pdu.internal_metadata.stream_ordering
+
+ # Note that we mark the last successful stream ordering as that
+ # from the *original* PDU, rather than the PDU(s) we actually
+ # send. This is because we use it to mark our position in the
+ # queue of missed PDUs to process.
+ self._last_successful_stream_ordering = (
+ pdu.internal_metadata.stream_ordering
+ )
+
+ await self._store.set_destination_last_successful_stream_ordering(
+ self._destination, self._last_successful_stream_ordering
+ )
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 6f96cd7940..95eac6a5a3 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -2,6 +2,7 @@ from typing import List, Tuple
from mock import Mock
+from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.federation.sender import PerDestinationQueue, TransactionManager
from synapse.federation.units import Edu
@@ -421,3 +422,51 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
self.assertNotIn("zzzerver", woken)
# - all destinations are woken exactly once; they appear once in woken.
self.assertCountEqual(woken, server_names[:-1])
+
+ @override_config({"send_federation": True})
+ def test_not_latest_event(self):
+ """Test that we send the latest event in the room even if its not ours."""
+
+ per_dest_queue, sent_pdus = self.make_fake_destination_queue()
+
+ # Make a room with a local user, and two servers. One will go offline
+ # and one will send some events.
+ self.register_user("u1", "you the one")
+ u1_token = self.login("u1", "you the one")
+ room_1 = self.helper.create_room_as("u1", tok=u1_token)
+
+ self.get_success(
+ event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
+ )
+ event_1 = self.get_success(
+ event_injection.inject_member_event(self.hs, room_1, "@user:host3", "join")
+ )
+
+ # First we send something from the local server, so that we notice the
+ # remote is down and go into catchup mode.
+ self.helper.send(room_1, "you hear me!!", tok=u1_token)
+
+ # Now simulate us receiving an event from the still online remote.
+ event_2 = self.get_success(
+ event_injection.inject_event(
+ self.hs,
+ type=EventTypes.Message,
+ sender="@user:host3",
+ room_id=room_1,
+ content={"msgtype": "m.text", "body": "Hello"},
+ )
+ )
+
+ self.get_success(
+ self.hs.get_datastore().set_destination_last_successful_stream_ordering(
+ "host2", event_1.internal_metadata.stream_ordering
+ )
+ )
+
+ self.get_success(per_dest_queue._catch_up_transmission_loop())
+
+ # We expect only the last message from the remote, event_2, to have been
+ # sent, rather than the last *local* event that was sent.
+ self.assertEqual(len(sent_pdus), 1)
+ self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
+ self.assertFalse(per_dest_queue._catching_up)
|