diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 464d7a41de..4b63a0755f 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -69,6 +69,9 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs)
+ self._instance_name = hs.get_instance_name()
+ self._federation_shard_config = hs.config.federation.federation_shard_config
+
# map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
@@ -191,7 +194,13 @@ class FederationSender(object):
)
return
- destinations = set(destinations)
+ destinations = {
+ d
+ for d in destinations
+ if self._federation_shard_config.should_send_to(
+ self._instance_name, d
+ )
+ }
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
@@ -322,7 +331,12 @@ class FederationSender(object):
# Work out which remote servers should be poked and poke them.
domains = yield self.state.get_current_hosts_in_room(room_id)
- domains = [d for d in domains if d != self.server_name]
+ domains = [
+ d
+ for d in domains
+ if d != self.server_name
+ and self._federation_shard_config.should_send_to(self._instance_name, d)
+ ]
if not domains:
return
@@ -427,6 +441,10 @@ class FederationSender(object):
for destination in destinations:
if destination == self.server_name:
continue
+ if not self._federation_shard_config.should_send_to(
+ self._instance_name, destination
+ ):
+ continue
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
@@ -441,6 +459,12 @@ class FederationSender(object):
for destination in destinations:
if destination == self.server_name:
continue
+
+ if not self._federation_shard_config.should_send_to(
+ self._instance_name, destination
+ ):
+ continue
+
self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu(
@@ -462,6 +486,11 @@ class FederationSender(object):
logger.info("Not sending EDU to ourselves")
return
+ if not self._federation_shard_config.should_send_to(
+ self._instance_name, destination
+ ):
+ return
+
edu = Edu(
origin=self.server_name,
destination=destination,
@@ -478,6 +507,11 @@ class FederationSender(object):
edu: edu to send
key: clobbering key for this edu
"""
+ if not self._federation_shard_config.should_send_to(
+ self._instance_name, edu.destination
+ ):
+ return
+
queue = self._get_per_destination_queue(edu.destination)
if key:
queue.send_keyed_edu(edu, key)
@@ -489,6 +523,11 @@ class FederationSender(object):
logger.warning("Not sending device update to ourselves")
return
+ if not self._federation_shard_config.should_send_to(
+ self._instance_name, destination
+ ):
+ return
+
self._get_per_destination_queue(destination).attempt_new_transaction()
def wake_destination(self, destination: str):
@@ -502,6 +541,11 @@ class FederationSender(object):
logger.warning("Not waking up ourselves")
return
+ if not self._federation_shard_config.should_send_to(
+ self._instance_name, destination
+ ):
+ return
+
self._get_per_destination_queue(destination).attempt_new_transaction()
@staticmethod
|