summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
authorNeil Johnson <neil@fragile.org.uk>2018-05-14 09:31:42 +0100
committerNeil Johnson <neil@fragile.org.uk>2018-05-14 09:31:42 +0100
commit977765bde2987770f63065d839f9686a7a144140 (patch)
tree41d3a247f546cfe50500f465e50a798a597ef464 /synapse/app/federation_sender.py
parentremove user agent from data model, will just join on user_ips (diff)
parentMerge pull request #2846 from kaiyou/feat-dockerfile (diff)
downloadsynapse-977765bde2987770f63065d839f9686a7a144140.tar.xz
Merge branch 'develop' of https://github.com/matrix-org/synapse into cohort_analytics
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r--synapse/app/federation_sender.py32
1 files changed, 18 insertions, 14 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 0cc3331519..6f24e32d6d 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
@@ -101,6 +101,7 @@ class FederationSenderServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
@@ -229,7 +230,7 @@ class FederationSenderHandler(object):
         # presence, typing, etc.
         if stream_name == "federation":
             send_queue.process_rows_for_federation(self.federation_sender, rows)
-            preserve_fn(self.update_token)(token)
+            run_in_background(self.update_token, token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
@@ -237,19 +238,22 @@ class FederationSenderHandler(object):
 
     @defer.inlineCallbacks
     def update_token(self, token):
-        self.federation_position = token
-
-        # We linearize here to ensure we don't have races updating the token
-        with (yield self._fed_position_linearizer.queue(None)):
-            if self._last_ack < self.federation_position:
-                yield self.store.update_federation_out_pos(
-                    "federation", self.federation_position
-                )
+        try:
+            self.federation_position = token
+
+            # We linearize here to ensure we don't have races updating the token
+            with (yield self._fed_position_linearizer.queue(None)):
+                if self._last_ack < self.federation_position:
+                    yield self.store.update_federation_out_pos(
+                        "federation", self.federation_position
+                    )
 
-                # We ACK this token over replication so that the master can drop
-                # its in memory queues
-                self.replication_client.send_federation_ack(self.federation_position)
-                self._last_ack = self.federation_position
+                    # We ACK this token over replication so that the master can drop
+                    # its in memory queues
+                    self.replication_client.send_federation_ack(self.federation_position)
+                    self._last_ack = self.federation_position
+        except Exception:
+            logger.exception("Error updating federation stream position")
 
 
 if __name__ == '__main__':