diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 6ee6216660..687cd841ac 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -17,12 +17,23 @@
import copy
import itertools
import logging
-
-from six.moves import range
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ TypeVar,
+)
from prometheus_client import Counter
from twisted.internet import defer
+from twisted.internet.defer import Deferred
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
@@ -31,16 +42,19 @@ from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
SynapseError,
+ UnsupportedRoomVersionError,
)
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
+ RoomVersion,
RoomVersions,
)
-from synapse.events import builder, room_version_to_event_format
+from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
-from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.logging.utils import log_function
+from synapse.types import JsonDict
from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
@@ -52,6 +66,8 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t
PDU_RETRY_TIME_MS = 1 * 60 * 1000
+T = TypeVar("T")
+
class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response
@@ -170,56 +186,54 @@ class FederationClient(FederationBase):
sent_queries_counter.labels("client_one_time_keys").inc()
return self.transport_layer.claim_client_keys(destination, content, timeout)
- @defer.inlineCallbacks
- @log_function
- def backfill(self, dest, room_id, limit, extremities):
- """Requests some more historic PDUs for the given context from the
+ async def backfill(
+ self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
+ ) -> Optional[List[EventBase]]:
+ """Requests some more historic PDUs for the given room from the
given destination server.
Args:
- dest (str): The remote home server to ask.
+ dest (str): The remote homeserver to ask.
room_id (str): The room_id to backfill.
- limit (int): The maximum number of PDUs to return.
- extremities (list): List of PDU id and origins of the first pdus
- we have seen from the context
-
- Returns:
- Deferred: Results in the received PDUs.
+ limit (int): The maximum number of events to return.
+ extremities (list): our current backwards extremities, to backfill from
"""
logger.debug("backfill extrem=%s", extremities)
- # If there are no extremeties then we've (probably) reached the start.
+ # If there are no extremities then we've (probably) reached the start.
if not extremities:
- return
+ return None
- transaction_data = yield self.transport_layer.backfill(
+ transaction_data = await self.transport_layer.backfill(
dest, room_id, extremities, limit
)
- logger.debug("backfill transaction_data=%s", repr(transaction_data))
+ logger.debug("backfill transaction_data=%r", transaction_data)
- room_version = yield self.store.get_room_version(room_id)
- format_ver = room_version_to_event_format(room_version)
+ room_version = await self.store.get_room_version(room_id)
pdus = [
- event_from_pdu_json(p, format_ver, outlier=False)
+ event_from_pdu_json(p, room_version, outlier=False)
for p in transaction_data["pdus"]
]
# FIXME: We should handle signature failures more gracefully.
- pdus[:] = yield make_deferred_yieldable(
+ pdus[:] = await make_deferred_yieldable(
defer.gatherResults(
- self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
+ self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True,
).addErrback(unwrapFirstError)
)
return pdus
- @defer.inlineCallbacks
- @log_function
- def get_pdu(
- self, destinations, event_id, room_version, outlier=False, timeout=None
- ):
+ async def get_pdu(
+ self,
+ destinations: Iterable[str],
+ event_id: str,
+ room_version: RoomVersion,
+ outlier: bool = False,
+ timeout: Optional[int] = None,
+ ) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home
servers.
@@ -227,18 +241,17 @@ class FederationClient(FederationBase):
one succeeds.
Args:
- destinations (list): Which home servers to query
- event_id (str): event to fetch
- room_version (str): version of the room
- outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
+ destinations: Which homeservers to query
+ event_id: event to fetch
+ room_version: version of the room
+ outlier: Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
- timeout (int): How long to try (in ms) each destination for before
+ timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.
Returns:
- Deferred: Results in the requested PDU, or None if we were unable to find
- it.
+ The requested PDU, or None if we were unable to find it.
"""
# TODO: Rate limit the number of times we try and get the same event.
@@ -249,8 +262,6 @@ class FederationClient(FederationBase):
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
- format_ver = room_version_to_event_format(room_version)
-
signed_pdu = None
for destination in destinations:
now = self._clock.time_msec()
@@ -259,7 +270,7 @@ class FederationClient(FederationBase):
continue
try:
- transaction_data = yield self.transport_layer.get_event(
+ transaction_data = await self.transport_layer.get_event(
destination, event_id, timeout=timeout
)
@@ -271,15 +282,15 @@ class FederationClient(FederationBase):
)
pdu_list = [
- event_from_pdu_json(p, format_ver, outlier=outlier)
+ event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
- ]
+ ] # type: List[EventBase]
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
# Check signatures are correct.
- signed_pdu = yield self._check_sigs_and_hash(room_version, pdu)
+ signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
break
@@ -309,177 +320,117 @@ class FederationClient(FederationBase):
return signed_pdu
- @defer.inlineCallbacks
- @log_function
- def get_state_for_room(self, destination, room_id, event_id):
- """Requests all of the room state at a given event from a remote home server.
-
- Args:
- destination (str): The remote homeserver to query for the state.
- room_id (str): The id of the room we're interested in.
- event_id (str): The id of the event we want the state at.
+ async def get_room_state_ids(
+ self, destination: str, room_id: str, event_id: str
+ ) -> Tuple[List[str], List[str]]:
+ """Calls the /state_ids endpoint to fetch the state at a particular point
+ in the room, and the auth events for the given event
Returns:
- Deferred[Tuple[List[EventBase], List[EventBase]]]:
- A list of events in the state, and a list of events in the auth chain
- for the given event.
+ a tuple of (state event_ids, auth event_ids)
"""
- try:
- # First we try and ask for just the IDs, as thats far quicker if
- # we have most of the state and auth_chain already.
- # However, this may 404 if the other side has an old synapse.
- result = yield self.transport_layer.get_room_state_ids(
- destination, room_id, event_id=event_id
- )
-
- state_event_ids = result["pdu_ids"]
- auth_event_ids = result.get("auth_chain_ids", [])
-
- fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest(
- destination, room_id, set(state_event_ids + auth_event_ids)
- )
-
- if failed_to_fetch:
- logger.warning(
- "Failed to fetch missing state/auth events for %s: %s",
- room_id,
- failed_to_fetch,
- )
-
- event_map = {ev.event_id: ev for ev in fetched_events}
-
- pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
- auth_chain = [
- event_map[e_id] for e_id in auth_event_ids if e_id in event_map
- ]
-
- auth_chain.sort(key=lambda e: e.depth)
-
- return pdus, auth_chain
- except HttpResponseException as e:
- if e.code == 400 or e.code == 404:
- logger.info("Failed to use get_room_state_ids API, falling back")
- else:
- raise e
-
- result = yield self.transport_layer.get_room_state(
+ result = await self.transport_layer.get_room_state_ids(
destination, room_id, event_id=event_id
)
- room_version = yield self.store.get_room_version(room_id)
- format_ver = room_version_to_event_format(room_version)
+ state_event_ids = result["pdu_ids"]
+ auth_event_ids = result.get("auth_chain_ids", [])
- pdus = [
- event_from_pdu_json(p, format_ver, outlier=True) for p in result["pdus"]
- ]
+ if not isinstance(state_event_ids, list) or not isinstance(
+ auth_event_ids, list
+ ):
+ raise Exception("invalid response from /state_ids")
- auth_chain = [
- event_from_pdu_json(p, format_ver, outlier=True)
- for p in result.get("auth_chain", [])
- ]
-
- seen_events = yield self.store.get_events(
- [ev.event_id for ev in itertools.chain(pdus, auth_chain)]
- )
-
- signed_pdus = yield self._check_sigs_and_hash_and_fetch(
- destination,
- [p for p in pdus if p.event_id not in seen_events],
- outlier=True,
- room_version=room_version,
- )
- signed_pdus.extend(
- seen_events[p.event_id] for p in pdus if p.event_id in seen_events
- )
-
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination,
- [p for p in auth_chain if p.event_id not in seen_events],
- outlier=True,
- room_version=room_version,
- )
- signed_auth.extend(
- seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
- )
-
- signed_auth.sort(key=lambda e: e.depth)
+ return state_event_ids, auth_event_ids
- return signed_pdus, signed_auth
-
- @defer.inlineCallbacks
- def get_events_from_store_or_dest(self, destination, room_id, event_ids):
- """Fetch events from a remote destination, checking if we already have them.
+ async def _check_sigs_and_hash_and_fetch(
+ self,
+ origin: str,
+ pdus: List[EventBase],
+ room_version: RoomVersion,
+ outlier: bool = False,
+ include_none: bool = False,
+ ) -> List[EventBase]:
+ """Takes a list of PDUs and checks the signatures and hashs of each
+ one. If a PDU fails its signature check then we check if we have it in
+ the database and if not then request if from the originating server of
+ that PDU.
+
+ If a PDU fails its content hash check then it is redacted.
+
+ The given list of PDUs are not modified, instead the function returns
+ a new list.
Args:
- destination (str)
- room_id (str)
- event_ids (list)
+ origin
+ pdu
+ room_version
+ outlier: Whether the events are outliers or not
+ include_none: Whether to include None in the returned list
+ for events that have failed their checks
Returns:
- Deferred: A deferred resolving to a 2-tuple where the first is a list of
- events and the second is a list of event ids that we failed to fetch.
+ Deferred : A list of PDUs that have valid signatures and hashes.
"""
- seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
- signed_events = list(seen_events.values())
-
- failed_to_fetch = set()
+ deferreds = self._check_sigs_and_hashes(room_version, pdus)
- missing_events = set(event_ids)
- for k in seen_events:
- missing_events.discard(k)
-
- if not missing_events:
- return signed_events, failed_to_fetch
-
- logger.debug(
- "Fetching unknown state/auth events %s for room %s",
- missing_events,
- event_ids,
- )
-
- room_version = yield self.store.get_room_version(room_id)
+ @defer.inlineCallbacks
+ def handle_check_result(pdu: EventBase, deferred: Deferred):
+ try:
+ res = yield make_deferred_yieldable(deferred)
+ except SynapseError:
+ res = None
+
+ if not res:
+ # Check local db.
+ res = yield self.store.get_event(
+ pdu.event_id, allow_rejected=True, allow_none=True
+ )
- batch_size = 20
- missing_events = list(missing_events)
- for i in range(0, len(missing_events), batch_size):
- batch = set(missing_events[i : i + batch_size])
+ if not res and pdu.origin != origin:
+ try:
+ res = yield defer.ensureDeferred(
+ self.get_pdu(
+ destinations=[pdu.origin],
+ event_id=pdu.event_id,
+ room_version=room_version,
+ outlier=outlier,
+ timeout=10000,
+ )
+ )
+ except SynapseError:
+ pass
- deferreds = [
- run_in_background(
- self.get_pdu,
- destinations=[destination],
- event_id=e_id,
- room_version=room_version,
+ if not res:
+ logger.warning(
+ "Failed to find copy of %s with valid signature", pdu.event_id
)
- for e_id in batch
- ]
- res = yield make_deferred_yieldable(
- defer.DeferredList(deferreds, consumeErrors=True)
- )
- for success, result in res:
- if success and result:
- signed_events.append(result)
- batch.discard(result.event_id)
+ return res
- # We removed all events we successfully fetched from `batch`
- failed_to_fetch.update(batch)
+ handle = preserve_fn(handle_check_result)
+ deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)]
- return signed_events, failed_to_fetch
+ valid_pdus = await make_deferred_yieldable(
+ defer.gatherResults(deferreds2, consumeErrors=True)
+ ).addErrback(unwrapFirstError)
- @defer.inlineCallbacks
- @log_function
- def get_event_auth(self, destination, room_id, event_id):
- res = yield self.transport_layer.get_event_auth(destination, room_id, event_id)
+ if include_none:
+ return valid_pdus
+ else:
+ return [p for p in valid_pdus if p]
+
+ async def get_event_auth(self, destination, room_id, event_id):
+ res = await self.transport_layer.get_event_auth(destination, room_id, event_id)
- room_version = yield self.store.get_room_version(room_id)
- format_ver = room_version_to_event_format(room_version)
+ room_version = await self.store.get_room_version(room_id)
auth_chain = [
- event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"]
+ event_from_pdu_json(p, room_version, outlier=True)
+ for p in res["auth_chain"]
]
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ signed_auth = await self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True, room_version=room_version
)
@@ -487,17 +438,21 @@ class FederationClient(FederationBase):
return signed_auth
- @defer.inlineCallbacks
- def _try_destination_list(self, description, destinations, callback):
+ async def _try_destination_list(
+ self,
+ description: str,
+ destinations: Iterable[str],
+ callback: Callable[[str], Awaitable[T]],
+ ) -> T:
"""Try an operation on a series of servers, until it succeeds
Args:
- description (unicode): description of the operation we're doing, for logging
+ description: description of the operation we're doing, for logging
- destinations (Iterable[unicode]): list of server_names to try
+ destinations: list of server_names to try
- callback (callable): Function to run for each server. Passed a single
- argument: the server_name to try. May return a deferred.
+ callback: Function to run for each server. Passed a single
+ argument: the server_name to try.
If the callback raises a CodeMessageException with a 300/400 code,
attempts to perform the operation stop immediately and the exception is
@@ -508,7 +463,7 @@ class FederationClient(FederationBase):
suppressed if the exception is an InvalidResponseError.
Returns:
- The [Deferred] result of callback, if it succeeds
+ The result of callback, if it succeeds
Raises:
SynapseError if the chosen remote server returns a 300/400 code, or
@@ -519,15 +474,17 @@ class FederationClient(FederationBase):
continue
try:
- res = yield callback(destination)
+ res = await callback(destination)
return res
except InvalidResponseError as e:
- logger.warn("Failed to %s via %s: %s", description, destination, e)
+ logger.warning("Failed to %s via %s: %s", description, destination, e)
+ except UnsupportedRoomVersionError:
+ raise
except HttpResponseException as e:
if not 500 <= e.code < 600:
raise e.to_synapse_error()
else:
- logger.warn(
+ logger.warning(
"Failed to %s via %s: %i %s",
description,
destination,
@@ -535,13 +492,21 @@ class FederationClient(FederationBase):
e.args[0],
)
except Exception:
- logger.warn("Failed to %s via %s", description, destination, exc_info=1)
+ logger.warning(
+ "Failed to %s via %s", description, destination, exc_info=True
+ )
raise SynapseError(502, "Failed to %s via any server" % (description,))
- def make_membership_event(
- self, destinations, room_id, user_id, membership, content, params
- ):
+ async def make_membership_event(
+ self,
+ destinations: Iterable[str],
+ room_id: str,
+ user_id: str,
+ membership: str,
+ content: dict,
+ params: Dict[str, str],
+ ) -> Tuple[str, EventBase, RoomVersion]:
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -553,26 +518,28 @@ class FederationClient(FederationBase):
Note that this does not append any events to any graphs.
Args:
- destinations (str): Candidate homeservers which are probably
+ destinations: Candidate homeservers which are probably
participating in the room.
- room_id (str): The room in which the event will happen.
- user_id (str): The user whose membership is being evented.
- membership (str): The "membership" property of the event. Must be
- one of "join" or "leave".
- content (dict): Any additional data to put into the content field
- of the event.
- params (dict[str, str|Iterable[str]]): Query parameters to include in the
- request.
- Return:
- Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
- `(origin, event, event_format)` where origin is the remote
- homeserver which generated the event, and event_format is one of
- `synapse.api.room_versions.EventFormatVersions`.
-
- Fails with a ``SynapseError`` if the chosen remote server
- returns a 300/400 code.
-
- Fails with a ``RuntimeError`` if no servers were reachable.
+ room_id: The room in which the event will happen.
+ user_id: The user whose membership is being evented.
+ membership: The "membership" property of the event. Must be one of
+ "join" or "leave".
+ content: Any additional data to put into the content field of the
+ event.
+ params: Query parameters to include in the request.
+
+ Returns:
+ `(origin, event, room_version)` where origin is the remote
+ homeserver which generated the event, and room_version is the
+ version of the room.
+
+ Raises:
+ UnsupportedRoomVersionError: if remote responds with
+ a room version we don't understand.
+
+ SynapseError: if the chosen remote server returns a 300/400 code.
+
+ RuntimeError: if no servers were reachable.
"""
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
@@ -581,16 +548,17 @@ class FederationClient(FederationBase):
% (membership, ",".join(valid_memberships))
)
- @defer.inlineCallbacks
- def send_request(destination):
- ret = yield self.transport_layer.make_membership_event(
+ async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]:
+ ret = await self.transport_layer.make_membership_event(
destination, room_id, user_id, membership, params
)
# Note: If not supplied, the room version may be either v1 or v2,
# however either way the event format version will be v1.
- room_version = ret.get("room_version", RoomVersions.V1.identifier)
- event_format = room_version_to_event_format(room_version)
+ room_version_id = ret.get("room_version", RoomVersions.V1.identifier)
+ room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+ if not room_version:
+ raise UnsupportedRoomVersionError()
pdu_dict = ret.get("event", None)
if not isinstance(pdu_dict, dict):
@@ -610,94 +578,83 @@ class FederationClient(FederationBase):
self._clock,
self.hostname,
self.signing_key,
- format_version=event_format,
+ room_version=room_version,
event_dict=pdu_dict,
)
- return (destination, ev, event_format)
+ return destination, ev, room_version
- return self._try_destination_list(
+ return await self._try_destination_list(
"make_" + membership, destinations, send_request
)
- def send_join(self, destinations, pdu, event_format_version):
+ async def send_join(
+ self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion
+ ) -> Dict[str, Any]:
"""Sends a join event to one of a list of homeservers.
Doing so will cause the remote server to add the event to the graph,
and send the event out to the rest of the federation.
Args:
- destinations (str): Candidate homeservers which are probably
+ destinations: Candidate homeservers which are probably
participating in the room.
- pdu (BaseEvent): event to be sent
- event_format_version (int): The event format version
+ pdu: event to be sent
+ room_version: the version of the room (according to the server that
+ did the make_join)
- Return:
- Deferred: resolves to a dict with members ``origin`` (a string
- giving the serer the event was sent to, ``state`` (?) and
+ Returns:
+ a dict with members ``origin`` (a string
+ giving the server the event was sent to, ``state`` (?) and
``auth_chain``.
- Fails with a ``SynapseError`` if the chosen remote server
- returns a 300/400 code.
+ Raises:
+ SynapseError: if the chosen remote server returns a 300/400 code.
- Fails with a ``RuntimeError`` if no servers were reachable.
+ RuntimeError: if no servers were reachable.
"""
- def check_authchain_validity(signed_auth_chain):
- for e in signed_auth_chain:
- if e.type == EventTypes.Create:
- create_event = e
- break
- else:
- raise InvalidResponseError("no %s in auth chain" % (EventTypes.Create,))
-
- # the room version should be sane.
- room_version = create_event.content.get("room_version", "1")
- if room_version not in KNOWN_ROOM_VERSIONS:
- # This shouldn't be possible, because the remote server should have
- # rejected the join attempt during make_join.
- raise InvalidResponseError(
- "room appears to have unsupported version %s" % (room_version,)
- )
-
- @defer.inlineCallbacks
- def send_request(destination):
- time_now = self._clock.time_msec()
- _, content = yield self.transport_layer.send_join(
- destination=destination,
- room_id=pdu.room_id,
- event_id=pdu.event_id,
- content=pdu.get_pdu_json(time_now),
- )
+ async def send_request(destination) -> Dict[str, Any]:
+ content = await self._do_send_join(destination, pdu)
logger.debug("Got content: %s", content)
state = [
- event_from_pdu_json(p, event_format_version, outlier=True)
+ event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("state", [])
]
auth_chain = [
- event_from_pdu_json(p, event_format_version, outlier=True)
+ event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("auth_chain", [])
]
pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
- room_version = None
+ create_event = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
- room_version = e.content.get(
- "room_version", RoomVersions.V1.identifier
- )
+ create_event = e
break
- if room_version is None:
+ if create_event is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
- valid_pdus = yield self._check_sigs_and_hash_and_fetch(
+ # the room version should be sane.
+ create_room_version = create_event.content.get(
+ "room_version", RoomVersions.V1.identifier
+ )
+ if create_room_version != room_version.identifier:
+ # either the server that fulfilled the make_join, or the server that is
+ # handling the send_join, is lying.
+ raise InvalidResponseError(
+ "Unexpected room version %s in create event"
+ % (create_room_version,)
+ )
+
+ valid_pdus = await self._check_sigs_and_hash_and_fetch(
destination,
list(pdus.values()),
outlier=True,
@@ -725,7 +682,17 @@ class FederationClient(FederationBase):
for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata)
- check_authchain_validity(signed_auth)
+ # double-check that the same create event has ended up in the auth chain
+ auth_chain_create_events = [
+ e.event_id
+ for e in signed_auth
+ if (e.type, e.state_key) == (EventTypes.Create, "")
+ ]
+ if auth_chain_create_events != [create_event.event_id]:
+ raise InvalidResponseError(
+ "Unexpected create event(s) in auth chain: %s"
+ % (auth_chain_create_events,)
+ )
return {
"state": signed_state,
@@ -733,53 +700,84 @@ class FederationClient(FederationBase):
"origin": destination,
}
- return self._try_destination_list("send_join", destinations, send_request)
+ return await self._try_destination_list("send_join", destinations, send_request)
- @defer.inlineCallbacks
- def send_invite(self, destination, room_id, event_id, pdu):
- room_version = yield self.store.get_room_version(room_id)
+ async def _do_send_join(self, destination: str, pdu: EventBase):
+ time_now = self._clock.time_msec()
+
+ try:
+ content = await self.transport_layer.send_join_v2(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+
+ return content
+ except HttpResponseException as e:
+ if e.code in [400, 404]:
+ err = e.to_synapse_error()
+
+ # If we receive an error response that isn't a generic error, or an
+ # unrecognised endpoint error, we assume that the remote understands
+ # the v2 invite API and this is a legitimate error.
+ if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
+ raise err
+ else:
+ raise e.to_synapse_error()
- content = yield self._do_send_invite(destination, pdu, room_version)
+ logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
+
+ resp = await self.transport_layer.send_join_v1(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+
+ # We expect the v1 API to respond with [200, content], so we only return the
+ # content.
+ return resp[1]
+
+ async def send_invite(
+ self, destination: str, room_id: str, event_id: str, pdu: EventBase,
+ ) -> EventBase:
+ room_version = await self.store.get_room_version(room_id)
+
+ content = await self._do_send_invite(destination, pdu, room_version)
pdu_dict = content["event"]
logger.debug("Got response to send_invite: %s", pdu_dict)
- room_version = yield self.store.get_room_version(room_id)
- format_ver = room_version_to_event_format(room_version)
-
- pdu = event_from_pdu_json(pdu_dict, format_ver)
+ pdu = event_from_pdu_json(pdu_dict, room_version)
# Check signatures are correct.
- pdu = yield self._check_sigs_and_hash(room_version, pdu)
+ pdu = await self._check_sigs_and_hash(room_version, pdu)
# FIXME: We should handle signature failures more gracefully.
return pdu
- @defer.inlineCallbacks
- def _do_send_invite(self, destination, pdu, room_version):
+ async def _do_send_invite(
+ self, destination: str, pdu: EventBase, room_version: RoomVersion
+ ) -> JsonDict:
"""Actually sends the invite, first trying v2 API and falling back to
v1 API if necessary.
- Args:
- destination (str): Target server
- pdu (FrozenEvent)
- room_version (str)
-
Returns:
- dict: The event as a dict as returned by the remote server
+ The event as a dict as returned by the remote server
"""
time_now = self._clock.time_msec()
try:
- content = yield self.transport_layer.send_invite_v2(
+ content = await self.transport_layer.send_invite_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content={
"event": pdu.get_pdu_json(time_now),
- "room_version": room_version,
+ "room_version": room_version.identifier,
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
},
)
@@ -797,8 +795,7 @@ class FederationClient(FederationBase):
# Otherwise, we assume that the remote server doesn't understand
# the v2 invite API. That's ok provided the room uses old-style event
# IDs.
- v = KNOWN_ROOM_VERSIONS.get(room_version)
- if v.event_format != EventFormatVersions.V1:
+ if room_version.event_format != EventFormatVersions.V1:
raise SynapseError(
400,
"User's homeserver does not support this room version",
@@ -812,7 +809,7 @@ class FederationClient(FederationBase):
# Didn't work, try v1 API.
# Note the v1 API returns a tuple of `(200, content)`
- _, content = yield self.transport_layer.send_invite_v1(
+ _, content = await self.transport_layer.send_invite_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@@ -820,7 +817,7 @@ class FederationClient(FederationBase):
)
return content
- def send_leave(self, destinations, pdu):
+ async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None:
"""Sends a leave event to one of a list of homeservers.
Doing so will cause the remote server to add the event to the graph,
@@ -829,48 +826,94 @@ class FederationClient(FederationBase):
This is mostly useful to reject received invites.
Args:
- destinations (str): Candidate homeservers which are probably
+ destinations: Candidate homeservers which are probably
participating in the room.
- pdu (BaseEvent): event to be sent
+ pdu: event to be sent
- Return:
- Deferred: resolves to None.
-
- Fails with a ``SynapseError`` if the chosen remote server
- returns a 300/400 code.
+ Raises:
+ SynapseError if the chosen remote server returns a 300/400 code.
- Fails with a ``RuntimeError`` if no servers were reachable.
+ RuntimeError if no servers were reachable.
"""
- @defer.inlineCallbacks
- def send_request(destination):
- time_now = self._clock.time_msec()
- _, content = yield self.transport_layer.send_leave(
+ async def send_request(destination: str) -> None:
+ content = await self._do_send_leave(destination, pdu)
+ logger.debug("Got content: %s", content)
+
+ return await self._try_destination_list(
+ "send_leave", destinations, send_request
+ )
+
+ async def _do_send_leave(self, destination, pdu):
+ time_now = self._clock.time_msec()
+
+ try:
+ content = await self.transport_layer.send_leave_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
- logger.debug("Got content: %s", content)
- return None
+ return content
+ except HttpResponseException as e:
+ if e.code in [400, 404]:
+ err = e.to_synapse_error()
- return self._try_destination_list("send_leave", destinations, send_request)
+ # If we receive an error response that isn't a generic error, or an
+ # unrecognised endpoint error, we assume that the remote understands
+ # the v2 invite API and this is a legitimate error.
+ if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
+ raise err
+ else:
+ raise e.to_synapse_error()
+
+ logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
+
+ resp = await self.transport_layer.send_leave_v1(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+
+ # We expect the v1 API to respond with [200, content], so we only return the
+ # content.
+ return resp[1]
def get_public_rooms(
self,
- destination,
- limit=None,
- since_token=None,
- search_filter=None,
- include_all_networks=False,
- third_party_instance_id=None,
+ remote_server: str,
+ limit: Optional[int] = None,
+ since_token: Optional[str] = None,
+ search_filter: Optional[Dict] = None,
+ include_all_networks: bool = False,
+ third_party_instance_id: Optional[str] = None,
):
- if destination == self.server_name:
- return
+ """Get the list of public rooms from a remote homeserver
+
+ Args:
+ remote_server: The name of the remote server
+ limit: Maximum amount of rooms to return
+ since_token: Used for result pagination
+ search_filter: A filter dictionary to send the remote homeserver
+ and filter the result set
+ include_all_networks: Whether to include results from all third party instances
+ third_party_instance_id: Whether to only include results from a specific third
+ party instance
+
+ Returns:
+ Deferred[Dict[str, Any]]: The response from the remote server, or None if
+ `remote_server` is the same as the local server_name
+
+ Raises:
+ HttpResponseException: There was an exception returned from the remote server
+ SynapseException: M_FORBIDDEN when the remote server has disallowed publicRoom
+ requests over federation
+ """
return self.transport_layer.get_public_rooms(
- destination,
+ remote_server,
limit,
since_token,
search_filter,
@@ -878,72 +921,33 @@ class FederationClient(FederationBase):
third_party_instance_id=third_party_instance_id,
)
- @defer.inlineCallbacks
- def query_auth(self, destination, room_id, event_id, local_auth):
- """
- Params:
- destination (str)
- event_it (str)
- local_auth (list)
- """
- time_now = self._clock.time_msec()
-
- send_content = {"auth_chain": [e.get_pdu_json(time_now) for e in local_auth]}
-
- code, content = yield self.transport_layer.send_query_auth(
- destination=destination,
- room_id=room_id,
- event_id=event_id,
- content=send_content,
- )
-
- room_version = yield self.store.get_room_version(room_id)
- format_ver = room_version_to_event_format(room_version)
-
- auth_chain = [event_from_pdu_json(e, format_ver) for e in content["auth_chain"]]
-
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True, room_version=room_version
- )
-
- signed_auth.sort(key=lambda e: e.depth)
-
- ret = {
- "auth_chain": signed_auth,
- "rejects": content.get("rejects", []),
- "missing": content.get("missing", []),
- }
-
- return ret
-
- @defer.inlineCallbacks
- def get_missing_events(
+ async def get_missing_events(
self,
- destination,
- room_id,
- earliest_events_ids,
- latest_events,
- limit,
- min_depth,
- timeout,
- ):
+ destination: str,
+ room_id: str,
+ earliest_events_ids: Sequence[str],
+ latest_events: Iterable[EventBase],
+ limit: int,
+ min_depth: int,
+ timeout: int,
+ ) -> List[EventBase]:
"""Tries to fetch events we are missing. This is called when we receive
an event without having received all of its ancestors.
Args:
- destination (str)
- room_id (str)
- earliest_events_ids (list): List of event ids. Effectively the
+ destination
+ room_id
+ earliest_events_ids: List of event ids. Effectively the
events we expected to receive, but haven't. `get_missing_events`
should only return events that didn't happen before these.
- latest_events (list): List of events we have received that we don't
+ latest_events: List of events we have received that we don't
have all previous events for.
- limit (int): Maximum number of events to return.
- min_depth (int): Minimum depth of events tor return.
- timeout (int): Max time to wait in ms
+ limit: Maximum number of events to return.
+ min_depth: Minimum depth of events to return.
+ timeout: Max time to wait in ms
"""
try:
- content = yield self.transport_layer.get_missing_events(
+ content = await self.transport_layer.get_missing_events(
destination=destination,
room_id=room_id,
earliest_events=earliest_events_ids,
@@ -953,14 +957,13 @@ class FederationClient(FederationBase):
timeout=timeout,
)
- room_version = yield self.store.get_room_version(room_id)
- format_ver = room_version_to_event_format(room_version)
+ room_version = await self.store.get_room_version(room_id)
events = [
- event_from_pdu_json(e, format_ver) for e in content.get("events", [])
+ event_from_pdu_json(e, room_version) for e in content.get("events", [])
]
- signed_events = yield self._check_sigs_and_hash_and_fetch(
+ signed_events = await self._check_sigs_and_hash_and_fetch(
destination, events, outlier=False, room_version=room_version
)
except HttpResponseException as e:
@@ -973,14 +976,13 @@ class FederationClient(FederationBase):
return signed_events
- @defer.inlineCallbacks
- def forward_third_party_invite(self, destinations, room_id, event_dict):
+ async def forward_third_party_invite(self, destinations, room_id, event_dict):
for destination in destinations:
if destination == self.server_name:
continue
try:
- yield self.transport_layer.exchange_third_party_invite(
+ await self.transport_layer.exchange_third_party_invite(
destination=destination, room_id=room_id, event_dict=event_dict
)
return None
|