diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 4613b2538c..f43a4ed009 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -33,10 +33,12 @@ class ReplicationRestResource(JsonResource):
self.register_servlets(hs)
def register_servlets(self, hs):
- send_event.register_servlets(hs, self)
- membership.register_servlets(hs, self)
- federation.register_servlets(hs, self)
- login.register_servlets(hs, self)
- register.register_servlets(hs, self)
- devices.register_servlets(hs, self)
+ if hs.config.worker_app is None:
+ send_event.register_servlets(hs, self)
+ membership.register_servlets(hs, self)
+ login.register_servlets(hs, self)
+ register.register_servlets(hs, self)
+ devices.register_servlets(hs, self)
+
streams.register_servlets(hs, self)
+ federation.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index a9ad601132..cdbc406c97 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -128,15 +128,23 @@ class ReplicationEndpoint(object):
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
- host = hs.config.worker_replication_host
- port = hs.config.worker_replication_http_port
+ master_host = hs.config.worker_replication_host
+ master_port = hs.config.worker_replication_http_port
+
+ instance_http_map = hs.config.instance_http_map
client = hs.get_simple_http_client()
@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(instance_name="master", **kwargs):
- if instance_name != "master":
+ if instance_name == "master":
+ host = master_host
+ port = master_port
+ elif instance_name in instance_http_map:
+ host = instance_http_map[instance_name]["host"]
+ port = instance_http_map[instance_name]["port"]
+ else:
raise Exception("Unknown instance")
data = yield cls._serialize_payload(**kwargs)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7e23b565b9..0a11743845 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -277,8 +277,10 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
def register_servlets(hs, http_server):
- ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+ if hs.config.worker_app is None:
+ ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+ ReplicationGetQueryRestServlet(hs).register(http_server)
+ ReplicationCleanRoomRestServlet(hs).register(http_server)
+ ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
+
ReplicationFederationSendEduRestServlet(hs).register(http_server)
- ReplicationGetQueryRestServlet(hs).register(http_server)
- ReplicationCleanRoomRestServlet(hs).register(http_server)
- ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 9c1fc9fb25..11a0270699 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -51,6 +51,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
# them ourselves we end up in an import loop).
self.streams = hs.get_replication_streamer().get_streams()
+ self.instance_name = hs.config.worker_name or "master"
+
@staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit):
return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cfba255897..ac4d6d1dd1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -63,6 +63,8 @@ class ReplicationClientHandler:
self.presence_handler = hs.get_presence_handler()
self.instance_id = hs.get_instance_id()
+ self.instance_name = hs.config.worker.worker_name or "master"
+
self.connections = [] # type: List[Any]
self.streams = {
@@ -134,7 +136,9 @@ class ReplicationClientHandler:
for stream_name, stream in self.streams.items():
current_token = stream.current_token()
- self.send_command(PositionCommand(stream_name, "master", current_token))
+ self.send_command(
+ PositionCommand(stream_name, self.instance_name, current_token)
+ )
async def on_USER_SYNC(self, cmd: UserSyncCommand):
user_sync_counter.inc()
@@ -232,17 +236,17 @@ class ReplicationClientHandler:
return
# Find where we previously streamed up to.
- current_token = (
- self.replication_data_handler.get_streams_to_replicate()
- .get(cmd.stream_name, {})
- .get(cmd.instance_name)
+ current_tokens = self.replication_data_handler.get_streams_to_replicate().get(
+ cmd.stream_name
)
- if current_token is None:
+ if current_tokens is None:
logger.debug(
"Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
)
return
+ current_token = current_tokens.get(cmd.instance_name, 0)
+
# Fetch all updates between then and now.
limited = cmd.token != current_token
while limited:
@@ -335,7 +339,7 @@ class ReplicationClientHandler:
We need to check if the client is interested in the stream or not
"""
- self.send_command(RdataCommand(stream_name, "master", token, data))
+ self.send_command(RdataCommand(stream_name, self.instance_name, token, data))
class ReplicationDataHandler:
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index bbd2c6ec41..d421cc477f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -74,9 +74,7 @@ class ReplicationStreamer(object):
self.streams = [] # type: List[Stream]
if hs.config.worker_app is None:
for stream in STREAMS_MAP.values():
- if stream == FederationStream and hs.config.send_federation:
- # We only support federation stream if federation sending
- # hase been disabled on the master.
+ if stream == FederationStream:
continue
if stream == TypingStream:
@@ -87,6 +85,9 @@ class ReplicationStreamer(object):
if hs.config.server.handle_typing:
self.streams.append(TypingStream(hs))
+ # We always add federation stream
+ self.streams.append(FederationStream(hs))
+
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.notifier.add_replication_callback(self.on_notifier_poke)
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 87ab631fd4..e68b220956 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -256,7 +256,7 @@ class TypingStream(Stream):
self.current_token = typing_handler.get_current_token # type: ignore
- if hs.config.worker_app is None:
+ if hs.config.handle_typing:
self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore
else:
# Query master process
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 48c1d45718..40951e81c9 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,8 +15,6 @@
# limitations under the License.
from collections import namedtuple
-from twisted.internet import defer
-
from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
@@ -41,12 +39,8 @@ class FederationStream(Stream):
# Not all synapse instances will have a federation sender instance,
# whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
# so we stub the stream out when that is the case.
- if hs.config.worker_app is None or hs.should_send_federation():
- federation_sender = hs.get_federation_sender()
- self.current_token = federation_sender.get_current_token # type: ignore
- self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
- else:
- self.current_token = lambda: 0 # type: ignore
- self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore
+ federation_sender = hs.get_federation_sender()
+ self.current_token = federation_sender.get_current_token # type: ignore
+ self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
super(FederationStream, self).__init__(hs)
|