summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-23 18:29:21 +0000
committerErik Johnston <erik@matrix.org>2020-03-23 18:29:21 +0000
commit3204b0e79fc0521281de6e5270375b9855201dfb (patch)
treec2efe4eb3a7a63e7a7032c20b4467c83f2bea507 /synapse/replication/tcp/protocol.py
parentReview comments (diff)
downloadsynapse-3204b0e79fc0521281de6e5270375b9855201dfb.tar.xz
Handle connection closing under us
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py11
1 files changed, 11 insertions, 0 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 8aa749265c..e266c72417 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -628,6 +628,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
             updates, current_token, limited = await stream.get_updates_since(
                 current_token, cmd.token
             )
+
+            # Check if the connection was closed underneath us, if so we bail
+            # rather than risk having concurrent catch ups going on.
+            if self.state == ConnectionStates.CLOSED:
+                return
+
             if updates:
                 await self.handler.on_rdata(
                     cmd.stream_name,
@@ -643,6 +649,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
         if not self.streams_connecting:
             self.handler.finished_connecting()
 
+        # Check if the connection was closed underneath us, if so we bail
+        # rather than risk having concurrent catch ups going on.
+        if self.state == ConnectionStates.CLOSED:
+            return
+
         # Handle any RDATA that came in while we were catching up.
         rows = self.pending_batches.pop(cmd.stream_name, [])
         if rows: