From 11880103b13146aa8e700827f36c81a26fb8e09e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:11:17 +0100 Subject: Make federation send queue take the current position --- synapse/federation/send_queue.py | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) (limited to 'synapse/federation/send_queue.py') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index bbb0195228..4bde66fbf8 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -220,10 +220,15 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def get_replication_rows(self, token, limit, federation_ack=None): - """ + def federation_ack(self, token): + self._clear_queue_before_pos(token) + + def get_replication_rows(self, from_token, to_token, limit, federation_ack=None): + """Get rows to be sent over federation between the two tokens + Args: - token (int) + from_token (int) + to_token(int) limit (int) federation_ack (int): Optional. The position where the worker is explicitly acknowledged it has handled. Allows us to drop @@ -232,8 +237,8 @@ class FederationRemoteSendQueue(object): # TODO: Handle limit. # To handle restarts where we wrap around - if token > self.pos: - token = -1 + if from_token > self.pos: + from_token = -1 rows = [] @@ -244,10 +249,11 @@ class FederationRemoteSendQueue(object): # Fetch changed presence keys = self.presence_changed.keys() - i = keys.bisect_right(token) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 dest_user_ids = set( (pos, dest_user_id) - for pos in keys[i:] + for pos in keys[i:j] for dest_user_id in self.presence_changed[pos] ) @@ -259,8 +265,9 @@ class FederationRemoteSendQueue(object): # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() - i = keys.bisect_right(token) - keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: rows.append( @@ -272,16 +279,18 @@ class FederationRemoteSendQueue(object): # Fetch changed edus keys = self.edus.keys() - i = keys.bisect_right(token) - edus = set((k, self.edus[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) # Fetch changed failures keys = self.failures.keys() - i = keys.bisect_right(token) - failures = set((k, self.failures[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: rows.append((pos, FAILURE_TYPE, ujson.dumps({ @@ -291,8 +300,9 @@ class FederationRemoteSendQueue(object): # Fetch changed device messages keys = self.device_messages.keys() - i = keys.bisect_right(token) - device_messages = set((k, self.device_messages[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ -- cgit 1.4.1