diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-23 10:40:44 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-23 10:40:44 +0000 |
commit | 4c79a63fd76e982e5e60b22c7efd15b6e3cf9915 (patch) | |
tree | 47bb0578f3bdf45e675184759ed93a7650f1eacb /synapse/federation | |
parent | Fix tests and flake8 (diff) | |
download | synapse-4c79a63fd76e982e5e60b22c7efd15b6e3cf9915.tar.xz |
Explicit federation ack
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/send_queue.py | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index ed2b03fad4..5c9f7a86f0 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -213,7 +213,15 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def get_replication_rows(self, token, limit): + def get_replication_rows(self, token, limit, federation_ack=None): + """ + Args: + token (int) + limit (int) + federation_ack (int): Optional. The position where the worker is + explicitly acknowledged it has handled. Allows us to drop + data from before that point + """ # TODO: Handle limit. # To handle restarts where we wrap around @@ -224,7 +232,8 @@ class FederationRemoteSendQueue(object): # There should be only one reader, so lets delete everything its # acknowledged its seen. - self._clear_queue_before_pos(token) + if federation_ack: + self._clear_queue_before_pos(federation_ack) # Fetch changed presence keys = self.presence_changed.keys() |