summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/notifier.py9
-rw-r--r--synapse/replication/tcp/handler.py33
2 files changed, 29 insertions, 13 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py

index 6132727cbd..88a5a97caf 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py
@@ -220,12 +220,6 @@ class Notifier(object): """ self.replication_callbacks.append(cb) - def add_remote_server_up_callback(self, cb: Callable[[str], None]): - """Add a callback that will be called when synapse detects a server - has been - """ - self.remote_server_up_callbacks.append(cb) - def on_new_room_event( self, event, room_stream_id, max_room_stream_id, extra_users=[] ): @@ -544,6 +538,3 @@ class Notifier(object): # circular dependencies. if self.federation_sender: self.federation_sender.wake_destination(server) - - for cb in self.remote_server_up_callbacks: - cb(server) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 3cdf87e140..7de8d94961 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -115,7 +115,6 @@ class ReplicationCommandHandler: self._server_notices_sender = None if self._is_master: self._server_notices_sender = hs.get_server_notices_sender() - self._notifier.add_remote_server_up_callback(self.send_remote_server_up) def start_replication(self, hs): """Helper method to start a replication connection to the remote server @@ -344,8 +343,24 @@ class ReplicationCommandHandler: """"Called when get a new REMOTE_SERVER_UP command.""" self._replication_data_handler.on_remote_server_up(cmd.data) - if self._is_master: - self._notifier.notify_remote_server_up(cmd.data) + self._notifier.notify_remote_server_up(cmd.data) + + # We relay to all other connections to ensure every instance gets the + # notification. + # + # When configured to use redis we'll always only have one connection and + # so this is a no-op (all instances will have already received the same + # REMOTE_SERVER_UP command). + # + # For direct TCP connections this will relay to all other connections + # connected to us. When on master this will correctly fan out to all + # other direct TCP clients and on workers there'll only be the one + # connection to master. + # + # (The logic here should also be sound if we have a mix of Redis and + # direct TCP connections so long as there is only one traffic route + # between two instances, but that is not currently supported). + self.send_command(cmd, ignore_conn=conn) def new_connection(self, connection: AbstractConnection): """Called when we have a new connection. @@ -390,11 +405,21 @@ class ReplicationCommandHandler: """ return bool(self._connections) - def send_command(self, cmd: Command): + def send_command( + self, cmd: Command, ignore_conn: Optional[AbstractConnection] = None + ): """Send a command to all connected connections. + + Args: + cmd + ignore_conn: If set don't send command to the given connection. + Used when relaying commands from one connection to all others. """ if self._connections: for connection in self._connections: + if connection == ignore_conn: + continue + try: connection.send_command(cmd) except Exception: