summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-04-27 14:18:00 +0100
committerRichard van der Hoff <richard@matrix.org>2018-04-27 14:18:00 +0100
commit61463323871ccbe17f8933c1695d7901cd6a8a20 (patch)
treed70648dd1b7466452da27a4b7d8993c58481ed03 /synapse/app/federation_sender.py
parentMerge branch 'develop' into rav/deferred_timeout (diff)
parentMerge branch 'master' of github.com:matrix-org/synapse into develop (diff)
downloadsynapse-61463323871ccbe17f8933c1695d7901cd6a8a20.tar.xz
Merge remote-tracking branch 'origin/develop' into rav/deferred_timeout
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r--synapse/app/federation_sender.py27
1 files changed, 15 insertions, 12 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 0cc3331519..4f2a9ca21a 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -237,19 +237,22 @@ class FederationSenderHandler(object):
 
     @defer.inlineCallbacks
     def update_token(self, token):
-        self.federation_position = token
-
-        # We linearize here to ensure we don't have races updating the token
-        with (yield self._fed_position_linearizer.queue(None)):
-            if self._last_ack < self.federation_position:
-                yield self.store.update_federation_out_pos(
-                    "federation", self.federation_position
-                )
+        try:
+            self.federation_position = token
+
+            # We linearize here to ensure we don't have races updating the token
+            with (yield self._fed_position_linearizer.queue(None)):
+                if self._last_ack < self.federation_position:
+                    yield self.store.update_federation_out_pos(
+                        "federation", self.federation_position
+                    )
 
-                # We ACK this token over replication so that the master can drop
-                # its in memory queues
-                self.replication_client.send_federation_ack(self.federation_position)
-                self._last_ack = self.federation_position
+                    # We ACK this token over replication so that the master can drop
+                    # its in memory queues
+                    self.replication_client.send_federation_ack(self.federation_position)
+                    self._last_ack = self.federation_position
+        except Exception:
+            logger.exception("Error updating federation stream position")
 
 
 if __name__ == '__main__':