summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py16
1 files changed, 10 insertions, 6 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index a9d85f4f6c..ecd6190f5b 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -16,16 +16,18 @@
 
 import logging
 import random
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from prometheus_client import Counter
 
+from twisted.internet.interfaces import IAddress
 from twisted.internet.protocol import ServerFactory
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.tcp.commands import PositionCommand
 from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
 from synapse.replication.tcp.streams import EventsStream
+from synapse.replication.tcp.streams._base import StreamRow, Token
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -56,7 +58,7 @@ class ReplicationStreamProtocolFactory(ServerFactory):
         # listener config again or always starting a `ReplicationStreamer`.)
         hs.get_replication_streamer()
 
-    def buildProtocol(self, addr):
+    def buildProtocol(self, addr: IAddress) -> ServerReplicationStreamProtocol:
         return ServerReplicationStreamProtocol(
             self.server_name, self.clock, self.command_handler
         )
@@ -105,7 +107,7 @@ class ReplicationStreamer:
         if any(EventsStream.NAME == s.NAME for s in self.streams):
             self.clock.looping_call(self.on_notifier_poke, 1000)
 
-    def on_notifier_poke(self):
+    def on_notifier_poke(self) -> None:
         """Checks if there is actually any new data and sends it to the
         connections if there are.
 
@@ -137,7 +139,7 @@ class ReplicationStreamer:
 
         run_as_background_process("replication_notifier", self._run_notifier_loop)
 
-    async def _run_notifier_loop(self):
+    async def _run_notifier_loop(self) -> None:
         self.is_looping = True
 
         try:
@@ -238,7 +240,9 @@ class ReplicationStreamer:
             self.is_looping = False
 
 
-def _batch_updates(updates):
+def _batch_updates(
+    updates: List[Tuple[Token, StreamRow]]
+) -> List[Tuple[Optional[Token], StreamRow]]:
     """Takes a list of updates of form [(token, row)] and sets the token to
     None for all rows where the next row has the same token. This is used to
     implement batching.
@@ -254,7 +258,7 @@ def _batch_updates(updates):
     if not updates:
         return []
 
-    new_updates = []
+    new_updates: List[Tuple[Optional[Token], StreamRow]] = []
     for i, update in enumerate(updates[:-1]):
         if update[0] == updates[i + 1][0]:
             new_updates.append((None, update[1]))