diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 8d28bd3f3f..3170f7c59b 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -14,7 +14,7 @@
import logging
from inspect import isawaitable
-from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast
import attr
import txredisapi
@@ -62,7 +62,7 @@ class ConstantProperty(Generic[T, V]):
def __get__(self, obj: Optional[T], objtype: Optional[Type[T]] = None) -> V:
return self.constant
- def __set__(self, obj: Optional[T], value: V):
+ def __set__(self, obj: Optional[T], value: V) -> None:
pass
@@ -95,7 +95,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
synapse_stream_name: str
synapse_outbound_redis_connection: txredisapi.RedisProtocol
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
# a logcontext which we use for processing incoming commands. We declare it as a
@@ -108,12 +108,12 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
"replication_command_handler"
)
- def connectionMade(self):
+ def connectionMade(self) -> None:
logger.info("Connected to redis")
super().connectionMade()
run_as_background_process("subscribe-replication", self._send_subscribe)
- async def _send_subscribe(self):
+ async def _send_subscribe(self) -> None:
# it's important to make sure that we only send the REPLICATE command once we
# have successfully subscribed to the stream - otherwise we might miss the
# POSITION response sent back by the other end.
@@ -131,12 +131,12 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
# otherside won't know we've connected and so won't issue a REPLICATE.
self.synapse_handler.send_positions_to_connection(self)
- def messageReceived(self, pattern: str, channel: str, message: str):
+ def messageReceived(self, pattern: str, channel: str, message: str) -> None:
"""Received a message from redis."""
with PreserveLoggingContext(self._logging_context):
self._parse_and_dispatch_message(message)
- def _parse_and_dispatch_message(self, message: str):
+ def _parse_and_dispatch_message(self, message: str) -> None:
if message.strip() == "":
# Ignore blank lines
return
@@ -181,7 +181,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
"replication-" + cmd.get_logcontext_id(), lambda: res
)
- def connectionLost(self, reason):
+ def connectionLost(self, reason: Failure) -> None: # type: ignore[override]
logger.info("Lost connection to redis")
super().connectionLost(reason)
self.synapse_handler.lost_connection(self)
@@ -193,17 +193,17 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
# the sentinel context is now active, which may not be correct.
# PreserveLoggingContext() will restore the correct logging context.
- def send_command(self, cmd: Command):
+ def send_command(self, cmd: Command) -> None:
"""Send a command if connection has been established.
Args:
- cmd (Command)
+ cmd: The command to send
"""
run_as_background_process(
"send-cmd", self._async_send_command, cmd, bg_start_span=False
)
- async def _async_send_command(self, cmd: Command):
+ async def _async_send_command(self, cmd: Command) -> None:
"""Encode a replication command and send it over our outbound connection"""
string = "%s %s" % (cmd.NAME, cmd.to_line())
if "\n" in string:
@@ -259,7 +259,7 @@ class SynapseRedisFactory(txredisapi.RedisFactory):
hs.get_clock().looping_call(self._send_ping, 30 * 1000)
@wrap_as_background_process("redis_ping")
- async def _send_ping(self):
+ async def _send_ping(self) -> None:
for connection in self.pool:
try:
await make_deferred_yieldable(connection.ping())
@@ -269,13 +269,13 @@ class SynapseRedisFactory(txredisapi.RedisFactory):
# ReconnectingClientFactory has some logging (if you enable `self.noisy`), but
# it's rubbish. We add our own here.
- def startedConnecting(self, connector: IConnector):
+ def startedConnecting(self, connector: IConnector) -> None:
logger.info(
"Connecting to redis server %s", format_address(connector.getDestination())
)
super().startedConnecting(connector)
- def clientConnectionFailed(self, connector: IConnector, reason: Failure):
+ def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None:
logger.info(
"Connection to redis server %s failed: %s",
format_address(connector.getDestination()),
@@ -283,7 +283,7 @@ class SynapseRedisFactory(txredisapi.RedisFactory):
)
super().clientConnectionFailed(connector, reason)
- def clientConnectionLost(self, connector: IConnector, reason: Failure):
+ def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None:
logger.info(
"Connection to redis server %s lost: %s",
format_address(connector.getDestination()),
@@ -330,7 +330,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
self.synapse_outbound_redis_connection = outbound_redis_connection
- def buildProtocol(self, addr):
+ def buildProtocol(self, addr: IAddress) -> RedisSubscriber:
p = super().buildProtocol(addr)
p = cast(RedisSubscriber, p)
@@ -373,7 +373,7 @@ def lazyConnection(
reactor = hs.get_reactor()
reactor.connectTCP(
- host, # type: ignore[arg-type]
+ host,
port,
factory,
timeout=30,
|