diff options
author | Erik Johnston <erik@matrix.org> | 2020-05-01 17:19:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-01 17:19:56 +0100 |
commit | 0e719f23981b8294df66ba7f38b8c7cc99fad228 (patch) | |
tree | 42d9aa97954cdbea46b0969bceefd88d2953a623 /synapse/app | |
parent | Use `stream.current_token()` and remove `stream_positions()` (#7172) (diff) | |
download | synapse-0e719f23981b8294df66ba7f38b8c7cc99fad228.tar.xz |
Thread through instance name to replication client. (#7369)
For in memory streams when fetching updates on workers we need to query the source of the stream, which currently is hard coded to be master. This PR threads through the source instance we received via `POSITION` through to the update function in each stream, which can then be passed to the replication client for in memory streams.
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/generic_worker.py | 10 |
1 files changed, 4 insertions, 6 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 97b9b81237..667ad20428 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -646,13 +646,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): else: self.send_handler = None - async def on_rdata(self, stream_name, token, rows): - await super(GenericWorkerReplicationHandler, self).on_rdata( - stream_name, token, rows - ) - await self.process_and_notify(stream_name, token, rows) + async def on_rdata(self, stream_name, instance_name, token, rows): + await super().on_rdata(stream_name, instance_name, token, rows) + await self._process_and_notify(stream_name, instance_name, token, rows) - async def process_and_notify(self, stream_name, token, rows): + async def _process_and_notify(self, stream_name, instance_name, token, rows): try: if self.send_handler: await self.send_handler.process_replication_rows( |