summary refs log tree commit diff
path: root/synapse/replication/http
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-01 17:19:56 +0100
committerGitHub <noreply@github.com>2020-05-01 17:19:56 +0100
commit0e719f23981b8294df66ba7f38b8c7cc99fad228 (patch)
tree42d9aa97954cdbea46b0969bceefd88d2953a623 /synapse/replication/http
parentUse `stream.current_token()` and remove `stream_positions()` (#7172) (diff)
downloadsynapse-0e719f23981b8294df66ba7f38b8c7cc99fad228.tar.xz
Thread through instance name to replication client. (#7369)
For in memory streams when fetching updates on workers we need to query the source of the stream, which currently is hard coded to be master. This PR threads through the source instance we received via `POSITION` through to the update function in each stream, which can then be passed to the replication client for in memory streams.
Diffstat (limited to 'synapse/replication/http')
-rw-r--r--synapse/replication/http/_base.py19
-rw-r--r--synapse/replication/http/streams.py4
2 files changed, 21 insertions, 2 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 1be1ccbdf3..f88c80ae84 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -16,6 +16,7 @@
 import abc
 import logging
 import re
+from inspect import signature
 from typing import Dict, List, Tuple
 
 from six import raise_from
@@ -60,6 +61,8 @@ class ReplicationEndpoint(object):
     must call `register` to register the path with the HTTP server.
 
     Requests can be sent by calling the client returned by `make_client`.
+    Requests are sent to master process by default, but can be sent to other
+    named processes by specifying an `instance_name` keyword argument.
 
     Attributes:
         NAME (str): A name for the endpoint, added to the path as well as used
@@ -91,6 +94,16 @@ class ReplicationEndpoint(object):
                 hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000
             )
 
+        # We reserve `instance_name` as a parameter to sending requests, so we
+        # assert here that sub classes don't try and use the name.
+        assert (
+            "instance_name" not in self.PATH_ARGS
+        ), "`instance_name` is a reserved paramater name"
+        assert (
+            "instance_name"
+            not in signature(self.__class__._serialize_payload).parameters
+        ), "`instance_name` is a reserved paramater name"
+
         assert self.METHOD in ("PUT", "POST", "GET")
 
     @abc.abstractmethod
@@ -135,7 +148,11 @@ class ReplicationEndpoint(object):
 
         @trace(opname="outgoing_replication_request")
         @defer.inlineCallbacks
-        def send_request(**kwargs):
+        def send_request(instance_name="master", **kwargs):
+            # Currently we only support sending requests to master process.
+            if instance_name != "master":
+                raise Exception("Unknown instance")
+
             data = yield cls._serialize_payload(**kwargs)
 
             url_args = [
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index f35cebc710..0459f582bf 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -50,6 +50,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
     def __init__(self, hs):
         super().__init__(hs)
 
+        self._instance_name = hs.get_instance_name()
+
         # We pull the streams from the replication steamer (if we try and make
         # them ourselves we end up in an import loop).
         self.streams = hs.get_replication_streamer().get_streams()
@@ -67,7 +69,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         upto_token = parse_integer(request, "upto_token", required=True)
 
         updates, upto_token, limited = await stream.get_updates_since(
-            from_token, upto_token
+            self._instance_name, from_token, upto_token
         )
 
         return (