summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/__init__.py2
-rw-r--r--synapse/replication/http/_base.py31
-rw-r--r--synapse/replication/http/account_data.py38
-rw-r--r--synapse/replication/http/devices.py14
-rw-r--r--synapse/replication/http/federation.py65
-rw-r--r--synapse/replication/http/login.py47
-rw-r--r--synapse/replication/http/membership.py31
-rw-r--r--synapse/replication/http/presence.py38
-rw-r--r--synapse/replication/http/push.py14
-rw-r--r--synapse/replication/http/register.py79
-rw-r--r--synapse/replication/http/send_event.py44
-rw-r--r--synapse/replication/http/streams.py16
12 files changed, 257 insertions, 162 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 1457d9d59b..aec040ee19 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -40,7 +40,7 @@ class ReplicationRestResource(JsonResource):
         super().__init__(hs, canonical_json=False, extract_context=True)
         self.register_servlets(hs)
 
-    def register_servlets(self, hs: "HomeServer"):
+    def register_servlets(self, hs: "HomeServer") -> None:
         send_event.register_servlets(hs, self)
         federation.register_servlets(hs, self)
         presence.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 585332b244..bc1d28dd19 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -15,16 +15,20 @@
 import abc
 import logging
 import re
-import urllib
+import urllib.parse
 from inspect import signature
 from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple
 
 from prometheus_client import Counter, Gauge
 
+from twisted.web.server import Request
+
 from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.http import RequestTimedOutError
+from synapse.http.server import HttpServer
 from synapse.logging import opentracing
 from synapse.logging.opentracing import trace
+from synapse.types import JsonDict
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.stringutils import random_string
 
@@ -113,10 +117,12 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         if hs.config.worker.worker_replication_secret:
             self._replication_secret = hs.config.worker.worker_replication_secret
 
-    def _check_auth(self, request) -> None:
+    def _check_auth(self, request: Request) -> None:
         # Get the authorization header.
         auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
 
+        if not auth_headers:
+            raise RuntimeError("Missing Authorization header.")
         if len(auth_headers) > 1:
             raise RuntimeError("Too many Authorization headers.")
         parts = auth_headers[0].split(b" ")
@@ -129,7 +135,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         raise RuntimeError("Invalid Authorization header.")
 
     @abc.abstractmethod
-    async def _serialize_payload(**kwargs):
+    async def _serialize_payload(**kwargs) -> JsonDict:
         """Static method that is called when creating a request.
 
         Concrete implementations should have explicit parameters (rather than
@@ -144,19 +150,20 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         return {}
 
     @abc.abstractmethod
-    async def _handle_request(self, request, **kwargs):
+    async def _handle_request(
+        self, request: Request, **kwargs: Any
+    ) -> Tuple[int, JsonDict]:
         """Handle incoming request.
 
         This is called with the request object and PATH_ARGS.
 
         Returns:
-            tuple[int, dict]: HTTP status code and a JSON serialisable dict
-            to be used as response body of request.
+            HTTP status code and a JSON serialisable dict to be used as response
+            body of request.
         """
-        pass
 
     @classmethod
-    def make_client(cls, hs: "HomeServer"):
+    def make_client(cls, hs: "HomeServer") -> Callable:
         """Create a client that makes requests.
 
         Returns a callable that accepts the same parameters as
@@ -182,7 +189,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
             )
 
         @trace(opname="outgoing_replication_request")
-        async def send_request(*, instance_name="master", **kwargs):
+        async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
             with outgoing_gauge.track_inprogress():
                 if instance_name == local_instance_name:
                     raise Exception("Trying to send HTTP request to self")
@@ -268,7 +275,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
 
         return send_request
 
-    def register(self, http_server):
+    def register(self, http_server: HttpServer) -> None:
         """Called by the server to register this as a handler to the
         appropriate path.
         """
@@ -289,7 +296,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
             self.__class__.__name__,
         )
 
