summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/send_queue.py14
-rw-r--r--synapse/federation/sender/__init__.py48
-rw-r--r--synapse/federation/sender/per_destination_queue.py22
3 files changed, 81 insertions, 3 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 860b03f7b9..4fc9ff92e5 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -55,6 +55,11 @@ class FederationRemoteSendQueue(object):
         self.notifier = hs.get_notifier()
         self.is_mine_id = hs.is_mine_id
 
+        # We may have multiple federation sender instances, so we need to track
+        # their positions separately.
+        self._sender_instances = hs.config.federation.federation_shard_config.instances
+        self._sender_positions = {}
+
         # Pending presence map user_id -> UserPresenceState
         self.presence_map = {}  # type: Dict[str, UserPresenceState]
 
@@ -261,7 +266,14 @@ class FederationRemoteSendQueue(object):
     def get_current_token(self):
         return self.pos - 1
 
-    def federation_ack(self, token):
+    def federation_ack(self, instance_name, token):
+        if self._sender_instances:
+            # If we have configured multiple federation sender instances we need
+            # to track their positions separately, and only clear the queue up
+            # to the token all instances have acked.
+            self._sender_positions[instance_name] = token
+            token = min(self._sender_positions.values())
+
         self._clear_queue_before_pos(token)
 
     async def get_replication_rows(
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 464d7a41de..4b63a0755f 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -69,6 +69,9 @@ class FederationSender(object):
 
         self._transaction_manager = TransactionManager(hs)
 
+        self._instance_name = hs.get_instance_name()
+        self._federation_shard_config = hs.config.federation.federation_shard_config
+
         # map from destination to PerDestinationQueue
         self._per_destination_queues = {}  # type: Dict[str, PerDestinationQueue]
 
@@ -191,7 +194,13 @@ class FederationSender(object):
                         )
                         return
 
-                    destinations = set(destinations)
+                    destinations = {
+                        d
+                        for d in destinations
+                        if self._federation_shard_config.should_send_to(
+                            self._instance_name, d
+                        )
+                    }
 
                     if send_on_behalf_of is not None:
                         # If we are sending the event on behalf of another server
@@ -322,7 +331,12 @@ class FederationSender(object):
 
         # Work out which remote servers should be poked and poke them.
         domains = yield self.state.get_current_hosts_in_room(room_id)
-        domains = [d for d in domains if d != self.server_name]
+        domains = [
+            d
+            for d in domains
+            if d != self.server_name
+            and self._federation_shard_config.should_send_to(self._instance_name, d)
+        ]
         if not domains:
             return
 
@@ -427,6 +441,10 @@ class FederationSender(object):
         for destination in destinations:
             if destination == self.server_name:
                 continue
+            if not self._federation_shard_config.should_send_to(
+                self._instance_name, destination
+            ):
+                continue
             self._get_per_destination_queue(destination).send_presence(states)
 
     @measure_func("txnqueue._process_presence")
@@ -441,6 +459,12 @@ class FederationSender(object):
             for destination in destinations:
                 if destination == self.server_name:
                     continue
+
+                if not self._federation_shard_config.should_send_to(
+                    self._instance_name, destination
+                ):
+                    continue
+
                 self._get_per_destination_queue(destination).send_presence(states)
 
     def build_and_send_edu(
@@ -462,6 +486,11 @@ class FederationSender(object):
             logger.info("Not sending EDU to ourselves")
             return
 
+        if not self._federation_shard_config.should_send_to(
+            self._instance_name, destination
+        ):
+            return
+
         edu = Edu(
             origin=self.server_name,
             destination=destination,
@@ -478,6 +507,11 @@ class FederationSender(object):
             edu: edu to send
             key: clobbering key for this edu
         """
+        if not self._federation_shard_config.should_send_to(
+            self._instance_name, edu.destination
+        ):
+            return
+
         queue = self._get_per_destination_queue(edu.destination)
         if key:
             queue.send_keyed_edu(edu, key)
@@ -489,6 +523,11 @@ class FederationSender(object):
             logger.warning("Not sending device update to ourselves")
             return
 
+        if not self._federation_shard_config.should_send_to(
+            self._instance_name, destination
+        ):
+            return
+
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
     def wake_destination(self, destination: str):
@@ -502,6 +541,11 @@ class FederationSender(object):
             logger.warning("Not waking up ourselves")
             return
 
+        if not self._federation_shard_config.should_send_to(
+            self._instance_name, destination
+        ):
+            return
+
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
     @staticmethod
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 12966e239b..6402136e8a 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -74,6 +74,20 @@ class PerDestinationQueue(object):
         self._clock = hs.get_clock()
         self._store = hs.get_datastore()
         self._transaction_manager = transaction_manager
+        self._instance_name = hs.get_instance_name()
+        self._federation_shard_config = hs.config.federation.federation_shard_config
+
+        self._should_send_on_this_instance = True
+        if not self._federation_shard_config.should_send_to(
+            self._instance_name, destination
+        ):
+            # We don't raise an exception here to avoid taking out any other
+            # processing. We have a guard in `attempt_new_transaction` that
+            # ensure we don't start sending stuff.
+            logger.error(
+                "Create a per destination queue for %s on wrong worker", destination,
+            )
+            self._should_send_on_this_instance = False
 
         self._destination = destination
         self.transmission_loop_running = False
@@ -180,6 +194,14 @@ class PerDestinationQueue(object):
             logger.debug("TX [%s] Transaction already in progress", self._destination)
             return
 
+        if not self._should_send_on_this_instance:
+            # We don't raise an exception here to avoid taking out any other
+            # processing.
+            logger.error(
+                "Trying to start a transaction to %s on wrong worker", self._destination
+            )
+            return
+
         logger.debug("TX [%s] Starting transaction loop", self._destination)
 
         run_as_background_process(