diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-25 14:05:53 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-03-25 14:05:53 +0000 |
commit | 0473f87a1791ed7cd9a0a7caa7b8e4f2cdfe8858 (patch) | |
tree | e275eb38323b7d1c5426b9fbbd19fc0ac2196321 | |
parent | fixup! Thread through instance name to replication client (diff) | |
download | synapse-0473f87a1791ed7cd9a0a7caa7b8e4f2cdfe8858.tar.xz |
Pass instance name through to rdata
-rw-r--r-- | synapse/app/generic_worker.py | 10 | ||||
-rw-r--r-- | synapse/replication/http/streams.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/handler.py | 23 |
3 files changed, 23 insertions, 12 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 5a9be05183..383edf07ad 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -608,9 +608,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): else: self.send_handler = None - async def on_rdata(self, stream_name, token, rows): - await super().on_rdata(stream_name, token, rows) - run_in_background(self.process_and_notify, stream_name, token, rows) + async def on_rdata(self, stream_name, instance_name, token, rows): + await super().on_rdata(stream_name, instance_name, token, rows) + run_in_background( + self.process_and_notify, stream_name, instance_name, token, rows + ) def get_streams_to_replicate(self): args = super().get_streams_to_replicate() @@ -619,7 +621,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): args.update(self.send_handler.stream_positions()) return args - async def process_and_notify(self, stream_name, token, rows): + async def process_and_notify(self, stream_name, instance_name, token, rows): try: if self.send_handler: self.send_handler.process_replication_rows(stream_name, token, rows) diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index ffd4c61993..9c1fc9fb25 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -65,7 +65,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): limit = parse_integer(request, "limit", required=True) updates, upto_token, limited = await stream.get_updates_since( - from_token, upto_token, limit + self.instance_name, from_token, upto_token, limit ) return ( diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 401e4c1d4d..cfba255897 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -207,9 +207,11 @@ class ReplicationClientHandler: # Check if this is the last of a batch of updates rows = self.pending_batches.pop(stream_name, []) rows.append(row) - await self.on_rdata(stream_name, cmd.token, rows) + await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows) - async def on_rdata(self, stream_name: str, token: int, rows: list): + async def on_rdata( + self, stream_name: str, instance_name: str, token: int, rows: list + ): """Called to handle a batch of replication data with a given stream token. Args: @@ -218,8 +220,10 @@ class ReplicationClientHandler: rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ - logger.info("Received rdata %s -> %s", stream_name, token) - await self.replication_data_handler.on_rdata(stream_name, token, rows) + logger.info("Received rdata %s %s -> %s", stream_name, instance_name, token) + await self.replication_data_handler.on_rdata( + stream_name, instance_name, token, rows + ) async def on_POSITION(self, cmd: PositionCommand): stream = self.streams.get(cmd.stream_name) @@ -243,11 +247,12 @@ class ReplicationClientHandler: limited = cmd.token != current_token while limited: updates, current_token, limited = await stream.get_updates_since( - current_token, cmd.token + cmd.instance_name, current_token, cmd.token ) if updates: await self.on_rdata( cmd.stream_name, + cmd.instance_name, current_token, [stream.parse_row(update[1]) for update in updates], ) @@ -258,7 +263,9 @@ class ReplicationClientHandler: # Handle any RDATA that came in while we were catching up. rows = self.pending_batches.pop(cmd.stream_name, []) if rows: - await self.on_rdata(cmd.stream_name, rows[-1].token, rows) + await self.on_rdata( + cmd.stream_name, cmd.instance_name, rows[-1].token, rows + ) async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): """Called when get a new REMOTE_SERVER_UP command.""" @@ -342,7 +349,9 @@ class ReplicationDataHandler: self.slaved_store = hs.config.worker_app is not None self.slaved_typing = not hs.config.server.handle_typing - async def on_rdata(self, stream_name: str, token: int, rows: list): + async def on_rdata( + self, stream_name: str, instance_name: str, token: int, rows: list + ): """Called to handle a batch of replication data with a given stream token. By default this just pokes the slave store. Can be overridden in subclasses to |