summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2020-05-05 22:38:44 +0100
committerRichard van der Hoff <richard@matrix.org>2020-05-05 22:38:44 +0100
commit124226731676ac769d6e54e80bc46a6e33569bde (patch)
treef68bc1ba57855f9ee93db61606ee043b7e07bfda /synapse/replication/tcp/handler.py
parentchangelog (diff)
parentAdd backwards compatibility codepath to LoggingContext. (#7408) (diff)
downloadsynapse-124226731676ac769d6e54e80bc46a6e33569bde.tar.xz
Merge branch 'release-v1.13.0' into rav/fix_dropped_messages
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py29
1 files changed, 16 insertions, 13 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index ffce3db629..467e8695e0 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -281,19 +281,24 @@ class ReplicationCommandHandler:
                 # Check if this is the last of a batch of updates
                 rows = self._pending_batches.pop(stream_name, [])
                 rows.append(row)
-                await self.on_rdata(stream_name, cmd.token, rows)
+                await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
 
-    async def on_rdata(self, stream_name: str, token: int, rows: list):
+    async def on_rdata(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
         """Called to handle a batch of replication data with a given stream token.
 
         Args:
             stream_name: name of the replication stream for this batch of rows
+            instance_name: the instance that wrote the rows.
             token: stream token for this batch of rows
             rows: a list of Stream.ROW_TYPE objects as returned by
                 Stream.parse_row.
         """
         logger.debug("Received rdata %s -> %s", stream_name, token)
-        await self._replication_data_handler.on_rdata(stream_name, token, rows)
+        await self._replication_data_handler.on_rdata(
+            stream_name, instance_name, token, rows
+        )
 
     async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):
         if cmd.instance_name == self._instance_name:
@@ -321,14 +326,7 @@ class ReplicationCommandHandler:
             self._pending_batches.pop(stream_name, [])
 
             # Find where we previously streamed up to.
-            current_token = self._replication_data_handler.get_streams_to_replicate().get(
-                stream_name
-            )
-            if current_token is None:
-                logger.warning(
-                    "Got POSITION for stream we're not subscribed to: %s", stream_name,
-                )
-                return
+            current_token = stream.current_token()
 
             # If the position token matches our current token then we're up to
             # date and there's nothing to do. Otherwise, fetch all updates
@@ -345,7 +343,9 @@ class ReplicationCommandHandler:
                     updates,
                     current_token,
                     missing_updates,
-                ) = await stream.get_updates_since(current_token, cmd.token)
+                ) = await stream.get_updates_since(
+                    cmd.instance_name, current_token, cmd.token
+                )
 
                 # TODO: add some tests for this
 
@@ -354,7 +354,10 @@ class ReplicationCommandHandler:
 
                 for token, rows in _batch_updates(updates):
                     await self.on_rdata(
-                        stream_name, token, [stream.parse_row(row) for row in rows],
+                        stream_name,
+                        cmd.instance_name,
+                        token,
+                        [stream.parse_row(row) for row in rows],
                     )
 
             logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token)