diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 8ea8dcd587..d1d00c3717 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -303,7 +303,9 @@ class ReplicationCommandHandler:
hs, outbound_redis_connection
)
hs.get_reactor().connectTCP(
- hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
+ hs.config.redis.redis_host,
+ hs.config.redis.redis_port,
+ self._factory,
)
else:
client_name = hs.get_instance_name()
@@ -313,13 +315,11 @@ class ReplicationCommandHandler:
hs.get_reactor().connectTCP(host, port, self._factory)
def get_streams(self) -> Dict[str, Stream]:
- """Get a map from stream name to all streams.
- """
+ """Get a map from stream name to all streams."""
return self._streams
def get_streams_to_replicate(self) -> List[Stream]:
- """Get a list of streams that this instances replicates.
- """
+ """Get a list of streams that this instances replicates."""
return self._streams_to_replicate
def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
@@ -340,7 +340,10 @@ class ReplicationCommandHandler:
current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
- stream.NAME, self._instance_name, current_token, current_token,
+ stream.NAME,
+ self._instance_name,
+ current_token,
+ current_token,
)
)
@@ -592,8 +595,7 @@ class ReplicationCommandHandler:
self.send_command(cmd, ignore_conn=conn)
def new_connection(self, connection: AbstractConnection):
- """Called when we have a new connection.
- """
+ """Called when we have a new connection."""
self._connections.append(connection)
# If we are connected to replication as a client (rather than a server)
@@ -620,8 +622,7 @@ class ReplicationCommandHandler:
)
def lost_connection(self, connection: AbstractConnection):
- """Called when a connection is closed/lost.
- """
+ """Called when a connection is closed/lost."""
# we no longer need _streams_by_connection for this connection.
streams = self._streams_by_connection.pop(connection, None)
if streams:
@@ -678,15 +679,13 @@ class ReplicationCommandHandler:
def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
):
- """Poke the master that a user has started/stopped syncing.
- """
+ """Poke the master that a user has started/stopped syncing."""
self.send_command(
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
)
def send_remove_pusher(self, app_id: str, push_key: str, user_id: str):
- """Poke the master to remove a pusher for a user
- """
+ """Poke the master to remove a pusher for a user"""
cmd = RemovePusherCommand(app_id, push_key, user_id)
self.send_command(cmd)
@@ -699,8 +698,7 @@ class ReplicationCommandHandler:
device_id: str,
last_seen: int,
):
- """Tell the master that the user made a request.
- """
+ """Tell the master that the user made a request."""
cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
self.send_command(cmd)
|