diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ed09c6af1f..c767d30627 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -86,7 +86,7 @@ class FederationClient(FederationBase):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
- self.pdu_destination_tried = {} # type: Dict[str, Dict[str, int]]
+ self.pdu_destination_tried: Dict[str, Dict[str, int]] = {}
self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
@@ -94,13 +94,13 @@ class FederationClient(FederationBase):
self.hostname = hs.hostname
self.signing_key = hs.signing_key
- self._get_pdu_cache = ExpiringCache(
+ self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
max_len=1000,
expiry_ms=120 * 1000,
reset_expiry_on_get=False,
- ) # type: ExpiringCache[str, EventBase]
+ )
def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
@@ -293,10 +293,10 @@ class FederationClient(FederationBase):
transaction_data,
)
- pdu_list = [
+ pdu_list: List[EventBase] = [
event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
- ] # type: List[EventBase]
+ ]
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ac0f2ccfb3..29619aeeb8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -122,12 +122,12 @@ class FederationServer(FederationBase):
# origins that we are currently processing a transaction from.
# a dict from origin to txn id.
- self._active_transactions = {} # type: Dict[str, str]
+ self._active_transactions: Dict[str, str] = {}
# We cache results for transaction with the same ID
- self._transaction_resp_cache = ResponseCache(
+ self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "fed_txn_handler", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, str]]
+ )
self.transaction_actions = TransactionActions(self.store)
@@ -135,12 +135,12 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
- self._state_resp_cache = ResponseCache(
- hs.get_clock(), "state_resp", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, Optional[str]]]
- self._state_ids_resp_cache = ResponseCache(
+ self._state_resp_cache: ResponseCache[
+ Tuple[str, Optional[str]]
+ ] = ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000)
+ self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "state_ids_resp", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, str]]
+ )
self._federation_metrics_domains = (
hs.config.federation.federation_metrics_domains
@@ -337,7 +337,7 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
- pdus_by_room = {} # type: Dict[str, List[EventBase]]
+ pdus_by_room: Dict[str, List[EventBase]] = {}
newest_pdu_ts = 0
@@ -516,9 +516,9 @@ class FederationServer(FederationBase):
self, room_id: str, event_id: Optional[str]
) -> Dict[str, list]:
if event_id:
- pdus = await self.handler.get_state_for_pdu(
+ pdus: Iterable[EventBase] = await self.handler.get_state_for_pdu(
room_id, event_id
- ) # type: Iterable[EventBase]
+ )
else:
pdus = (await self.state.get_current_state(room_id)).values()
@@ -562,8 +562,7 @@ class FederationServer(FederationBase):
raise IncompatibleRoomVersionError(room_version=room_version)
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
- time_now = self._clock.time_msec()
- return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
+ return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
async def on_invite_request(
self, origin: str, content: JsonDict, room_version_id: str
@@ -611,8 +610,7 @@ class FederationServer(FederationBase):
room_version = await self.store.get_room_version_id(room_id)
- time_now = self._clock.time_msec()
- return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
+ return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
async def on_send_leave_request(
self, origin: str, content: JsonDict, room_id: str
@@ -659,9 +657,8 @@ class FederationServer(FederationBase):
)
pdu = await self.handler.on_make_knock_request(origin, room_id, user_id)
- time_now = self._clock.time_msec()
return {
- "event": pdu.get_pdu_json(time_now),
+ "event": pdu.get_templated_pdu_json(),
"room_version": room_version.identifier,
}
@@ -791,7 +788,7 @@ class FederationServer(FederationBase):
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = await self.store.claim_e2e_one_time_keys(query)
- json_result = {} # type: Dict[str, Dict[str, dict]]
+ json_result: Dict[str, Dict[str, dict]] = {}
for user_id, device_keys in results.items():
for device_id, keys in device_keys.items():
for key_id, json_str in keys.items():
@@ -1119,17 +1116,13 @@ class FederationHandlerRegistry:
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
- self.edu_handlers = (
- {}
- ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
- self.query_handlers = (
- {}
- ) # type: Dict[str, Callable[[dict], Awaitable[JsonDict]]]
+ self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {}
+ self.query_handlers: Dict[str, Callable[[dict], Awaitable[JsonDict]]] = {}
# Map from type to instance names that we should route EDU handling to.
# We randomly choose one instance from the list to route to for each new
# EDU received.
- self._edu_type_to_instance = {} # type: Dict[str, List[str]]
+ self._edu_type_to_instance: Dict[str, List[str]] = {}
def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 65d76ea974..1fbf325fdc 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -71,34 +71,32 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# We may have multiple federation sender instances, so we need to track
# their positions separately.
self._sender_instances = hs.config.worker.federation_shard_config.instances
- self._sender_positions = {} # type: Dict[str, int]
+ self._sender_positions: Dict[str, int] = {}
# Pending presence map user_id -> UserPresenceState
- self.presence_map = {} # type: Dict[str, UserPresenceState]
+ self.presence_map: Dict[str, UserPresenceState] = {}
# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
- self.presence_destinations = (
- SortedDict()
- ) # type: SortedDict[int, Tuple[str, Iterable[str]]]
+ self.presence_destinations: SortedDict[
+ int, Tuple[str, Iterable[str]]
+ ] = SortedDict()
# (destination, key) -> EDU
- self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
+ self.keyed_edu: Dict[Tuple[str, tuple], Edu] = {}
# stream position -> (destination, key)
- self.keyed_edu_changed = (
- SortedDict()
- ) # type: SortedDict[int, Tuple[str, tuple]]
+ self.keyed_edu_changed: SortedDict[int, Tuple[str, tuple]] = SortedDict()
- self.edus = SortedDict() # type: SortedDict[int, Edu]
+ self.edus: SortedDict[int, Edu] = SortedDict()
# stream ID for the next entry into keyed_edu_changed/edus.
self.pos = 1
# map from stream ID to the time that stream entry was generated, so that we
# can clear out entries after a while
- self.pos_time = SortedDict() # type: SortedDict[int, int]
+ self.pos_time: SortedDict[int, int] = SortedDict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
@@ -291,7 +289,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# list of tuple(int, BaseFederationRow), where the first is the position
# of the federation stream.
- rows = [] # type: List[Tuple[int, BaseFederationRow]]
+ rows: List[Tuple[int, BaseFederationRow]] = []
# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
@@ -445,11 +443,11 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-_rowtypes = (
+_rowtypes: Tuple[Type[BaseFederationRow], ...] = (
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
-) # type: Tuple[Type[BaseFederationRow], ...]
+)
TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index deb40f4610..d980e0d986 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,9 +14,12 @@
import abc
import logging
+from collections import OrderedDict
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
+import attr
from prometheus_client import Counter
+from typing_extensions import Literal
from twisted.internet import defer
@@ -33,8 +36,12 @@ from synapse.metrics import (
event_processing_loop_room_count,
events_processed_counter,
)
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.util import Clock
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -137,6 +144,84 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
+@attr.s
+class _PresenceQueue:
+ """A queue of destinations that need to be woken up due to new presence
+ updates.
+
+ Staggers waking up of per destination queues to ensure that we don't attempt
+ to start TLS connections with many hosts all at once, leading to pinned CPU.
+ """
+
+ # The maximum duration in seconds between queuing up a destination and it
+ # being woken up.
+ _MAX_TIME_IN_QUEUE = 30.0
+
+ # The maximum duration in seconds between waking up consecutive destination
+ # queues.
+ _MAX_DELAY = 0.1
+
+ sender: "FederationSender" = attr.ib()
+ clock: Clock = attr.ib()
+ queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
+ processing: bool = attr.ib(default=False)
+
+ def add_to_queue(self, destination: str) -> None:
+ """Add a destination to the queue to be woken up."""
+
+ self.queue[destination] = None
+
+ if not self.processing:
+ self._handle()
+
+ @wrap_as_background_process("_PresenceQueue.handle")
+ async def _handle(self) -> None:
+ """Background process to drain the queue."""
+
+ if not self.queue:
+ return
+
+ assert not self.processing
+ self.processing = True
+
+ try:
+ # We start with a delay that should drain the queue quickly enough that
+ # we process all destinations in the queue in _MAX_TIME_IN_QUEUE
+ # seconds.
+ #
+ # We also add an upper bound to the delay, to gracefully handle the
+ # case where the queue only has a few entries in it.
+ current_sleep_seconds = min(
+ self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
+ )
+
+ while self.queue:
+ destination, _ = self.queue.popitem(last=False)
+
+ queue = self.sender._get_per_destination_queue(destination)
+
+ if not queue._new_data_to_send:
+ # The per destination queue has already been woken up.
+ continue
+
+ queue.attempt_new_transaction()
+
+ await self.clock.sleep(current_sleep_seconds)
+
+ if not self.queue:
+ break
+
+ # More destinations may have been added to the queue, so we may
+ # need to reduce the delay to ensure everything gets processed
+ # within _MAX_TIME_IN_QUEUE seconds.
+ current_sleep_seconds = min(
+ current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue)
+ )
+
+ finally:
+ self.processing = False
+
+
class FederationSender(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.hs = hs
@@ -148,14 +233,14 @@ class FederationSender(AbstractFederationSender):
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
- self._presence_router = None # type: Optional[PresenceRouter]
+ self._presence_router: Optional["PresenceRouter"] = None
self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
# map from destination to PerDestinationQueue
- self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
+ self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
@@ -192,9 +277,7 @@ class FederationSender(AbstractFederationSender):
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
- self._queues_awaiting_rr_flush_by_room = (
- {}
- ) # type: Dict[str, Set[PerDestinationQueue]]
+ self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
self._rr_txn_interval_per_room_ms = (
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
@@ -210,6 +293,8 @@ class FederationSender(AbstractFederationSender):
self._external_cache = hs.get_external_cache()
+ self._presence_queue = _PresenceQueue(self, self.clock)
+
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
@@ -265,7 +350,7 @@ class FederationSender(AbstractFederationSender):
if not event.internal_metadata.should_proactively_send():
return
- destinations = None # type: Optional[Set[str]]
+ destinations: Optional[Set[str]] = None
if not event.prev_event_ids():
# If there are no prev event IDs then the state is empty
# and so no remote servers in the room
@@ -331,7 +416,7 @@ class FederationSender(AbstractFederationSender):
for event in events:
await handle_event(event)
- events_by_room = {} # type: Dict[str, List[EventBase]]
+ events_by_room: Dict[str, List[EventBase]] = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@@ -519,7 +604,12 @@ class FederationSender(AbstractFederationSender):
self._instance_name, destination
):
continue
- self._get_per_destination_queue(destination).send_presence(states)
+
+ self._get_per_destination_queue(destination).send_presence(
+ states, start_loop=False
+ )
+
+ self._presence_queue.add_to_queue(destination)
def build_and_send_edu(
self,
@@ -628,7 +718,7 @@ class FederationSender(AbstractFederationSender):
In order to reduce load spikes, adds a delay between each destination.
"""
- last_processed = None # type: Optional[str]
+ last_processed: Optional[str] = None
while True:
destinations_to_wake = (
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3a2efd56ee..c11d1f6d31 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -105,34 +105,34 @@ class PerDestinationQueue:
# catch-up at startup.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
- self._catching_up = True # type: bool
+ self._catching_up: bool = True
# The stream_ordering of the most recent PDU that was discarded due to
# being in catch-up mode.
- self._catchup_last_skipped = 0 # type: int
+ self._catchup_last_skipped: int = 0
# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
- self._last_successful_stream_ordering = None # type: Optional[int]
+ self._last_successful_stream_ordering: Optional[int] = None
# a queue of pending PDUs
- self._pending_pdus = [] # type: List[EventBase]
+ self._pending_pdus: List[EventBase] = []
# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
- self._pending_edus = [] # type: List[Edu]
+ self._pending_edus: List[Edu] = []
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
- self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu]
+ self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {}
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
- self._pending_presence = {} # type: Dict[str, UserPresenceState]
+ self._pending_presence: Dict[str, UserPresenceState] = {}
# room_id -> receipt_type -> user_id -> receipt_dict
- self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]]
+ self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@@ -171,14 +171,24 @@ class PerDestinationQueue:
self.attempt_new_transaction()
- def send_presence(self, states: Iterable[UserPresenceState]) -> None:
- """Add presence updates to the queue. Start the transmission loop if necessary.
+ def send_presence(
+ self, states: Iterable[UserPresenceState], start_loop: bool = True
+ ) -> None:
+ """Add presence updates to the queue.
+
+ Args:
+ states: Presence updates to send
+ start_loop: Whether to start the transmission loop if not already
+ running.
Args:
states: presence to send
"""
self._pending_presence.update({state.user_id: state for state in states})
- self.attempt_new_transaction()
+ self._new_data_to_send = True
+
+ if start_loop:
+ self.attempt_new_transaction()
def queue_read_receipt(self, receipt: ReadReceipt) -> None:
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
@@ -243,7 +253,7 @@ class PerDestinationQueue:
)
async def _transaction_transmission_loop(self) -> None:
- pending_pdus = [] # type: List[EventBase]
+ pending_pdus: List[EventBase] = []
try:
self.transmission_loop_running = True
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index c9e7c57461..98b1bf77fd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -395,9 +395,9 @@ class TransportLayerClient:
# this uses MSC2197 (Search Filtering over Federation)
path = _create_v1_path("/publicRooms")
- data = {
+ data: Dict[str, Any] = {
"include_all_networks": "true" if include_all_networks else "false"
- } # type: Dict[str, Any]
+ }
if third_party_instance_id:
data["third_party_instance_id"] = third_party_instance_id
if limit:
@@ -423,9 +423,9 @@ class TransportLayerClient:
else:
path = _create_v1_path("/publicRooms")
- args = {
+ args: Dict[str, Any] = {
"include_all_networks": "true" if include_all_networks else "false"
- } # type: Dict[str, Any]
+ }
if third_party_instance_id:
args["third_party_instance_id"] = (third_party_instance_id,)
if limit:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d37d9565fc..2974d4d0cc 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1013,7 +1013,7 @@ class PublicRoomList(BaseFederationServlet):
if not self.allow_access:
raise FederationDeniedError(origin)
- limit = int(content.get("limit", 100)) # type: Optional[int]
+ limit: Optional[int] = int(content.get("limit", 100))
since_token = content.get("since", None)
search_filter = content.get("filter", None)
@@ -1095,7 +1095,9 @@ class FederationGroupsProfileServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1110,7 +1112,9 @@ class FederationGroupsProfileServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1131,7 +1135,9 @@ class FederationGroupsSummaryServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1152,7 +1158,9 @@ class FederationGroupsRoomsServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1174,7 +1182,9 @@ class FederationGroupsAddRoomsServlet(BaseGroupsServerServlet):
group_id: str,
room_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1192,7 +1202,9 @@ class FederationGroupsAddRoomsServlet(BaseGroupsServerServlet):
group_id: str,
room_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1220,7 +1232,9 @@ class FederationGroupsAddRoomsConfigServlet(BaseGroupsServerServlet):
room_id: str,
config_key: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1243,7 +1257,9 @@ class FederationGroupsUsersServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1264,7 +1280,9 @@ class FederationGroupsInvitedUsersServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1288,7 +1306,9 @@ class FederationGroupsInviteServlet(BaseGroupsServerServlet):
group_id: str,
user_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1354,7 +1374,9 @@ class FederationGroupsRemoveUserServlet(BaseGroupsServerServlet):
group_id: str,
user_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1487,7 +1509,9 @@ class FederationGroupsSummaryRoomsServlet(BaseGroupsServerServlet):
category_id: str,
room_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1523,7 +1547,9 @@ class FederationGroupsSummaryRoomsServlet(BaseGroupsServerServlet):
category_id: str,
room_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1549,7 +1575,9 @@ class FederationGroupsCategoriesServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1571,7 +1599,9 @@ class FederationGroupsCategoryServlet(BaseGroupsServerServlet):
group_id: str,
category_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1589,7 +1619,9 @@ class FederationGroupsCategoryServlet(BaseGroupsServerServlet):
group_id: str,
category_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1618,7 +1650,9 @@ class FederationGroupsCategoryServlet(BaseGroupsServerServlet):
group_id: str,
category_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1644,7 +1678,9 @@ class FederationGroupsRolesServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1666,7 +1702,9 @@ class FederationGroupsRoleServlet(BaseGroupsServerServlet):
group_id: str,
role_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1682,7 +1720,9 @@ class FederationGroupsRoleServlet(BaseGroupsServerServlet):
group_id: str,
role_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1713,7 +1753,9 @@ class FederationGroupsRoleServlet(BaseGroupsServerServlet):
group_id: str,
role_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1750,7 +1792,9 @@ class FederationGroupsSummaryUsersServlet(BaseGroupsServerServlet):
role_id: str,
user_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1784,7 +1828,9 @@ class FederationGroupsSummaryUsersServlet(BaseGroupsServerServlet):
role_id: str,
user_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1825,7 +1871,9 @@ class FederationGroupsSettingJoinPolicyServlet(BaseGroupsServerServlet):
query: Dict[bytes, List[bytes]],
group_id: str,
) -> Tuple[int, JsonDict]:
- requester_user_id = parse_string_from_args(query, "requester_user_id")
+ requester_user_id = parse_string_from_args(
+ query, "requester_user_id", required=True
+ )
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1943,7 +1991,7 @@ class RoomComplexityServlet(BaseFederationServlet):
return 200, complexity
-FEDERATION_SERVLET_CLASSES = (
+FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationSendServlet,
FederationEventServlet,
FederationStateV1Servlet,
@@ -1971,15 +2019,13 @@ FEDERATION_SERVLET_CLASSES = (
FederationSpaceSummaryServlet,
FederationV1SendKnockServlet,
FederationMakeKnockServlet,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+)
-OPENID_SERVLET_CLASSES = (
- OpenIdUserInfo,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+OPENID_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (OpenIdUserInfo,)
-ROOM_LIST_CLASSES = (PublicRoomList,) # type: Tuple[Type[PublicRoomList], ...]
+ROOM_LIST_CLASSES: Tuple[Type[PublicRoomList], ...] = (PublicRoomList,)
-GROUP_SERVER_SERVLET_CLASSES = (
+GROUP_SERVER_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationGroupsProfileServlet,
FederationGroupsSummaryServlet,
FederationGroupsRoomsServlet,
@@ -1998,19 +2044,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet,
FederationGroupsSettingJoinPolicyServlet,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+)
-GROUP_LOCAL_SERVLET_CLASSES = (
+GROUP_LOCAL_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationGroupsLocalInviteServlet,
FederationGroupsRemoveLocalUserServlet,
FederationGroupsBulkPublicisedServlet,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+)
-GROUP_ATTESTATION_SERVLET_CLASSES = (
+GROUP_ATTESTATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationGroupsRenewAttestaionServlet,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+)
DEFAULT_SERVLET_GROUPS = (
|