diff options
author | Erik Johnston <erik@matrix.org> | 2020-07-10 18:26:36 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-10 18:26:36 +0100 |
commit | f299441cc67f31dcd47b8fdfda4a218bee9df9ba (patch) | |
tree | bba78ca419a547249491c81f3c9968cf526c13b1 /synapse/federation/send_queue.py | |
parent | Fix resync remote devices on receive PDU in worker mode. (#7815) (diff) | |
download | synapse-f299441cc67f31dcd47b8fdfda4a218bee9df9ba.tar.xz |
Add ability to shard the federation sender (#7798)
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r-- | synapse/federation/send_queue.py | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 860b03f7b9..4fc9ff92e5 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -55,6 +55,11 @@ class FederationRemoteSendQueue(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id + # We may have multiple federation sender instances, so we need to track + # their positions separately. + self._sender_instances = hs.config.federation.federation_shard_config.instances + self._sender_positions = {} + # Pending presence map user_id -> UserPresenceState self.presence_map = {} # type: Dict[str, UserPresenceState] @@ -261,7 +266,14 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def federation_ack(self, token): + def federation_ack(self, instance_name, token): + if self._sender_instances: + # If we have configured multiple federation sender instances we need + # to track their positions separately, and only clear the queue up + # to the token all instances have acked. + self._sender_positions[instance_name] = token + token = min(self._sender_positions.values()) + self._clear_queue_before_pos(token) async def get_replication_rows( |