summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-04-04 10:07:57 +0100
committerGitHub <noreply@github.com>2017-04-04 10:07:57 +0100
commit27cc627e425a4a1d6baf3887b435005a571c2271 (patch)
treea5fa797ac7e65f85cae52cccd88451e5672257c6 /synapse/federation/send_queue.py
parentMerge pull request #2095 from matrix-org/rav/cull_log_preserves (diff)
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/repl_tcp_s... (diff)
downloadsynapse-27cc627e425a4a1d6baf3887b435005a571c2271.tar.xz
Merge pull request #2082 from matrix-org/erikj/repl_tcp_server
Replace HTTP replication with TCP replication (Server side part)
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py40
1 files changed, 25 insertions, 15 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index bbb0195228..4bde66fbf8 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -220,10 +220,15 @@ class FederationRemoteSendQueue(object):
     def get_current_token(self):
         return self.pos - 1
 
-    def get_replication_rows(self, token, limit, federation_ack=None):
-        """
+    def federation_ack(self, token):
+        self._clear_queue_before_pos(token)
+
+    def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
+        """Get rows to be sent over federation between the two tokens
+
         Args:
-            token (int)
+            from_token (int)
+            to_token(int)
             limit (int)
             federation_ack (int): Optional. The position where the worker is
                 explicitly acknowledged it has handled. Allows us to drop
@@ -232,8 +237,8 @@ class FederationRemoteSendQueue(object):
         # TODO: Handle limit.
 
         # To handle restarts where we wrap around
-        if token > self.pos:
-            token = -1
+        if from_token > self.pos:
+            from_token = -1
 
         rows = []
 
@@ -244,10 +249,11 @@ class FederationRemoteSendQueue(object):
 
         # Fetch changed presence
         keys = self.presence_changed.keys()
-        i = keys.bisect_right(token)
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
         dest_user_ids = set(
             (pos, dest_user_id)
-            for pos in keys[i:]
+            for pos in keys[i:j]
             for dest_user_id in self.presence_changed[pos]
         )
 
@@ -259,8 +265,9 @@ class FederationRemoteSendQueue(object):
 
         # Fetch changes keyed edus
         keys = self.keyed_edu_changed.keys()
-        i = keys.bisect_right(token)
-        keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
+        keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
 
         for (pos, (destination, edu_key)) in keyed_edus:
             rows.append(
@@ -272,16 +279,18 @@ class FederationRemoteSendQueue(object):
 
         # Fetch changed edus
         keys = self.edus.keys()
-        i = keys.bisect_right(token)
-        edus = set((k, self.edus[k]) for k in keys[i:])
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
+        edus = set((k, self.edus[k]) for k in keys[i:j])
 
         for (pos, edu) in edus:
             rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
 
         # Fetch changed failures
         keys = self.failures.keys()
-        i = keys.bisect_right(token)
-        failures = set((k, self.failures[k]) for k in keys[i:])
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
+        failures = set((k, self.failures[k]) for k in keys[i:j])
 
         for (pos, (destination, failure)) in failures:
             rows.append((pos, FAILURE_TYPE, ujson.dumps({
@@ -291,8 +300,9 @@ class FederationRemoteSendQueue(object):
 
         # Fetch changed device messages
         keys = self.device_messages.keys()
-        i = keys.bisect_right(token)
-        device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+        i = keys.bisect_right(from_token)
+        j = keys.bisect_right(to_token) + 1
+        device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
 
         for (pos, destination) in device_messages:
             rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({