-    async def _check_auth_and_handle(self, request, **kwargs):
+    async def _check_auth_and_handle(
+        self, request: Request, **kwargs: Any
+    ) -> Tuple[int, JsonDict]:
         """Called on new incoming requests when caching is enabled. Checks
         if there is a cached response for the request and returns that,
         otherwise calls `_handle_request` and caches its response.
diff --git a/synapse/replication/http/account_data.py b/synapse/replication/http/account_data.py
index 5f0f225aa9..310f609153 100644
--- a/synapse/replication/http/account_data.py
+++ b/synapse/replication/http/account_data.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -48,14 +52,18 @@ class ReplicationUserAccountDataRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id, account_data_type, content):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, account_data_type: str, content: JsonDict
+    ) -> JsonDict:
         payload = {
             "content": content,
         }
 
         return payload
 
-    async def _handle_request(self, request, user_id, account_data_type):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str, account_data_type: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         max_stream_id = await self.handler.add_account_data_for_user(
@@ -89,14 +97,18 @@ class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id, room_id, account_data_type, content):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, room_id: str, account_data_type: str, content: JsonDict
+    ) -> JsonDict:
         payload = {
             "content": content,
         }
 
         return payload
 
-    async def _handle_request(self, request, user_id, room_id, account_data_type):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str, room_id: str, account_data_type: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         max_stream_id = await self.handler.add_account_data_to_room(
@@ -130,14 +142,18 @@ class ReplicationAddTagRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id, room_id, tag, content):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, room_id: str, tag: str, content: JsonDict
+    ) -> JsonDict:
         payload = {
             "content": content,
         }
 
         return payload
 
-    async def _handle_request(self, request, user_id, room_id, tag):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str, room_id: str, tag: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         max_stream_id = await self.handler.add_tag_to_room(
@@ -173,11 +189,13 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id, room_id, tag):
+    async def _serialize_payload(user_id: str, room_id: str, tag: str) -> JsonDict:  # type: ignore[override]
 
         return {}
 
-    async def _handle_request(self, request, user_id, room_id, tag):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str, room_id: str, tag: str
+    ) -> Tuple[int, JsonDict]:
         max_stream_id = await self.handler.remove_tag_from_room(
             user_id,
             room_id,
@@ -187,7 +205,7 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
         return 200, {"max_stream_id": max_stream_id}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationUserAccountDataRestServlet(hs).register(http_server)
     ReplicationRoomAccountDataRestServlet(hs).register(http_server)
     ReplicationAddTagRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 42dffb39cb..f2f40129fe 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -13,9 +13,13 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -63,14 +67,16 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id):
+    async def _serialize_payload(user_id: str) -> JsonDict:  # type: ignore[override]
         return {}
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         user_devices = await self.device_list_updater.user_device_resync(user_id)
 
         return 200, user_devices
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 5ed535c90d..d529c8a19f 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -13,17 +13,22 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Tuple
 
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
-from synapse.events import make_event_from_dict
+from twisted.web.server import Request
+
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
+from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
+    from synapse.storage.databases.main import DataStore
 
 logger = logging.getLogger(__name__)
 
@@ -69,14 +74,18 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         self.federation_event_handler = hs.get_federation_event_handler()
 
     @staticmethod
-    async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
+    async def _serialize_payload(  # type: ignore[override]
+        store: "DataStore",
+        room_id: str,
+        event_and_contexts: List[Tuple[EventBase, EventContext]],
+        backfilled: bool,
+    ) -> JsonDict:
         """
         Args:
             store
