summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/appservice.py6
-rw-r--r--synapse/app/federation_sender.py4
-rw-r--r--synapse/app/pusher.py13
-rw-r--r--synapse/app/synchrotron.py13
4 files changed, 19 insertions, 17 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 13b69d8dee..9a476efa63 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -136,7 +136,9 @@ class ASReplicationHandler(ReplicationClientHandler):
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
-            self.appservice_handler.notify_interested_services(max_stream_id)
+            preserve_fn(
+                self.appservice_handler.notify_interested_services
+            )(max_stream_id)
 
 
 def start(config_options):
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 705fe5e3df..fa7b19e53a 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -36,7 +36,7 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -337,7 +337,7 @@ class FederationSenderHandler(object):
             for destination in device_destinations:
                 self.federation_sender.send_device_messages(destination)
 
-            self.update_token(token)
+            preserve_fn(self.update_token)(token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index cb76f058b0..f9114acfcb 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string
 
 from synapse import events
 
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -170,21 +170,22 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
-        self.poke_pushers(stream_name, token, rows)
+        preserve_fn(self.poke_pushers)(stream_name, token, rows)
 
+    @defer.inlineCallbacks
     def poke_pushers(self, stream_name, token, rows):
         if stream_name == "pushers":
             for row in rows:
                 if row.deleted:
-                    self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+                    yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
                 else:
-                    self.start_pusher(row.user_id, row.app_id, row.pushkey)
+                    yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
         elif stream_name == "events":
-            preserve_fn(self.pusher_pool.on_new_notifications)(
+            yield self.pusher_pool.on_new_notifications(
                 token, token,
             )
         elif stream_name == "receipts":
-            preserve_fn(self.pusher_pool.on_new_receipts)(
+            yield self.pusher_pool.on_new_receipts(
                 token, token, set(row.room_id for row in rows)
             )
 
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 7341970a81..a1ef5dfa77 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -47,7 +47,7 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import PresenceStore, UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
@@ -328,11 +328,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
     def on_rdata(self, stream_name, token, rows):
         super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
 
-        if stream_name == "typing":
-            self.typing_handler.process_replication_rows(token, rows)
-        elif stream_name == "presence":
-            self.presence_handler.process_replication_rows(token, rows)
-        self.notify(stream_name, token, rows)
+        preserve_fn(self.process_and_notify)(stream_name, token, rows)
 
     def get_streams_to_replicate(self):
         args = super(SyncReplicationHandler, self).get_streams_to_replicate()
@@ -343,7 +339,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
         return self.presence_handler.get_currently_syncing_users()
 
     @defer.inlineCallbacks
-    def notify(self, stream_name, token, rows):
+    def process_and_notify(self, stream_name, token, rows):
         if stream_name == "events":
             # We shouldn't get multiple rows per token for events stream, so
             # we don't need to optimise this for multiple rows.
@@ -369,6 +365,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
                 "receipt_key", token, rooms=[row.room_id for row in rows],
             )
         elif stream_name == "typing":
+            self.typing_handler.process_replication_rows(token, rows)
             self.notifier.on_new_event(
                 "typing_key", token, rooms=[row.room_id for row in rows],
             )
@@ -386,6 +383,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
             self.notifier.on_new_event(
                 "device_list_key", token, rooms=all_room_ids,
             )
+        elif stream_name == "presence":
+            yield self.presence_handler.process_replication_rows(token, rows)
 
 
 def start(config_options):