summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py36
1 files changed, 18 insertions, 18 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index f7e6bc1e62..17e1572393 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -261,7 +261,7 @@ class ReplicationCommandHandler:
             "process-replication-data", self._unsafe_process_queue, stream_name
         )
 
-    async def _unsafe_process_queue(self, stream_name: str):
+    async def _unsafe_process_queue(self, stream_name: str) -> None:
         """Processes the command queue for the given stream, until it is empty
 
         Does not check if there is already a thread processing the queue, hence "unsafe"
@@ -294,7 +294,7 @@ class ReplicationCommandHandler:
             # This shouldn't be possible
             raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
 
-    def start_replication(self, hs: "HomeServer"):
+    def start_replication(self, hs: "HomeServer") -> None:
         """Helper method to start a replication connection to the remote server
         using TCP.
         """
@@ -345,10 +345,10 @@ class ReplicationCommandHandler:
         """Get a list of streams that this instances replicates."""
         return self._streams_to_replicate
 
-    def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand):
+    def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
         self.send_positions_to_connection(conn)
 
-    def send_positions_to_connection(self, conn: IReplicationConnection):
+    def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
         """Send current position of all streams this process is source of to
         the connection.
         """
@@ -392,7 +392,7 @@ class ReplicationCommandHandler:
 
     def on_FEDERATION_ACK(
         self, conn: IReplicationConnection, cmd: FederationAckCommand
-    ):
+    ) -> None:
         federation_ack_counter.inc()
 
         if self._federation_sender:
@@ -408,7 +408,7 @@ class ReplicationCommandHandler:
         else:
             return None
 
-    async def _handle_user_ip(self, cmd: UserIpCommand):
+    async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
         await self._store.insert_client_ip(
             cmd.user_id,
             cmd.access_token,
@@ -421,7 +421,7 @@ class ReplicationCommandHandler:
         assert self._server_notices_sender is not None
         await self._server_notices_sender.on_user_ip(cmd.user_id)
 
-    def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand):
+    def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
         if cmd.instance_name == self._instance_name:
             # Ignore RDATA that are just our own echoes
             return
@@ -497,7 +497,7 @@ class ReplicationCommandHandler:
 
     async def on_rdata(
         self, stream_name: str, instance_name: str, token: int, rows: list
-    ):
+    ) -> None:
         """Called to handle a batch of replication data with a given stream token.
 
         Args:
@@ -512,7 +512,7 @@ class ReplicationCommandHandler:
             stream_name, instance_name, token, rows
         )
 
-    def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand):
+    def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> None:
         if cmd.instance_name == self._instance_name:
             # Ignore POSITION that are just our own echoes
             return
@@ -581,7 +581,7 @@ class ReplicationCommandHandler:
 
     def on_REMOTE_SERVER_UP(
         self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
-    ):
+    ) -> None:
         """Called when get a new REMOTE_SERVER_UP command."""
         self._replication_data_handler.on_remote_server_up(cmd.data)
 
@@ -604,7 +604,7 @@ class ReplicationCommandHandler:
         # between two instances, but that is not currently supported).
         self.send_command(cmd, ignore_conn=conn)
 
-    def new_connection(self, connection: IReplicationConnection):
+    def new_connection(self, connection: IReplicationConnection) -> None:
         """Called when we have a new connection."""
         self._connections.append(connection)
 
@@ -631,7 +631,7 @@ class ReplicationCommandHandler:
                 UserSyncCommand(self._instance_id, user_id, True, now)
             )
 
-    def lost_connection(self, connection: IReplicationConnection):
+    def lost_connection(self, connection: IReplicationConnection) -> None:
         """Called when a connection is closed/lost."""
         # we no longer need _streams_by_connection for this connection.
         streams = self._streams_by_connection.pop(connection, None)
@@ -653,7 +653,7 @@ class ReplicationCommandHandler:
 
     def send_command(
         self, cmd: Command, ignore_conn: Optional[IReplicationConnection] = None
-    ):
+    ) -> None:
         """Send a command to all connected connections.
 
         Args:
@@ -680,7 +680,7 @@ class ReplicationCommandHandler:
         else:
             logger.warning("Dropping command as not connected: %r", cmd.NAME)
 
-    def send_federation_ack(self, token: int):
+    def send_federation_ack(self, token: int) -> None:
         """Ack data for the federation stream. This allows the master to drop
         data stored purely in memory.
         """
@@ -688,7 +688,7 @@ class ReplicationCommandHandler:
 
     def send_user_sync(
         self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
-    ):
+    ) -> None:
         """Poke the master that a user has started/stopped syncing."""
         self.send_command(
             UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
@@ -702,15 +702,15 @@ class ReplicationCommandHandler:
         user_agent: str,
         device_id: str,
         last_seen: int,
-    ):
+    ) -> None:
         """Tell the master that the user made a request."""
         cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
         self.send_command(cmd)
 
-    def send_remote_server_up(self, server: str):
+    def send_remote_server_up(self, server: str) -> None:
         self.send_command(RemoteServerUpCommand(server))
 
-    def stream_update(self, stream_name: str, token: str, data: Any):
+    def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
         """Called when a new update is available to stream to clients.
 
         We need to check if the client is interested in the stream or not