summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-07-16 11:25:22 +0100
committerErik Johnston <erik@matrix.org>2021-07-16 11:25:22 +0100
commitcc07548d7154811b46537db67e6735214551dd24 (patch)
tree79fa084a29bb512fe7a5eb826484140e34ef8a61 /synapse/replication/tcp/handler.py
parentMerge remote-tracking branch 'origin/release-v1.38' into matrix-org-hotfixes (diff)
parentUse inline type hints in `http/federation/`, `storage/` and `util/` (#10381) (diff)
downloadsynapse-cc07548d7154811b46537db67e6735214551dd24.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py

index 2ad7a200bb..eae4515363 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -105,12 +105,12 @@ class ReplicationCommandHandler: hs.get_instance_name() in hs.config.worker.writers.presence ) - self._streams = { + self._streams: Dict[str, Stream] = { stream.NAME: stream(hs) for stream in STREAMS_MAP.values() - } # type: Dict[str, Stream] + } # List of streams that this instance is the source of - self._streams_to_replicate = [] # type: List[Stream] + self._streams_to_replicate: List[Stream] = [] for stream in self._streams.values(): if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME: @@ -180,14 +180,14 @@ class ReplicationCommandHandler: # Map of stream name to batched updates. See RdataCommand for info on # how batching works. - self._pending_batches = {} # type: Dict[str, List[Any]] + self._pending_batches: Dict[str, List[Any]] = {} # The factory used to create connections. - self._factory = None # type: Optional[ReconnectingClientFactory] + self._factory: Optional[ReconnectingClientFactory] = None # The currently connected connections. (The list of places we need to send # outgoing replication commands to.) - self._connections = [] # type: List[IReplicationConnection] + self._connections: List[IReplicationConnection] = [] LaterGauge( "synapse_replication_tcp_resource_total_connections", @@ -200,7 +200,7 @@ class ReplicationCommandHandler: # them in order in a separate background process. # the streams which are currently being processed by _unsafe_process_queue - self._processing_streams = set() # type: Set[str] + self._processing_streams: Set[str] = set() # for each stream, a queue of commands that are awaiting processing, and the # connection that they arrived on. @@ -210,7 +210,7 @@ class ReplicationCommandHandler: # For each connection, the incoming stream names that have received a POSITION # from that connection. - self._streams_by_connection = {} # type: Dict[IReplicationConnection, Set[str]] + self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} LaterGauge( "synapse_replication_tcp_command_queue",