diff options
author | Erik Johnston <erik@matrix.org> | 2020-07-10 18:26:36 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-10 18:26:36 +0100 |
commit | f299441cc67f31dcd47b8fdfda4a218bee9df9ba (patch) | |
tree | bba78ca419a547249491c81f3c9968cf526c13b1 /synapse/federation/sender/per_destination_queue.py | |
parent | Fix resync remote devices on receive PDU in worker mode. (#7815) (diff) | |
download | synapse-f299441cc67f31dcd47b8fdfda4a218bee9df9ba.tar.xz |
Add ability to shard the federation sender (#7798)
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 12966e239b..6402136e8a 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -74,6 +74,20 @@ class PerDestinationQueue(object): self._clock = hs.get_clock() self._store = hs.get_datastore() self._transaction_manager = transaction_manager + self._instance_name = hs.get_instance_name() + self._federation_shard_config = hs.config.federation.federation_shard_config + + self._should_send_on_this_instance = True + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + # We don't raise an exception here to avoid taking out any other + # processing. We have a guard in `attempt_new_transaction` that + # ensure we don't start sending stuff. + logger.error( + "Create a per destination queue for %s on wrong worker", destination, + ) + self._should_send_on_this_instance = False self._destination = destination self.transmission_loop_running = False @@ -180,6 +194,14 @@ class PerDestinationQueue(object): logger.debug("TX [%s] Transaction already in progress", self._destination) return + if not self._should_send_on_this_instance: + # We don't raise an exception here to avoid taking out any other + # processing. + logger.error( + "Trying to start a transaction to %s on wrong worker", self._destination + ) + return + logger.debug("TX [%s] Starting transaction loop", self._destination) run_as_background_process( |