-            room_id (str)
-            event_and_contexts (list[tuple[FrozenEvent, EventContext]])
-            backfilled (bool): Whether or not the events are the result of
-                backfilling
+            room_id
+            event_and_contexts
+            backfilled: Whether or not the events are the result of backfilling
         """
         event_payloads = []
         for event, context in event_and_contexts:
@@ -102,7 +111,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
         return payload
 
-    async def _handle_request(self, request):
+    async def _handle_request(self, request: Request) -> Tuple[int, JsonDict]:  # type: ignore[override]
         with Measure(self.clock, "repl_fed_send_events_parse"):
             content = parse_json_object_from_request(request)
 
@@ -163,10 +172,14 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
         self.registry = hs.get_federation_registry()
 
     @staticmethod
-    async def _serialize_payload(edu_type, origin, content):
+    async def _serialize_payload(  # type: ignore[override]
+        edu_type: str, origin: str, content: JsonDict
+    ) -> JsonDict:
         return {"origin": origin, "content": content}
 
-    async def _handle_request(self, request, edu_type):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, edu_type: str
+    ) -> Tuple[int, JsonDict]:
         with Measure(self.clock, "repl_fed_send_edu_parse"):
             content = parse_json_object_from_request(request)
 
@@ -175,9 +188,9 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
 
         logger.info("Got %r edu from %s", edu_type, origin)
 
-        result = await self.registry.on_edu(edu_type, origin, edu_content)
+        await self.registry.on_edu(edu_type, origin, edu_content)
 
-        return 200, result
+        return 200, {}
 
 
 class ReplicationGetQueryRestServlet(ReplicationEndpoint):
@@ -206,15 +219,17 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
         self.registry = hs.get_federation_registry()
 
     @staticmethod
-    async def _serialize_payload(query_type, args):
+    async def _serialize_payload(query_type: str, args: JsonDict) -> JsonDict:  # type: ignore[override]
         """
         Args:
-            query_type (str)
-            args (dict): The arguments received for the given query type
+            query_type
+            args: The arguments received for the given query type
         """
         return {"args": args}
 
-    async def _handle_request(self, request, query_type):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, query_type: str
+    ) -> Tuple[int, JsonDict]:
         with Measure(self.clock, "repl_fed_query_parse"):
             content = parse_json_object_from_request(request)
 
@@ -248,14 +263,16 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
         self.store = hs.get_datastore()
 
     @staticmethod
-    async def _serialize_payload(room_id, args):
+    async def _serialize_payload(room_id: str) -> JsonDict:  # type: ignore[override]
         """
         Args:
-            room_id (str)
+            room_id
         """
         return {}
 
-    async def _handle_request(self, request, room_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, room_id: str
+    ) -> Tuple[int, JsonDict]:
         await self.store.clean_room_for_join(room_id)
 
         return 200, {}
@@ -283,17 +300,19 @@ class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint):
         self.store = hs.get_datastore()
 
     @staticmethod
-    async def _serialize_payload(room_id, room_version):
+    async def _serialize_payload(room_id: str, room_version: RoomVersion) -> JsonDict:  # type: ignore[override]
         return {"room_version": room_version.identifier}
 
-    async def _handle_request(self, request, room_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, room_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
         room_version = KNOWN_ROOM_VERSIONS[content["room_version"]]
         await self.store.maybe_store_room_on_outlier_membership(room_id, room_version)
         return 200, {}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationFederationSendEventsRestServlet(hs).register(http_server)
     ReplicationFederationSendEduRestServlet(hs).register(http_server)
     ReplicationGetQueryRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index daacc34cea..c68e18da12 100644
--- a/synapse/replication/http/login.py
+++ b/synapse/replication/http/login.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional, Tuple, cast
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -39,25 +43,24 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
         self.registration_handler = hs.get_registration_handler()
 
     @staticmethod
-    async def _serialize_payload(
-        user_id,
-        device_id,
-        initial_display_name,
-        is_guest,
-        is_appservice_ghost,
-        should_issue_refresh_token,
-        auth_provider_id,
-        auth_provider_session_id,
-    ):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str,
+        device_id: Optional[str],
+        initial_display_name: Optional[str],
+        is_guest: bool,
+        is_appservice_ghost: bool,
+        should_issue_refresh_token: bool,
+        auth_provider_id: Optional[str],
+        auth_provider_session_id: Optional[str],
+    ) -> JsonDict:
         """
         Args:
