summary refs log tree commit diff
path: root/synapse/replication/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/http')
-rw-r--r--synapse/replication/http/__init__.py14
-rw-r--r--synapse/replication/http/_base.py14
-rw-r--r--synapse/replication/http/federation.py10
-rw-r--r--synapse/replication/http/streams.py2
4 files changed, 27 insertions, 13 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 4613b2538c..f43a4ed009 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -33,10 +33,12 @@ class ReplicationRestResource(JsonResource):
         self.register_servlets(hs)
 
     def register_servlets(self, hs):
-        send_event.register_servlets(hs, self)
-        membership.register_servlets(hs, self)
-        federation.register_servlets(hs, self)
-        login.register_servlets(hs, self)
-        register.register_servlets(hs, self)
-        devices.register_servlets(hs, self)
+        if hs.config.worker_app is None:
+            send_event.register_servlets(hs, self)
+            membership.register_servlets(hs, self)
+            login.register_servlets(hs, self)
+            register.register_servlets(hs, self)
+            devices.register_servlets(hs, self)
+
         streams.register_servlets(hs, self)
+        federation.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index a9ad601132..cdbc406c97 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -128,15 +128,23 @@ class ReplicationEndpoint(object):
         Returns a callable that accepts the same parameters as `_serialize_payload`.
         """
         clock = hs.get_clock()
-        host = hs.config.worker_replication_host
-        port = hs.config.worker_replication_http_port
+        master_host = hs.config.worker_replication_host
+        master_port = hs.config.worker_replication_http_port
+
+        instance_http_map = hs.config.instance_http_map
 
         client = hs.get_simple_http_client()
 
         @trace(opname="outgoing_replication_request")
         @defer.inlineCallbacks
         def send_request(instance_name="master", **kwargs):
-            if instance_name != "master":
+            if instance_name == "master":
+                host = master_host
+                port = master_port
+            elif instance_name in instance_http_map:
+                host = instance_http_map[instance_name]["host"]
+                port = instance_http_map[instance_name]["port"]
+            else:
                 raise Exception("Unknown instance")
 
             data = yield cls._serialize_payload(**kwargs)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7e23b565b9..0a11743845 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -277,8 +277,10 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
 
 
 def register_servlets(hs, http_server):
-    ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+    if hs.config.worker_app is None:
+        ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+        ReplicationGetQueryRestServlet(hs).register(http_server)
+        ReplicationCleanRoomRestServlet(hs).register(http_server)
+        ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
+
     ReplicationFederationSendEduRestServlet(hs).register(http_server)
-    ReplicationGetQueryRestServlet(hs).register(http_server)
-    ReplicationCleanRoomRestServlet(hs).register(http_server)
-    ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 9c1fc9fb25..11a0270699 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -51,6 +51,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         # them ourselves we end up in an import loop).
         self.streams = hs.get_replication_streamer().get_streams()
 
+        self.instance_name = hs.config.worker_name or "master"
+
     @staticmethod
     def _serialize_payload(stream_name, from_token, upto_token, limit):
         return {"from_token": from_token, "upto_token": upto_token, "limit": limit}