diff options
Diffstat (limited to 'synapse/replication/http')
-rw-r--r-- | synapse/replication/http/__init__.py | 14 | ||||
-rw-r--r-- | synapse/replication/http/_base.py | 14 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 10 | ||||
-rw-r--r-- | synapse/replication/http/streams.py | 2 |
4 files changed, 27 insertions, 13 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 4613b2538c..f43a4ed009 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -33,10 +33,12 @@ class ReplicationRestResource(JsonResource): self.register_servlets(hs) def register_servlets(self, hs): - send_event.register_servlets(hs, self) - membership.register_servlets(hs, self) - federation.register_servlets(hs, self) - login.register_servlets(hs, self) - register.register_servlets(hs, self) - devices.register_servlets(hs, self) + if hs.config.worker_app is None: + send_event.register_servlets(hs, self) + membership.register_servlets(hs, self) + login.register_servlets(hs, self) + register.register_servlets(hs, self) + devices.register_servlets(hs, self) + streams.register_servlets(hs, self) + federation.register_servlets(hs, self) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index a9ad601132..cdbc406c97 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -128,15 +128,23 @@ class ReplicationEndpoint(object): Returns a callable that accepts the same parameters as `_serialize_payload`. """ clock = hs.get_clock() - host = hs.config.worker_replication_host - port = hs.config.worker_replication_http_port + master_host = hs.config.worker_replication_host + master_port = hs.config.worker_replication_http_port + + instance_http_map = hs.config.instance_http_map client = hs.get_simple_http_client() @trace(opname="outgoing_replication_request") @defer.inlineCallbacks def send_request(instance_name="master", **kwargs): - if instance_name != "master": + if instance_name == "master": + host = master_host + port = master_port + elif instance_name in instance_http_map: + host = instance_http_map[instance_name]["host"] + port = instance_http_map[instance_name]["port"] + else: raise Exception("Unknown instance") data = yield cls._serialize_payload(**kwargs) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 7e23b565b9..0a11743845 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -277,8 +277,10 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint): def register_servlets(hs, http_server): - ReplicationFederationSendEventsRestServlet(hs).register(http_server) + if hs.config.worker_app is None: + ReplicationFederationSendEventsRestServlet(hs).register(http_server) + ReplicationGetQueryRestServlet(hs).register(http_server) + ReplicationCleanRoomRestServlet(hs).register(http_server) + ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server) + ReplicationFederationSendEduRestServlet(hs).register(http_server) - ReplicationGetQueryRestServlet(hs).register(http_server) - ReplicationCleanRoomRestServlet(hs).register(http_server) - ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index 9c1fc9fb25..11a0270699 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -51,6 +51,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): # them ourselves we end up in an import loop). self.streams = hs.get_replication_streamer().get_streams() + self.instance_name = hs.config.worker_name or "master" + @staticmethod def _serialize_payload(stream_name, from_token, upto_token, limit): return {"from_token": from_token, "upto_token": upto_token, "limit": limit} |