diff --git a/changelog.d/18484.bugfix b/changelog.d/18484.bugfix
new file mode 100644
index 0000000000..c816410da4
--- /dev/null
+++ b/changelog.d/18484.bugfix
@@ -0,0 +1 @@
+Remove destinations from sending if not whitelisted.
diff --git a/synapse/config/federation.py b/synapse/config/federation.py
index cf29fa2562..31f46e420d 100644
--- a/synapse/config/federation.py
+++ b/synapse/config/federation.py
@@ -94,5 +94,21 @@ class FederationConfig(Config):
2**62,
)
+ def is_domain_allowed_according_to_federation_whitelist(self, domain: str) -> bool:
+ """
+ Returns whether a domain is allowed according to the federation whitelist. If a
+ federation whitelist is not set, all domains are allowed.
+
+ Args:
+ domain: The domain to test.
+
+ Returns:
+ True if the domain is allowed or if a whitelist is not set, False otherwise.
+ """
+ if self.federation_domain_whitelist is None:
+ return True
+
+ return domain in self.federation_domain_whitelist
+
_METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}}
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index b95b3c629d..2eef7b707d 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -342,6 +342,8 @@ class _DestinationWakeupQueue:
destination, _ = self.queue.popitem(last=False)
queue = self.sender._get_per_destination_queue(destination)
+ if queue is None:
+ continue
if not queue._new_data_to_send:
# The per destination queue has already been woken up.
@@ -436,12 +438,23 @@ class FederationSender(AbstractFederationSender):
self._wake_destinations_needing_catchup,
)
- def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
+ def _get_per_destination_queue(
+ self, destination: str
+ ) -> Optional[PerDestinationQueue]:
"""Get or create a PerDestinationQueue for the given destination
Args:
destination: server_name of remote server
+
+ Returns:
+ None if the destination is not allowed by the federation whitelist.
+ Otherwise a PerDestinationQueue for this destination.
"""
+ if not self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
+ destination
+ ):
+ return None
+
queue = self._per_destination_queues.get(destination)
if not queue:
queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
@@ -718,6 +731,16 @@ class FederationSender(AbstractFederationSender):
# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
+ # Filter out any destinations not present in the federation_domain_whitelist, if
+ # the whitelist exists. These destinations should not be sent to so let's not
+ # waste time or space keeping track of events destined for them.
+ destinations = [
+ d
+ for d in destinations
+ if self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
+ d
+ )
+ ]
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
@@ -732,7 +755,12 @@ class FederationSender(AbstractFederationSender):
)
for destination in destinations:
- self._get_per_destination_queue(destination).send_pdu(pdu)
+ queue = self._get_per_destination_queue(destination)
+ # We expect `queue` to not be None as we already filtered out
+ # non-whitelisted destinations above.
+ assert queue is not None
+
+ queue.send_pdu(pdu)
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
@@ -841,12 +869,16 @@ class FederationSender(AbstractFederationSender):
for domain in immediate_domains:
# Add to destination queue and wake the destination up
queue = self._get_per_destination_queue(domain)
+ if queue is None:
+ continue
queue.queue_read_receipt(receipt)
queue.attempt_new_transaction()
for domain in delay_domains:
# Add to destination queue...
queue = self._get_per_destination_queue(domain)
+ if queue is None:
+ continue
queue.queue_read_receipt(receipt)
# ... and schedule the destination to be woken up.
@@ -882,9 +914,10 @@ class FederationSender(AbstractFederationSender):
if self.is_mine_server_name(destination):
continue
- self._get_per_destination_queue(destination).send_presence(
- states, start_loop=False
- )
+ queue = self._get_per_destination_queue(destination)
+ if queue is None:
+ continue
+ queue.send_presence(states, start_loop=False)
self._destination_wakeup_queue.add_to_queue(destination)
@@ -934,6 +967,8 @@ class FederationSender(AbstractFederationSender):
return
queue = self._get_per_destination_queue(edu.destination)
+ if queue is None:
+ return
if key:
queue.send_keyed_edu(edu, key)
else:
@@ -958,9 +993,15 @@ class FederationSender(AbstractFederationSender):
for destination in destinations:
if immediate:
- self._get_per_destination_queue(destination).attempt_new_transaction()
+ queue = self._get_per_destination_queue(destination)
+ if queue is None:
+ continue
+ queue.attempt_new_transaction()
else:
- self._get_per_destination_queue(destination).mark_new_data()
+ queue = self._get_per_destination_queue(destination)
+ if queue is None:
+ continue
+ queue.mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)
def wake_destination(self, destination: str) -> None:
@@ -979,7 +1020,9 @@ class FederationSender(AbstractFederationSender):
):
return
- self._get_per_destination_queue(destination).attempt_new_transaction()
+ queue = self._get_per_destination_queue(destination)
+ if queue is not None:
+ queue.attempt_new_transaction()
@staticmethod
def get_current_token() -> int:
@@ -1024,6 +1067,9 @@ class FederationSender(AbstractFederationSender):
d
for d in destinations_to_wake
if self._federation_shard_config.should_handle(self._instance_name, d)
+ and self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
+ d
+ )
]
for destination in destinations_to_wake:
|