summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-04-27 12:01:23 +0100
committerErik Johnston <erik@matrix.org>2020-04-27 12:05:57 +0100
commit659b6bec35cc19b64cf8b526f82c6ecbedd1749e (patch)
tree9d6d81d4b656a60bb5dd7e237a2a2e2dbd240b1e
parentPass connection to 'on_*' replication handler (diff)
downloadsynapse-659b6bec35cc19b64cf8b526f82c6ecbedd1749e.tar.xz
Don't relay REMOTE_SERVER_UP cmds to same conn.
For direct TCP connections we need the master to relay REMOTE_SERVER_UP
commands to the other connections so that all instances get notified
about it. The old implementation just relayed to all connections,
assuming that sending back to the original sender of the command was
safe. This is not true for redis, where commands sent get echoed back to
the sender, which was causing master to effectively infinite loop
sending and then re-receiving REMOTE_SERVER_UP commands that it sent.

The fix is to ensure that we only relay to *other* connections and not
to the connection we received the notification from.

Fixes #7334.
-rw-r--r--synapse/notifier.py9
-rw-r--r--synapse/replication/tcp/handler.py33
2 files changed, 29 insertions, 13 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6132727cbd..88a5a97caf 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -220,12 +220,6 @@ class Notifier(object):
         """
         self.replication_callbacks.append(cb)
 
-    def add_remote_server_up_callback(self, cb: Callable[[str], None]):
-        """Add a callback that will be called when synapse detects a server
-        has been
-        """
-        self.remote_server_up_callbacks.append(cb)
-
     def on_new_room_event(
         self, event, room_stream_id, max_room_stream_id, extra_users=[]
     ):
@@ -544,6 +538,3 @@ class Notifier(object):
         # circular dependencies.
         if self.federation_sender:
             self.federation_sender.wake_destination(server)
-
-        for cb in self.remote_server_up_callbacks:
-            cb(server)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 3cdf87e140..7de8d94961 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -115,7 +115,6 @@ class ReplicationCommandHandler:
         self._server_notices_sender = None
         if self._is_master:
             self._server_notices_sender = hs.get_server_notices_sender()
-            self._notifier.add_remote_server_up_callback(self.send_remote_server_up)
 
     def start_replication(self, hs):
         """Helper method to start a replication connection to the remote server
@@ -344,8 +343,24 @@ class ReplicationCommandHandler:
         """"Called when get a new REMOTE_SERVER_UP command."""
         self._replication_data_handler.on_remote_server_up(cmd.data)
 
-        if self._is_master:
-            self._notifier.notify_remote_server_up(cmd.data)
+        self._notifier.notify_remote_server_up(cmd.data)
+
+        # We relay to all other connections to ensure every instance gets the
+        # notification.
+        #
+        # When configured to use redis we'll always only have one connection and
+        # so this is a no-op (all instances will have already received the same
+        # REMOTE_SERVER_UP command).
+        #
+        # For direct TCP connections this will relay to all other connections
+        # connected to us. When on master this will correctly fan out to all
+        # other direct TCP clients and on workers there'll only be the one
+        # connection to master.
+        #
+        # (The logic here should also be sound if we have a mix of Redis and
+        # direct TCP connections so long as there is only one traffic route
+        # between two instances, but that is not currently supported).
+        self.send_command(cmd, ignore_conn=conn)
 
     def new_connection(self, connection: AbstractConnection):
         """Called when we have a new connection.
@@ -390,11 +405,21 @@ class ReplicationCommandHandler:
         """
         return bool(self._connections)
 
-    def send_command(self, cmd: Command):
+    def send_command(
+        self, cmd: Command, ignore_conn: Optional[AbstractConnection] = None
+    ):
         """Send a command to all connected connections.
+
+        Args:
+            cmd
+            ignore_conn: If set don't send command to the given connection.
+                Used when relaying commands from one connection to all others.
         """
         if self._connections:
             for connection in self._connections:
+                if connection == ignore_conn:
+                    continue
+
                 try:
                     connection.send_command(cmd)
                 except Exception: