diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index ac532ed588..0a9da79c32 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -196,8 +196,7 @@ class ErrorCommand(_SimpleCommand):
class PingCommand(_SimpleCommand):
- """Sent by either side as a keep alive. The data is arbitrary (often timestamp)
- """
+ """Sent by either side as a keep alive. The data is arbitrary (often timestamp)"""
NAME = "PING"
diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index 34fa3ff5b3..d89a36f25a 100644
--- a/synapse/replication/tcp/external_cache.py
+++ b/synapse/replication/tcp/external_cache.py
@@ -60,8 +60,7 @@ class ExternalCache:
return self._redis_connection is not None
async def set(self, cache_name: str, key: str, value: Any, expiry_ms: int) -> None:
- """Add the key/value to the named cache, with the expiry time given.
- """
+ """Add the key/value to the named cache, with the expiry time given."""
if self._redis_connection is None:
return
@@ -76,13 +75,14 @@ class ExternalCache:
return await make_deferred_yieldable(
self._redis_connection.set(
- self._get_redis_key(cache_name, key), encoded_value, pexpire=expiry_ms,
+ self._get_redis_key(cache_name, key),
+ encoded_value,
+ pexpire=expiry_ms,
)
)
async def get(self, cache_name: str, key: str) -> Optional[Any]:
- """Look up a key/value in the named cache.
- """
+ """Look up a key/value in the named cache."""
if self._redis_connection is None:
return None
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 8ea8dcd587..d1d00c3717 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -303,7 +303,9 @@ class ReplicationCommandHandler:
hs, outbound_redis_connection
)
hs.get_reactor().connectTCP(
- hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
+ hs.config.redis.redis_host,
+ hs.config.redis.redis_port,
+ self._factory,
)
else:
client_name = hs.get_instance_name()
@@ -313,13 +315,11 @@ class ReplicationCommandHandler:
hs.get_reactor().connectTCP(host, port, self._factory)
def get_streams(self) -> Dict[str, Stream]:
- """Get a map from stream name to all streams.
- """
+ """Get a map from stream name to all streams."""
return self._streams
def get_streams_to_replicate(self) -> List[Stream]:
- """Get a list of streams that this instances replicates.
- """
+ """Get a list of streams that this instances replicates."""
return self._streams_to_replicate
def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
@@ -340,7 +340,10 @@ class ReplicationCommandHandler:
current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
- stream.NAME, self._instance_name, current_token, current_token,
+ stream.NAME,
+ self._instance_name,
+ current_token,
+ current_token,
)
)
@@ -592,8 +595,7 @@ class ReplicationCommandHandler:
self.send_command(cmd, ignore_conn=conn)
def new_connection(self, connection: AbstractConnection):
- """Called when we have a new connection.
- """
+ """Called when we have a new connection."""
self._connections.append(connection)
# If we are connected to replication as a client (rather than a server)
@@ -620,8 +622,7 @@ class ReplicationCommandHandler:
)
def lost_connection(self, connection: AbstractConnection):
- """Called when a connection is closed/lost.
- """
+ """Called when a connection is closed/lost."""
# we no longer need _streams_by_connection for this connection.
streams = self._streams_by_connection.pop(connection, None)
if streams:
@@ -678,15 +679,13 @@ class ReplicationCommandHandler:
def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
):
- """Poke the master that a user has started/stopped syncing.
- """
+ """Poke the master that a user has started/stopped syncing."""
self.send_command(
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
)
def send_remove_pusher(self, app_id: str, push_key: str, user_id: str):
- """Poke the master to remove a pusher for a user
- """
+ """Poke the master to remove a pusher for a user"""
cmd = RemovePusherCommand(app_id, push_key, user_id)
self.send_command(cmd)
@@ -699,8 +698,7 @@ class ReplicationCommandHandler:
device_id: str,
last_seen: int,
):
- """Tell the master that the user made a request.
- """
+ """Tell the master that the user made a request."""
cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
self.send_command(cmd)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 804da994ea..e0b4ad314d 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -222,8 +222,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.send_error("ping timeout")
def lineReceived(self, line: bytes):
- """Called when we've received a line
- """
+ """Called when we've received a line"""
with PreserveLoggingContext(self._logging_context):
self._parse_and_dispatch_line(line)
@@ -299,8 +298,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.on_connection_closed()
def send_error(self, error_string, *args):
- """Send an error to remote and close the connection.
- """
+ """Send an error to remote and close the connection."""
self.send_command(ErrorCommand(error_string % args))
self.close()
@@ -341,8 +339,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_sent_command = self.clock.time_msec()
def _queue_command(self, cmd):
- """Queue the command until the connection is ready to write to again.
- """
+ """Queue the command until the connection is ready to write to again."""
logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd)
self.pending_commands.append(cmd)
@@ -355,8 +352,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.close()
def _send_pending_commands(self):
- """Send any queued commandes
- """
+ """Send any queued commandes"""
pending = self.pending_commands
self.pending_commands = []
for cmd in pending:
@@ -380,8 +376,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.state = ConnectionStates.PAUSED
def resumeProducing(self):
- """The remote has caught up after we started buffering!
- """
+ """The remote has caught up after we started buffering!"""
logger.info("[%s] Resume producing", self.id())
self.state = ConnectionStates.ESTABLISHED
self._send_pending_commands()
@@ -440,8 +435,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
return "%s-%s" % (self.name, self.conn_id)
def lineLengthExceeded(self, line):
- """Called when we receive a line that is above the maximum line length
- """
+ """Called when we receive a line that is above the maximum line length"""
self.send_error("Line length exceeded")
@@ -495,21 +489,18 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.send_error("Wrong remote")
def replicate(self):
- """Send the subscription request to the server
- """
+ """Send the subscription request to the server"""
logger.info("[%s] Subscribing to replication streams", self.id())
self.send_command(ReplicateCommand())
class AbstractConnection(abc.ABC):
- """An interface for replication connections.
- """
+ """An interface for replication connections."""
@abc.abstractmethod
def send_command(self, cmd: Command):
- """Send the command down the connection
- """
+ """Send the command down the connection"""
pass
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 89f8af0f36..0e6155cf53 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -123,8 +123,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
self.synapse_handler.send_positions_to_connection(self)
def messageReceived(self, pattern: str, channel: str, message: str):
- """Received a message from redis.
- """
+ """Received a message from redis."""
with PreserveLoggingContext(self._logging_context):
self._parse_and_dispatch_message(message)
@@ -137,7 +136,8 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
cmd = parse_command_from_line(message)
except Exception:
logger.exception(
- "Failed to parse replication line: %r", message,
+ "Failed to parse replication line: %r",
+ message,
)
return
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d4ceac0f1..2018f9f29e 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -36,8 +36,7 @@ logger = logging.getLogger(__name__)
class ReplicationStreamProtocolFactory(Factory):
- """Factory for new replication connections.
- """
+ """Factory for new replication connections."""
def __init__(self, hs):
self.command_handler = hs.get_tcp_replication()
@@ -181,7 +180,8 @@ class ReplicationStreamer:
raise
logger.debug(
- "Sending %d updates", len(updates),
+ "Sending %d updates",
+ len(updates),
)
if updates:
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 61b282ab2d..38809b5b7c 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -183,7 +183,10 @@ class Stream:
return [], upto_token, False
updates, upto_token, limited = await self.update_function(
- instance_name, from_token, upto_token, _STREAM_UPDATE_TARGET_ROW_COUNT,
+ instance_name,
+ from_token,
+ upto_token,
+ _STREAM_UPDATE_TARGET_ROW_COUNT,
)
return updates, upto_token, limited
@@ -339,8 +342,7 @@ class ReceiptsStream(Stream):
class PushRulesStream(Stream):
- """A user has changed their push rules
- """
+ """A user has changed their push rules"""
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
@@ -362,8 +364,7 @@ class PushRulesStream(Stream):
class PushersStream(Stream):
- """A user has added/changed/removed a pusher
- """
+ """A user has added/changed/removed a pusher"""
PushersStreamRow = namedtuple(
"PushersStreamRow",
@@ -416,8 +417,7 @@ class CachesStream(Stream):
class PublicRoomsStream(Stream):
- """The public rooms list changed
- """
+ """The public rooms list changed"""
PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
@@ -463,8 +463,7 @@ class DeviceListsStream(Stream):
class ToDeviceStream(Stream):
- """New to_device messages for a client
- """
+ """New to_device messages for a client"""
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
@@ -481,8 +480,7 @@ class ToDeviceStream(Stream):
class TagAccountDataStream(Stream):
- """Someone added/removed a tag for a room
- """
+ """Someone added/removed a tag for a room"""
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
@@ -501,8 +499,7 @@ class TagAccountDataStream(Stream):
class AccountDataStream(Stream):
- """Global or per room account data was changed
- """
+ """Global or per room account data was changed"""
AccountDataStreamRow = namedtuple(
"AccountDataStream",
@@ -589,8 +586,7 @@ class GroupServerStream(Stream):
class UserSignatureStream(Stream):
- """A user has signed their own device with their user-signing key
- """
+ """A user has signed their own device with their user-signing key"""
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 86a62b71eb..fa5e37ba7b 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -113,8 +113,7 @@ TypeToRow = {Row.TypeId: Row for Row in _EventRows}
class EventsStream(Stream):
- """We received a new event, or an event went from being an outlier to not
- """
+ """We received a new event, or an event went from being an outlier to not"""
NAME = "events"
|