summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2020-06-15 16:37:52 +0100
committerBrendan Abolivier <babolivier@matrix.org>2020-06-15 16:37:52 +0100
commit6efb2b0ad44b4cfb1f05a77c1a5a22b527758b37 (patch)
tree56c11988292ef856c99e8fb4f9b08b648e5d93d8 /synapse/replication/tcp/handler.py
parentMerge branch 'develop' into babolivier/mark_unread (diff)
parentDiscard RDATA from already seen positions. (#7648) (diff)
downloadsynapse-6efb2b0ad44b4cfb1f05a77c1a5a22b527758b37.tar.xz
Merge branch 'develop' into babolivier/mark_unread
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py30
1 files changed, 26 insertions, 4 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cbcf46f3ae..e6a2e2598b 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -112,8 +112,8 @@ class ReplicationCommandHandler:
             "replication_position", clock=self._clock
         )
 
-        # Map of stream to batched updates. See RdataCommand for info on how
-        # batching works.
+        # Map of stream name to batched updates. See RdataCommand for info on
+        # how batching works.
         self._pending_batches = {}  # type: Dict[str, List[Any]]
 
         # The factory used to create connections.
@@ -123,7 +123,8 @@ class ReplicationCommandHandler:
         # outgoing replication commands to.)
         self._connections = []  # type: List[AbstractConnection]
 
-        # For each connection, the incoming streams that are coming from that connection
+        # For each connection, the incoming stream names that are coming from
+        # that connection.
         self._streams_by_connection = {}  # type: Dict[AbstractConnection, Set[str]]
 
         LaterGauge(
@@ -310,7 +311,28 @@ class ReplicationCommandHandler:
                 # Check if this is the last of a batch of updates
                 rows = self._pending_batches.pop(stream_name, [])
                 rows.append(row)
-                await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
+
+                stream = self._streams.get(stream_name)
+                if not stream:
+                    logger.error("Got RDATA for unknown stream: %s", stream_name)
+                    return
+
+                # Find where we previously streamed up to.
+                current_token = stream.current_token(cmd.instance_name)
+
+                # Discard this data if this token is earlier than the current
+                # position. Note that streams can be reset (in which case you
+                # expect an earlier token), but that must be preceded by a
+                # POSITION command.
+                if cmd.token <= current_token:
+                    logger.debug(
+                        "Discarding RDATA from stream %s at position %s before previous position %s",
+                        stream_name,
+                        cmd.token,
+                        current_token,
+                    )
+                else:
+                    await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
 
     async def on_rdata(
         self, stream_name: str, instance_name: str, token: int, rows: list