summary refs log tree commit diff
path: root/synapse/federation/sender/per_destination_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-10 18:26:36 +0100
committerGitHub <noreply@github.com>2020-07-10 18:26:36 +0100
commitf299441cc67f31dcd47b8fdfda4a218bee9df9ba (patch)
treebba78ca419a547249491c81f3c9968cf526c13b1 /synapse/federation/sender/per_destination_queue.py
parentFix resync remote devices on receive PDU in worker mode. (#7815) (diff)
downloadsynapse-f299441cc67f31dcd47b8fdfda4a218bee9df9ba.tar.xz
Add ability to shard the federation sender (#7798)
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r--synapse/federation/sender/per_destination_queue.py22
1 files changed, 22 insertions, 0 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py

index 12966e239b..6402136e8a 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -74,6 +74,20 @@ class PerDestinationQueue(object): self._clock = hs.get_clock() self._store = hs.get_datastore() self._transaction_manager = transaction_manager + self._instance_name = hs.get_instance_name() + self._federation_shard_config = hs.config.federation.federation_shard_config + + self._should_send_on_this_instance = True + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + # We don't raise an exception here to avoid taking out any other + # processing. We have a guard in `attempt_new_transaction` that + # ensure we don't start sending stuff. + logger.error( + "Create a per destination queue for %s on wrong worker", destination, + ) + self._should_send_on_this_instance = False self._destination = destination self.transmission_loop_running = False @@ -180,6 +194,14 @@ class PerDestinationQueue(object): logger.debug("TX [%s] Transaction already in progress", self._destination) return + if not self._should_send_on_this_instance: + # We don't raise an exception here to avoid taking out any other + # processing. + logger.error( + "Trying to start a transaction to %s on wrong worker", self._destination + ) + return + logger.debug("TX [%s] Starting transaction loop", self._destination) run_as_background_process(