From 4c79a63fd76e982e5e60b22c7efd15b6e3cf9915 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Nov 2016 10:40:44 +0000 Subject: Explicit federation ack --- synapse/federation/send_queue.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index ed2b03fad4..5c9f7a86f0 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -213,7 +213,15 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def get_replication_rows(self, token, limit): + def get_replication_rows(self, token, limit, federation_ack=None): + """ + Args: + token (int) + limit (int) + federation_ack (int): Optional. The position where the worker is + explicitly acknowledged it has handled. Allows us to drop + data from before that point + """ # TODO: Handle limit. # To handle restarts where we wrap around @@ -224,7 +232,8 @@ class FederationRemoteSendQueue(object): # There should be only one reader, so lets delete everything its # acknowledged its seen. - self._clear_queue_before_pos(token) + if federation_ack: + self._clear_queue_before_pos(federation_ack) # Fetch changed presence keys = self.presence_changed.keys() -- cgit 1.5.1