summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py10
-rw-r--r--synapse/federation/federation_server.py43
-rw-r--r--synapse/federation/send_queue.py26
-rw-r--r--synapse/federation/sender/__init__.py110
-rw-r--r--synapse/federation/sender/per_destination_queue.py34
-rw-r--r--synapse/federation/transport/client.py8
-rw-r--r--synapse/federation/transport/server.py120
7 files changed, 244 insertions, 107 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ed09c6af1f..c767d30627 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -86,7 +86,7 @@ class FederationClient(FederationBase):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
-        self.pdu_destination_tried = {}  # type: Dict[str, Dict[str, int]]
+        self.pdu_destination_tried: Dict[str, Dict[str, int]] = {}
         self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
         self.state = hs.get_state_handler()
         self.transport_layer = hs.get_federation_transport_client()
@@ -94,13 +94,13 @@ class FederationClient(FederationBase):
         self.hostname = hs.hostname
         self.signing_key = hs.signing_key
 
-        self._get_pdu_cache = ExpiringCache(
+        self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache(
             cache_name="get_pdu_cache",
             clock=self._clock,
             max_len=1000,
             expiry_ms=120 * 1000,
             reset_expiry_on_get=False,
-        )  # type: ExpiringCache[str, EventBase]
+        )
 
     def _clear_tried_cache(self):
         """Clear pdu_destination_tried cache"""
@@ -293,10 +293,10 @@ class FederationClient(FederationBase):
                     transaction_data,
                 )
 
-                pdu_list = [
+                pdu_list: List[EventBase] = [
                     event_from_pdu_json(p, room_version, outlier=outlier)
                     for p in transaction_data["pdus"]
-                ]  # type: List[EventBase]
+                ]
 
                 if pdu_list and pdu_list[0]:
                     pdu = pdu_list[0]
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ac0f2ccfb3..29619aeeb8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -122,12 +122,12 @@ class FederationServer(FederationBase):
 
         # origins that we are currently processing a transaction from.
         # a dict from origin to txn id.
-        self._active_transactions = {}  # type: Dict[str, str]
+        self._active_transactions: Dict[str, str] = {}
 
         # We cache results for transaction with the same ID
-        self._transaction_resp_cache = ResponseCache(
+        self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
             hs.get_clock(), "fed_txn_handler", timeout_ms=30000
-        )  # type: ResponseCache[Tuple[str, str]]
+        )
 
         self.transaction_actions = TransactionActions(self.store)
 
@@ -135,12 +135,12 @@ class FederationServer(FederationBase):
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
-        self._state_resp_cache = ResponseCache(
-            hs.get_clock(), "state_resp", timeout_ms=30000
-        )  # type: ResponseCache[Tuple[str, Optional[str]]]
-        self._state_ids_resp_cache = ResponseCache(
+        self._state_resp_cache: ResponseCache[
+            Tuple[str, Optional[str]]
+        ] = ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000)
+        self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
             hs.get_clock(), "state_ids_resp", timeout_ms=30000
-        )  # type: ResponseCache[Tuple[str, str]]
+        )
 
         self._federation_metrics_domains = (
             hs.config.federation.federation_metrics_domains
@@ -337,7 +337,7 @@ class FederationServer(FederationBase):
 
         origin_host, _ = parse_server_name(origin)
 
-        pdus_by_room = {}  # type: Dict[str, List[EventBase]]
+        pdus_by_room: Dict[str, List[EventBase]] = {}
 
         newest_pdu_ts = 0
 
@@ -516,9 +516,9 @@ class FederationServer(FederationBase):
         self, room_id: str, event_id: Optional[str]
     ) -> Dict[str, list]:
         if event_id:
-            pdus = await self.handler.get_state_for_pdu(
+            pdus: Iterable[EventBase] = await self.handler.get_state_for_pdu(
                 room_id, event_id
-            )  # type: Iterable[EventBase]
+            )
         else:
             pdus = (await self.state.get_current_state(room_id)).values()
 
