diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 302b2f69bc..bee81fc019 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -18,6 +18,7 @@ import copy
import itertools
import logging
from typing import (
+ TYPE_CHECKING,
Any,
Awaitable,
Callable,
@@ -26,7 +27,6 @@ from typing import (
List,
Mapping,
Optional,
- Sequence,
Tuple,
TypeVar,
Union,
@@ -61,6 +61,9 @@ from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
logger = logging.getLogger(__name__)
sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
@@ -80,10 +83,10 @@ class InvalidResponseError(RuntimeError):
class FederationClient(FederationBase):
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
- self.pdu_destination_tried = {}
+ self.pdu_destination_tried = {} # type: Dict[str, Dict[str, int]]
self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
@@ -116,33 +119,32 @@ class FederationClient(FederationBase):
self.pdu_destination_tried[event_id] = destination_dict
@log_function
- def make_query(
+ async def make_query(
self,
- destination,
- query_type,
- args,
- retry_on_dns_fail=False,
- ignore_backoff=False,
- ):
+ destination: str,
+ query_type: str,
+ args: dict,
+ retry_on_dns_fail: bool = False,
+ ignore_backoff: bool = False,
+ ) -> JsonDict:
"""Sends a federation Query to a remote homeserver of the given type
and arguments.
Args:
- destination (str): Domain name of the remote homeserver
- query_type (str): Category of the query type; should match the
+ destination: Domain name of the remote homeserver
+ query_type: Category of the query type; should match the
handler name used in register_query_handler().
- args (dict): Mapping of strings to strings containing the details
+ args: Mapping of strings to strings containing the details
of the query request.
- ignore_backoff (bool): true to ignore the historical backoff data
+ ignore_backoff: true to ignore the historical backoff data
and try the request anyway.
Returns:
- a Awaitable which will eventually yield a JSON object from the
- response
+ The JSON object from the response
"""
sent_queries_counter.labels(query_type).inc()
- return self.transport_layer.make_query(
+ return await self.transport_layer.make_query(
destination,
query_type,
args,
@@ -151,42 +153,52 @@ class FederationClient(FederationBase):
)
@log_function
- def query_client_keys(self, destination, content, timeout):
+ async def query_client_keys(
+ self, destination: str, content: JsonDict, timeout: int
+ ) -> JsonDict:
"""Query device keys for a device hosted on a remote server.
Args:
- destination (str): Domain name of the remote homeserver
- content (dict): The query content.
+ destination: Domain name of the remote homeserver
+ content: The query content.
Returns:
- an Awaitable which will eventually yield a JSON object from the
- response
+ The JSON object from the response
"""
sent_queries_counter.labels("client_device_keys").inc()
- return self.transport_layer.query_client_keys(destination, content, timeout)
+ return await self.transport_layer.query_client_keys(
+ destination, content, timeout
+ )
@log_function
- def query_user_devices(self, destination, user_id, timeout=30000):
+ async def query_user_devices(
+ self, destination: str, user_id: str, timeout: int = 30000
+ ) -> JsonDict:
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
sent_queries_counter.labels("user_devices").inc()
- return self.transport_layer.query_user_devices(destination, user_id, timeout)
+ return await self.transport_layer.query_user_devices(
+ destination, user_id, timeout
+ )
@log_function
- def claim_client_keys(self, destination, content, timeout):
+ async def claim_client_keys(
+ self, destination: str, content: JsonDict, timeout: int
+ ) -> JsonDict:
"""Claims one-time keys for a device hosted on a remote server.
Args:
- destination (str): Domain name of the remote homeserver
- content (dict): The query content.
+ destination: Domain name of the remote homeserver
+ content: The query content.
Returns:
- an Awaitable which will eventually yield a JSON object from the
- response
+ The JSON object from the response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
- return self.transport_layer.claim_client_keys(destination, content, timeout)
+ return await self.transport_layer.claim_client_keys(
+ destination, content, timeout
+ )
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
@@ -195,10 +207,10 @@ class FederationClient(FederationBase):
given destination server.
Args:
- dest (str): The remote homeserver to ask.
- room_id (str): The room_id to backfill.
- limit (int): The maximum number of events to return.
- extremities (list): our current backwards extremities, to backfill from
+ dest: The remote homeserver to ask.
+ room_id: The room_id to backfill.
+ limit: The maximum number of events to return.
+ extremities: our current backwards extremities, to backfill from
"""
logger.debug("backfill extrem=%s", extremities)
@@ -370,7 +382,7 @@ class FederationClient(FederationBase):
for events that have failed their checks
Returns:
- Deferred : A list of PDUs that have valid signatures and hashes.
+ A list of PDUs that have valid signatures and hashes.
"""
deferreds = self._check_sigs_and_hashes(room_version, pdus)
@@ -418,7 +430,9 @@ class FederationClient(FederationBase):
else:
return [p for p in valid_pdus if p]
- async def get_event_auth(self, destination, room_id, event_id):
+ async def get_event_auth(
+ self, destination: str, room_id: str, event_id: str
+ ) -> List[EventBase]:
res = await self.transport_layer.get_event_auth(destination, room_id, event_id)
room_version = await self.store.get_room_version(room_id)
@@ -700,18 +714,16 @@ class FederationClient(FederationBase):
return await self._try_destination_list("send_join", destinations, send_request)
- async def _do_send_join(self, destination: str, pdu: EventBase):
+ async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict:
time_now = self._clock.time_msec()
try:
- content = await self.transport_layer.send_join_v2(
+ return await self.transport_layer.send_join_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
-
- return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
@@ -738,7 +750,11 @@ class FederationClient(FederationBase):
return resp[1]
async def send_invite(
- self, destination: str, room_id: str, event_id: str, pdu: EventBase,
+ self,
+ destination: str,
+ room_id: str,
+ event_id: str,
+ pdu: EventBase,
) -> EventBase:
room_version = await self.store.get_room_version(room_id)
@@ -769,7 +785,7 @@ class FederationClient(FederationBase):
time_now = self._clock.time_msec()
try:
- content = await self.transport_layer.send_invite_v2(
+ return await self.transport_layer.send_invite_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@@ -779,7 +795,6 @@ class FederationClient(FederationBase):
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
},
)
- return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
@@ -799,7 +814,7 @@ class FederationClient(FederationBase):
"User's homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
- elif e.code == 403:
+ elif e.code in (403, 429):
raise e.to_synapse_error()
else:
raise
@@ -842,18 +857,16 @@ class FederationClient(FederationBase):
"send_leave", destinations, send_request
)
- async def _do_send_leave(self, destination, pdu):
+ async def _do_send_leave(self, destination: str, pdu: EventBase) -> JsonDict:
time_now = self._clock.time_msec()
try:
- content = await self.transport_layer.send_leave_v2(
+ return await self.transport_layer.send_leave_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
-
- return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
@@ -879,7 +892,7 @@ class FederationClient(FederationBase):
# content.
return resp[1]
- def get_public_rooms(
+ async def get_public_rooms(
self,
remote_server: str,
limit: Optional[int] = None,
@@ -887,7 +900,7 @@ class FederationClient(FederationBase):
search_filter: Optional[Dict] = None,
include_all_networks: bool = False,
third_party_instance_id: Optional[str] = None,
- ):
+ ) -> JsonDict:
"""Get the list of public rooms from a remote homeserver
Args:
@@ -901,8 +914,7 @@ class FederationClient(FederationBase):
party instance
Returns:
- Awaitable[Dict[str, Any]]: The response from the remote server, or None if
- `remote_server` is the same as the local server_name
+ The response from the remote server.
Raises:
HttpResponseException: There was an exception returned from the remote server
@@ -910,7 +922,7 @@ class FederationClient(FederationBase):
requests over federation
"""
- return self.transport_layer.get_public_rooms(
+ return await self.transport_layer.get_public_rooms(
remote_server,
limit,
since_token,
@@ -923,7 +935,7 @@ class FederationClient(FederationBase):
self,
destination: str,
room_id: str,
- earliest_events_ids: Sequence[str],
+ earliest_events_ids: Iterable[str],
latest_events: Iterable[EventBase],
limit: int,
min_depth: int,
@@ -974,7 +986,9 @@ class FederationClient(FederationBase):
return signed_events
- async def forward_third_party_invite(self, destinations, room_id, event_dict):
+ async def forward_third_party_invite(
+ self, destinations: Iterable[str], room_id: str, event_dict: JsonDict
+ ) -> None:
for destination in destinations:
if destination == self.server_name:
continue
@@ -983,7 +997,7 @@ class FederationClient(FederationBase):
await self.transport_layer.exchange_third_party_invite(
destination=destination, room_id=room_id, event_dict=event_dict
)
- return None
+ return
except CodeMessageException:
raise
except Exception as e:
@@ -995,7 +1009,7 @@ class FederationClient(FederationBase):
async def get_room_complexity(
self, destination: str, room_id: str
- ) -> Optional[dict]:
+ ) -> Optional[JsonDict]:
"""
Fetch the complexity of a remote room from another server.
@@ -1008,10 +1022,9 @@ class FederationClient(FederationBase):
could not fetch the complexity.
"""
try:
- complexity = await self.transport_layer.get_room_complexity(
+ return await self.transport_layer.get_room_complexity(
destination=destination, room_id=room_id
)
- return complexity
except CodeMessageException as e:
# We didn't manage to get it -- probably a 404. We are okay if other
# servers don't give it to us.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 171d25c945..8d4bb621e7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -85,7 +85,8 @@ received_queries_counter = Counter(
)
pdu_process_time = Histogram(
- "synapse_federation_server_pdu_process_time", "Time taken to process an event",
+ "synapse_federation_server_pdu_process_time",
+ "Time taken to process an event",
)
@@ -204,7 +205,7 @@ class FederationServer(FederationBase):
async def _handle_incoming_transaction(
self, origin: str, transaction: Transaction, request_time: int
) -> Tuple[int, Dict[str, Any]]:
- """ Process an incoming transaction and return the HTTP response
+ """Process an incoming transaction and return the HTTP response
Args:
origin: the server making the request
@@ -373,8 +374,7 @@ class FederationServer(FederationBase):
return pdu_results
async def _handle_edus_in_txn(self, origin: str, transaction: Transaction):
- """Process the EDUs in a received transaction.
- """
+ """Process the EDUs in a received transaction."""
async def _process_edu(edu_dict):
received_edus_counter.inc()
@@ -437,7 +437,10 @@ class FederationServer(FederationBase):
raise AuthError(403, "Host not in room.")
resp = await self._state_ids_resp_cache.wrap(
- (room_id, event_id), self._on_state_ids_request_compute, room_id, event_id,
+ (room_id, event_id),
+ self._on_state_ids_request_compute,
+ room_id,
+ event_id,
)
return 200, resp
@@ -679,7 +682,7 @@ class FederationServer(FederationBase):
)
async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
- """ Process a PDU received in a federation /send/ transaction.
+ """Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError.
(The error will then be logged and sent back to the sender (which
@@ -906,13 +909,11 @@ class FederationHandlerRegistry:
self.query_handlers[query_type] = handler
def register_instance_for_edu(self, edu_type: str, instance_name: str):
- """Register that the EDU handler is on a different instance than master.
- """
+ """Register that the EDU handler is on a different instance than master."""
self._edu_type_to_instance[edu_type] = [instance_name]
def register_instances_for_edu(self, edu_type: str, instance_names: List[str]):
- """Register that the EDU handler is on multiple instances.
- """
+ """Register that the EDU handler is on multiple instances."""
self._edu_type_to_instance[edu_type] = instance_names
async def on_edu(self, edu_type: str, origin: str, content: dict):
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 079e2b2fe0..ce5fc758f0 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -30,8 +30,7 @@ logger = logging.getLogger(__name__)
class TransactionActions:
- """ Defines persistence actions that relate to handling Transactions.
- """
+ """Defines persistence actions that relate to handling Transactions."""
def __init__(self, datastore):
self.store = datastore
@@ -57,8 +56,7 @@ class TransactionActions:
async def set_response(
self, origin: str, transaction: Transaction, code: int, response: JsonDict
) -> None:
- """Persist how we responded to a transaction.
- """
+ """Persist how we responded to a transaction."""
transaction_id = transaction.transaction_id # type: ignore
if not transaction_id:
raise RuntimeError("Cannot persist a transaction with no transaction_id")
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 5f1bf492c1..3e993b428b 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -468,8 +468,7 @@ class KeyedEduRow(
class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
- """Streams EDUs that don't have keys. See KeyedEduRow
- """
+ """Streams EDUs that don't have keys. See KeyedEduRow"""
TypeId = "e"
@@ -519,7 +518,10 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
- presence=[], presence_destinations=[], keyed_edus={}, edus={},
+ presence=[],
+ presence_destinations=[],
+ keyed_edus={},
+ edus={},
)
# Parse the rows in the stream and add to the buffer
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 604cfd1935..97fc4d0a82 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -142,6 +142,8 @@ class FederationSender:
self._wake_destinations_needing_catchup,
)
+ self._external_cache = hs.get_external_cache()
+
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
@@ -197,22 +199,40 @@ class FederationSender:
if not event.internal_metadata.should_proactively_send():
return
- try:
- # Get the state from before the event.
- # We need to make sure that this is the state from before
- # the event and not from after it.
- # Otherwise if the last member on a server in a room is
- # banned then it won't receive the event because it won't
- # be in the room after the ban.
- destinations = await self.state.get_hosts_in_room_at_events(
- event.room_id, event_ids=event.prev_event_ids()
- )
- except Exception:
- logger.exception(
- "Failed to calculate hosts in room for event: %s",
- event.event_id,
+ destinations = None # type: Optional[Set[str]]
+ if not event.prev_event_ids():
+ # If there are no prev event IDs then the state is empty
+ # and so no remote servers in the room
+ destinations = set()
+ else:
+ # We check the external cache for the destinations, which is
+ # stored per state group.
+
+ sg = await self._external_cache.get(
+ "event_to_prev_state_group", event.event_id
)
- return
+ if sg:
+ destinations = await self._external_cache.get(
+ "get_joined_hosts", str(sg)
+ )
+
+ if destinations is None:
+ try:
+ # Get the state from before the event.
+ # We need to make sure that this is the state from before
+ # the event and not from after it.
+ # Otherwise if the last member on a server in a room is
+ # banned then it won't receive the event because it won't
+ # be in the room after the ban.
+ destinations = await self.state.get_hosts_in_room_at_events(
+ event.room_id, event_ids=event.prev_event_ids()
+ )
+ except Exception:
+ logger.exception(
+ "Failed to calculate hosts in room for event: %s",
+ event.event_id,
+ )
+ return
destinations = {
d
@@ -308,7 +328,9 @@ class FederationSender:
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
- destinations, pdu.room_id, pdu.internal_metadata.stream_ordering,
+ destinations,
+ pdu.room_id,
+ pdu.internal_metadata.stream_ordering,
)
for destination in destinations:
@@ -455,7 +477,7 @@ class FederationSender:
self, states: List[UserPresenceState], destinations: List[str]
) -> None:
"""Send the given presence states to the given destinations.
- destinations (list[str])
+ destinations (list[str])
"""
if not states or not self.hs.config.use_presence:
@@ -596,8 +618,8 @@ class FederationSender:
last_processed = None # type: Optional[str]
while True:
- destinations_to_wake = await self.store.get_catch_up_outstanding_destinations(
- last_processed
+ destinations_to_wake = (
+ await self.store.get_catch_up_outstanding_destinations(last_processed)
)
if not destinations_to_wake:
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index db8e456fe8..deb519f3ef 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -85,7 +85,8 @@ class PerDestinationQueue:
# processing. We have a guard in `attempt_new_transaction` that
# ensure we don't start sending stuff.
logger.error(
- "Create a per destination queue for %s on wrong worker", destination,
+ "Create a per destination queue for %s on wrong worker",
+ destination,
)
self._should_send_on_this_instance = False
@@ -440,8 +441,10 @@ class PerDestinationQueue:
if first_catch_up_check:
# first catchup so get last_successful_stream_ordering from database
- self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering(
- self._destination
+ self._last_successful_stream_ordering = (
+ await self._store.get_destination_last_successful_stream_ordering(
+ self._destination
+ )
)
if self._last_successful_stream_ordering is None:
@@ -457,7 +460,8 @@ class PerDestinationQueue:
# get at most 50 catchup room/PDUs
while True:
event_ids = await self._store.get_catch_up_room_event_ids(
- self._destination, self._last_successful_stream_ordering,
+ self._destination,
+ self._last_successful_stream_ordering,
)
if not event_ids:
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 3e07f925e0..763aff296c 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -65,7 +65,10 @@ class TransactionManager:
@measure_func("_send_new_transaction")
async def send_new_transaction(
- self, destination: str, pdus: List[EventBase], edus: List[Edu],
+ self,
+ destination: str,
+ pdus: List[EventBase],
+ edus: List[Edu],
) -> bool:
"""
Args:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index abe9168c78..10c4747f97 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -39,7 +39,7 @@ class TransportLayerClient:
@log_function
def get_room_state_ids(self, destination, room_id, event_id):
- """ Requests all state for a given room from the given server at the
+ """Requests all state for a given room from the given server at the
given event. Returns the state's event_id's
Args:
@@ -63,7 +63,7 @@ class TransportLayerClient:
@log_function
def get_event(self, destination, event_id, timeout=None):
- """ Requests the pdu with give id and origin from the given server.
+ """Requests the pdu with give id and origin from the given server.
Args:
destination (str): The host name of the remote homeserver we want
@@ -84,7 +84,7 @@ class TransportLayerClient:
@log_function
def backfill(self, destination, room_id, event_tuples, limit):
- """ Requests `limit` previous PDUs in a given context before list of
+ """Requests `limit` previous PDUs in a given context before list of
PDUs.
Args:
@@ -118,7 +118,7 @@ class TransportLayerClient:
@log_function
async def send_transaction(self, transaction, json_data_callback=None):
- """ Sends the given Transaction to its destination
+ """Sends the given Transaction to its destination
Args:
transaction (Transaction)
@@ -551,8 +551,7 @@ class TransportLayerClient:
@log_function
def get_group_profile(self, destination, group_id, requester_user_id):
- """Get a group profile
- """
+ """Get a group profile"""
path = _create_v1_path("/groups/%s/profile", group_id)
return self.client.get_json(
@@ -584,8 +583,7 @@ class TransportLayerClient:
@log_function
def get_group_summary(self, destination, group_id, requester_user_id):
- """Get a group summary
- """
+ """Get a group summary"""
path = _create_v1_path("/groups/%s/summary", group_id)
return self.client.get_json(
@@ -597,8 +595,7 @@ class TransportLayerClient:
@log_function
def get_rooms_in_group(self, destination, group_id, requester_user_id):
- """Get all rooms in a group
- """
+ """Get all rooms in a group"""
path = _create_v1_path("/groups/%s/rooms", group_id)
return self.client.get_json(
@@ -611,8 +608,7 @@ class TransportLayerClient:
def add_room_to_group(
self, destination, group_id, requester_user_id, room_id, content
):
- """Add a room to a group
- """
+ """Add a room to a group"""
path = _create_v1_path("/groups/%s/room/%s", group_id, room_id)
return self.client.post_json(
@@ -626,8 +622,7 @@ class TransportLayerClient:
def update_room_in_group(
self, destination, group_id, requester_user_id, room_id, config_key, content
):
- """Update room in group
- """
+ """Update room in group"""
path = _create_v1_path(
"/groups/%s/room/%s/config/%s", group_id, room_id, config_key
)
@@ -641,8 +636,7 @@ class TransportLayerClient:
)
def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
- """Remove a room from a group
- """
+ """Remove a room from a group"""
path = _create_v1_path("/groups/%s/room/%s", group_id, room_id)
return self.client.delete_json(
@@ -654,8 +648,7 @@ class TransportLayerClient:
@log_function
def get_users_in_group(self, destination, group_id, requester_user_id):
- """Get users in a group
- """
+ """Get users in a group"""
path = _create_v1_path("/groups/%s/users", group_id)
return self.client.get_json(
@@ -667,8 +660,7 @@ class TransportLayerClient:
@log_function
def get_invited_users_in_group(self, destination, group_id, requester_user_id):
- """Get users that have been invited to a group
- """
+ """Get users that have been invited to a group"""
path = _create_v1_path("/groups/%s/invited_users", group_id)
return self.client.get_json(
@@ -680,8 +672,7 @@ class TransportLayerClient:
@log_function
def accept_group_invite(self, destination, group_id, user_id, content):
- """Accept a group invite
- """
+ """Accept a group invite"""
path = _create_v1_path("/groups/%s/users/%s/accept_invite", group_id, user_id)
return self.client.post_json(
@@ -690,8 +681,7 @@ class TransportLayerClient:
@log_function
def join_group(self, destination, group_id, user_id, content):
- """Attempts to join a group
- """
+ """Attempts to join a group"""
path = _create_v1_path("/groups/%s/users/%s/join", group_id, user_id)
return self.client.post_json(
@@ -702,8 +692,7 @@ class TransportLayerClient:
def invite_to_group(
self, destination, group_id, user_id, requester_user_id, content
):
- """Invite a user to a group
- """
+ """Invite a user to a group"""
path = _create_v1_path("/groups/%s/users/%s/invite", group_id, user_id)
return self.client.post_json(
@@ -730,8 +719,7 @@ class TransportLayerClient:
def remove_user_from_group(
self, destination, group_id, requester_user_id, user_id, content
):
- """Remove a user from a group
- """
+ """Remove a user from a group"""
path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id)
return self.client.post_json(
@@ -772,8 +760,7 @@ class TransportLayerClient:
def update_group_summary_room(
self, destination, group_id, user_id, room_id, category_id, content
):
- """Update a room entry in a group summary
- """
+ """Update a room entry in a group summary"""
if category_id:
path = _create_v1_path(
"/groups/%s/summary/categories/%s/rooms/%s",
@@ -796,8 +783,7 @@ class TransportLayerClient:
def delete_group_summary_room(
self, destination, group_id, user_id, room_id, category_id
):
- """Delete a room entry in a group summary
- """
+ """Delete a room entry in a group summary"""
if category_id:
path = _create_v1_path(
"/groups/%s/summary/categories/%s/rooms/%s",
@@ -817,8 +803,7 @@ class TransportLayerClient:
@log_function
def get_group_categories(self, destination, group_id, requester_user_id):
- """Get all categories in a group
- """
+ """Get all categories in a group"""
path = _create_v1_path("/groups/%s/categories", group_id)
return self.client.get_json(
@@ -830,8 +815,7 @@ class TransportLayerClient:
@log_function
def get_group_category(self, destination, group_id, requester_user_id, category_id):
- """Get category info in a group
- """
+ """Get category info in a group"""
path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id)
return self.client.get_json(
@@ -845,8 +829,7 @@ class TransportLayerClient:
def update_group_category(
self, destination, group_id, requester_user_id, category_id, content
):
- """Update a category in a group
- """
+ """Update a category in a group"""
path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id)
return self.client.post_json(
@@ -861,8 +844,7 @@ class TransportLayerClient:
def delete_group_category(
self, destination, group_id, requester_user_id, category_id
):
- """Delete a category in a group
- """
+ """Delete a category in a group"""
path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id)
return self.client.delete_json(
@@ -874,8 +856,7 @@ class TransportLayerClient:
@log_function
def get_group_roles(self, destination, group_id, requester_user_id):
- """Get all roles in a group
- """
+ """Get all roles in a group"""
path = _create_v1_path("/groups/%s/roles", group_id)
return self.client.get_json(
@@ -887,8 +868,7 @@ class TransportLayerClient:
@log_function
def get_group_role(self, destination, group_id, requester_user_id, role_id):
- """Get a roles info
- """
+ """Get a roles info"""
path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id)
return self.client.get_json(
@@ -902,8 +882,7 @@ class TransportLayerClient:
def update_group_role(
self, destination, group_id, requester_user_id, role_id, content
):
- """Update a role in a group
- """
+ """Update a role in a group"""
path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id)
return self.client.post_json(
@@ -916,8 +895,7 @@ class TransportLayerClient:
@log_function
def delete_group_role(self, destination, group_id, requester_user_id, role_id):
- """Delete a role in a group
- """
+ """Delete a role in a group"""
path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id)
return self.client.delete_json(
@@ -931,8 +909,7 @@ class TransportLayerClient:
def update_group_summary_user(
self, destination, group_id, requester_user_id, user_id, role_id, content
):
- """Update a users entry in a group
- """
+ """Update a users entry in a group"""
if role_id:
path = _create_v1_path(
"/groups/%s/summary/roles/%s/users/%s", group_id, role_id, user_id
@@ -950,8 +927,7 @@ class TransportLayerClient:
@log_function
def set_group_join_policy(self, destination, group_id, requester_user_id, content):
- """Sets the join policy for a group
- """
+ """Sets the join policy for a group"""
path = _create_v1_path("/groups/%s/settings/m.join_policy", group_id)
return self.client.put_json(
@@ -966,8 +942,7 @@ class TransportLayerClient:
def delete_group_summary_user(
self, destination, group_id, requester_user_id, user_id, role_id
):
- """Delete a users entry in a group
- """
+ """Delete a users entry in a group"""
if role_id:
path = _create_v1_path(
"/groups/%s/summary/roles/%s/users/%s", group_id, role_id, user_id
@@ -983,8 +958,7 @@ class TransportLayerClient:
)
def bulk_get_publicised_groups(self, destination, user_ids):
- """Get the groups a list of users are publicising
- """
+ """Get the groups a list of users are publicising"""
path = _create_v1_path("/get_groups_publicised")
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 95c64510a9..cce83704d4 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -21,6 +21,7 @@ import re
from typing import Optional, Tuple, Type
import synapse
+from synapse.api.constants import MAX_GROUP_CATEGORYID_LENGTH, MAX_GROUP_ROLEID_LENGTH
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import (
@@ -364,7 +365,10 @@ class BaseFederationServlet:
continue
server.register_paths(
- method, (pattern,), self._wrap(code), self.__class__.__name__,
+ method,
+ (pattern,),
+ self._wrap(code),
+ self.__class__.__name__,
)
@@ -381,7 +385,7 @@ class FederationSendServlet(BaseFederationServlet):
# This is when someone is trying to send us a bunch of data.
async def on_PUT(self, origin, content, query, transaction_id):
- """ Called on PUT /send/<transaction_id>/
+ """Called on PUT /send/<transaction_id>/
Args:
request (twisted.web.http.Request): The HTTP request.
@@ -855,8 +859,7 @@ class FederationVersionServlet(BaseFederationServlet):
class FederationGroupsProfileServlet(BaseFederationServlet):
- """Get/set the basic profile of a group on behalf of a user
- """
+ """Get/set the basic profile of a group on behalf of a user"""
PATH = "/groups/(?P<group_id>[^/]*)/profile"
@@ -895,8 +898,7 @@ class FederationGroupsSummaryServlet(BaseFederationServlet):
class FederationGroupsRoomsServlet(BaseFederationServlet):
- """Get the rooms in a group on behalf of a user
- """
+ """Get the rooms in a group on behalf of a user"""
PATH = "/groups/(?P<group_id>[^/]*)/rooms"
@@ -911,8 +913,7 @@ class FederationGroupsRoomsServlet(BaseFederationServlet):
class FederationGroupsAddRoomsServlet(BaseFederationServlet):
- """Add/remove room from group
- """
+ """Add/remove room from group"""
PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)"
@@ -940,8 +941,7 @@ class FederationGroupsAddRoomsServlet(BaseFederationServlet):
class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
- """Update room config in group
- """
+ """Update room config in group"""
PATH = (
"/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)"
@@ -961,8 +961,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
class FederationGroupsUsersServlet(BaseFederationServlet):
- """Get the users in a group on behalf of a user
- """
+ """Get the users in a group on behalf of a user"""
PATH = "/groups/(?P<group_id>[^/]*)/users"
@@ -977,8 +976,7 @@ class FederationGroupsUsersServlet(BaseFederationServlet):
class FederationGroupsInvitedUsersServlet(BaseFederationServlet):
- """Get the users that have been invited to a group
- """
+ """Get the users that have been invited to a group"""
PATH = "/groups/(?P<group_id>[^/]*)/invited_users"
@@ -995,8 +993,7 @@ class FederationGroupsInvitedUsersServlet(BaseFederationServlet):
class FederationGroupsInviteServlet(BaseFederationServlet):
- """Ask a group server to invite someone to the group
- """
+ """Ask a group server to invite someone to the group"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite"
@@ -1013,8 +1010,7 @@ class FederationGroupsInviteServlet(BaseFederationServlet):
class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
- """Accept an invitation from the group server
- """
+ """Accept an invitation from the group server"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite"
@@ -1028,8 +1024,7 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
class FederationGroupsJoinServlet(BaseFederationServlet):
- """Attempt to join a group
- """
+ """Attempt to join a group"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join"
@@ -1043,8 +1038,7 @@ class FederationGroupsJoinServlet(BaseFederationServlet):
class FederationGroupsRemoveUserServlet(BaseFederationServlet):
- """Leave or kick a user from the group
- """
+ """Leave or kick a user from the group"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove"
@@ -1061,8 +1055,7 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet):
class FederationGroupsLocalInviteServlet(BaseFederationServlet):
- """A group server has invited a local user
- """
+ """A group server has invited a local user"""
PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite"
@@ -1076,8 +1069,7 @@ class FederationGroupsLocalInviteServlet(BaseFederationServlet):
class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
- """A group server has removed a local user
- """
+ """A group server has removed a local user"""
PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove"
@@ -1093,8 +1085,7 @@ class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
- """A group or user's server renews their attestation
- """
+ """A group or user's server renews their attestation"""
PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)"
@@ -1128,7 +1119,17 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
raise SynapseError(403, "requester_user_id doesn't match origin")
if category_id == "":
- raise SynapseError(400, "category_id cannot be empty string")
+ raise SynapseError(
+ 400, "category_id cannot be empty string", Codes.INVALID_PARAM
+ )
+
+ if len(category_id) > MAX_GROUP_CATEGORYID_LENGTH:
+ raise SynapseError(
+ 400,
+ "category_id may not be longer than %s characters"
+ % (MAX_GROUP_CATEGORYID_LENGTH,),
+ Codes.INVALID_PARAM,
+ )
resp = await self.handler.update_group_summary_room(
group_id,
@@ -1156,8 +1157,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
class FederationGroupsCategoriesServlet(BaseFederationServlet):
- """Get all categories for a group
- """
+ """Get all categories for a group"""
PATH = "/groups/(?P<group_id>[^/]*)/categories/?"
@@ -1172,8 +1172,7 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet):
class FederationGroupsCategoryServlet(BaseFederationServlet):
- """Add/remove/get a category in a group
- """
+ """Add/remove/get a category in a group"""
PATH = "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)"
@@ -1196,6 +1195,14 @@ class FederationGroupsCategoryServlet(BaseFederationServlet):
if category_id == "":
raise SynapseError(400, "category_id cannot be empty string")
+ if len(category_id) > MAX_GROUP_CATEGORYID_LENGTH:
+ raise SynapseError(
+ 400,
+ "category_id may not be longer than %s characters"
+ % (MAX_GROUP_CATEGORYID_LENGTH,),
+ Codes.INVALID_PARAM,
+ )
+
resp = await self.handler.upsert_group_category(
group_id, requester_user_id, category_id, content
)
@@ -1218,8 +1225,7 @@ class FederationGroupsCategoryServlet(BaseFederationServlet):
class FederationGroupsRolesServlet(BaseFederationServlet):
- """Get roles in a group
- """
+ """Get roles in a group"""
PATH = "/groups/(?P<group_id>[^/]*)/roles/?"
@@ -1234,8 +1240,7 @@ class FederationGroupsRolesServlet(BaseFederationServlet):
class FederationGroupsRoleServlet(BaseFederationServlet):
- """Add/remove/get a role in a group
- """
+ """Add/remove/get a role in a group"""
PATH = "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)"
@@ -1254,7 +1259,17 @@ class FederationGroupsRoleServlet(BaseFederationServlet):
raise SynapseError(403, "requester_user_id doesn't match origin")
if role_id == "":
- raise SynapseError(400, "role_id cannot be empty string")
+ raise SynapseError(
+ 400, "role_id cannot be empty string", Codes.INVALID_PARAM
+ )
+
+ if len(role_id) > MAX_GROUP_ROLEID_LENGTH:
+ raise SynapseError(
+ 400,
+ "role_id may not be longer than %s characters"
+ % (MAX_GROUP_ROLEID_LENGTH,),
+ Codes.INVALID_PARAM,
+ )
resp = await self.handler.update_group_role(
group_id, requester_user_id, role_id, content
@@ -1299,6 +1314,14 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
if role_id == "":
raise SynapseError(400, "role_id cannot be empty string")
+ if len(role_id) > MAX_GROUP_ROLEID_LENGTH:
+ raise SynapseError(
+ 400,
+ "role_id may not be longer than %s characters"
+ % (MAX_GROUP_ROLEID_LENGTH,),
+ Codes.INVALID_PARAM,
+ )
+
resp = await self.handler.update_group_summary_user(
group_id,
requester_user_id,
@@ -1325,8 +1348,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
- """Get roles in a group
- """
+ """Get roles in a group"""
PATH = "/get_groups_publicised"
@@ -1339,8 +1361,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
- """Sets whether a group is joinable without an invite or knock
- """
+ """Sets whether a group is joinable without an invite or knock"""
PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy"
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 64d98fc8f6..b662c42621 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
@attr.s(slots=True)
class Edu(JsonEncodedObject):
- """ An Edu represents a piece of data sent from one homeserver to another.
+ """An Edu represents a piece of data sent from one homeserver to another.
In comparison to Pdus, Edus are not persisted for a long time on disk, are
not meaningful beyond a given pair of homeservers, and don't have an
@@ -63,7 +63,7 @@ class Edu(JsonEncodedObject):
class Transaction(JsonEncodedObject):
- """ A transaction is a list of Pdus and Edus to be sent to a remote home
+ """A transaction is a list of Pdus and Edus to be sent to a remote home
server with some extra metadata.
Example transaction::
@@ -99,7 +99,7 @@ class Transaction(JsonEncodedObject):
]
def __init__(self, transaction_id=None, pdus=[], **kwargs):
- """ If we include a list of pdus then we decode then as PDU's
+ """If we include a list of pdus then we decode then as PDU's
automatically.
"""
@@ -111,7 +111,7 @@ class Transaction(JsonEncodedObject):
@staticmethod
def create_new(pdus, **kwargs):
- """ Used to create a new transaction. Will auto fill out
+ """Used to create a new transaction. Will auto fill out
transaction_id and origin_server_ts keys.
"""
if "origin_server_ts" not in kwargs:
|