diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a9d85f4f6c..ecd6190f5b 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -16,16 +16,18 @@ import logging import random -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List, Optional, Tuple from prometheus_client import Counter +from twisted.internet.interfaces import IAddress from twisted.internet.protocol import ServerFactory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.commands import PositionCommand from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol from synapse.replication.tcp.streams import EventsStream +from synapse.replication.tcp.streams._base import StreamRow, Token from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -56,7 +58,7 @@ class ReplicationStreamProtocolFactory(ServerFactory): # listener config again or always starting a `ReplicationStreamer`.) hs.get_replication_streamer() - def buildProtocol(self, addr): + def buildProtocol(self, addr: IAddress) -> ServerReplicationStreamProtocol: return ServerReplicationStreamProtocol( self.server_name, self.clock, self.command_handler ) @@ -105,7 +107,7 @@ class ReplicationStreamer: if any(EventsStream.NAME == s.NAME for s in self.streams): self.clock.looping_call(self.on_notifier_poke, 1000) - def on_notifier_poke(self): + def on_notifier_poke(self) -> None: """Checks if there is actually any new data and sends it to the connections if there are. @@ -137,7 +139,7 @@ class ReplicationStreamer: run_as_background_process("replication_notifier", self._run_notifier_loop) - async def _run_notifier_loop(self): + async def _run_notifier_loop(self) -> None: self.is_looping = True try: @@ -238,7 +240,9 @@ class ReplicationStreamer: self.is_looping = False -def _batch_updates(updates): +def _batch_updates( + updates: List[Tuple[Token, StreamRow]] +) -> List[Tuple[Optional[Token], StreamRow]]: """Takes a list of updates of form [(token, row)] and sets the token to None for all rows where the next row has the same token. This is used to implement batching. @@ -254,7 +258,7 @@ def _batch_updates(updates): if not updates: return [] - new_updates = [] + new_updates: List[Tuple[Optional[Token], StreamRow]] = [] for i, update in enumerate(updates[:-1]): if update[0] == updates[i + 1][0]: new_updates.append((None, update[1])) |