summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py27
1 files changed, 9 insertions, 18 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 804da994ea..e0b4ad314d 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -222,8 +222,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
                 self.send_error("ping timeout")
 
     def lineReceived(self, line: bytes):
-        """Called when we've received a line
-        """
+        """Called when we've received a line"""
         with PreserveLoggingContext(self._logging_context):
             self._parse_and_dispatch_line(line)
 
@@ -299,8 +298,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         self.on_connection_closed()
 
     def send_error(self, error_string, *args):
-        """Send an error to remote and close the connection.
-        """
+        """Send an error to remote and close the connection."""
         self.send_command(ErrorCommand(error_string % args))
         self.close()
 
@@ -341,8 +339,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         self.last_sent_command = self.clock.time_msec()
 
     def _queue_command(self, cmd):
-        """Queue the command until the connection is ready to write to again.
-        """
+        """Queue the command until the connection is ready to write to again."""
         logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd)
         self.pending_commands.append(cmd)
 
@@ -355,8 +352,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             self.close()
 
     def _send_pending_commands(self):
-        """Send any queued commandes
-        """
+        """Send any queued commandes"""
         pending = self.pending_commands
         self.pending_commands = []
         for cmd in pending:
@@ -380,8 +376,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         self.state = ConnectionStates.PAUSED
 
     def resumeProducing(self):
-        """The remote has caught up after we started buffering!
-        """
+        """The remote has caught up after we started buffering!"""
         logger.info("[%s] Resume producing", self.id())
         self.state = ConnectionStates.ESTABLISHED
         self._send_pending_commands()
@@ -440,8 +435,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         return "%s-%s" % (self.name, self.conn_id)
 
     def lineLengthExceeded(self, line):
-        """Called when we receive a line that is above the maximum line length
-        """
+        """Called when we receive a line that is above the maximum line length"""
         self.send_error("Line length exceeded")
 
 
@@ -495,21 +489,18 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
             self.send_error("Wrong remote")
 
     def replicate(self):
-        """Send the subscription request to the server
-        """
+        """Send the subscription request to the server"""
         logger.info("[%s] Subscribing to replication streams", self.id())
 
         self.send_command(ReplicateCommand())
 
 
 class AbstractConnection(abc.ABC):
-    """An interface for replication connections.
-    """
+    """An interface for replication connections."""
 
     @abc.abstractmethod
     def send_command(self, cmd: Command):
-        """Send the command down the connection
-        """
+        """Send the command down the connection"""
         pass