summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-04 10:29:29 +0100
committerErik Johnston <erik@matrix.org>2017-04-04 11:02:44 +0100
commit6ce6bbedcb8721fc99200ecace94eeedb51da6f2 (patch)
tree9a3592597976757db9da486bb8dfb13961ea9ac0 /synapse/app/federation_sender.py
parentUpdate all the workers and master to use TCP replication (diff)
downloadsynapse-6ce6bbedcb8721fc99200ecace94eeedb51da6f2.tar.xz
Move where we ack federation
Diffstat (limited to '')
-rw-r--r--synapse/app/federation_sender.py23
1 files changed, 16 insertions, 7 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 8994891aeb..705fe5e3df 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -153,15 +153,13 @@ class FederationSenderServer(HomeServer):
 class FederationSenderReplicationHandler(ReplicationClientHandler):
     def __init__(self, hs):
         super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
-        self.send_handler = FederationSenderHandler(hs)
+        self.send_handler = FederationSenderHandler(hs, self)
 
     def on_rdata(self, stream_name, token, rows):
         super(FederationSenderReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         self.send_handler.process_replication_rows(stream_name, token, rows)
-        if stream_name == "federation":
-            self.send_federation_ack(token)
 
     def get_streams_to_replicate(self):
         args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
@@ -248,13 +246,16 @@ class FederationSenderHandler(object):
     """Processes the replication stream and forwards the appropriate entries
     to the federation sender.
     """
-    def __init__(self, hs):
+    def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
+        self.replication_client = replication_client
 
         self.federation_position = self.store.federation_out_pos_startup
         self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
 
+        self._last_ack = self.federation_position
+
         self._room_serials = {}
         self._room_typing = {}
 
@@ -345,10 +346,18 @@ class FederationSenderHandler(object):
     @defer.inlineCallbacks
     def update_token(self, token):
         self.federation_position = token
+
+        # We linearize here to ensure we don't have races updating the token
         with (yield self._fed_position_linearizer.queue(None)):
-            yield self.store.update_federation_out_pos(
-                "federation", self.federation_position
-            )
+            if self._last_ack < self.federation_position:
+                yield self.store.update_federation_out_pos(
+                    "federation", self.federation_position
+                )
+
+                # We ACK this token over replication so that the master can drop
+                # its in memory queues
+                self.replication_client.send_federation_ack(self.federation_position)
+                self._last_ack = self.federation_position
 
 
 if __name__ == '__main__':