summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-13 09:57:15 +0100
committerGitHub <noreply@github.com>2020-05-13 09:57:15 +0100
commit8ca79613e62201f0990a482130527fc88c4ffd40 (patch)
tree165ff43a8cf7bd954ac6b0ec6e65bf5db5fe5edc
parentUpdate documentation about SSO mapping providers (#7458) (diff)
downloadsynapse-8ca79613e62201f0990a482130527fc88c4ffd40.tar.xz
Fix Redis reconnection logic (#7482)
Proactively send out `POSITION` commands (as if we had just received a `REPLICATE`) when we connect to Redis. This is important as other instances won't notice we've connected to issue a `REPLICATE` command (unlike for direct TCP connections). This is only currently an issue if master process reconnects without restarting (if it restarts then it won't have written anything and so other instances probably won't have missed anything). 
-rw-r--r--changelog.d/7482.bugfix1
-rw-r--r--synapse/replication/tcp/handler.py9
-rw-r--r--synapse/replication/tcp/redis.py7
3 files changed, 15 insertions, 2 deletions
diff --git a/changelog.d/7482.bugfix b/changelog.d/7482.bugfix
new file mode 100644
index 0000000000..018bf5cc89
--- /dev/null
+++ b/changelog.d/7482.bugfix
@@ -0,0 +1 @@
+Fix Redis reconnection logic that can result in missed updates over replication if master reconnects to Redis without restarting.
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1b05468483..e6a50aa74e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -151,6 +151,13 @@ class ReplicationCommandHandler:
             hs.get_reactor().connectTCP(host, port, self._factory)
 
     async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
+        self.send_positions_to_connection(conn)
+
+    def send_positions_to_connection(self, conn: AbstractConnection):
+        """Send current position of all streams this process is source of to
+        the connection.
+        """
+
         # We only want to announce positions by the writer of the streams.
         # Currently this is just the master process.
         if not self._is_master:
@@ -158,7 +165,7 @@ class ReplicationCommandHandler:
 
         for stream_name, stream in self._streams.items():
             current_token = stream.current_token(self._instance_name)
-            self.send_command(
+            conn.send_command(
                 PositionCommand(stream_name, self._instance_name, current_token)
             )
 
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 55bfa71dfd..e776b63183 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -70,7 +70,6 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
         logger.info("Connected to redis")
         super().connectionMade()
         run_as_background_process("subscribe-replication", self._send_subscribe)
-        self.handler.new_connection(self)
 
     async def _send_subscribe(self):
         # it's important to make sure that we only send the REPLICATE command once we
@@ -81,9 +80,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
         logger.info(
             "Successfully subscribed to redis stream, sending REPLICATE command"
         )
+        self.handler.new_connection(self)
         await self._async_send_command(ReplicateCommand())
         logger.info("REPLICATE successfully sent")
 
+        # We send out our positions when there is a new connection in case the
+        # other side missed updates. We do this for Redis connections as the
+        # otherside won't know we've connected and so won't issue a REPLICATE.
+        self.handler.send_positions_to_connection(self)
+
     def messageReceived(self, pattern: str, channel: str, message: str):
         """Received a message from redis.
         """