summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-08-17 00:43:43 +0100
committerRichard van der Hoff <richard@matrix.org>2018-08-17 00:43:43 +0100
commit0e8d78f6aa56020ffb948a4b2c6feadba2d16712 (patch)
tree391fb5435e9a314730c27c3e041cd06fee5d0e03 /synapse/replication/tcp/client.py
parentMerge pull request #3705 from matrix-org/erikj/fix_inbound_fed_worker (diff)
downloadsynapse-0e8d78f6aa56020ffb948a4b2c6feadba2d16712.tar.xz
Logcontexts for replication command handlers
Run the handlers for replication commands as background processes. This should
improve the visibility in our metrics, and reduce the number of "running db
transaction from sentinel context" warnings.

Ideally it means converting the things that fire off deferreds into the night
into things that actually return a Deferred when they are done. I've made a bit
of a stab at this, but it will probably be leaky.
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py4
1 files changed, 2 insertions, 2 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 970e94313e..cbe9645817 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -107,7 +107,7 @@ class ReplicationClientHandler(object):
         Can be overriden in subclasses to handle more.
         """
         logger.info("Received rdata %s -> %s", stream_name, token)
-        self.store.process_replication_rows(stream_name, token, rows)
+        return self.store.process_replication_rows(stream_name, token, rows)
 
     def on_position(self, stream_name, token):
         """Called when we get new position data. By default this just pokes
@@ -115,7 +115,7 @@ class ReplicationClientHandler(object):
 
         Can be overriden in subclasses to handle more.
         """
-        self.store.process_replication_rows(stream_name, token, [])
+        return self.store.process_replication_rows(stream_name, token, [])
 
     def on_sync(self, data):
         """When we received a SYNC we wake up any deferreds that were waiting