-            user_id (int)
-            device_id (str|None): Device ID to use, if None a new one is
-                generated.
-            initial_display_name (str|None)
-            is_guest (bool)
-            is_appservice_ghost (bool)
-            should_issue_refresh_token (bool)
+            user_id
+            device_id: Device ID to use, if None a new one is generated.
+            initial_display_name
+            is_guest
+            is_appservice_ghost
+            should_issue_refresh_token
         """
         return {
             "device_id": device_id,
@@ -69,7 +72,9 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
             "auth_provider_session_id": auth_provider_session_id,
         }
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         device_id = content["device_id"]
@@ -91,8 +96,8 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
             auth_provider_session_id=auth_provider_session_id,
         )
 
-        return 200, res
+        return 200, cast(JsonDict, res)
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     RegisterDeviceReplicationServlet(hs).register(http_server)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 7371c240b2..0145858e47 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -16,6 +16,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from twisted.web.server import Request
 
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.http.site import SynapseRequest
 from synapse.replication.http._base import ReplicationEndpoint
@@ -53,7 +54,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         requester: Requester,
         room_id: str,
         user_id: str,
@@ -77,7 +78,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
             "content": content,
         }
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self, request: SynapseRequest, room_id: str, user_id: str
     ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
@@ -122,13 +123,13 @@ class ReplicationRemoteKnockRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         requester: Requester,
         room_id: str,
         user_id: str,
         remote_room_hosts: List[str],
         content: JsonDict,
-    ):
+    ) -> JsonDict:
         """
         Args:
             requester: The user making the request, according to the access token.
@@ -143,12 +144,12 @@ class ReplicationRemoteKnockRestServlet(ReplicationEndpoint):
             "content": content,
         }
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self,
         request: SynapseRequest,
         room_id: str,
         user_id: str,
-    ):
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         remote_room_hosts = content["remote_room_hosts"]
@@ -192,7 +193,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         self.member_handler = hs.get_room_member_handler()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         invite_event_id: str,
         txn_id: Optional[str],
         requester: Requester,
@@ -215,7 +216,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
             "content": content,
         }
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self, request: SynapseRequest, invite_event_id: str
     ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
@@ -262,12 +263,12 @@ class ReplicationRemoteRescindKnockRestServlet(ReplicationEndpoint):
         self.member_handler = hs.get_room_member_handler()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         knock_event_id: str,
         txn_id: Optional[str],
         requester: Requester,
         content: JsonDict,
-    ):
+    ) -> JsonDict:
         """
         Args:
             knock_event_id: The ID of the knock to be rescinded.
@@ -281,11 +282,11 @@ class ReplicationRemoteRescindKnockRestServlet(ReplicationEndpoint):
             "content": content,
         }
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self,
         request: SynapseRequest,
         knock_event_id: str,
-    ):
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         txn_id = content["txn_id"]
@@ -329,7 +330,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
         self.distributor = hs.get_distributor()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         room_id: str, user_id: str, change: str
     ) -> JsonDict:
         """
@@ -345,7 +346,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
 
         return {}
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self, request: Request, room_id: str, user_id: str, change: str
     ) -> Tuple[int, JsonDict]:
         logger.info("user membership change: %s in %s", user_id, room_id)
@@ -360,7 +361,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationRemoteJoinRestServlet(hs).register(http_server)
     ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
     ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
index 63143085d5..4a5b08f56f 100644
--- a/synapse/replication/http/presence.py
+++ b/synapse/replication/http/presence.py
@@ -13,11 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import UserID
+from synapse.types import JsonDict, UserID
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -49,18 +52,17 @@ class ReplicationBumpPresenceActiveTime(ReplicationEndpoint):
         self._presence_handler = hs.get_presence_handler()
 
     @staticmethod
