summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorBen Banfield-Zanin <benbz@matrix.org>2021-03-01 10:06:09 +0000
committerBen Banfield-Zanin <benbz@matrix.org>2021-03-01 10:06:09 +0000
commitb26bee9faf957643cd34c4146b250b0009be205d (patch)
treea7a7e29f30acb437d010bdf6116c0f2729f21a1b /synapse/federation
parentMerge remote-tracking branch 'origin/release-v1.26.0' into toml/keycloak_hints (diff)
parentFixup changelog (diff)
downloadsynapse-toml/keycloak_hints.tar.xz
Merge remote-tracking branch 'origin/release-v1.28.0' into toml/keycloak_hints github/toml/keycloak_hints toml/keycloak_hints
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py133
-rw-r--r--synapse/federation/federation_server.py21
-rw-r--r--synapse/federation/persistence.py6
-rw-r--r--synapse/federation/send_queue.py8
-rw-r--r--synapse/federation/sender/__init__.py60
-rw-r--r--synapse/federation/sender/per_destination_queue.py12
-rw-r--r--synapse/federation/sender/transaction_manager.py5
-rw-r--r--synapse/federation/transport/client.py86
-rw-r--r--synapse/federation/transport/server.py105
-rw-r--r--synapse/federation/units.py8
10 files changed, 241 insertions, 203 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index 302b2f69bc..bee81fc019 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -18,6 +18,7 @@ import copy import itertools import logging from typing import ( + TYPE_CHECKING, Any, Awaitable, Callable, @@ -26,7 +27,6 @@ from typing import ( List, Mapping, Optional, - Sequence, Tuple, TypeVar, Union, @@ -61,6 +61,9 @@ from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"]) @@ -80,10 +83,10 @@ class InvalidResponseError(RuntimeError): class FederationClient(FederationBase): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.pdu_destination_tried = {} + self.pdu_destination_tried = {} # type: Dict[str, Dict[str, int]] self._clock.looping_call(self._clear_tried_cache, 60 * 1000) self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() @@ -116,33 +119,32 @@ class FederationClient(FederationBase): self.pdu_destination_tried[event_id] = destination_dict @log_function - def make_query( + async def make_query( self, - destination, - query_type, - args, - retry_on_dns_fail=False, - ignore_backoff=False, - ): + destination: str, + query_type: str, + args: dict, + retry_on_dns_fail: bool = False, + ignore_backoff: bool = False, + ) -> JsonDict: """Sends a federation Query to a remote homeserver of the given type and arguments. Args: - destination (str): Domain name of the remote homeserver - query_type (str): Category of the query type; should match the + destination: Domain name of the remote homeserver + query_type: Category of the query type; should match the handler name used in register_query_handler(). - args (dict): Mapping of strings to strings containing the details + args: Mapping of strings to strings containing the details of the query request. - ignore_backoff (bool): true to ignore the historical backoff data + ignore_backoff: true to ignore the historical backoff data and try the request anyway. Returns: - a Awaitable which will eventually yield a JSON object from the - response + The JSON object from the response """ sent_queries_counter.labels(query_type).inc() - return self.transport_layer.make_query( + return await self.transport_layer.make_query( destination, query_type, args, @@ -151,42 +153,52 @@ class FederationClient(FederationBase): ) @log_function - def query_client_keys(self, destination, content, timeout): + async def query_client_keys( + self, destination: str, content: JsonDict, timeout: int + ) -> JsonDict: """Query device keys for a device hosted on a remote server. Args: - destination (str): Domain name of the remote homeserver - content (dict): The query content. + destination: Domain name of the remote homeserver + content: The query content. Returns: - an Awaitable which will eventually yield a JSON object from the - response + The JSON object from the response """ sent_queries_counter.labels("client_device_keys").inc() - return self.transport_layer.query_client_keys(destination, content, timeout) + return await self.transport_layer.query_client_keys( + destination, content, timeout + ) @log_function - def query_user_devices(self, destination, user_id, timeout=30000): + async def query_user_devices( + self, destination: str, user_id: str, timeout: int = 30000 + ) -> JsonDict: """Query the device keys for a list of user ids hosted on a remote server. """ sent_queries_counter.labels("user_devices").inc() - return self.transport_layer.query_user_devices(destination, user_id, timeout) + return await self.transport_layer.query_user_devices( + destination, user_id, timeout + ) @log_function - def claim_client_keys(self, destination, content, timeout): + async def claim_client_keys( + self, destination: str, content: JsonDict, timeout: int + ) -> JsonDict: """Claims one-time keys for a device hosted on a remote server. Args: - destination (str): Domain name of the remote homeserver - content (dict): The query content. + destination: Domain name of the remote homeserver + content: The query content. Returns: - an Awaitable which will eventually yield a JSON object from the - response + The JSON object from the response """ sent_queries_counter.labels("client_one_time_keys").inc() - return self.transport_layer.claim_client_keys(destination, content, timeout) + return await self.transport_layer.claim_client_keys( + destination, content, timeout + ) async def backfill( self, dest: str, room_id: str, limit: int, extremities: Iterable[str] @@ -195,10 +207,10 @@ class FederationClient(FederationBase): given destination server. Args: - dest (str): The remote homeserver to ask. - room_id (str): The room_id to backfill. - limit (int): The maximum number of events to return. - extremities (list): our current backwards extremities, to backfill from + dest: The remote homeserver to ask. + room_id: The room_id to backfill. + limit: The maximum number of events to return. + extremities: our current backwards extremities, to backfill from """ logger.debug("backfill extrem=%s", extremities) @@ -370,7 +382,7 @@ class FederationClient(FederationBase): for events that have failed their checks Returns: - Deferred : A list of PDUs that have valid signatures and hashes. + A list of PDUs that have valid signatures and hashes. """ deferreds = self._check_sigs_and_hashes(room_version, pdus) @@ -418,7 +430,9 @@ class FederationClient(FederationBase): else: return [p for p in valid_pdus if p] - async def get_event_auth(self, destination, room_id, event_id): + async def get_event_auth( + self, destination: str, room_id: str, event_id: str + ) -> List[EventBase]: res = await self.transport_layer.get_event_auth(destination, room_id, event_id) room_version = await self.store.get_room_version(room_id) @@ -700,18 +714,16 @@ class FederationClient(FederationBase): return await self._try_destination_list("send_join", destinations, send_request) - async def _do_send_join(self, destination: str, pdu: EventBase): + async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict: time_now = self._clock.time_msec() try: - content = await self.transport_layer.send_join_v2( + return await self.transport_layer.send_join_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content=pdu.get_pdu_json(time_now), ) - - return content except HttpResponseException as e: if e.code in [400, 404]: err = e.to_synapse_error() @@ -738,7 +750,11 @@ class FederationClient(FederationBase): return resp[1] async def send_invite( - self, destination: str, room_id: str, event_id: str, pdu: EventBase, + self, + destination: str, + room_id: str, + event_id: str, + pdu: EventBase, ) -> EventBase: room_version = await self.store.get_room_version(room_id) @@ -769,7 +785,7 @@ class FederationClient(FederationBase): time_now = self._clock.time_msec() try: - content = await self.transport_layer.send_invite_v2( + return await self.transport_layer.send_invite_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -779,7 +795,6 @@ class FederationClient(FederationBase): "invite_room_state": pdu.unsigned.get("invite_room_state", []), }, ) - return content except HttpResponseException as e: if e.code in [400, 404]: err = e.to_synapse_error() @@ -799,7 +814,7 @@ class FederationClient(FederationBase): "User's homeserver does not support this room version", Codes.UNSUPPORTED_ROOM_VERSION, ) - elif e.code == 403: + elif e.code in (403, 429): raise e.to_synapse_error() else: raise @@ -842,18 +857,16 @@ class FederationClient(FederationBase): "send_leave", destinations, send_request ) - async def _do_send_leave(self, destination, pdu): + async def _do_send_leave(self, destination: str, pdu: EventBase) -> JsonDict: time_now = self._clock.time_msec() try: - content = await self.transport_layer.send_leave_v2( + return await self.transport_layer.send_leave_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content=pdu.get_pdu_json(time_now), ) - - return content except HttpResponseException as e: if e.code in [400, 404]: err = e.to_synapse_error() @@ -879,7 +892,7 @@ class FederationClient(FederationBase): # content. return resp[1] - def get_public_rooms( + async def get_public_rooms( self, remote_server: str, limit: Optional[int] = None, @@ -887,7 +900,7 @@ class FederationClient(FederationBase): search_filter: Optional[Dict] = None, include_all_networks: bool = False, third_party_instance_id: Optional[str] = None, - ): + ) -> JsonDict: """Get the list of public rooms from a remote homeserver Args: @@ -901,8 +914,7 @@ class FederationClient(FederationBase): party instance Returns: - Awaitable[Dict[str, Any]]: The response from the remote server, or None if - `remote_server` is the same as the local server_name + The response from the remote server. Raises: HttpResponseException: There was an exception returned from the remote server @@ -910,7 +922,7 @@ class FederationClient(FederationBase): requests over federation """ - return self.transport_layer.get_public_rooms( + return await self.transport_layer.get_public_rooms( remote_server, limit, since_token, @@ -923,7 +935,7 @@ class FederationClient(FederationBase): self, destination: str, room_id: str, - earliest_events_ids: Sequence[str], + earliest_events_ids: Iterable[str], latest_events: Iterable[EventBase], limit: int, min_depth: int, @@ -974,7 +986,9 @@ class FederationClient(FederationBase): return signed_events - async def forward_third_party_invite(self, destinations, room_id, event_dict): + async def forward_third_party_invite( + self, destinations: Iterable[str], room_id: str, event_dict: JsonDict + ) -> None: for destination in destinations: if destination == self.server_name: continue @@ -983,7 +997,7 @@ class FederationClient(FederationBase): await self.transport_layer.exchange_third_party_invite( destination=destination, room_id=room_id, event_dict=event_dict ) - return None + return except CodeMessageException: raise except Exception as e: @@ -995,7 +1009,7 @@ class FederationClient(FederationBase): async def get_room_complexity( self, destination: str, room_id: str - ) -> Optional[dict]: + ) -> Optional[JsonDict]: """ Fetch the complexity of a remote room from another server. @@ -1008,10 +1022,9 @@ class FederationClient(FederationBase): could not fetch the complexity. """ try: - complexity = await self.transport_layer.get_room_complexity( + return await self.transport_layer.get_room_complexity( destination=destination, room_id=room_id ) - return complexity except CodeMessageException as e: # We didn't manage to get it -- probably a 404. We are okay if other # servers don't give it to us. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 171d25c945..8d4bb621e7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -85,7 +85,8 @@ received_queries_counter = Counter( ) pdu_process_time = Histogram( - "synapse_federation_server_pdu_process_time", "Time taken to process an event", + "synapse_federation_server_pdu_process_time", + "Time taken to process an event", ) @@ -204,7 +205,7 @@ class FederationServer(FederationBase): async def _handle_incoming_transaction( self, origin: str, transaction: Transaction, request_time: int ) -> Tuple[int, Dict[str, Any]]: - """ Process an incoming transaction and return the HTTP response + """Process an incoming transaction and return the HTTP response Args: origin: the server making the request @@ -373,8 +374,7 @@ class FederationServer(FederationBase): return pdu_results async def _handle_edus_in_txn(self, origin: str, transaction: Transaction): - """Process the EDUs in a received transaction. - """ + """Process the EDUs in a received transaction.""" async def _process_edu(edu_dict): received_edus_counter.inc() @@ -437,7 +437,10 @@ class FederationServer(FederationBase): raise AuthError(403, "Host not in room.") resp = await self._state_ids_resp_cache.wrap( - (room_id, event_id), self._on_state_ids_request_compute, room_id, event_id, + (room_id, event_id), + self._on_state_ids_request_compute, + room_id, + event_id, ) return 200, resp @@ -679,7 +682,7 @@ class FederationServer(FederationBase): ) async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None: - """ Process a PDU received in a federation /send/ transaction. + """Process a PDU received in a federation /send/ transaction. If the event is invalid, then this method throws a FederationError. (The error will then be logged and sent back to the sender (which @@ -906,13 +909,11 @@ class FederationHandlerRegistry: self.query_handlers[query_type] = handler def register_instance_for_edu(self, edu_type: str, instance_name: str): - """Register that the EDU handler is on a different instance than master. - """ + """Register that the EDU handler is on a different instance than master.""" self._edu_type_to_instance[edu_type] = [instance_name] def register_instances_for_edu(self, edu_type: str, instance_names: List[str]): - """Register that the EDU handler is on multiple instances. - """ + """Register that the EDU handler is on multiple instances.""" self._edu_type_to_instance[edu_type] = instance_names async def on_edu(self, edu_type: str, origin: str, content: dict): diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 079e2b2fe0..ce5fc758f0 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py
@@ -30,8 +30,7 @@ logger = logging.getLogger(__name__) class TransactionActions: - """ Defines persistence actions that relate to handling Transactions. - """ + """Defines persistence actions that relate to handling Transactions.""" def __init__(self, datastore): self.store = datastore @@ -57,8 +56,7 @@ class TransactionActions: async def set_response( self, origin: str, transaction: Transaction, code: int, response: JsonDict ) -> None: - """Persist how we responded to a transaction. - """ + """Persist how we responded to a transaction.""" transaction_id = transaction.transaction_id # type: ignore if not transaction_id: raise RuntimeError("Cannot persist a transaction with no transaction_id") diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 5f1bf492c1..3e993b428b 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -468,8 +468,7 @@ class KeyedEduRow( class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu - """Streams EDUs that don't have keys. See KeyedEduRow - """ + """Streams EDUs that don't have keys. See KeyedEduRow""" TypeId = "e" @@ -519,7 +518,10 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence=[], presence_destinations=[], keyed_edus={}, edus={}, + presence=[], + presence_destinations=[], + keyed_edus={}, + edus={}, ) # Parse the rows in the stream and add to the buffer diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 604cfd1935..97fc4d0a82 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -142,6 +142,8 @@ class FederationSender: self._wake_destinations_needing_catchup, ) + self._external_cache = hs.get_external_cache() + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination @@ -197,22 +199,40 @@ class FederationSender: if not event.internal_metadata.should_proactively_send(): return - try: - # Get the state from before the event. - # We need to make sure that this is the state from before - # the event and not from after it. - # Otherwise if the last member on a server in a room is - # banned then it won't receive the event because it won't - # be in the room after the ban. - destinations = await self.state.get_hosts_in_room_at_events( - event.room_id, event_ids=event.prev_event_ids() - ) - except Exception: - logger.exception( - "Failed to calculate hosts in room for event: %s", - event.event_id, + destinations = None # type: Optional[Set[str]] + if not event.prev_event_ids(): + # If there are no prev event IDs then the state is empty + # and so no remote servers in the room + destinations = set() + else: + # We check the external cache for the destinations, which is + # stored per state group. + + sg = await self._external_cache.get( + "event_to_prev_state_group", event.event_id ) - return + if sg: + destinations = await self._external_cache.get( + "get_joined_hosts", str(sg) + ) + + if destinations is None: + try: + # Get the state from before the event. + # We need to make sure that this is the state from before + # the event and not from after it. + # Otherwise if the last member on a server in a room is + # banned then it won't receive the event because it won't + # be in the room after the ban. + destinations = await self.state.get_hosts_in_room_at_events( + event.room_id, event_ids=event.prev_event_ids() + ) + except Exception: + logger.exception( + "Failed to calculate hosts in room for event: %s", + event.event_id, + ) + return destinations = { d @@ -308,7 +328,9 @@ class FederationSender: # to allow us to perform catch-up later on if the remote is unreachable # for a while. await self.store.store_destination_rooms_entries( - destinations, pdu.room_id, pdu.internal_metadata.stream_ordering, + destinations, + pdu.room_id, + pdu.internal_metadata.stream_ordering, ) for destination in destinations: @@ -455,7 +477,7 @@ class FederationSender: self, states: List[UserPresenceState], destinations: List[str] ) -> None: """Send the given presence states to the given destinations. - destinations (list[str]) + destinations (list[str]) """ if not states or not self.hs.config.use_presence: @@ -596,8 +618,8 @@ class FederationSender: last_processed = None # type: Optional[str] while True: - destinations_to_wake = await self.store.get_catch_up_outstanding_destinations( - last_processed + destinations_to_wake = ( + await self.store.get_catch_up_outstanding_destinations(last_processed) ) if not destinations_to_wake: diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index db8e456fe8..deb519f3ef 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -85,7 +85,8 @@ class PerDestinationQueue: # processing. We have a guard in `attempt_new_transaction` that # ensure we don't start sending stuff. logger.error( - "Create a per destination queue for %s on wrong worker", destination, + "Create a per destination queue for %s on wrong worker", + destination, ) self._should_send_on_this_instance = False @@ -440,8 +441,10 @@ class PerDestinationQueue: if first_catch_up_check: # first catchup so get last_successful_stream_ordering from database - self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering( - self._destination + self._last_successful_stream_ordering = ( + await self._store.get_destination_last_successful_stream_ordering( + self._destination + ) ) if self._last_successful_stream_ordering is None: @@ -457,7 +460,8 @@ class PerDestinationQueue: # get at most 50 catchup room/PDUs while True: event_ids = await self._store.get_catch_up_room_event_ids( - self._destination, self._last_successful_stream_ordering, + self._destination, + self._last_successful_stream_ordering, ) if not event_ids: diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 3e07f925e0..763aff296c 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -65,7 +65,10 @@ class TransactionManager: @measure_func("_send_new_transaction") async def send_new_transaction( - self, destination: str, pdus: List[EventBase], edus: List[Edu], + self, + destination: str, + pdus: List[EventBase], + edus: List[Edu], ) -> bool: """ Args: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index abe9168c78..10c4747f97 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -39,7 +39,7 @@ class TransportLayerClient: @log_function def get_room_state_ids(self, destination, room_id, event_id): - """ Requests all state for a given room from the given server at the + """Requests all state for a given room from the given server at the given event. Returns the state's event_id's Args: @@ -63,7 +63,7 @@ class TransportLayerClient: @log_function def get_event(self, destination, event_id, timeout=None): - """ Requests the pdu with give id and origin from the given server. + """Requests the pdu with give id and origin from the given server. Args: destination (str): The host name of the remote homeserver we want @@ -84,7 +84,7 @@ class TransportLayerClient: @log_function def backfill(self, destination, room_id, event_tuples, limit): - """ Requests `limit` previous PDUs in a given context before list of + """Requests `limit` previous PDUs in a given context before list of PDUs. Args: @@ -118,7 +118,7 @@ class TransportLayerClient: @log_function async def send_transaction(self, transaction, json_data_callback=None): - """ Sends the given Transaction to its destination + """Sends the given Transaction to its destination Args: transaction (Transaction) @@ -551,8 +551,7 @@ class TransportLayerClient: @log_function def get_group_profile(self, destination, group_id, requester_user_id): - """Get a group profile - """ + """Get a group profile""" path = _create_v1_path("/groups/%s/profile", group_id) return self.client.get_json( @@ -584,8 +583,7 @@ class TransportLayerClient: @log_function def get_group_summary(self, destination, group_id, requester_user_id): - """Get a group summary - """ + """Get a group summary""" path = _create_v1_path("/groups/%s/summary", group_id) return self.client.get_json( @@ -597,8 +595,7 @@ class TransportLayerClient: @log_function def get_rooms_in_group(self, destination, group_id, requester_user_id): - """Get all rooms in a group - """ + """Get all rooms in a group""" path = _create_v1_path("/groups/%s/rooms", group_id) return self.client.get_json( @@ -611,8 +608,7 @@ class TransportLayerClient: def add_room_to_group( self, destination, group_id, requester_user_id, room_id, content ): - """Add a room to a group - """ + """Add a room to a group""" path = _create_v1_path("/groups/%s/room/%s", group_id, room_id) return self.client.post_json( @@ -626,8 +622,7 @@ class TransportLayerClient: def update_room_in_group( self, destination, group_id, requester_user_id, room_id, config_key, content ): - """Update room in group - """ + """Update room in group""" path = _create_v1_path( "/groups/%s/room/%s/config/%s", group_id, room_id, config_key ) @@ -641,8 +636,7 @@ class TransportLayerClient: ) def remove_room_from_group(self, destination, group_id, requester_user_id, room_id): - """Remove a room from a group - """ + """Remove a room from a group""" path = _create_v1_path("/groups/%s/room/%s", group_id, room_id) return self.client.delete_json( @@ -654,8 +648,7 @@ class TransportLayerClient: @log_function def get_users_in_group(self, destination, group_id, requester_user_id): - """Get users in a group - """ + """Get users in a group""" path = _create_v1_path("/groups/%s/users", group_id) return self.client.get_json( @@ -667,8 +660,7 @@ class TransportLayerClient: @log_function def get_invited_users_in_group(self, destination, group_id, requester_user_id): - """Get users that have been invited to a group - """ + """Get users that have been invited to a group""" path = _create_v1_path("/groups/%s/invited_users", group_id) return self.client.get_json( @@ -680,8 +672,7 @@ class TransportLayerClient: @log_function def accept_group_invite(self, destination, group_id, user_id, content): - """Accept a group invite - """ + """Accept a group invite""" path = _create_v1_path("/groups/%s/users/%s/accept_invite", group_id, user_id) return self.client.post_json( @@ -690,8 +681,7 @@ class TransportLayerClient: @log_function def join_group(self, destination, group_id, user_id, content): - """Attempts to join a group - """ + """Attempts to join a group""" path = _create_v1_path("/groups/%s/users/%s/join", group_id, user_id) return self.client.post_json( @@ -702,8 +692,7 @@ class TransportLayerClient: def invite_to_group( self, destination, group_id, user_id, requester_user_id, content ): - """Invite a user to a group - """ + """Invite a user to a group""" path = _create_v1_path("/groups/%s/users/%s/invite", group_id, user_id) return self.client.post_json( @@ -730,8 +719,7 @@ class TransportLayerClient: def remove_user_from_group( self, destination, group_id, requester_user_id, user_id, content ): - """Remove a user from a group - """ + """Remove a user from a group""" path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id) return self.client.post_json( @@ -772,8 +760,7 @@ class TransportLayerClient: def update_group_summary_room( self, destination, group_id, user_id, room_id, category_id, content ): - """Update a room entry in a group summary - """ + """Update a room entry in a group summary""" if category_id: path = _create_v1_path( "/groups/%s/summary/categories/%s/rooms/%s", @@ -796,8 +783,7 @@ class TransportLayerClient: def delete_group_summary_room( self, destination, group_id, user_id, room_id, category_id ): - """Delete a room entry in a group summary - """ + """Delete a room entry in a group summary""" if category_id: path = _create_v1_path( "/groups/%s/summary/categories/%s/rooms/%s", @@ -817,8 +803,7 @@ class TransportLayerClient: @log_function def get_group_categories(self, destination, group_id, requester_user_id): - """Get all categories in a group - """ + """Get all categories in a group""" path = _create_v1_path("/groups/%s/categories", group_id) return self.client.get_json( @@ -830,8 +815,7 @@ class TransportLayerClient: @log_function def get_group_category(self, destination, group_id, requester_user_id, category_id): - """Get category info in a group - """ + """Get category info in a group""" path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id) return self.client.get_json( @@ -845,8 +829,7 @@ class TransportLayerClient: def update_group_category( self, destination, group_id, requester_user_id, category_id, content ): - """Update a category in a group - """ + """Update a category in a group""" path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id) return self.client.post_json( @@ -861,8 +844,7 @@ class TransportLayerClient: def delete_group_category( self, destination, group_id, requester_user_id, category_id ): - """Delete a category in a group - """ + """Delete a category in a group""" path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id) return self.client.delete_json( @@ -874,8 +856,7 @@ class TransportLayerClient: @log_function def get_group_roles(self, destination, group_id, requester_user_id): - """Get all roles in a group - """ + """Get all roles in a group""" path = _create_v1_path("/groups/%s/roles", group_id) return self.client.get_json( @@ -887,8 +868,7 @@ class TransportLayerClient: @log_function def get_group_role(self, destination, group_id, requester_user_id, role_id): - """Get a roles info - """ + """Get a roles info""" path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id) return self.client.get_json( @@ -902,8 +882,7 @@ class TransportLayerClient: def update_group_role( self, destination, group_id, requester_user_id, role_id, content ): - """Update a role in a group - """ + """Update a role in a group""" path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id) return self.client.post_json( @@ -916,8 +895,7 @@ class TransportLayerClient: @log_function def delete_group_role(self, destination, group_id, requester_user_id, role_id): - """Delete a role in a group - """ + """Delete a role in a group""" path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id) return self.client.delete_json( @@ -931,8 +909,7 @@ class TransportLayerClient: def update_group_summary_user( self, destination, group_id, requester_user_id, user_id, role_id, content ): - """Update a users entry in a group - """ + """Update a users entry in a group""" if role_id: path = _create_v1_path( "/groups/%s/summary/roles/%s/users/%s", group_id, role_id, user_id @@ -950,8 +927,7 @@ class TransportLayerClient: @log_function def set_group_join_policy(self, destination, group_id, requester_user_id, content): - """Sets the join policy for a group - """ + """Sets the join policy for a group""" path = _create_v1_path("/groups/%s/settings/m.join_policy", group_id) return self.client.put_json( @@ -966,8 +942,7 @@ class TransportLayerClient: def delete_group_summary_user( self, destination, group_id, requester_user_id, user_id, role_id ): - """Delete a users entry in a group - """ + """Delete a users entry in a group""" if role_id: path = _create_v1_path( "/groups/%s/summary/roles/%s/users/%s", group_id, role_id, user_id @@ -983,8 +958,7 @@ class TransportLayerClient: ) def bulk_get_publicised_groups(self, destination, user_ids): - """Get the groups a list of users are publicising - """ + """Get the groups a list of users are publicising""" path = _create_v1_path("/get_groups_publicised") diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 95c64510a9..cce83704d4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -21,6 +21,7 @@ import re from typing import Optional, Tuple, Type import synapse +from synapse.api.constants import MAX_GROUP_CATEGORYID_LENGTH, MAX_GROUP_ROLEID_LENGTH from synapse.api.errors import Codes, FederationDeniedError, SynapseError from synapse.api.room_versions import RoomVersions from synapse.api.urls import ( @@ -364,7 +365,10 @@ class BaseFederationServlet: continue server.register_paths( - method, (pattern,), self._wrap(code), self.__class__.__name__, + method, + (pattern,), + self._wrap(code), + self.__class__.__name__, ) @@ -381,7 +385,7 @@ class FederationSendServlet(BaseFederationServlet): # This is when someone is trying to send us a bunch of data. async def on_PUT(self, origin, content, query, transaction_id): - """ Called on PUT /send/<transaction_id>/ + """Called on PUT /send/<transaction_id>/ Args: request (twisted.web.http.Request): The HTTP request. @@ -855,8 +859,7 @@ class FederationVersionServlet(BaseFederationServlet): class FederationGroupsProfileServlet(BaseFederationServlet): - """Get/set the basic profile of a group on behalf of a user - """ + """Get/set the basic profile of a group on behalf of a user""" PATH = "/groups/(?P<group_id>[^/]*)/profile" @@ -895,8 +898,7 @@ class FederationGroupsSummaryServlet(BaseFederationServlet): class FederationGroupsRoomsServlet(BaseFederationServlet): - """Get the rooms in a group on behalf of a user - """ + """Get the rooms in a group on behalf of a user""" PATH = "/groups/(?P<group_id>[^/]*)/rooms" @@ -911,8 +913,7 @@ class FederationGroupsRoomsServlet(BaseFederationServlet): class FederationGroupsAddRoomsServlet(BaseFederationServlet): - """Add/remove room from group - """ + """Add/remove room from group""" PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)" @@ -940,8 +941,7 @@ class FederationGroupsAddRoomsServlet(BaseFederationServlet): class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): - """Update room config in group - """ + """Update room config in group""" PATH = ( "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)" @@ -961,8 +961,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): class FederationGroupsUsersServlet(BaseFederationServlet): - """Get the users in a group on behalf of a user - """ + """Get the users in a group on behalf of a user""" PATH = "/groups/(?P<group_id>[^/]*)/users" @@ -977,8 +976,7 @@ class FederationGroupsUsersServlet(BaseFederationServlet): class FederationGroupsInvitedUsersServlet(BaseFederationServlet): - """Get the users that have been invited to a group - """ + """Get the users that have been invited to a group""" PATH = "/groups/(?P<group_id>[^/]*)/invited_users" @@ -995,8 +993,7 @@ class FederationGroupsInvitedUsersServlet(BaseFederationServlet): class FederationGroupsInviteServlet(BaseFederationServlet): - """Ask a group server to invite someone to the group - """ + """Ask a group server to invite someone to the group""" PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite" @@ -1013,8 +1010,7 @@ class FederationGroupsInviteServlet(BaseFederationServlet): class FederationGroupsAcceptInviteServlet(BaseFederationServlet): - """Accept an invitation from the group server - """ + """Accept an invitation from the group server""" PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite" @@ -1028,8 +1024,7 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet): class FederationGroupsJoinServlet(BaseFederationServlet): - """Attempt to join a group - """ + """Attempt to join a group""" PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join" @@ -1043,8 +1038,7 @@ class FederationGroupsJoinServlet(BaseFederationServlet): class FederationGroupsRemoveUserServlet(BaseFederationServlet): - """Leave or kick a user from the group - """ + """Leave or kick a user from the group""" PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove" @@ -1061,8 +1055,7 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet): class FederationGroupsLocalInviteServlet(BaseFederationServlet): - """A group server has invited a local user - """ + """A group server has invited a local user""" PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite" @@ -1076,8 +1069,7 @@ class FederationGroupsLocalInviteServlet(BaseFederationServlet): class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): - """A group server has removed a local user - """ + """A group server has removed a local user""" PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove" @@ -1093,8 +1085,7 @@ class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): - """A group or user's server renews their attestation - """ + """A group or user's server renews their attestation""" PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)" @@ -1128,7 +1119,17 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): raise SynapseError(403, "requester_user_id doesn't match origin") if category_id == "": - raise SynapseError(400, "category_id cannot be empty string") + raise SynapseError( + 400, "category_id cannot be empty string", Codes.INVALID_PARAM + ) + + if len(category_id) > MAX_GROUP_CATEGORYID_LENGTH: + raise SynapseError( + 400, + "category_id may not be longer than %s characters" + % (MAX_GROUP_CATEGORYID_LENGTH,), + Codes.INVALID_PARAM, + ) resp = await self.handler.update_group_summary_room( group_id, @@ -1156,8 +1157,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): class FederationGroupsCategoriesServlet(BaseFederationServlet): - """Get all categories for a group - """ + """Get all categories for a group""" PATH = "/groups/(?P<group_id>[^/]*)/categories/?" @@ -1172,8 +1172,7 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet): class FederationGroupsCategoryServlet(BaseFederationServlet): - """Add/remove/get a category in a group - """ + """Add/remove/get a category in a group""" PATH = "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)" @@ -1196,6 +1195,14 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): if category_id == "": raise SynapseError(400, "category_id cannot be empty string") + if len(category_id) > MAX_GROUP_CATEGORYID_LENGTH: + raise SynapseError( + 400, + "category_id may not be longer than %s characters" + % (MAX_GROUP_CATEGORYID_LENGTH,), + Codes.INVALID_PARAM, + ) + resp = await self.handler.upsert_group_category( group_id, requester_user_id, category_id, content ) @@ -1218,8 +1225,7 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): class FederationGroupsRolesServlet(BaseFederationServlet): - """Get roles in a group - """ + """Get roles in a group""" PATH = "/groups/(?P<group_id>[^/]*)/roles/?" @@ -1234,8 +1240,7 @@ class FederationGroupsRolesServlet(BaseFederationServlet): class FederationGroupsRoleServlet(BaseFederationServlet): - """Add/remove/get a role in a group - """ + """Add/remove/get a role in a group""" PATH = "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)" @@ -1254,7 +1259,17 @@ class FederationGroupsRoleServlet(BaseFederationServlet): raise SynapseError(403, "requester_user_id doesn't match origin") if role_id == "": - raise SynapseError(400, "role_id cannot be empty string") + raise SynapseError( + 400, "role_id cannot be empty string", Codes.INVALID_PARAM + ) + + if len(role_id) > MAX_GROUP_ROLEID_LENGTH: + raise SynapseError( + 400, + "role_id may not be longer than %s characters" + % (MAX_GROUP_ROLEID_LENGTH,), + Codes.INVALID_PARAM, + ) resp = await self.handler.update_group_role( group_id, requester_user_id, role_id, content @@ -1299,6 +1314,14 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): if role_id == "": raise SynapseError(400, "role_id cannot be empty string") + if len(role_id) > MAX_GROUP_ROLEID_LENGTH: + raise SynapseError( + 400, + "role_id may not be longer than %s characters" + % (MAX_GROUP_ROLEID_LENGTH,), + Codes.INVALID_PARAM, + ) + resp = await self.handler.update_group_summary_user( group_id, requester_user_id, @@ -1325,8 +1348,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): - """Get roles in a group - """ + """Get roles in a group""" PATH = "/get_groups_publicised" @@ -1339,8 +1361,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): - """Sets whether a group is joinable without an invite or knock - """ + """Sets whether a group is joinable without an invite or knock""" PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy" diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 64d98fc8f6..b662c42621 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) @attr.s(slots=True) class Edu(JsonEncodedObject): - """ An Edu represents a piece of data sent from one homeserver to another. + """An Edu represents a piece of data sent from one homeserver to another. In comparison to Pdus, Edus are not persisted for a long time on disk, are not meaningful beyond a given pair of homeservers, and don't have an @@ -63,7 +63,7 @@ class Edu(JsonEncodedObject): class Transaction(JsonEncodedObject): - """ A transaction is a list of Pdus and Edus to be sent to a remote home + """A transaction is a list of Pdus and Edus to be sent to a remote home server with some extra metadata. Example transaction:: @@ -99,7 +99,7 @@ class Transaction(JsonEncodedObject): ] def __init__(self, transaction_id=None, pdus=[], **kwargs): - """ If we include a list of pdus then we decode then as PDU's + """If we include a list of pdus then we decode then as PDU's automatically. """ @@ -111,7 +111,7 @@ class Transaction(JsonEncodedObject): @staticmethod def create_new(pdus, **kwargs): - """ Used to create a new transaction. Will auto fill out + """Used to create a new transaction. Will auto fill out transaction_id and origin_server_ts keys. """ if "origin_server_ts" not in kwargs: