summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2022-03-10 14:34:59 -0500
committerPatrick Cloke <patrickc@matrix.org>2022-03-11 10:34:59 -0500
commit7375bd4828a13ece1bc7d8a32d05c389e722174b (patch)
tree8d0ced40d4b41455bd015ff500dd1fc74c87f8dd
parentAttempt some progress (diff)
downloadsynapse-7375bd4828a13ece1bc7d8a32d05c389e722174b.tar.xz
More robust-ness against dying connections.
-rw-r--r--synapse/replication/tcp/redis.py13
1 files changed, 11 insertions, 2 deletions
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index b63c151cfb..2688c1ee8e 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -118,13 +118,21 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
         # have successfully subscribed to the stream - otherwise we might miss the
         # POSITION response sent back by the other end.
         logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name)
-        await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
+        try:
+            await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
+        except txredisapi.ConnectionError:
+            # The connection died, the factory will attempt to reconnect.
+            return
         logger.info(
             "Successfully subscribed to redis stream, sending REPLICATE command"
         )
+
+        # If the connection has been severed for some reason, bail.
+        if not self.connected:
+            return
+
         self.synapse_handler.new_connection(self)
         await self._async_send_command(ReplicateCommand())
-        logger.info("REPLICATE successfully sent")
 
         # We send out our positions when there is a new connection in case the
         # other side missed updates. We do this for Redis connections as the
@@ -360,6 +368,7 @@ def lazyConnection(
     reconnect: bool = True,
     password: Optional[str] = None,
     replyTimeout: int = 30,
+    handler: Optional[txredisapi.ConnectionHandler] = None,
 ) -> txredisapi.ConnectionHandler:
     """Creates a connection to Redis that is lazily set up and reconnects if the
     connections is lost.