summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/commands.py6
-rw-r--r--synapse/replication/tcp/streams/federation.py14
2 files changed, 14 insertions, 6 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index bb447f75b4..8abed1f52d 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -312,16 +312,16 @@ class FederationAckCommand(Command):
 
     NAME = "FEDERATION_ACK"
 
-    def __init__(self, instance_name, token):
+    def __init__(self, instance_name: str, token: int):
         self.instance_name = instance_name
         self.token = token
 
     @classmethod
-    def from_line(cls, line):
+    def from_line(cls, line: str) -> "FederationAckCommand":
         instance_name, token = line.split(" ")
         return cls(instance_name, int(token))
 
-    def to_line(self):
+    def to_line(self) -> str:
         return "%s %s" % (self.instance_name, self.token)
 
 
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 9bcd13b009..9bb8e9e177 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from collections import namedtuple
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
 
 from synapse.replication.tcp.streams._base import (
     Stream,
@@ -21,6 +22,9 @@ from synapse.replication.tcp.streams._base import (
     make_http_update_function,
 )
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class FederationStream(Stream):
     """Data to be sent over federation. Only available when master has federation
@@ -38,7 +42,7 @@ class FederationStream(Stream):
     NAME = "federation"
     ROW_TYPE = FederationStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         if hs.config.worker_app is None:
             # master process: get updates from the FederationRemoteSendQueue.
             # (if the master is configured to send federation itself, federation_sender
@@ -48,7 +52,9 @@ class FederationStream(Stream):
             current_token = current_token_without_instance(
                 federation_sender.get_current_token
             )
-            update_function = federation_sender.get_replication_rows
+            update_function = (
+                federation_sender.get_replication_rows
+            )  # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
 
         elif hs.should_send_federation():
             # federation sender: Query master process
@@ -69,5 +75,7 @@ class FederationStream(Stream):
         return 0
 
     @staticmethod
-    async def _stub_update_function(instance_name, from_token, upto_token, limit):
+    async def _stub_update_function(
+        instance_name: str, from_token: int, upto_token: int, limit: int
+    ) -> Tuple[list, int, bool]:
         return [], upto_token, False