@@ -562,8 +562,7 @@ class FederationServer(FederationBase):
             raise IncompatibleRoomVersionError(room_version=room_version)
 
         pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
-        time_now = self._clock.time_msec()
-        return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
+        return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
 
     async def on_invite_request(
         self, origin: str, content: JsonDict, room_version_id: str
@@ -611,8 +610,7 @@ class FederationServer(FederationBase):
 
         room_version = await self.store.get_room_version_id(room_id)
 
-        time_now = self._clock.time_msec()
-        return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
+        return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
 
     async def on_send_leave_request(
         self, origin: str, content: JsonDict, room_id: str
@@ -659,9 +657,8 @@ class FederationServer(FederationBase):
             )
 
         pdu = await self.handler.on_make_knock_request(origin, room_id, user_id)
-        time_now = self._clock.time_msec()
         return {
-            "event": pdu.get_pdu_json(time_now),
+            "event": pdu.get_templated_pdu_json(),
             "room_version": room_version.identifier,
         }
 
@@ -791,7 +788,7 @@ class FederationServer(FederationBase):
         log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
         results = await self.store.claim_e2e_one_time_keys(query)
 
-        json_result = {}  # type: Dict[str, Dict[str, dict]]
+        json_result: Dict[str, Dict[str, dict]] = {}
         for user_id, device_keys in results.items():
             for device_id, keys in device_keys.items():
                 for key_id, json_str in keys.items():
@@ -1119,17 +1116,13 @@ class FederationHandlerRegistry:
         self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
         self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
 
-        self.edu_handlers = (
-            {}
-        )  # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
-        self.query_handlers = (
-            {}
-        )  # type: Dict[str, Callable[[dict], Awaitable[JsonDict]]]
+        self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {}
+        self.query_handlers: Dict[str, Callable[[dict], Awaitable[JsonDict]]] = {}
 
         # Map from type to instance names that we should route EDU handling to.
         # We randomly choose one instance from the list to route to for each new
         # EDU received.
-        self._edu_type_to_instance = {}  # type: Dict[str, List[str]]
+        self._edu_type_to_instance: Dict[str, List[str]] = {}
 
     def register_edu_handler(
         self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 65d76ea974..1fbf325fdc 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -71,34 +71,32 @@ class FederationRemoteSendQueue(AbstractFederationSender):
         # We may have multiple federation sender instances, so we need to track
         # their positions separately.
         self._sender_instances = hs.config.worker.federation_shard_config.instances
-        self._sender_positions = {}  # type: Dict[str, int]
+        self._sender_positions: Dict[str, int] = {}
 
         # Pending presence map user_id -> UserPresenceState
-        self.presence_map = {}  # type: Dict[str, UserPresenceState]
+        self.presence_map: Dict[str, UserPresenceState] = {}
 
         # Stores the destinations we need to explicitly send presence to about a
         # given user.
         # Stream position -> (user_id, destinations)
-        self.presence_destinations = (
-            SortedDict()
-        )  # type: SortedDict[int, Tuple[str, Iterable[str]]]
+        self.presence_destinations: SortedDict[
+            int, Tuple[str, Iterable[str]]
+        ] = SortedDict()
 
         # (destination, key) -> EDU
-        self.keyed_edu = {}  # type: Dict[Tuple[str, tuple], Edu]
+        self.keyed_edu: Dict[Tuple[str, tuple], Edu] = {}
 
         # stream position -> (destination, key)
-        self.keyed_edu_changed = (
-            SortedDict()
-        )  # type: SortedDict[int, Tuple[str, tuple]]
+        self.keyed_edu_changed: SortedDict[int, Tuple[str, tuple]] = SortedDict()
 
-        self.edus = SortedDict()  # type: SortedDict[int, Edu]
+        self.edus: SortedDict[int, Edu] = SortedDict()
 
         # stream ID for the next entry into keyed_edu_changed/edus.
         self.pos = 1
 
         # map from stream ID to the time that stream entry was generated, so that we
         # can clear out entries after a while
-        self.pos_time = SortedDict()  # type: SortedDict[int, int]
+        self.pos_time: SortedDict[int, int] = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
@@ -291,7 +289,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
 
         # list of tuple(int, BaseFederationRow), where the first is the position
         # of the federation stream.
-        rows = []  # type: List[Tuple[int, BaseFederationRow]]
+        rows: List[Tuple[int, BaseFederationRow]] = []
 
         # Fetch presence to send to destinations
         i = self.presence_destinations.bisect_right(from_token)
@@ -445,11 +443,11 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-_rowtypes = (
+_rowtypes: Tuple[Type[BaseFederationRow], ...] = (
     PresenceDestinationsRow,
     KeyedEduRow,
     EduRow,
-)  # type: Tuple[Type[BaseFederationRow], ...]
+)
 
 TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
 
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index deb40f4610..d980e0d986 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,9 +14,12 @@
 
 import abc
 import logging
+from collections import OrderedDict
 from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
+import attr
 from prometheus_client import Counter
+from typing_extensions import Literal
 
 from twisted.internet import defer
 
@@ -33,8 +36,12 @@ from synapse.metrics import (
     event_processing_loop_room_count,
     events_processed_counter,
 )
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.util import Clock
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -137,6 +144,84 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
 
+@attr.s
+class _PresenceQueue:
+    """A queue of destinations that need to be woken up due to new presence
+    updates.
+
+    Staggers waking up of per destination queues to ensure that we don't attempt
+    to start TLS connections with many hosts all at once, leading to pinned CPU.
+    """
+
+    # The maximum duration in seconds between queuing up a destination and it
+    # being woken up.
+    _MAX_TIME_IN_QUEUE = 30.0
+
+    # The maximum duration in seconds between waking up consecutive destination
+    # queues.
+    _MAX_DELAY = 0.1
+
+    sender: "FederationSender" = attr.ib()
+    clock: Clock = attr.ib()
+    queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
+    processing: bool = attr.ib(default=False)
+
+    def add_to_queue(self, destination: str) -> None:
+        """Add a destination to the queue to be woken up."""
+
+        self.queue[destination] = None
+
+        if not self.processing:
+            self._handle()
+
+    @wrap_as_background_process("_PresenceQueue.handle")
+    async def _handle(self) -> None:
+        """Background process to drain the queue."""
+
+        if not self.queue:
+            return
+
+        assert not self.processing
+        self.processing = True
+
+        try:
+            # We start with a delay that should drain the queue quickly enough that
+            # we process all destinations in the queue in _MAX_TIME_IN_QUEUE
+            # seconds.
+            #
+            # We also add an upper bound to the delay, to gracefully handle the
+            # case where the queue only has a few entries in it.
+            current_sleep_seconds = min(
+                self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
+            )
+
+            while self.queue:
+                destination, _ = self.queue.popitem(last=False)
+
+                queue = self.sender._get_per_destination_queue(destination)
+
+                if not queue._new_data_to_send:
+                    # The per destination queue has already been woken up.
+                    continue
+
+                queue.attempt_new_transaction()
+
+                await self.clock.sleep(current_sleep_seconds)
+
+                if not self.queue:
+                    break
+
+                # More destinations may have been added to the queue, so we may
+                # need to reduce the delay to ensure everything gets processed
+                # within _MAX_TIME_IN_QUEUE seconds.
+                current_sleep_seconds = min(
+                    current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue)
+                )
+
+        finally:
+            self.processing = False
+
+
 class FederationSender(AbstractFederationSender):
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
@@ -148,14 +233,14 @@ class FederationSender(AbstractFederationSender):
         self.clock = hs.get_clock()
         self.is_mine_id = hs.is_mine_id
 
-        self._presence_router = None  # type: Optional[PresenceRouter]
+        self._presence_router: Optional["PresenceRouter"] = None
         self._transaction_manager = TransactionManager(hs)
 
         self._instance_name = hs.get_instance_name()
         self._federation_shard_config = hs.config.worker.federation_shard_config
 
         # map from destination to PerDestinationQueue
-        self._per_destination_queues = {}  # type: Dict[str, PerDestinationQueue]
+        self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
 
         LaterGauge(
             "synapse_federation_transaction_queue_pending_destinations",
@@ -192,9 +277,7 @@ class FederationSender(AbstractFederationSender):
         # awaiting a call to flush_read_receipts_for_room. The presence of an entry
         # here for a given room means that we are rate-limiting RR flushes to that room,
         # and that there is a pending call to _flush_rrs_for_room in the system.
-        self._queues_awaiting_rr_flush_by_room = (
-            {}
-        )  # type: Dict[str, Set[PerDestinationQueue]]
+        self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
 
         self._rr_txn_interval_per_room_ms = (
             1000.0 / hs.config.federation_rr_transactions_per_room_per_second
@@ -210,6 +293,8 @@ class FederationSender(AbstractFederationSender):
 
         self._external_cache = hs.get_external_cache()
 
+        self._presence_queue = _PresenceQueue(self, self.clock)
+
     def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
         """Get or create a PerDestinationQueue for the given destination
 
@@ -265,7 +350,7 @@ class FederationSender(AbstractFederationSender):
                     if not event.internal_metadata.should_proactively_send():
                         return
 
-                    destinations = None  # type: Optional[Set[str]]
+                    destinations: Optional[Set[str]] = None
                     if not event.prev_event_ids():
                         # If there are no prev event IDs then the state is empty
                         # and so no remote servers in the room
@@ -331,7 +416,7 @@ class FederationSender(AbstractFederationSender):
                         for event in events:
                             await handle_event(event)
 
-                events_by_room = {}  # type: Dict[str, List[EventBase]]
+                events_by_room: Dict[str, List[EventBase]] = {}
                 for event in events:
                     events_by_room.setdefault(event.room_id, []).append(event)
 
@@ -519,7 +604,12 @@ class FederationSender(AbstractFederationSender):
                 self._instance_name, destination
             ):
                 continue
-            self._get_per_destination_queue(destination).send_presence(states)
+
+            self._get_per_destination_queue(destination).send_presence(
+                states, start_loop=False
+            )
+
+            self._presence_queue.add_to_queue(destination)
 
     def build_and_send_edu(
         self,
@@ -628,7 +718,7 @@ class FederationSender(AbstractFederationSender):
         In order to reduce load spikes, adds a delay between each destination.
         """
 
-        last_processed = None  # type: Optional[str]
+        last_processed: Optional[str] = None
 
         while True:
             destinations_to_wake = (
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3a2efd56ee..c11d1f6d31 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -105,34 +105,34 @@ class PerDestinationQueue:
         # catch-up at startup.
         # New events will only be sent once this is finished, at which point
         # _catching_up is flipped to False.
-        self._catching_up = True  # type: bool
+        self._catching_up: bool = True
 
         # The stream_ordering of the most recent PDU that was discarded due to
         # being in catch-up mode.
-        self._catchup_last_skipped = 0  # type: int
+        self._catchup_last_skipped: int = 0
 
         # Cache of the last successfully-transmitted stream ordering for this
         # destination (we are the only updater so this is safe)
-        self._last_successful_stream_ordering = None  # type: Optional[int]
+        self._last_successful_stream_ordering: Optional[int] = None
 
         # a queue of pending PDUs
-        self._pending_pdus = []  # type: List[EventBase]
+        self._pending_pdus: List[EventBase] = []
 
         # XXX this is never actually used: see
         # https://github.com/matrix-org/synapse/issues/7549
-        self._pending_edus = []  # type: List[Edu]
+        self._pending_edus: List[Edu] = []
 
         # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
         # based on their key (e.g. typing events by room_id)
         # Map of (edu_type, key) -> Edu
-        self._pending_edus_keyed = {}  # type: Dict[Tuple[str, Hashable], Edu]
+        self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {}
 
         # Map of user_id -> UserPresenceState of pending presence to be sent to this
         # destination
-        self._pending_presence = {}  # type: Dict[str, UserPresenceState]
+        self._pending_presence: Dict[str, UserPresenceState] = {}
 
         # room_id -> receipt_type -> user_id -> receipt_dict
-        self._pending_rrs = {}  # type: Dict[str, Dict[str, Dict[str, dict]]]
+        self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
         self._rrs_pending_flush = False
 
         # stream_id of last successfully sent to-device message.
@@ -171,14 +171,24 @@ class PerDestinationQueue:
 
         self.attempt_new_transaction()
 
-    def send_presence(self, states: Iterable[UserPresenceState]) -> None:
-        """Add presence updates to the queue. Start the transmission loop if necessary.
+    def send_presence(
+        self, states: Iterable[UserPresenceState], start_loop: bool = True
+    ) -> None:
+        """Add presence updates to the queue.
+
+        Args:
+            states: Presence updates to send
+            start_loop: Whether to start the transmission loop if not already
+                running.
 
         Args:
             states: presence to send
         """
         self._pending_presence.update({state.user_id: state for state in states})
-        self.attempt_new_transaction()
+        self._new_data_to_send = True
+
+        if start_loop:
+            self.attempt_new_transaction()
 
     def queue_read_receipt(self, receipt: ReadReceipt) -> None:
         """Add a RR to the list to be sent. Doesn't start the transmission loop yet
@@ -243,7 +253,7 @@ class PerDestinationQueue:
         )
 
     async def _transaction_transmission_loop(self) -> None:
-        pending_pdus = []  # type: List[EventBase]
+        pending_pdus: List[EventBase] = []
         try:
             self.transmission_loop_running = True
 
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index c9e7c57461..98b1bf77fd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -395,9 +395,9 @@ class TransportLayerClient:
             # this uses MSC2197 (Search Filtering over Federation)
             path = _create_v1_path("/publicRooms")
 
-            data = {
+            data: Dict[str, Any] = {
                 "include_all_networks": "true" if include_all_networks else "false"
-            }  # type: Dict[str, Any]
+            }
             if third_party_instance_id:
                 data["third_party_instance_id"] = third_party_instance_id
             if limit:
@@ -423,9 +423,9 @@ class TransportLayerClient:
         else:
             path = _create_v1_path("/publicRooms")
 
-            args = {
+            args: Dict[str, Any] = {
                 "include_all_networks": "true" if include_all_networks else "false"
-            }  # type: Dict[str, Any]
+            }
             if third_party_instance_id:
                 args["third_party_instance_id"] = (third_party_instance_id,)
             if limit:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d37d9565fc..2974d4d0cc 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1013,7 +1013,7 @@ class PublicRoomList(BaseFederationServlet):
         if not self.allow_access:
             raise FederationDeniedError(origin)
 
-        limit = int(content.get("limit", 100))  # type: Optional[int]
+        limit: Optional[int] = int(content.get("limit", 100))
         since_token = content.get("since", None)
         search_filter = content.get("filter", None)
 
@@ -1095,7 +1095,9 @@ class FederationGroupsProfileServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1110,7 +1112,9 @@ class FederationGroupsProfileServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1131,7 +1135,9 @@ class FederationGroupsSummaryServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1152,7 +1158,9 @@ class FederationGroupsRoomsServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1174,7 +1182,9 @@ class FederationGroupsAddRoomsServlet(BaseGroupsServerServlet):
         group_id: str,
         room_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1192,7 +1202,9 @@ class FederationGroupsAddRoomsServlet(BaseGroupsServerServlet):
         group_id: str,
         room_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1220,7 +1232,9 @@ class FederationGroupsAddRoomsConfigServlet(BaseGroupsServerServlet):
         room_id: str,
         config_key: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1243,7 +1257,9 @@ class FederationGroupsUsersServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1264,7 +1280,9 @@ class FederationGroupsInvitedUsersServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1288,7 +1306,9 @@ class FederationGroupsInviteServlet(BaseGroupsServerServlet):
         group_id: str,
         user_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1354,7 +1374,9 @@ class FederationGroupsRemoveUserServlet(BaseGroupsServerServlet):
         group_id: str,
         user_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1487,7 +1509,9 @@ class FederationGroupsSummaryRoomsServlet(BaseGroupsServerServlet):
         category_id: str,
         room_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1523,7 +1547,9 @@ class FederationGroupsSummaryRoomsServlet(BaseGroupsServerServlet):
         category_id: str,
         room_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1549,7 +1575,9 @@ class FederationGroupsCategoriesServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1571,7 +1599,9 @@ class FederationGroupsCategoryServlet(BaseGroupsServerServlet):
         group_id: str,
         category_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1589,7 +1619,9 @@ class FederationGroupsCategoryServlet(BaseGroupsServerServlet):
         group_id: str,
         category_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1618,7 +1650,9 @@ class FederationGroupsCategoryServlet(BaseGroupsServerServlet):
         group_id: str,
         category_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1644,7 +1678,9 @@ class FederationGroupsRolesServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1666,7 +1702,9 @@ class FederationGroupsRoleServlet(BaseGroupsServerServlet):
         group_id: str,
         role_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1682,7 +1720,9 @@ class FederationGroupsRoleServlet(BaseGroupsServerServlet):
         group_id: str,
         role_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1713,7 +1753,9 @@ class FederationGroupsRoleServlet(BaseGroupsServerServlet):
         group_id: str,
         role_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1750,7 +1792,9 @@ class FederationGroupsSummaryUsersServlet(BaseGroupsServerServlet):
         role_id: str,
         user_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1784,7 +1828,9 @@ class FederationGroupsSummaryUsersServlet(BaseGroupsServerServlet):
         role_id: str,
         user_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1825,7 +1871,9 @@ class FederationGroupsSettingJoinPolicyServlet(BaseGroupsServerServlet):
         query: Dict[bytes, List[bytes]],
         group_id: str,
     ) -> Tuple[int, JsonDict]:
-        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        requester_user_id = parse_string_from_args(
+            query, "requester_user_id", required=True
+        )
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
@@ -1943,7 +1991,7 @@ class RoomComplexityServlet(BaseFederationServlet):
         return 200, complexity
 
 
-FEDERATION_SERVLET_CLASSES = (
+FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationSendServlet,
     FederationEventServlet,
     FederationStateV1Servlet,
@@ -1971,15 +2019,13 @@ FEDERATION_SERVLET_CLASSES = (
     FederationSpaceSummaryServlet,
     FederationV1SendKnockServlet,
     FederationMakeKnockServlet,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+)
 
-OPENID_SERVLET_CLASSES = (
-    OpenIdUserInfo,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+OPENID_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (OpenIdUserInfo,)
 
-ROOM_LIST_CLASSES = (PublicRoomList,)  # type: Tuple[Type[PublicRoomList], ...]
+ROOM_LIST_CLASSES: Tuple[Type[PublicRoomList], ...] = (PublicRoomList,)
 
-GROUP_SERVER_SERVLET_CLASSES = (
+GROUP_SERVER_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationGroupsProfileServlet,
     FederationGroupsSummaryServlet,
     FederationGroupsRoomsServlet,
@@ -1998,19 +2044,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsAddRoomsServlet,
     FederationGroupsAddRoomsConfigServlet,
     FederationGroupsSettingJoinPolicyServlet,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+)
 
 
-GROUP_LOCAL_SERVLET_CLASSES = (
+GROUP_LOCAL_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationGroupsLocalInviteServlet,
     FederationGroupsRemoveLocalUserServlet,
     FederationGroupsBulkPublicisedServlet,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+)
 
 
-GROUP_ATTESTATION_SERVLET_CLASSES = (
+GROUP_ATTESTATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationGroupsRenewAttestaionServlet,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+)
 
 
 DEFAULT_SERVLET_GROUPS = (