summary refs log tree commit diff
path: root/synapse/replication/http/streams.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-01 17:19:56 +0100
committerGitHub <noreply@github.com>2020-05-01 17:19:56 +0100
commit0e719f23981b8294df66ba7f38b8c7cc99fad228 (patch)
tree42d9aa97954cdbea46b0969bceefd88d2953a623 /synapse/replication/http/streams.py
parentUse `stream.current_token()` and remove `stream_positions()` (#7172) (diff)
downloadsynapse-0e719f23981b8294df66ba7f38b8c7cc99fad228.tar.xz
Thread through instance name to replication client. (#7369)
For in memory streams when fetching updates on workers we need to query the source of the stream, which currently is hard coded to be master. This PR threads through the source instance we received via `POSITION` through to the update function in each stream, which can then be passed to the replication client for in memory streams.
Diffstat (limited to 'synapse/replication/http/streams.py')
-rw-r--r--synapse/replication/http/streams.py4
1 files changed, 3 insertions, 1 deletions
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index f35cebc710..0459f582bf 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -50,6 +50,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
     def __init__(self, hs):
         super().__init__(hs)
 
+        self._instance_name = hs.get_instance_name()
+
         # We pull the streams from the replication steamer (if we try and make
         # them ourselves we end up in an import loop).
         self.streams = hs.get_replication_streamer().get_streams()
@@ -67,7 +69,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         upto_token = parse_integer(request, "upto_token", required=True)
 
         updates, upto_token, limited = await stream.get_updates_since(
-            from_token, upto_token
+            self._instance_name, from_token, upto_token
         )
 
         return (