diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index ac532ed588..0a9da79c32 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -196,8 +196,7 @@ class ErrorCommand(_SimpleCommand):
class PingCommand(_SimpleCommand):
- """Sent by either side as a keep alive. The data is arbitrary (often timestamp)
- """
+ """Sent by either side as a keep alive. The data is arbitrary (often timestamp)"""
NAME = "PING"
diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
new file mode 100644
index 0000000000..d89a36f25a
--- /dev/null
+++ b/synapse/replication/tcp/external_cache.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING, Any, Optional
+
+from prometheus_client import Counter
+
+from synapse.logging.context import make_deferred_yieldable
+from synapse.util import json_decoder, json_encoder
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+set_counter = Counter(
+ "synapse_external_cache_set",
+ "Number of times we set a cache",
+ labelnames=["cache_name"],
+)
+
+get_counter = Counter(
+ "synapse_external_cache_get",
+ "Number of times we get a cache",
+ labelnames=["cache_name", "hit"],
+)
+
+
+logger = logging.getLogger(__name__)
+
+
+class ExternalCache:
+ """A cache backed by an external Redis. Does nothing if no Redis is
+ configured.
+ """
+
+ def __init__(self, hs: "HomeServer"):
+ self._redis_connection = hs.get_outbound_redis_connection()
+
+ def _get_redis_key(self, cache_name: str, key: str) -> str:
+ return "cache_v1:%s:%s" % (cache_name, key)
+
+ def is_enabled(self) -> bool:
+ """Whether the external cache is used or not.
+
+ It's safe to use the cache when this returns false, the methods will
+ just no-op, but the function is useful to avoid doing unnecessary work.
+ """
+ return self._redis_connection is not None
+
+ async def set(self, cache_name: str, key: str, value: Any, expiry_ms: int) -> None:
+ """Add the key/value to the named cache, with the expiry time given."""
+
+ if self._redis_connection is None:
+ return
+
+ set_counter.labels(cache_name).inc()
+
+ # txredisapi requires the value to be string, bytes or numbers, so we
+ # encode stuff in JSON.
+ encoded_value = json_encoder.encode(value)
+
+ logger.debug("Caching %s %s: %r", cache_name, key, encoded_value)
+
+ return await make_deferred_yieldable(
+ self._redis_connection.set(
+ self._get_redis_key(cache_name, key),
+ encoded_value,
+ pexpire=expiry_ms,
+ )
+ )
+
+ async def get(self, cache_name: str, key: str) -> Optional[Any]:
+ """Look up a key/value in the named cache."""
+
+ if self._redis_connection is None:
+ return None
+
+ result = await make_deferred_yieldable(
+ self._redis_connection.get(self._get_redis_key(cache_name, key))
+ )
+
+ logger.debug("Got cache result %s %s: %r", cache_name, key, result)
+
+ get_counter.labels(cache_name, result is not None).inc()
+
+ if not result:
+ return None
+
+ # For some reason the integers get magically converted back to integers
+ if isinstance(result, int):
+ return result
+
+ return json_decoder.decode(result)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 317796d5e0..d1d00c3717 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -15,6 +15,7 @@
# limitations under the License.
import logging
from typing import (
+ TYPE_CHECKING,
Any,
Awaitable,
Dict,
@@ -63,6 +64,9 @@ from synapse.replication.tcp.streams import (
TypingStream,
)
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
@@ -88,7 +92,7 @@ class ReplicationCommandHandler:
back out to connections.
"""
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self._replication_data_handler = hs.get_replication_data_handler()
self._presence_handler = hs.get_presence_handler()
self._store = hs.get_datastore()
@@ -282,13 +286,6 @@ class ReplicationCommandHandler:
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
- lazyConnection,
- )
-
- logger.info(
- "Connecting to redis (host=%r port=%r)",
- hs.config.redis_host,
- hs.config.redis_port,
)
# First let's ensure that we have a ReplicationStreamer started.
@@ -299,20 +296,16 @@ class ReplicationCommandHandler:
# connection after SUBSCRIBE is called).
# First create the connection for sending commands.
- outbound_redis_connection = lazyConnection(
- reactor=hs.get_reactor(),
- host=hs.config.redis_host,
- port=hs.config.redis_port,
- password=hs.config.redis.redis_password,
- reconnect=True,
- )
+ outbound_redis_connection = hs.get_outbound_redis_connection()
# Now create the factory/connection for the subscription stream.
self._factory = RedisDirectTcpReplicationClientFactory(
hs, outbound_redis_connection
)
hs.get_reactor().connectTCP(
- hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
+ hs.config.redis.redis_host,
+ hs.config.redis.redis_port,
+ self._factory,
)
else:
client_name = hs.get_instance_name()
@@ -322,13 +315,11 @@ class ReplicationCommandHandler:
hs.get_reactor().connectTCP(host, port, self._factory)
def get_streams(self) -> Dict[str, Stream]:
- """Get a map from stream name to all streams.
- """
+ """Get a map from stream name to all streams."""
return self._streams
def get_streams_to_replicate(self) -> List[Stream]:
- """Get a list of streams that this instances replicates.
- """
+ """Get a list of streams that this instances replicates."""
return self._streams_to_replicate
def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
@@ -349,7 +340,10 @@ class ReplicationCommandHandler:
current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
- stream.NAME, self._instance_name, current_token, current_token,
+ stream.NAME,
+ self._instance_name,
+ current_token,
+ current_token,
)
)
@@ -601,8 +595,7 @@ class ReplicationCommandHandler:
self.send_command(cmd, ignore_conn=conn)
def new_connection(self, connection: AbstractConnection):
- """Called when we have a new connection.
- """
+ """Called when we have a new connection."""
self._connections.append(connection)
# If we are connected to replication as a client (rather than a server)
@@ -629,8 +622,7 @@ class ReplicationCommandHandler:
)
def lost_connection(self, connection: AbstractConnection):
- """Called when a connection is closed/lost.
- """
+ """Called when a connection is closed/lost."""
# we no longer need _streams_by_connection for this connection.
streams = self._streams_by_connection.pop(connection, None)
if streams:
@@ -687,15 +679,13 @@ class ReplicationCommandHandler:
def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
):
- """Poke the master that a user has started/stopped syncing.
- """
+ """Poke the master that a user has started/stopped syncing."""
self.send_command(
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
)
def send_remove_pusher(self, app_id: str, push_key: str, user_id: str):
- """Poke the master to remove a pusher for a user
- """
+ """Poke the master to remove a pusher for a user"""
cmd = RemovePusherCommand(app_id, push_key, user_id)
self.send_command(cmd)
@@ -708,8 +698,7 @@ class ReplicationCommandHandler:
device_id: str,
last_seen: int,
):
- """Tell the master that the user made a request.
- """
+ """Tell the master that the user made a request."""
cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
self.send_command(cmd)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 804da994ea..e0b4ad314d 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -222,8 +222,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.send_error("ping timeout")
def lineReceived(self, line: bytes):
- """Called when we've received a line
- """
+ """Called when we've received a line"""
with PreserveLoggingContext(self._logging_context):
self._parse_and_dispatch_line(line)
@@ -299,8 +298,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.on_connection_closed()
def send_error(self, error_string, *args):
- """Send an error to remote and close the connection.
- """
+ """Send an error to remote and close the connection."""
self.send_command(ErrorCommand(error_string % args))
self.close()
@@ -341,8 +339,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_sent_command = self.clock.time_msec()
def _queue_command(self, cmd):
- """Queue the command until the connection is ready to write to again.
- """
+ """Queue the command until the connection is ready to write to again."""
logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd)
self.pending_commands.append(cmd)
@@ -355,8 +352,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.close()
def _send_pending_commands(self):
- """Send any queued commandes
- """
+ """Send any queued commandes"""
pending = self.pending_commands
self.pending_commands = []
for cmd in pending:
@@ -380,8 +376,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.state = ConnectionStates.PAUSED
def resumeProducing(self):
- """The remote has caught up after we started buffering!
- """
+ """The remote has caught up after we started buffering!"""
logger.info("[%s] Resume producing", self.id())
self.state = ConnectionStates.ESTABLISHED
self._send_pending_commands()
@@ -440,8 +435,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
return "%s-%s" % (self.name, self.conn_id)
def lineLengthExceeded(self, line):
- """Called when we receive a line that is above the maximum line length
- """
+ """Called when we receive a line that is above the maximum line length"""
self.send_error("Line length exceeded")
@@ -495,21 +489,18 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.send_error("Wrong remote")
def replicate(self):
- """Send the subscription request to the server
- """
+ """Send the subscription request to the server"""
logger.info("[%s] Subscribing to replication streams", self.id())
self.send_command(ReplicateCommand())
class AbstractConnection(abc.ABC):
- """An interface for replication connections.
- """
+ """An interface for replication connections."""
@abc.abstractmethod
def send_command(self, cmd: Command):
- """Send the command down the connection
- """
+ """Send the command down the connection"""
pass
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index bc6ba709a7..0e6155cf53 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -15,14 +15,16 @@
import logging
from inspect import isawaitable
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast
+import attr
import txredisapi
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import (
BackgroundProcessLoggingContext,
run_as_background_process,
+ wrap_as_background_process,
)
from synapse.replication.tcp.commands import (
Command,
@@ -41,6 +43,24 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+T = TypeVar("T")
+V = TypeVar("V")
+
+
+@attr.s
+class ConstantProperty(Generic[T, V]):
+ """A descriptor that returns the given constant, ignoring attempts to set
+ it.
+ """
+
+ constant = attr.ib() # type: V
+
+ def __get__(self, obj: Optional[T], objtype: Type[T] = None) -> V:
+ return self.constant
+
+ def __set__(self, obj: Optional[T], value: V):
+ pass
+
class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
"""Connection to redis subscribed to replication stream.
@@ -59,16 +79,16 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
immediately after initialisation.
Attributes:
- handler: The command handler to handle incoming commands.
- stream_name: The *redis* stream name to subscribe to and publish from
- (not anything to do with Synapse replication streams).
- outbound_redis_connection: The connection to redis to use to send
+ synapse_handler: The command handler to handle incoming commands.
+ synapse_stream_name: The *redis* stream name to subscribe to and publish
+ from (not anything to do with Synapse replication streams).
+ synapse_outbound_redis_connection: The connection to redis to use to send
commands.
"""
- handler = None # type: ReplicationCommandHandler
- stream_name = None # type: str
- outbound_redis_connection = None # type: txredisapi.RedisProtocol
+ synapse_handler = None # type: ReplicationCommandHandler
+ synapse_stream_name = None # type: str
+ synapse_outbound_redis_connection = None # type: txredisapi.RedisProtocol
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -88,23 +108,22 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
# it's important to make sure that we only send the REPLICATE command once we
# have successfully subscribed to the stream - otherwise we might miss the
# POSITION response sent back by the other end.
- logger.info("Sending redis SUBSCRIBE for %s", self.stream_name)
- await make_deferred_yieldable(self.subscribe(self.stream_name))
+ logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name)
+ await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
logger.info(
"Successfully subscribed to redis stream, sending REPLICATE command"
)
- self.handler.new_connection(self)
+ self.synapse_handler.new_connection(self)
await self._async_send_command(ReplicateCommand())
logger.info("REPLICATE successfully sent")
# We send out our positions when there is a new connection in case the
# other side missed updates. We do this for Redis connections as the
# otherside won't know we've connected and so won't issue a REPLICATE.
- self.handler.send_positions_to_connection(self)
+ self.synapse_handler.send_positions_to_connection(self)
def messageReceived(self, pattern: str, channel: str, message: str):
- """Received a message from redis.
- """
+ """Received a message from redis."""
with PreserveLoggingContext(self._logging_context):
self._parse_and_dispatch_message(message)
@@ -117,7 +136,8 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
cmd = parse_command_from_line(message)
except Exception:
logger.exception(
- "Failed to parse replication line: %r", message,
+ "Failed to parse replication line: %r",
+ message,
)
return
@@ -137,7 +157,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
cmd: received command
"""
- cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
+ cmd_func = getattr(self.synapse_handler, "on_%s" % (cmd.NAME,), None)
if not cmd_func:
logger.warning("Unhandled command: %r", cmd)
return
@@ -155,7 +175,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
def connectionLost(self, reason):
logger.info("Lost connection to redis")
super().connectionLost(reason)
- self.handler.lost_connection(self)
+ self.synapse_handler.lost_connection(self)
# mark the logging context as finished
self._logging_context.__exit__(None, None, None)
@@ -183,11 +203,58 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
await make_deferred_yieldable(
- self.outbound_redis_connection.publish(self.stream_name, encoded_string)
+ self.synapse_outbound_redis_connection.publish(
+ self.synapse_stream_name, encoded_string
+ )
)
-class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
+class SynapseRedisFactory(txredisapi.RedisFactory):
+ """A subclass of RedisFactory that periodically sends pings to ensure that
+ we detect dead connections.
+ """
+
+ # We want to *always* retry connecting, txredisapi will stop if there is a
+ # failure during certain operations, e.g. during AUTH.
+ continueTrying = cast(bool, ConstantProperty(True))
+
+ def __init__(
+ self,
+ hs: "HomeServer",
+ uuid: str,
+ dbid: Optional[int],
+ poolsize: int,
+ isLazy: bool = False,
+ handler: Type = txredisapi.ConnectionHandler,
+ charset: str = "utf-8",
+ password: Optional[str] = None,
+ replyTimeout: int = 30,
+ convertNumbers: Optional[int] = True,
+ ):
+ super().__init__(
+ uuid=uuid,
+ dbid=dbid,
+ poolsize=poolsize,
+ isLazy=isLazy,
+ handler=handler,
+ charset=charset,
+ password=password,
+ replyTimeout=replyTimeout,
+ convertNumbers=convertNumbers,
+ )
+
+ hs.get_clock().looping_call(self._send_ping, 30 * 1000)
+
+ @wrap_as_background_process("redis_ping")
+ async def _send_ping(self):
+ for connection in self.pool:
+ try:
+ await make_deferred_yieldable(connection.ping())
+ except Exception:
+ logger.warning("Failed to send ping to a redis connection")
+
+
+class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
"""This is a reconnecting factory that connects to redis and immediately
subscribes to a stream.
@@ -199,72 +266,68 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
"""
maxDelay = 5
- continueTrying = True
protocol = RedisSubscriber
def __init__(
self, hs: "HomeServer", outbound_redis_connection: txredisapi.RedisProtocol
):
- super().__init__()
-
- # This sets the password on the RedisFactory base class (as
- # SubscriberFactory constructor doesn't pass it through).
- self.password = hs.config.redis.redis_password
+ super().__init__(
+ hs,
+ uuid="subscriber",
+ dbid=None,
+ poolsize=1,
+ replyTimeout=30,
+ password=hs.config.redis.redis_password,
+ )
- self.handler = hs.get_tcp_replication()
- self.stream_name = hs.hostname
+ self.synapse_handler = hs.get_tcp_replication()
+ self.synapse_stream_name = hs.hostname
- self.outbound_redis_connection = outbound_redis_connection
+ self.synapse_outbound_redis_connection = outbound_redis_connection
def buildProtocol(self, addr):
- p = super().buildProtocol(addr) # type: RedisSubscriber
+ p = super().buildProtocol(addr)
+ p = cast(RedisSubscriber, p)
# We do this here rather than add to the constructor of `RedisSubcriber`
# as to do so would involve overriding `buildProtocol` entirely, however
# the base method does some other things than just instantiating the
# protocol.
- p.handler = self.handler
- p.outbound_redis_connection = self.outbound_redis_connection
- p.stream_name = self.stream_name
- p.password = self.password
+ p.synapse_handler = self.synapse_handler
+ p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection
+ p.synapse_stream_name = self.synapse_stream_name
return p
def lazyConnection(
- reactor,
+ hs: "HomeServer",
host: str = "localhost",
port: int = 6379,
dbid: Optional[int] = None,
reconnect: bool = True,
- charset: str = "utf-8",
password: Optional[str] = None,
- connectTimeout: Optional[int] = None,
- replyTimeout: Optional[int] = None,
- convertNumbers: bool = True,
+ replyTimeout: int = 30,
) -> txredisapi.RedisProtocol:
- """Equivalent to `txredisapi.lazyConnection`, except allows specifying a
- reactor.
+ """Creates a connection to Redis that is lazily set up and reconnects if the
+ connections is lost.
"""
- isLazy = True
- poolsize = 1
-
uuid = "%s:%d" % (host, port)
- factory = txredisapi.RedisFactory(
- uuid,
- dbid,
- poolsize,
- isLazy,
- txredisapi.ConnectionHandler,
- charset,
- password,
- replyTimeout,
- convertNumbers,
+ factory = SynapseRedisFactory(
+ hs,
+ uuid=uuid,
+ dbid=dbid,
+ poolsize=1,
+ isLazy=True,
+ handler=txredisapi.ConnectionHandler,
+ password=password,
+ replyTimeout=replyTimeout,
)
factory.continueTrying = reconnect
- for x in range(poolsize):
- reactor.connectTCP(host, port, factory, connectTimeout)
+
+ reactor = hs.get_reactor()
+ reactor.connectTCP(host, port, factory, 30)
return factory.handler
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d4ceac0f1..2018f9f29e 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -36,8 +36,7 @@ logger = logging.getLogger(__name__)
class ReplicationStreamProtocolFactory(Factory):
- """Factory for new replication connections.
- """
+ """Factory for new replication connections."""
def __init__(self, hs):
self.command_handler = hs.get_tcp_replication()
@@ -181,7 +180,8 @@ class ReplicationStreamer:
raise
logger.debug(
- "Sending %d updates", len(updates),
+ "Sending %d updates",
+ len(updates),
)
if updates:
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 61b282ab2d..38809b5b7c 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -183,7 +183,10 @@ class Stream:
return [], upto_token, False
updates, upto_token, limited = await self.update_function(
- instance_name, from_token, upto_token, _STREAM_UPDATE_TARGET_ROW_COUNT,
+ instance_name,
+ from_token,
+ upto_token,
+ _STREAM_UPDATE_TARGET_ROW_COUNT,
)
return updates, upto_token, limited
@@ -339,8 +342,7 @@ class ReceiptsStream(Stream):
class PushRulesStream(Stream):
- """A user has changed their push rules
- """
+ """A user has changed their push rules"""
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
@@ -362,8 +364,7 @@ class PushRulesStream(Stream):
class PushersStream(Stream):
- """A user has added/changed/removed a pusher
- """
+ """A user has added/changed/removed a pusher"""
PushersStreamRow = namedtuple(
"PushersStreamRow",
@@ -416,8 +417,7 @@ class CachesStream(Stream):
class PublicRoomsStream(Stream):
- """The public rooms list changed
- """
+ """The public rooms list changed"""
PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
@@ -463,8 +463,7 @@ class DeviceListsStream(Stream):
class ToDeviceStream(Stream):
- """New to_device messages for a client
- """
+ """New to_device messages for a client"""
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
@@ -481,8 +480,7 @@ class ToDeviceStream(Stream):
class TagAccountDataStream(Stream):
- """Someone added/removed a tag for a room
- """
+ """Someone added/removed a tag for a room"""
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
@@ -501,8 +499,7 @@ class TagAccountDataStream(Stream):
class AccountDataStream(Stream):
- """Global or per room account data was changed
- """
+ """Global or per room account data was changed"""
AccountDataStreamRow = namedtuple(
"AccountDataStream",
@@ -589,8 +586,7 @@ class GroupServerStream(Stream):
class UserSignatureStream(Stream):
- """A user has signed their own device with their user-signing key
- """
+ """A user has signed their own device with their user-signing key"""
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 86a62b71eb..fa5e37ba7b 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -113,8 +113,7 @@ TypeToRow = {Row.TypeId: Row for Row in _EventRows}
class EventsStream(Stream):
- """We received a new event, or an event went from being an outlier to not
- """
+ """We received a new event, or an event went from being an outlier to not"""
NAME = "events"
|