-    async def _serialize_payload(user_id):
+    async def _serialize_payload(user_id: str) -> JsonDict:  # type: ignore[override]
         return {}
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         await self._presence_handler.bump_presence_active_time(
             UserID.from_string(user_id)
         )
 
-        return (
-            200,
-            {},
-        )
+        return (200, {})
 
 
 class ReplicationPresenceSetState(ReplicationEndpoint):
@@ -92,16 +94,21 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
         self._presence_handler = hs.get_presence_handler()
 
     @staticmethod
-    async def _serialize_payload(
-        user_id, state, ignore_status_msg=False, force_notify=False
-    ):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str,
+        state: JsonDict,
+        ignore_status_msg: bool = False,
+        force_notify: bool = False,
+    ) -> JsonDict:
         return {
             "state": state,
             "ignore_status_msg": ignore_status_msg,
             "force_notify": force_notify,
         }
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         await self._presence_handler.set_state(
@@ -111,12 +118,9 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
             content["force_notify"],
         )
 
-        return (
-            200,
-            {},
-        )
+        return (200, {})
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationBumpPresenceActiveTime(hs).register(http_server)
     ReplicationPresenceSetState(hs).register(http_server)
diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
index 6c8db3061e..af5c2f66a7 100644
--- a/synapse/replication/http/push.py
+++ b/synapse/replication/http/push.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -48,7 +52,7 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
         self.pusher_pool = hs.get_pusherpool()
 
     @staticmethod
-    async def _serialize_payload(app_id, pushkey, user_id):
+    async def _serialize_payload(app_id: str, pushkey: str, user_id: str) -> JsonDict:  # type: ignore[override]
         payload = {
             "app_id": app_id,
             "pushkey": pushkey,
@@ -56,7 +60,9 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
 
         return payload
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         app_id = content["app_id"]
@@ -67,5 +73,5 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationRemovePusherRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 7adfbb666f..c7f751b70d 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -36,34 +40,34 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
         self.registration_handler = hs.get_registration_handler()
 
     @staticmethod
-    async def _serialize_payload(
-        user_id,
-        password_hash,
-        was_guest,
-        make_guest,
-        appservice_id,
-        create_profile_with_displayname,
-        admin,
-        user_type,
-        address,
-        shadow_banned,
-    ):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str,
+        password_hash: Optional[str],
+        was_guest: bool,
+        make_guest: bool,
+        appservice_id: Optional[str],
+        create_profile_with_displayname: Optional[str],
+        admin: bool,
+        user_type: Optional[str],
+        address: Optional[str],
+        shadow_banned: bool,
+    ) -> JsonDict:
         """
         Args:
-            user_id (str): The desired user ID to register.
-            password_hash (str|None): Optional. The password hash for this user.
-            was_guest (bool): Optional. Whether this is a guest account being
-                upgraded to a non-guest account.
-            make_guest (boolean): True if the the new user should be guest,
-                false to add a regular user account.
-            appservice_id (str|None): The ID of the appservice registering the user.
-            create_profile_with_displayname (unicode|None): Optionally create a
-                profile for the user, setting their displayname to the given value
-            admin (boolean): is an admin user?
-            user_type (str|None): type of user. One of the values from
-                api.constants.UserTypes, or None for a normal user.
-            address (str|None): the IP address used to perform the regitration.
-            shadow_banned (bool): Whether to shadow-ban the user
+            user_id: The desired user ID to register.
+            password_hash: Optional. The password hash for this user.
+            was_guest: Optional. Whether this is a guest account being upgraded
+                to a non-guest account.
+            make_guest: True if the the new user should be guest, false to add a
+                regular user account.
+            appservice_id: The ID of the appservice registering the user.
+            create_profile_with_displayname: Optionally create a profile for the
+                user, setting their displayname to the given value
+            admin: is an admin user?
+            user_type: type of user. One of the values from api.constants.UserTypes,
+                or None for a normal user.
+            address: the IP address used to perform the regitration.
+            shadow_banned: Whether to shadow-ban the user
         """
         return {
             "password_hash": password_hash,
@@ -77,7 +81,9 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
             "shadow_banned": shadow_banned,
         }
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         await self.registration_handler.check_registration_ratelimit(content["address"])
@@ -110,18 +116,21 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
         self.registration_handler = hs.get_registration_handler()
 
     @staticmethod
