summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py42
1 files changed, 29 insertions, 13 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index dec5ac0913..74e892c104 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -63,6 +63,8 @@ from twisted.protocols.basic import LineOnlyReceiver
 from twisted.python.failure import Failure
 
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.stringutils import random_string
 
 from .commands import (
@@ -222,7 +224,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
         # Now lets try and call on_<CMD_NAME> function
         try:
-            getattr(self, "on_%s" % (cmd_name,))(cmd)
+            run_as_background_process(
+                "replication-" + cmd.get_logcontext_id(),
+                getattr(self, "on_%s" % (cmd_name,)),
+                cmd,
+            )
         except Exception:
             logger.exception("[%s] Failed to handle line: %r", self.id(), line)
 
@@ -387,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
         self.name = cmd.data
 
     def on_USER_SYNC(self, cmd):
-        self.streamer.on_user_sync(
+        return self.streamer.on_user_sync(
             self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
         )
 
@@ -397,22 +403,33 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
         if stream_name == "ALL":
             # Subscribe to all streams we're publishing to.
-            for stream in iterkeys(self.streamer.streams_by_name):
-                self.subscribe_to_stream(stream, token)
+            deferreds = [
+                run_in_background(
+                    self.subscribe_to_stream,
+                    stream, token,
+                )
+                for stream in iterkeys(self.streamer.streams_by_name)
+            ]
+
+            return make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True)
+            )
         else:
-            self.subscribe_to_stream(stream_name, token)
+            return self.subscribe_to_stream(stream_name, token)
 
     def on_FEDERATION_ACK(self, cmd):
-        self.streamer.federation_ack(cmd.token)
+        return self.streamer.federation_ack(cmd.token)
 
     def on_REMOVE_PUSHER(self, cmd):
-        self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
+        return self.streamer.on_remove_pusher(
+            cmd.app_id, cmd.push_key, cmd.user_id,
+        )
 
     def on_INVALIDATE_CACHE(self, cmd):
-        self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
+        return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
     def on_USER_IP(self, cmd):
-        self.streamer.on_user_ip(
+        return self.streamer.on_user_ip(
             cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
             cmd.last_seen,
         )
@@ -542,14 +559,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
             # Check if this is the last of a batch of updates
             rows = self.pending_batches.pop(stream_name, [])
             rows.append(row)
-
-            self.handler.on_rdata(stream_name, cmd.token, rows)
+            return self.handler.on_rdata(stream_name, cmd.token, rows)
 
     def on_POSITION(self, cmd):
-        self.handler.on_position(cmd.stream_name, cmd.token)
+        return self.handler.on_position(cmd.stream_name, cmd.token)
 
     def on_SYNC(self, cmd):
-        self.handler.on_sync(cmd.data)
+        return self.handler.on_sync(cmd.data)
 
     def replicate(self, stream_name, token):
         """Send the subscription request to the server