diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index a9d85f4f6c..ecd6190f5b 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -16,16 +16,18 @@
import logging
import random
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Optional, Tuple
from prometheus_client import Counter
+from twisted.internet.interfaces import IAddress
from twisted.internet.protocol import ServerFactory
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
from synapse.replication.tcp.streams import EventsStream
+from synapse.replication.tcp.streams._base import StreamRow, Token
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -56,7 +58,7 @@ class ReplicationStreamProtocolFactory(ServerFactory):
# listener config again or always starting a `ReplicationStreamer`.)
hs.get_replication_streamer()
- def buildProtocol(self, addr):
+ def buildProtocol(self, addr: IAddress) -> ServerReplicationStreamProtocol:
return ServerReplicationStreamProtocol(
self.server_name, self.clock, self.command_handler
)
@@ -105,7 +107,7 @@ class ReplicationStreamer:
if any(EventsStream.NAME == s.NAME for s in self.streams):
self.clock.looping_call(self.on_notifier_poke, 1000)
- def on_notifier_poke(self):
+ def on_notifier_poke(self) -> None:
"""Checks if there is actually any new data and sends it to the
connections if there are.
@@ -137,7 +139,7 @@ class ReplicationStreamer:
run_as_background_process("replication_notifier", self._run_notifier_loop)
- async def _run_notifier_loop(self):
+ async def _run_notifier_loop(self) -> None:
self.is_looping = True
try:
@@ -238,7 +240,9 @@ class ReplicationStreamer:
self.is_looping = False
-def _batch_updates(updates):
+def _batch_updates(
+ updates: List[Tuple[Token, StreamRow]]
+) -> List[Tuple[Optional[Token], StreamRow]]:
"""Takes a list of updates of form [(token, row)] and sets the token to
None for all rows where the next row has the same token. This is used to
implement batching.
@@ -254,7 +258,7 @@ def _batch_updates(updates):
if not updates:
return []
- new_updates = []
+ new_updates: List[Tuple[Optional[Token], StreamRow]] = []
for i, update in enumerate(updates[:-1]):
if update[0] == updates[i + 1][0]:
new_updates.append((None, update[1]))
|