diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 7341970a81..a1ef5dfa77 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -47,7 +47,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
@@ -328,11 +328,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
- if stream_name == "typing":
- self.typing_handler.process_replication_rows(token, rows)
- elif stream_name == "presence":
- self.presence_handler.process_replication_rows(token, rows)
- self.notify(stream_name, token, rows)
+ preserve_fn(self.process_and_notify)(stream_name, token, rows)
def get_streams_to_replicate(self):
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
@@ -343,7 +339,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
return self.presence_handler.get_currently_syncing_users()
@defer.inlineCallbacks
- def notify(self, stream_name, token, rows):
+ def process_and_notify(self, stream_name, token, rows):
if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
@@ -369,6 +365,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
"receipt_key", token, rooms=[row.room_id for row in rows],
)
elif stream_name == "typing":
+ self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows],
)
@@ -386,6 +383,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.notifier.on_new_event(
"device_list_key", token, rooms=all_room_ids,
)
+ elif stream_name == "presence":
+ yield self.presence_handler.process_replication_rows(token, rows)
def start(config_options):
|