diff options
author | Richard van der Hoff <richard@matrix.org> | 2018-08-17 00:43:43 +0100 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2018-08-17 00:43:43 +0100 |
commit | 0e8d78f6aa56020ffb948a4b2c6feadba2d16712 (patch) | |
tree | 391fb5435e9a314730c27c3e041cd06fee5d0e03 /synapse/app | |
parent | Merge pull request #3705 from matrix-org/erikj/fix_inbound_fed_worker (diff) | |
download | synapse-0e8d78f6aa56020ffb948a4b2c6feadba2d16712.tar.xz |
Logcontexts for replication command handlers
Run the handlers for replication commands as background processes. This should improve the visibility in our metrics, and reduce the number of "running db transaction from sentinel context" warnings. Ideally it means converting the things that fire off deferreds into the night into things that actually return a Deferred when they are done. I've made a bit of a stab at this, but it will probably be leaky.
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/appservice.py | 3 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 3 | ||||
-rw-r--r-- | synapse/app/pusher.py | 3 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 3 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 3 |
5 files changed, 10 insertions, 5 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 9a37384fb7..3348a8ec6d 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler): super(ASReplicationHandler, self).__init__(hs.get_datastore()) self.appservice_handler = hs.get_application_service_handler() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 7a4310ca18..d59007099b 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler): super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) self.send_handler = FederationSenderHandler(hs, self) + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(FederationSenderReplicationHandler, self).on_rdata( + yield super(FederationSenderReplicationHandler, self).on_rdata( stream_name, token, rows ) self.send_handler.process_replication_rows(stream_name, token, rows) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9295a51d5b..aa0938c376 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler): self.pusher_pool = hs.get_pusherpool() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e201f18efd..39c7cbc1ba 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -332,8 +332,9 @@ class SyncReplicationHandler(ReplicationClientHandler): self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index cb78de8834..1388a42b59 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) self.user_directory = hs.get_user_directory_handler() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(UserDirectoryReplicationHandler, self).on_rdata( + yield super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) if stream_name == "current_state_deltas": |