summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 2ad7a200bb..eae4515363 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -105,12 +105,12 @@ class ReplicationCommandHandler:
             hs.get_instance_name() in hs.config.worker.writers.presence
         )
 
-        self._streams = {
+        self._streams: Dict[str, Stream] = {
             stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
-        }  # type: Dict[str, Stream]
+        }
 
         # List of streams that this instance is the source of
-        self._streams_to_replicate = []  # type: List[Stream]
+        self._streams_to_replicate: List[Stream] = []
 
         for stream in self._streams.values():
             if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME:
@@ -180,14 +180,14 @@ class ReplicationCommandHandler:
 
         # Map of stream name to batched updates. See RdataCommand for info on
         # how batching works.
-        self._pending_batches = {}  # type: Dict[str, List[Any]]
+        self._pending_batches: Dict[str, List[Any]] = {}
 
         # The factory used to create connections.
-        self._factory = None  # type: Optional[ReconnectingClientFactory]
+        self._factory: Optional[ReconnectingClientFactory] = None
 
         # The currently connected connections. (The list of places we need to send
         # outgoing replication commands to.)
-        self._connections = []  # type: List[IReplicationConnection]
+        self._connections: List[IReplicationConnection] = []
 
         LaterGauge(
             "synapse_replication_tcp_resource_total_connections",
@@ -200,7 +200,7 @@ class ReplicationCommandHandler:
         # them in order in a separate background process.
 
         # the streams which are currently being processed by _unsafe_process_queue
-        self._processing_streams = set()  # type: Set[str]
+        self._processing_streams: Set[str] = set()
 
         # for each stream, a queue of commands that are awaiting processing, and the
         # connection that they arrived on.
@@ -210,7 +210,7 @@ class ReplicationCommandHandler:
 
         # For each connection, the incoming stream names that have received a POSITION
         # from that connection.
-        self._streams_by_connection = {}  # type: Dict[IReplicationConnection, Set[str]]
+        self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
 
         LaterGauge(
             "synapse_replication_tcp_command_queue",