diff options
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r-- | synapse/replication/tcp/handler.py | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 2ad7a200bb..eae4515363 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -105,12 +105,12 @@ class ReplicationCommandHandler: hs.get_instance_name() in hs.config.worker.writers.presence ) - self._streams = { + self._streams: Dict[str, Stream] = { stream.NAME: stream(hs) for stream in STREAMS_MAP.values() - } # type: Dict[str, Stream] + } # List of streams that this instance is the source of - self._streams_to_replicate = [] # type: List[Stream] + self._streams_to_replicate: List[Stream] = [] for stream in self._streams.values(): if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME: @@ -180,14 +180,14 @@ class ReplicationCommandHandler: # Map of stream name to batched updates. See RdataCommand for info on # how batching works. - self._pending_batches = {} # type: Dict[str, List[Any]] + self._pending_batches: Dict[str, List[Any]] = {} # The factory used to create connections. - self._factory = None # type: Optional[ReconnectingClientFactory] + self._factory: Optional[ReconnectingClientFactory] = None # The currently connected connections. (The list of places we need to send # outgoing replication commands to.) - self._connections = [] # type: List[IReplicationConnection] + self._connections: List[IReplicationConnection] = [] LaterGauge( "synapse_replication_tcp_resource_total_connections", @@ -200,7 +200,7 @@ class ReplicationCommandHandler: # them in order in a separate background process. # the streams which are currently being processed by _unsafe_process_queue - self._processing_streams = set() # type: Set[str] + self._processing_streams: Set[str] = set() # for each stream, a queue of commands that are awaiting processing, and the # connection that they arrived on. @@ -210,7 +210,7 @@ class ReplicationCommandHandler: # For each connection, the incoming stream names that have received a POSITION # from that connection. - self._streams_by_connection = {} # type: Dict[IReplicationConnection, Set[str]] + self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} LaterGauge( "synapse_replication_tcp_command_queue", |