diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 21293038ef..17e1572393 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -261,7 +261,7 @@ class ReplicationCommandHandler:
"process-replication-data", self._unsafe_process_queue, stream_name
)
- async def _unsafe_process_queue(self, stream_name: str):
+ async def _unsafe_process_queue(self, stream_name: str) -> None:
"""Processes the command queue for the given stream, until it is empty
Does not check if there is already a thread processing the queue, hence "unsafe"
@@ -294,7 +294,7 @@ class ReplicationCommandHandler:
# This shouldn't be possible
raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
- def start_replication(self, hs: "HomeServer"):
+ def start_replication(self, hs: "HomeServer") -> None:
"""Helper method to start a replication connection to the remote server
using TCP.
"""
@@ -318,7 +318,7 @@ class ReplicationCommandHandler:
hs, outbound_redis_connection
)
hs.get_reactor().connectTCP(
- hs.config.redis.redis_host, # type: ignore[arg-type]
+ hs.config.redis.redis_host,
hs.config.redis.redis_port,
self._factory,
timeout=30,
@@ -330,7 +330,7 @@ class ReplicationCommandHandler:
host = hs.config.worker.worker_replication_host
port = hs.config.worker.worker_replication_port
hs.get_reactor().connectTCP(
- host, # type: ignore[arg-type]
+ host,
port,
self._factory,
timeout=30,
@@ -345,10 +345,10 @@ class ReplicationCommandHandler:
"""Get a list of streams that this instances replicates."""
return self._streams_to_replicate
- def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand):
+ def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
self.send_positions_to_connection(conn)
- def send_positions_to_connection(self, conn: IReplicationConnection):
+ def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
"""Send current position of all streams this process is source of to
the connection.
"""
@@ -392,7 +392,7 @@ class ReplicationCommandHandler:
def on_FEDERATION_ACK(
self, conn: IReplicationConnection, cmd: FederationAckCommand
- ):
+ ) -> None:
federation_ack_counter.inc()
if self._federation_sender:
@@ -408,7 +408,7 @@ class ReplicationCommandHandler:
else:
return None
- async def _handle_user_ip(self, cmd: UserIpCommand):
+ async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
@@ -421,7 +421,7 @@ class ReplicationCommandHandler:
assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)
- def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand):
+ def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes
return
@@ -497,7 +497,7 @@ class ReplicationCommandHandler:
async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
- ):
+ ) -> None:
"""Called to handle a batch of replication data with a given stream token.
Args:
@@ -512,7 +512,7 @@ class ReplicationCommandHandler:
stream_name, instance_name, token, rows
)
- def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand):
+ def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> None:
if cmd.instance_name == self._instance_name:
# Ignore POSITION that are just our own echoes
return
@@ -581,7 +581,7 @@ class ReplicationCommandHandler:
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
- ):
+ ) -> None:
"""Called when get a new REMOTE_SERVER_UP command."""
self._replication_data_handler.on_remote_server_up(cmd.data)
@@ -604,7 +604,7 @@ class ReplicationCommandHandler:
# between two instances, but that is not currently supported).
self.send_command(cmd, ignore_conn=conn)
- def new_connection(self, connection: IReplicationConnection):
+ def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
self._connections.append(connection)
@@ -631,7 +631,7 @@ class ReplicationCommandHandler:
UserSyncCommand(self._instance_id, user_id, True, now)
)
- def lost_connection(self, connection: IReplicationConnection):
+ def lost_connection(self, connection: IReplicationConnection) -> None:
"""Called when a connection is closed/lost."""
# we no longer need _streams_by_connection for this connection.
streams = self._streams_by_connection.pop(connection, None)
@@ -653,7 +653,7 @@ class ReplicationCommandHandler:
def send_command(
self, cmd: Command, ignore_conn: Optional[IReplicationConnection] = None
- ):
+ ) -> None:
"""Send a command to all connected connections.
Args:
@@ -680,7 +680,7 @@ class ReplicationCommandHandler:
else:
logger.warning("Dropping command as not connected: %r", cmd.NAME)
- def send_federation_ack(self, token: int):
+ def send_federation_ack(self, token: int) -> None:
"""Ack data for the federation stream. This allows the master to drop
data stored purely in memory.
"""
@@ -688,7 +688,7 @@ class ReplicationCommandHandler:
def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
- ):
+ ) -> None:
"""Poke the master that a user has started/stopped syncing."""
self.send_command(
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
@@ -702,15 +702,15 @@ class ReplicationCommandHandler:
user_agent: str,
device_id: str,
last_seen: int,
- ):
+ ) -> None:
"""Tell the master that the user made a request."""
cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
self.send_command(cmd)
- def send_remote_server_up(self, server: str):
+ def send_remote_server_up(self, server: str) -> None:
self.send_command(RemoteServerUpCommand(server))
- def stream_update(self, stream_name: str, token: str, data: Any):
+ def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
"""Called when a new update is available to stream to clients.
We need to check if the client is interested in the stream or not
|