diff options
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r-- | synapse/replication/tcp/handler.py | 30 |
1 files changed, 14 insertions, 16 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 8ea8dcd587..d1d00c3717 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -303,7 +303,9 @@ class ReplicationCommandHandler: hs, outbound_redis_connection ) hs.get_reactor().connectTCP( - hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory, + hs.config.redis.redis_host, + hs.config.redis.redis_port, + self._factory, ) else: client_name = hs.get_instance_name() @@ -313,13 +315,11 @@ class ReplicationCommandHandler: hs.get_reactor().connectTCP(host, port, self._factory) def get_streams(self) -> Dict[str, Stream]: - """Get a map from stream name to all streams. - """ + """Get a map from stream name to all streams.""" return self._streams def get_streams_to_replicate(self) -> List[Stream]: - """Get a list of streams that this instances replicates. - """ + """Get a list of streams that this instances replicates.""" return self._streams_to_replicate def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand): @@ -340,7 +340,10 @@ class ReplicationCommandHandler: current_token = stream.current_token(self._instance_name) self.send_command( PositionCommand( - stream.NAME, self._instance_name, current_token, current_token, + stream.NAME, + self._instance_name, + current_token, + current_token, ) ) @@ -592,8 +595,7 @@ class ReplicationCommandHandler: self.send_command(cmd, ignore_conn=conn) def new_connection(self, connection: AbstractConnection): - """Called when we have a new connection. - """ + """Called when we have a new connection.""" self._connections.append(connection) # If we are connected to replication as a client (rather than a server) @@ -620,8 +622,7 @@ class ReplicationCommandHandler: ) def lost_connection(self, connection: AbstractConnection): - """Called when a connection is closed/lost. - """ + """Called when a connection is closed/lost.""" # we no longer need _streams_by_connection for this connection. streams = self._streams_by_connection.pop(connection, None) if streams: @@ -678,15 +679,13 @@ class ReplicationCommandHandler: def send_user_sync( self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int ): - """Poke the master that a user has started/stopped syncing. - """ + """Poke the master that a user has started/stopped syncing.""" self.send_command( UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms) ) def send_remove_pusher(self, app_id: str, push_key: str, user_id: str): - """Poke the master to remove a pusher for a user - """ + """Poke the master to remove a pusher for a user""" cmd = RemovePusherCommand(app_id, push_key, user_id) self.send_command(cmd) @@ -699,8 +698,7 @@ class ReplicationCommandHandler: device_id: str, last_seen: int, ): - """Tell the master that the user made a request. - """ + """Tell the master that the user made a request.""" cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) self.send_command(cmd) |