diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cfba255897..ac4d6d1dd1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -63,6 +63,8 @@ class ReplicationClientHandler:
self.presence_handler = hs.get_presence_handler()
self.instance_id = hs.get_instance_id()
+ self.instance_name = hs.config.worker.worker_name or "master"
+
self.connections = [] # type: List[Any]
self.streams = {
@@ -134,7 +136,9 @@ class ReplicationClientHandler:
for stream_name, stream in self.streams.items():
current_token = stream.current_token()
- self.send_command(PositionCommand(stream_name, "master", current_token))
+ self.send_command(
+ PositionCommand(stream_name, self.instance_name, current_token)
+ )
async def on_USER_SYNC(self, cmd: UserSyncCommand):
user_sync_counter.inc()
@@ -232,17 +236,17 @@ class ReplicationClientHandler:
return
# Find where we previously streamed up to.
- current_token = (
- self.replication_data_handler.get_streams_to_replicate()
- .get(cmd.stream_name, {})
- .get(cmd.instance_name)
+ current_tokens = self.replication_data_handler.get_streams_to_replicate().get(
+ cmd.stream_name
)
- if current_token is None:
+ if current_tokens is None:
logger.debug(
"Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
)
return
+ current_token = current_tokens.get(cmd.instance_name, 0)
+
# Fetch all updates between then and now.
limited = cmd.token != current_token
while limited:
@@ -335,7 +339,7 @@ class ReplicationClientHandler:
We need to check if the client is interested in the stream or not
"""
- self.send_command(RdataCommand(stream_name, "master", token, data))
+ self.send_command(RdataCommand(stream_name, self.instance_name, token, data))
class ReplicationDataHandler:
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index bbd2c6ec41..d421cc477f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -74,9 +74,7 @@ class ReplicationStreamer(object):
self.streams = [] # type: List[Stream]
if hs.config.worker_app is None:
for stream in STREAMS_MAP.values():
- if stream == FederationStream and hs.config.send_federation:
- # We only support federation stream if federation sending
- # hase been disabled on the master.
+ if stream == FederationStream:
continue
if stream == TypingStream:
@@ -87,6 +85,9 @@ class ReplicationStreamer(object):
if hs.config.server.handle_typing:
self.streams.append(TypingStream(hs))
+ # We always add federation stream
+ self.streams.append(FederationStream(hs))
+
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.notifier.add_replication_callback(self.on_notifier_poke)
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 87ab631fd4..e68b220956 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -256,7 +256,7 @@ class TypingStream(Stream):
self.current_token = typing_handler.get_current_token # type: ignore
- if hs.config.worker_app is None:
+ if hs.config.handle_typing:
self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore
else:
# Query master process
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 48c1d45718..40951e81c9 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,8 +15,6 @@
# limitations under the License.
from collections import namedtuple
-from twisted.internet import defer
-
from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
@@ -41,12 +39,8 @@ class FederationStream(Stream):
# Not all synapse instances will have a federation sender instance,
# whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
# so we stub the stream out when that is the case.
- if hs.config.worker_app is None or hs.should_send_federation():
- federation_sender = hs.get_federation_sender()
- self.current_token = federation_sender.get_current_token # type: ignore
- self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
- else:
- self.current_token = lambda: 0 # type: ignore
- self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore
+ federation_sender = hs.get_federation_sender()
+ self.current_token = federation_sender.get_current_token # type: ignore
+ self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
super(FederationStream, self).__init__(hs)
|