1 files changed, 13 insertions, 1 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 860b03f7b9..4fc9ff92e5 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -55,6 +55,11 @@ class FederationRemoteSendQueue(object):
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
+ # We may have multiple federation sender instances, so we need to track
+ # their positions separately.
+ self._sender_instances = hs.config.federation.federation_shard_config.instances
+ self._sender_positions = {}
+
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]
@@ -261,7 +266,14 @@ class FederationRemoteSendQueue(object):
def get_current_token(self):
return self.pos - 1
- def federation_ack(self, token):
+ def federation_ack(self, instance_name, token):
+ if self._sender_instances:
+ # If we have configured multiple federation sender instances we need
+ # to track their positions separately, and only clear the queue up
+ # to the token all instances have acked.
+ self._sender_positions[instance_name] = token
+ token = min(self._sender_positions.values())
+
self._clear_queue_before_pos(token)
async def get_replication_rows(
|