summary refs log tree commit diff
path: root/synapse/replication/tcp/redis.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/redis.py')
-rw-r--r--synapse/replication/tcp/redis.py34
1 files changed, 17 insertions, 17 deletions
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 8d28bd3f3f..3170f7c59b 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -14,7 +14,7 @@
 
 import logging
 from inspect import isawaitable
-from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast
 
 import attr
 import txredisapi
@@ -62,7 +62,7 @@ class ConstantProperty(Generic[T, V]):
     def __get__(self, obj: Optional[T], objtype: Optional[Type[T]] = None) -> V:
         return self.constant
 
-    def __set__(self, obj: Optional[T], value: V):
+    def __set__(self, obj: Optional[T], value: V) -> None:
         pass
 
 
@@ -95,7 +95,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
     synapse_stream_name: str
     synapse_outbound_redis_connection: txredisapi.RedisProtocol
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any):
         super().__init__(*args, **kwargs)
 
         # a logcontext which we use for processing incoming commands. We declare it as a
@@ -108,12 +108,12 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
                 "replication_command_handler"
             )
 
-    def connectionMade(self):
+    def connectionMade(self) -> None:
         logger.info("Connected to redis")
         super().connectionMade()
         run_as_background_process("subscribe-replication", self._send_subscribe)
 
-    async def _send_subscribe(self):
+    async def _send_subscribe(self) -> None:
         # it's important to make sure that we only send the REPLICATE command once we
         # have successfully subscribed to the stream - otherwise we might miss the
         # POSITION response sent back by the other end.
@@ -131,12 +131,12 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
         # otherside won't know we've connected and so won't issue a REPLICATE.
         self.synapse_handler.send_positions_to_connection(self)
 
-    def messageReceived(self, pattern: str, channel: str, message: str):
+    def messageReceived(self, pattern: str, channel: str, message: str) -> None:
         """Received a message from redis."""
         with PreserveLoggingContext(self._logging_context):
             self._parse_and_dispatch_message(message)
 
-    def _parse_and_dispatch_message(self, message: str):
+    def _parse_and_dispatch_message(self, message: str) -> None:
         if message.strip() == "":
             # Ignore blank lines
             return
@@ -181,7 +181,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
                 "replication-" + cmd.get_logcontext_id(), lambda: res
             )
 
-    def connectionLost(self, reason):
+    def connectionLost(self, reason: Failure) -> None:  # type: ignore[override]
         logger.info("Lost connection to redis")
         super().connectionLost(reason)
         self.synapse_handler.lost_connection(self)
@@ -193,17 +193,17 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
             # the sentinel context is now active, which may not be correct.
             # PreserveLoggingContext() will restore the correct logging context.
 
-    def send_command(self, cmd: Command):
+    def send_command(self, cmd: Command) -> None:
         """Send a command if connection has been established.
 
         Args:
-            cmd (Command)
+            cmd: The command to send
         """
         run_as_background_process(
             "send-cmd", self._async_send_command, cmd, bg_start_span=False
         )
 
-    async def _async_send_command(self, cmd: Command):
+    async def _async_send_command(self, cmd: Command) -> None:
         """Encode a replication command and send it over our outbound connection"""
         string = "%s %s" % (cmd.NAME, cmd.to_line())
         if "\n" in string:
@@ -259,7 +259,7 @@ class SynapseRedisFactory(txredisapi.RedisFactory):
         hs.get_clock().looping_call(self._send_ping, 30 * 1000)
 
     @wrap_as_background_process("redis_ping")
-    async def _send_ping(self):
+    async def _send_ping(self) -> None:
         for connection in self.pool:
             try:
                 await make_deferred_yieldable(connection.ping())
@@ -269,13 +269,13 @@ class SynapseRedisFactory(txredisapi.RedisFactory):
     # ReconnectingClientFactory has some logging (if you enable `self.noisy`), but
     # it's rubbish. We add our own here.
 
-    def startedConnecting(self, connector: IConnector):
+    def startedConnecting(self, connector: IConnector) -> None:
         logger.info(
             "Connecting to redis server %s", format_address(connector.getDestination())
         )
         super().startedConnecting(connector)
 
-    def clientConnectionFailed(self, connector: IConnector, reason: Failure):
+    def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None:
         logger.info(
             "Connection to redis server %s failed: %s",
             format_address(connector.getDestination()),
@@ -283,7 +283,7 @@ class SynapseRedisFactory(txredisapi.RedisFactory):
         )
         super().clientConnectionFailed(connector, reason)
 
-    def clientConnectionLost(self, connector: IConnector, reason: Failure):
+    def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None:
         logger.info(
             "Connection to redis server %s lost: %s",
             format_address(connector.getDestination()),
@@ -330,7 +330,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
 
         self.synapse_outbound_redis_connection = outbound_redis_connection
 
-    def buildProtocol(self, addr):
+    def buildProtocol(self, addr: IAddress) -> RedisSubscriber:
         p = super().buildProtocol(addr)
         p = cast(RedisSubscriber, p)
 
@@ -373,7 +373,7 @@ def lazyConnection(
 
     reactor = hs.get_reactor()
     reactor.connectTCP(
-        host,  # type: ignore[arg-type]
+        host,
         port,
         factory,
         timeout=30,