summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-25 14:05:53 +0000
committerErik Johnston <erik@matrix.org>2020-03-25 14:05:53 +0000
commit0473f87a1791ed7cd9a0a7caa7b8e4f2cdfe8858 (patch)
treee275eb38323b7d1c5426b9fbbd19fc0ac2196321
parentfixup! Thread through instance name to replication client (diff)
downloadsynapse-0473f87a1791ed7cd9a0a7caa7b8e4f2cdfe8858.tar.xz
Pass instance name through to rdata
-rw-r--r--synapse/app/generic_worker.py10
-rw-r--r--synapse/replication/http/streams.py2
-rw-r--r--synapse/replication/tcp/handler.py23
3 files changed, 23 insertions, 12 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 5a9be05183..383edf07ad 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -608,9 +608,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         else:
             self.send_handler = None
 
-    async def on_rdata(self, stream_name, token, rows):
-        await super().on_rdata(stream_name, token, rows)
-        run_in_background(self.process_and_notify, stream_name, token, rows)
+    async def on_rdata(self, stream_name, instance_name, token, rows):
+        await super().on_rdata(stream_name, instance_name, token, rows)
+        run_in_background(
+            self.process_and_notify, stream_name, instance_name, token, rows
+        )
 
     def get_streams_to_replicate(self):
         args = super().get_streams_to_replicate()
@@ -619,7 +621,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
             args.update(self.send_handler.stream_positions())
         return args
 
-    async def process_and_notify(self, stream_name, token, rows):
+    async def process_and_notify(self, stream_name, instance_name, token, rows):
         try:
             if self.send_handler:
                 self.send_handler.process_replication_rows(stream_name, token, rows)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index ffd4c61993..9c1fc9fb25 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -65,7 +65,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         limit = parse_integer(request, "limit", required=True)
 
         updates, upto_token, limited = await stream.get_updates_since(
-            from_token, upto_token, limit
+            self.instance_name, from_token, upto_token, limit
         )
 
         return (
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 401e4c1d4d..cfba255897 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -207,9 +207,11 @@ class ReplicationClientHandler:
             # Check if this is the last of a batch of updates
             rows = self.pending_batches.pop(stream_name, [])
             rows.append(row)
-            await self.on_rdata(stream_name, cmd.token, rows)
+            await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
 
-    async def on_rdata(self, stream_name: str, token: int, rows: list):
+    async def on_rdata(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
         """Called to handle a batch of replication data with a given stream token.
 
         Args:
@@ -218,8 +220,10 @@ class ReplicationClientHandler:
             rows: a list of Stream.ROW_TYPE objects as returned by
                 Stream.parse_row.
         """
-        logger.info("Received rdata %s -> %s", stream_name, token)
-        await self.replication_data_handler.on_rdata(stream_name, token, rows)
+        logger.info("Received rdata %s %s -> %s", stream_name, instance_name, token)
+        await self.replication_data_handler.on_rdata(
+            stream_name, instance_name, token, rows
+        )
 
     async def on_POSITION(self, cmd: PositionCommand):
         stream = self.streams.get(cmd.stream_name)
@@ -243,11 +247,12 @@ class ReplicationClientHandler:
         limited = cmd.token != current_token
         while limited:
             updates, current_token, limited = await stream.get_updates_since(
-                current_token, cmd.token
+                cmd.instance_name, current_token, cmd.token
             )
             if updates:
                 await self.on_rdata(
                     cmd.stream_name,
+                    cmd.instance_name,
                     current_token,
                     [stream.parse_row(update[1]) for update in updates],
                 )
@@ -258,7 +263,9 @@ class ReplicationClientHandler:
         # Handle any RDATA that came in while we were catching up.
         rows = self.pending_batches.pop(cmd.stream_name, [])
         if rows:
-            await self.on_rdata(cmd.stream_name, rows[-1].token, rows)
+            await self.on_rdata(
+                cmd.stream_name, cmd.instance_name, rows[-1].token, rows
+            )
 
     async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
         """Called when get a new REMOTE_SERVER_UP command."""
@@ -342,7 +349,9 @@ class ReplicationDataHandler:
         self.slaved_store = hs.config.worker_app is not None
         self.slaved_typing = not hs.config.server.handle_typing
 
-    async def on_rdata(self, stream_name: str, token: int, rows: list):
+    async def on_rdata(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
         """Called to handle a batch of replication data with a given stream token.
 
         By default this just pokes the slave store. Can be overridden in subclasses to