diff options
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r-- | synapse/replication/tcp/handler.py | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index f7e6bc1e62..17e1572393 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -261,7 +261,7 @@ class ReplicationCommandHandler: "process-replication-data", self._unsafe_process_queue, stream_name ) - async def _unsafe_process_queue(self, stream_name: str): + async def _unsafe_process_queue(self, stream_name: str) -> None: """Processes the command queue for the given stream, until it is empty Does not check if there is already a thread processing the queue, hence "unsafe" @@ -294,7 +294,7 @@ class ReplicationCommandHandler: # This shouldn't be possible raise Exception("Unrecognised command %s in stream queue", cmd.NAME) - def start_replication(self, hs: "HomeServer"): + def start_replication(self, hs: "HomeServer") -> None: """Helper method to start a replication connection to the remote server using TCP. """ @@ -345,10 +345,10 @@ class ReplicationCommandHandler: """Get a list of streams that this instances replicates.""" return self._streams_to_replicate - def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand): + def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None: self.send_positions_to_connection(conn) - def send_positions_to_connection(self, conn: IReplicationConnection): + def send_positions_to_connection(self, conn: IReplicationConnection) -> None: """Send current position of all streams this process is source of to the connection. """ @@ -392,7 +392,7 @@ class ReplicationCommandHandler: def on_FEDERATION_ACK( self, conn: IReplicationConnection, cmd: FederationAckCommand - ): + ) -> None: federation_ack_counter.inc() if self._federation_sender: @@ -408,7 +408,7 @@ class ReplicationCommandHandler: else: return None - async def _handle_user_ip(self, cmd: UserIpCommand): + async def _handle_user_ip(self, cmd: UserIpCommand) -> None: await self._store.insert_client_ip( cmd.user_id, cmd.access_token, @@ -421,7 +421,7 @@ class ReplicationCommandHandler: assert self._server_notices_sender is not None await self._server_notices_sender.on_user_ip(cmd.user_id) - def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand): + def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None: if cmd.instance_name == self._instance_name: # Ignore RDATA that are just our own echoes return @@ -497,7 +497,7 @@ class ReplicationCommandHandler: async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list - ): + ) -> None: """Called to handle a batch of replication data with a given stream token. Args: @@ -512,7 +512,7 @@ class ReplicationCommandHandler: stream_name, instance_name, token, rows ) - def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand): + def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> None: if cmd.instance_name == self._instance_name: # Ignore POSITION that are just our own echoes return @@ -581,7 +581,7 @@ class ReplicationCommandHandler: def on_REMOTE_SERVER_UP( self, conn: IReplicationConnection, cmd: RemoteServerUpCommand - ): + ) -> None: """Called when get a new REMOTE_SERVER_UP command.""" self._replication_data_handler.on_remote_server_up(cmd.data) @@ -604,7 +604,7 @@ class ReplicationCommandHandler: # between two instances, but that is not currently supported). self.send_command(cmd, ignore_conn=conn) - def new_connection(self, connection: IReplicationConnection): + def new_connection(self, connection: IReplicationConnection) -> None: """Called when we have a new connection.""" self._connections.append(connection) @@ -631,7 +631,7 @@ class ReplicationCommandHandler: UserSyncCommand(self._instance_id, user_id, True, now) ) - def lost_connection(self, connection: IReplicationConnection): + def lost_connection(self, connection: IReplicationConnection) -> None: """Called when a connection is closed/lost.""" # we no longer need _streams_by_connection for this connection. streams = self._streams_by_connection.pop(connection, None) @@ -653,7 +653,7 @@ class ReplicationCommandHandler: def send_command( self, cmd: Command, ignore_conn: Optional[IReplicationConnection] = None - ): + ) -> None: """Send a command to all connected connections. Args: @@ -680,7 +680,7 @@ class ReplicationCommandHandler: else: logger.warning("Dropping command as not connected: %r", cmd.NAME) - def send_federation_ack(self, token: int): + def send_federation_ack(self, token: int) -> None: """Ack data for the federation stream. This allows the master to drop data stored purely in memory. """ @@ -688,7 +688,7 @@ class ReplicationCommandHandler: def send_user_sync( self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int - ): + ) -> None: """Poke the master that a user has started/stopped syncing.""" self.send_command( UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms) @@ -702,15 +702,15 @@ class ReplicationCommandHandler: user_agent: str, device_id: str, last_seen: int, - ): + ) -> None: """Tell the master that the user made a request.""" cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) self.send_command(cmd) - def send_remote_server_up(self, server: str): + def send_remote_server_up(self, server: str) -> None: self.send_command(RemoteServerUpCommand(server)) - def stream_update(self, stream_name: str, token: str, data: Any): + def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None: """Called when a new update is available to stream to clients. We need to check if the client is interested in the stream or not |