diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6132727cbd..88a5a97caf 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -220,12 +220,6 @@ class Notifier(object):
"""
self.replication_callbacks.append(cb)
- def add_remote_server_up_callback(self, cb: Callable[[str], None]):
- """Add a callback that will be called when synapse detects a server
- has been
- """
- self.remote_server_up_callbacks.append(cb)
-
def on_new_room_event(
self, event, room_stream_id, max_room_stream_id, extra_users=[]
):
@@ -544,6 +538,3 @@ class Notifier(object):
# circular dependencies.
if self.federation_sender:
self.federation_sender.wake_destination(server)
-
- for cb in self.remote_server_up_callbacks:
- cb(server)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 3cdf87e140..7de8d94961 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -115,7 +115,6 @@ class ReplicationCommandHandler:
self._server_notices_sender = None
if self._is_master:
self._server_notices_sender = hs.get_server_notices_sender()
- self._notifier.add_remote_server_up_callback(self.send_remote_server_up)
def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
@@ -344,8 +343,24 @@ class ReplicationCommandHandler:
""""Called when get a new REMOTE_SERVER_UP command."""
self._replication_data_handler.on_remote_server_up(cmd.data)
- if self._is_master:
- self._notifier.notify_remote_server_up(cmd.data)
+ self._notifier.notify_remote_server_up(cmd.data)
+
+ # We relay to all other connections to ensure every instance gets the
+ # notification.
+ #
+ # When configured to use redis we'll always only have one connection and
+ # so this is a no-op (all instances will have already received the same
+ # REMOTE_SERVER_UP command).
+ #
+ # For direct TCP connections this will relay to all other connections
+ # connected to us. When on master this will correctly fan out to all
+ # other direct TCP clients and on workers there'll only be the one
+ # connection to master.
+ #
+ # (The logic here should also be sound if we have a mix of Redis and
+ # direct TCP connections so long as there is only one traffic route
+ # between two instances, but that is not currently supported).
+ self.send_command(cmd, ignore_conn=conn)
def new_connection(self, connection: AbstractConnection):
"""Called when we have a new connection.
@@ -390,11 +405,21 @@ class ReplicationCommandHandler:
"""
return bool(self._connections)
- def send_command(self, cmd: Command):
+ def send_command(
+ self, cmd: Command, ignore_conn: Optional[AbstractConnection] = None
+ ):
"""Send a command to all connected connections.
+
+ Args:
+ cmd
+ ignore_conn: If set don't send command to the given connection.
+ Used when relaying commands from one connection to all others.
"""
if self._connections:
for connection in self._connections:
+ if connection == ignore_conn:
+ continue
+
try:
connection.send_command(cmd)
except Exception:
|