summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-27 14:11:17 +0100
committerErik Johnston <erik@matrix.org>2017-03-30 12:54:36 +0100
commit11880103b13146aa8e700827f36c81a26fb8e09e (patch)
treef8f9f53e9472ea3b0ad3bb9554e7dd822875246b
parentAdd a simple hook to wait for replication traffic (diff)
downloadsynapse-11880103b13146aa8e700827f36c81a26fb8e09e.tar.xz
Make federation send queue take the current position
-rw-r--r--synapse/federation/send_queue.py40
-rw-r--r--synapse/replication/resource.py2
2 files changed, 26 insertions, 16 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index bbb0195228..4bde66fbf8 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -220,10 +220,15 @@ class FederationRemoteSendQueue(object):
     def get_current_token(self):
         return self.pos - 1
 
-    def get_replication_rows(self, token, limit, federation_ack=None):
-        """
+    def federation_ack(self, token):
+        self._clear_queue_before_pos(token)
+
+    def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
+        """Get rows to be sent over federation between the two tokens
+
         Args:
-            token (int)
+            from_token (int)
+            to_token(int)
             limit (int)
             federation_ack (int): Optional. The position where the worker is
                 explicitly acknowledged it has handled. Allows us to drop
@@ -232,8 +237,8 @@ class FederationRemoteSendQueue(object):
         # TODO: Handle limit.
 
         # To handle restarts where we wrap around
-        if token > self.pos:
-            token = -1
+        if from_token > self.pos:
+            from_token = -1
 
         rows = []
 
@@ -244,10 +249,11 @@ class FederationRemoteSendQueue(object):
 
         # Fetch changed presence
         keys = self.presence_changed.keys()
-        i = keys.bisect_right(token)
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
         dest_user_ids = set(
             (pos, dest_user_id)
-            for pos in keys[i:]
+            for pos in keys[i:j]
             for dest_user_id in self.presence_changed[pos]
         )
 
@@ -259,8 +265,9 @@ class FederationRemoteSendQueue(object):
 
         # Fetch changes keyed edus
         keys = self.keyed_edu_changed.keys()
-        i = keys.bisect_right(token)
-        keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
+        keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
 
         for (pos, (destination, edu_key)) in keyed_edus:
             rows.append(
@@ -272,16 +279,18 @@ class FederationRemoteSendQueue(object):
 
         # Fetch changed edus
         keys = self.edus.keys()
-        i = keys.bisect_right(token)
-        edus = set((k, self.edus[k]) for k in keys[i:])
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
+        edus = set((k, self.edus[k]) for k in keys[i:j])
 
         for (pos, edu) in edus:
             rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
 
         # Fetch changed failures
         keys = self.failures.keys()
-        i = keys.bisect_right(token)
-        failures = set((k, self.failures[k]) for k in keys[i:])
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
+        failures = set((k, self.failures[k]) for k in keys[i:j])
 
         for (pos, (destination, failure)) in failures:
             rows.append((pos, FAILURE_TYPE, ujson.dumps({
@@ -291,8 +300,9 @@ class FederationRemoteSendQueue(object):
 
         # Fetch changed device messages
         keys = self.device_messages.keys()
-        i = keys.bisect_right(token)
-        device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
+        device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
 
         for (pos, destination) in device_messages:
             rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 2d3ec2eca2..abd3fe7665 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -489,7 +489,7 @@ class ReplicationResource(Resource):
 
         if federation is not None and federation != current_position:
             federation_rows = self.federation_sender.get_replication_rows(
-                federation, limit, federation_ack=federation_ack,
+                federation, current_position, limit, federation_ack=federation_ack,
             )
             upto_token = _position_from_rows(federation_rows, current_position)
             writer.write_header_and_rows("federation", federation_rows, (