diff options
author | Richard van der Hoff <richard@matrix.org> | 2020-05-05 22:38:44 +0100 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2020-05-05 22:38:44 +0100 |
commit | 124226731676ac769d6e54e80bc46a6e33569bde (patch) | |
tree | f68bc1ba57855f9ee93db61606ee043b7e07bfda /synapse/replication/tcp/handler.py | |
parent | changelog (diff) | |
parent | Add backwards compatibility codepath to LoggingContext. (#7408) (diff) | |
download | synapse-124226731676ac769d6e54e80bc46a6e33569bde.tar.xz |
Merge branch 'release-v1.13.0' into rav/fix_dropped_messages
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r-- | synapse/replication/tcp/handler.py | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index ffce3db629..467e8695e0 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -281,19 +281,24 @@ class ReplicationCommandHandler: # Check if this is the last of a batch of updates rows = self._pending_batches.pop(stream_name, []) rows.append(row) - await self.on_rdata(stream_name, cmd.token, rows) + await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows) - async def on_rdata(self, stream_name: str, token: int, rows: list): + async def on_rdata( + self, stream_name: str, instance_name: str, token: int, rows: list + ): """Called to handle a batch of replication data with a given stream token. Args: stream_name: name of the replication stream for this batch of rows + instance_name: the instance that wrote the rows. token: stream token for this batch of rows rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ logger.debug("Received rdata %s -> %s", stream_name, token) - await self._replication_data_handler.on_rdata(stream_name, token, rows) + await self._replication_data_handler.on_rdata( + stream_name, instance_name, token, rows + ) async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand): if cmd.instance_name == self._instance_name: @@ -321,14 +326,7 @@ class ReplicationCommandHandler: self._pending_batches.pop(stream_name, []) # Find where we previously streamed up to. - current_token = self._replication_data_handler.get_streams_to_replicate().get( - stream_name - ) - if current_token is None: - logger.warning( - "Got POSITION for stream we're not subscribed to: %s", stream_name, - ) - return + current_token = stream.current_token() # If the position token matches our current token then we're up to # date and there's nothing to do. Otherwise, fetch all updates @@ -345,7 +343,9 @@ class ReplicationCommandHandler: updates, current_token, missing_updates, - ) = await stream.get_updates_since(current_token, cmd.token) + ) = await stream.get_updates_since( + cmd.instance_name, current_token, cmd.token + ) # TODO: add some tests for this @@ -354,7 +354,10 @@ class ReplicationCommandHandler: for token, rows in _batch_updates(updates): await self.on_rdata( - stream_name, token, [stream.parse_row(row) for row in rows], + stream_name, + cmd.instance_name, + token, + [stream.parse_row(row) for row in rows], ) logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token) |