diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 333ca9a97f..41d8b937af 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -37,6 +37,7 @@ from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
+from synapse.visibility import filter_events_for_server
if TYPE_CHECKING:
import synapse.server
@@ -77,6 +78,7 @@ class PerDestinationQueue:
):
self._server_name = hs.hostname
self._clock = hs.get_clock()
+ self._storage_controllers = hs.get_storage_controllers()
self._store = hs.get_datastores().main
self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
@@ -442,6 +444,12 @@ class PerDestinationQueue:
"This should not happen." % event_ids
)
+ logger.info(
+ "Catching up destination %s with %d PDUs",
+ self._destination,
+ len(catchup_pdus),
+ )
+
# We send transactions with events from one room only, as its likely
# that the remote will have to do additional processing, which may
# take some time. It's better to give it small amounts of work
@@ -487,19 +495,20 @@ class PerDestinationQueue:
):
continue
- # Filter out events where the server is not in the room,
- # e.g. it may have left/been kicked. *Ideally* we'd pull
- # out the kick and send that, but it's a rare edge case
- # so we don't bother for now (the server that sent the
- # kick should send it out if its online).
- hosts = await self._state.get_hosts_in_room_at_events(
- p.room_id, [p.event_id]
- )
- if self._destination not in hosts:
- continue
-
new_pdus.append(p)
+ # Filter out events where the server is not in the room,
+ # e.g. it may have left/been kicked. *Ideally* we'd pull
+ # out the kick and send that, but it's a rare edge case
+ # so we don't bother for now (the server that sent the
+ # kick should send it out if its online).
+ new_pdus = await filter_events_for_server(
+ self._storage_controllers,
+ self._destination,
+ new_pdus,
+ redact=False,
+ )
+
# If we've filtered out all the extremities, fall back to
# sending the original event. This should ensure that the
# server gets at least some of missed events (especially if
|