summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-08-03 17:28:09 -0700
committerAndrew Morgan <andrew@amorgan.xyz>2020-08-03 17:28:09 -0700
commitbf6cf4b6512ce2553e34932de0de18ec994cfd39 (patch)
tree45c499f7a24908324029fc1ba0c5ce065b1c127f /synapse/federation/send_queue.py
parentMerge commit 'f1245dc3c' into anoa/dinsic_release_1_18_x (diff)
parentAdd ability to shard the federation sender (#7798) (diff)
downloadsynapse-bf6cf4b6512ce2553e34932de0de18ec994cfd39.tar.xz
Merge commit 'f299441cc' into anoa/dinsic_release_1_18_x
* commit 'f299441cc':
  Add ability to shard the federation sender (#7798)
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py14
1 files changed, 13 insertions, 1 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py

index 860b03f7b9..4fc9ff92e5 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -55,6 +55,11 @@ class FederationRemoteSendQueue(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id + # We may have multiple federation sender instances, so we need to track + # their positions separately. + self._sender_instances = hs.config.federation.federation_shard_config.instances + self._sender_positions = {} + # Pending presence map user_id -> UserPresenceState self.presence_map = {} # type: Dict[str, UserPresenceState] @@ -261,7 +266,14 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def federation_ack(self, token): + def federation_ack(self, instance_name, token): + if self._sender_instances: + # If we have configured multiple federation sender instances we need + # to track their positions separately, and only clear the queue up + # to the token all instances have acked. + self._sender_positions[instance_name] = token + token = min(self._sender_positions.values()) + self._clear_queue_before_pos(token) async def get_replication_rows(