diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 2ad7a200bb..eae4515363 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -105,12 +105,12 @@ class ReplicationCommandHandler:
hs.get_instance_name() in hs.config.worker.writers.presence
)
- self._streams = {
+ self._streams: Dict[str, Stream] = {
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
- } # type: Dict[str, Stream]
+ }
# List of streams that this instance is the source of
- self._streams_to_replicate = [] # type: List[Stream]
+ self._streams_to_replicate: List[Stream] = []
for stream in self._streams.values():
if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME:
@@ -180,14 +180,14 @@ class ReplicationCommandHandler:
# Map of stream name to batched updates. See RdataCommand for info on
# how batching works.
- self._pending_batches = {} # type: Dict[str, List[Any]]
+ self._pending_batches: Dict[str, List[Any]] = {}
# The factory used to create connections.
- self._factory = None # type: Optional[ReconnectingClientFactory]
+ self._factory: Optional[ReconnectingClientFactory] = None
# The currently connected connections. (The list of places we need to send
# outgoing replication commands to.)
- self._connections = [] # type: List[IReplicationConnection]
+ self._connections: List[IReplicationConnection] = []
LaterGauge(
"synapse_replication_tcp_resource_total_connections",
@@ -200,7 +200,7 @@ class ReplicationCommandHandler:
# them in order in a separate background process.
# the streams which are currently being processed by _unsafe_process_queue
- self._processing_streams = set() # type: Set[str]
+ self._processing_streams: Set[str] = set()
# for each stream, a queue of commands that are awaiting processing, and the
# connection that they arrived on.
@@ -210,7 +210,7 @@ class ReplicationCommandHandler:
# For each connection, the incoming stream names that have received a POSITION
# from that connection.
- self._streams_by_connection = {} # type: Dict[IReplicationConnection, Set[str]]
+ self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
LaterGauge(
"synapse_replication_tcp_command_queue",
|