summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/federation.py2
-rw-r--r--synapse/replication/tcp/client.py4
-rw-r--r--synapse/replication/tcp/commands.py12
-rw-r--r--synapse/replication/tcp/protocol.py42
4 files changed, 44 insertions, 16 deletions
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 2ddd18f73b..64a79da162 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -156,7 +156,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
             edu_content = content["content"]
 
         logger.info(
-            "Got %r edu from $s",
+            "Got %r edu from %s",
             edu_type, origin,
         )
 
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 970e94313e..cbe9645817 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -107,7 +107,7 @@ class ReplicationClientHandler(object):
         Can be overriden in subclasses to handle more.
         """
         logger.info("Received rdata %s -> %s", stream_name, token)
-        self.store.process_replication_rows(stream_name, token, rows)
+        return self.store.process_replication_rows(stream_name, token, rows)
 
     def on_position(self, stream_name, token):
         """Called when we get new position data. By default this just pokes
@@ -115,7 +115,7 @@ class ReplicationClientHandler(object):
 
         Can be overriden in subclasses to handle more.
         """
-        self.store.process_replication_rows(stream_name, token, [])
+        return self.store.process_replication_rows(stream_name, token, [])
 
     def on_sync(self, data):
         """When we received a SYNC we wake up any deferreds that were waiting
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index f3908df642..327556f6a1 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -59,6 +59,12 @@ class Command(object):
         """
         return self.data
 
+    def get_logcontext_id(self):
+        """Get a suitable string for the logcontext when processing this command"""
+
+        # by default, we just use the command name.
+        return self.NAME
+
 
 class ServerCommand(Command):
     """Sent by the server on new connection and includes the server_name.
@@ -116,6 +122,9 @@ class RdataCommand(Command):
             _json_encoder.encode(self.row),
         ))
 
+    def get_logcontext_id(self):
+        return "RDATA-" + self.stream_name
+
 
 class PositionCommand(Command):
     """Sent by the client to tell the client the stream postition without
@@ -190,6 +199,9 @@ class ReplicateCommand(Command):
     def to_line(self):
         return " ".join((self.stream_name, str(self.token),))
 
+    def get_logcontext_id(self):
+        return "REPLICATE-" + self.stream_name
+
 
 class UserSyncCommand(Command):
     """Sent by the client to inform the server that a user has started or
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index dec5ac0913..74e892c104 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -63,6 +63,8 @@ from twisted.protocols.basic import LineOnlyReceiver
 from twisted.python.failure import Failure
 
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.stringutils import random_string
 
 from .commands import (
@@ -222,7 +224,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
         # Now lets try and call on_<CMD_NAME> function
         try:
-            getattr(self, "on_%s" % (cmd_name,))(cmd)
+            run_as_background_process(
+                "replication-" + cmd.get_logcontext_id(),
+                getattr(self, "on_%s" % (cmd_name,)),
+                cmd,
+            )
         except Exception:
             logger.exception("[%s] Failed to handle line: %r", self.id(), line)
 
@@ -387,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
         self.name = cmd.data
 
     def on_USER_SYNC(self, cmd):
-        self.streamer.on_user_sync(
+        return self.streamer.on_user_sync(
             self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
         )
 
@@ -397,22 +403,33 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
         if stream_name == "ALL":
             # Subscribe to all streams we're publishing to.
-            for stream in iterkeys(self.streamer.streams_by_name):
-                self.subscribe_to_stream(stream, token)
+            deferreds = [
+                run_in_background(
+                    self.subscribe_to_stream,
+                    stream, token,
+                )
+                for stream in iterkeys(self.streamer.streams_by_name)
+            ]
+
+            return make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True)
+            )
         else:
-            self.subscribe_to_stream(stream_name, token)
+            return self.subscribe_to_stream(stream_name, token)
 
     def on_FEDERATION_ACK(self, cmd):
-        self.streamer.federation_ack(cmd.token)
+        return self.streamer.federation_ack(cmd.token)
 
     def on_REMOVE_PUSHER(self, cmd):
-        self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
+        return self.streamer.on_remove_pusher(
+            cmd.app_id, cmd.push_key, cmd.user_id,
+        )
 
     def on_INVALIDATE_CACHE(self, cmd):
-        self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
+        return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
     def on_USER_IP(self, cmd):
-        self.streamer.on_user_ip(
+        return self.streamer.on_user_ip(
             cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
             cmd.last_seen,
         )
@@ -542,14 +559,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
             # Check if this is the last of a batch of updates
             rows = self.pending_batches.pop(stream_name, [])
             rows.append(row)
-
-            self.handler.on_rdata(stream_name, cmd.token, rows)
+            return self.handler.on_rdata(stream_name, cmd.token, rows)
 
     def on_POSITION(self, cmd):
-        self.handler.on_position(cmd.stream_name, cmd.token)
+        return self.handler.on_position(cmd.stream_name, cmd.token)
 
     def on_SYNC(self, cmd):
-        self.handler.on_sync(cmd.data)
+        return self.handler.on_sync(cmd.data)
 
     def replicate(self, stream_name, token):
         """Send the subscription request to the server