summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-23 10:40:44 +0000
committerErik Johnston <erik@matrix.org>2016-11-23 10:40:44 +0000
commit4c79a63fd76e982e5e60b22c7efd15b6e3cf9915 (patch)
tree47bb0578f3bdf45e675184759ed93a7650f1eacb /synapse/replication
parentFix tests and flake8 (diff)
downloadsynapse-4c79a63fd76e982e5e60b22c7efd15b6e3cf9915.tar.xz
Explicit federation ack
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/resource.py15
1 files changed, 10 insertions, 5 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index b05ca62710..cb9697e378 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -171,8 +171,13 @@ class ReplicationResource(Resource):
         }
         request_streams["streams"] = parse_string(request, "streams")
 
+        federation_ack = parse_integer(request, "federation_ack", None)
+
         def replicate():
-            return self.replicate(request_streams, limit)
+            return self.replicate(
+                request_streams, limit,
+                federation_ack=federation_ack
+            )
 
         writer = yield self.notifier.wait_for_replication(replicate, timeout)
         result = writer.finish()
@@ -190,7 +195,7 @@ class ReplicationResource(Resource):
         finish_request(request)
 
     @defer.inlineCallbacks
-    def replicate(self, request_streams, limit):
+    def replicate(self, request_streams, limit, federation_ack=None):
         writer = _Writer()
         current_token = yield self.current_replication_token()
         logger.debug("Replicating up to %r", current_token)
@@ -209,7 +214,7 @@ class ReplicationResource(Resource):
         yield self.caches(writer, current_token, limit, request_streams)
         yield self.to_device(writer, current_token, limit, request_streams)
         yield self.public_rooms(writer, current_token, limit, request_streams)
-        self.federation(writer, current_token, limit, request_streams)
+        self.federation(writer, current_token, limit, request_streams, federation_ack)
         self.streams(writer, current_token, request_streams)
 
         logger.debug("Replicated %d rows", writer.total)
@@ -473,7 +478,7 @@ class ReplicationResource(Resource):
                 "position", "room_id", "visibility"
             ), position=upto_token)
 
-    def federation(self, writer, current_token, limit, request_streams):
+    def federation(self, writer, current_token, limit, request_streams, federation_ack):
         if self.config.send_federation:
             return
 
@@ -483,7 +488,7 @@ class ReplicationResource(Resource):
 
         if federation is not None and federation != current_position:
             federation_rows = self.federation_sender.get_replication_rows(
-                federation, limit,
+                federation, limit, federation_ack=federation_ack,
             )
             upto_token = _position_from_rows(federation_rows, current_position)
             writer.write_header_and_rows("federation", federation_rows, (