-    async def _serialize_payload(user_id, auth_result, access_token):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, auth_result: JsonDict, access_token: Optional[str]
+    ) -> JsonDict:
         """
         Args:
-            user_id (str): The user ID that consented
-            auth_result (dict): The authenticated credentials of the newly
-                registered user.
-            access_token (str|None): The access token of the newly logged in
+            user_id: The user ID that consented
+            auth_result: The authenticated credentials of the newly registered user.
+            access_token: The access token of the newly logged in
                 device, or None if `inhibit_login` enabled.
         """
         return {"auth_result": auth_result, "access_token": access_token}
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         auth_result = content["auth_result"]
@@ -134,6 +143,6 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationRegisterServlet(hs).register(http_server)
     ReplicationPostRegisterActionsServlet(hs).register(http_server)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 9f6851d059..33e98daf8a 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -13,18 +13,22 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Tuple
+
+from twisted.web.server import Request
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
-from synapse.events import make_event_from_dict
+from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import Requester, UserID
+from synapse.types import JsonDict, Requester, UserID
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
+    from synapse.storage.databases.main import DataStore
 
 logger = logging.getLogger(__name__)
 
@@ -70,18 +74,24 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(
-        event_id, store, event, context, requester, ratelimit, extra_users
-    ):
+    async def _serialize_payload(  # type: ignore[override]
+        event_id: str,
+        store: "DataStore",
+        event: EventBase,
+        context: EventContext,
+        requester: Requester,
+        ratelimit: bool,
+        extra_users: List[UserID],
+    ) -> JsonDict:
         """
         Args:
-            event_id (str)
-            store (DataStore)
-            requester (Requester)
-            event (FrozenEvent)
-            context (EventContext)
-            ratelimit (bool)
-            extra_users (list(UserID)): Any extra users to notify about event
+            event_id
+            store
+            requester
+            event
+            context
+            ratelimit
+            extra_users: Any extra users to notify about event
         """
         serialized_context = await context.serialize(event, store)
 
@@ -100,7 +110,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
 
         return payload
 
-    async def _handle_request(self, request, event_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, event_id: str
+    ) -> Tuple[int, JsonDict]:
         with Measure(self.clock, "repl_send_event_parse"):
             content = parse_json_object_from_request(request)
 
@@ -120,8 +132,6 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             ratelimit = content["ratelimit"]
             extra_users = [UserID.from_string(u) for u in content["extra_users"]]
 
-        request.requester = requester
-
         logger.info(
             "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
         )
@@ -139,5 +149,5 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
         )
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationSendEventRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 3223bc2432..c065225362 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -13,11 +13,15 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
+
+from twisted.web.server import Request
 
 from synapse.api.errors import SynapseError
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_integer
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -57,10 +61,14 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         self.streams = hs.get_replication_streams()
 
     @staticmethod
-    async def _serialize_payload(stream_name, from_token, upto_token):
+    async def _serialize_payload(  # type: ignore[override]
+        stream_name: str, from_token: int, upto_token: int
+    ) -> JsonDict:
         return {"from_token": from_token, "upto_token": upto_token}
 
-    async def _handle_request(self, request, stream_name):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, stream_name: str
+    ) -> Tuple[int, JsonDict]:
         stream = self.streams.get(stream_name)
         if stream is None:
             raise SynapseError(400, "Unknown stream")
@@ -78,5 +86,5 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         )
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationGetStreamUpdates(hs).register(http_server)