1 files changed, 11 insertions, 2 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index ed2b03fad4..5c9f7a86f0 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -213,7 +213,15 @@ class FederationRemoteSendQueue(object):
def get_current_token(self):
return self.pos - 1
- def get_replication_rows(self, token, limit):
+ def get_replication_rows(self, token, limit, federation_ack=None):
+ """
+ Args:
+ token (int)
+ limit (int)
+ federation_ack (int): Optional. The position where the worker is
+ explicitly acknowledged it has handled. Allows us to drop
+ data from before that point
+ """
# TODO: Handle limit.
# To handle restarts where we wrap around
@@ -224,7 +232,8 @@ class FederationRemoteSendQueue(object):
# There should be only one reader, so lets delete everything its
# acknowledged its seen.
- self._clear_queue_before_pos(token)
+ if federation_ack:
+ self._clear_queue_before_pos(federation_ack)
# Fetch changed presence
keys = self.presence_changed.keys()
|