summary refs log tree commit diff
path: root/synapse/replication/tcp/redis.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/redis.py')
-rw-r--r--synapse/replication/tcp/redis.py45
1 files changed, 41 insertions, 4 deletions
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py

index 7560706b4b..2f4d407f94 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py
@@ -19,6 +19,11 @@ from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast import attr import txredisapi +from zope.interface import implementer + +from twisted.internet.address import IPv4Address, IPv6Address +from twisted.internet.interfaces import IAddress, IConnector +from twisted.python.failure import Failure from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import ( @@ -32,7 +37,7 @@ from synapse.replication.tcp.commands import ( parse_command_from_line, ) from synapse.replication.tcp.protocol import ( - AbstractConnection, + IReplicationConnection, tcp_inbound_commands_counter, tcp_outbound_commands_counter, ) @@ -62,7 +67,8 @@ class ConstantProperty(Generic[T, V]): pass -class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): +@implementer(IReplicationConnection) +class RedisSubscriber(txredisapi.SubscriberProtocol): """Connection to redis subscribed to replication stream. This class fulfils two functions: @@ -71,7 +77,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): connection, parsing *incoming* messages into replication commands, and passing them to `ReplicationCommandHandler` - (b) it implements the AbstractConnection API, where it sends *outgoing* commands + (b) it implements the IReplicationConnection API, where it sends *outgoing* commands onto outbound_redis_connection. Due to the vagaries of `txredisapi` we don't want to have a custom @@ -253,6 +259,37 @@ class SynapseRedisFactory(txredisapi.RedisFactory): except Exception: logger.warning("Failed to send ping to a redis connection") + # ReconnectingClientFactory has some logging (if you enable `self.noisy`), but + # it's rubbish. We add our own here. + + def startedConnecting(self, connector: IConnector): + logger.info( + "Connecting to redis server %s", format_address(connector.getDestination()) + ) + super().startedConnecting(connector) + + def clientConnectionFailed(self, connector: IConnector, reason: Failure): + logger.info( + "Connection to redis server %s failed: %s", + format_address(connector.getDestination()), + reason.value, + ) + super().clientConnectionFailed(connector, reason) + + def clientConnectionLost(self, connector: IConnector, reason: Failure): + logger.info( + "Connection to redis server %s lost: %s", + format_address(connector.getDestination()), + reason.value, + ) + super().clientConnectionLost(connector, reason) + + +def format_address(address: IAddress) -> str: + if isinstance(address, (IPv4Address, IPv6Address)): + return "%s:%i" % (address.host, address.port) + return str(address) + class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): """This is a reconnecting factory that connects to redis and immediately @@ -328,6 +365,6 @@ def lazyConnection( factory.continueTrying = reconnect reactor = hs.get_reactor() - reactor.connectTCP(host, port, factory, timeout=30, bindAddress=None) + reactor.connectTCP(host.encode(), port, factory, timeout=30, bindAddress=None) return factory.handler