diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 0e22183280..eea64c1c9f 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,9 +23,13 @@ from twisted.internet.defer import DeferredList
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
+from synapse.api.room_versions import (
+ KNOWN_ROOM_VERSIONS,
+ EventFormatVersions,
+ RoomVersion,
+)
from synapse.crypto.event_signing import check_event_content_hash
-from synapse.events import event_type_from_format_version
+from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
@@ -33,7 +38,7 @@ from synapse.logging.context import (
make_deferred_yieldable,
preserve_fn,
)
-from synapse.types import get_domain_from_id
+from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
logger = logging.getLogger(__name__)
@@ -342,16 +347,15 @@ def _is_invite_via_3pid(event):
)
-def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
- """Construct a FrozenEvent from an event json received over federation
+def event_from_pdu_json(
+ pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False
+) -> EventBase:
+ """Construct an EventBase from an event json received over federation
Args:
- pdu_json (object): pdu as received over federation
- event_format_version (int): The event format version
- outlier (bool): True to mark this event as an outlier
-
- Returns:
- FrozenEvent
+ pdu_json: pdu as received over federation
+ room_version: The version of the room this event belongs to
+ outlier: True to mark this event as an outlier
Raises:
SynapseError: if the pdu is missing required fields or is otherwise
@@ -370,8 +374,7 @@ def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
- event = event_type_from_format_version(event_format_version)(pdu_json)
-
+ event = make_event_from_dict(pdu_json, room_version)
event.internal_metadata.outlier = outlier
return event
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f99d17a7de..4870e39652 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -17,7 +17,18 @@
import copy
import itertools
import logging
-from typing import Dict, Iterable
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ TypeVar,
+)
from prometheus_client import Counter
@@ -35,12 +46,14 @@ from synapse.api.errors import (
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
+ RoomVersion,
RoomVersions,
)
-from synapse.events import builder, room_version_to_event_format
+from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.utils import log_function
+from synapse.types import JsonDict
from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
@@ -52,6 +65,8 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t
PDU_RETRY_TIME_MS = 1 * 60 * 1000
+T = TypeVar("T")
+
class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response
@@ -170,21 +185,17 @@ class FederationClient(FederationBase):
sent_queries_counter.labels("client_one_time_keys").inc()
return self.transport_layer.claim_client_keys(destination, content, timeout)
- @defer.inlineCallbacks
- @log_function
- def backfill(self, dest, room_id, limit, extremities):
- """Requests some more historic PDUs for the given context from the
+ async def backfill(
+ self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
+ ) -> List[EventBase]:
+ """Requests some more historic PDUs for the given room from the
given destination server.
Args:
dest (str): The remote homeserver to ask.
room_id (str): The room_id to backfill.
- limit (int): The maximum number of PDUs to return.
- extremities (list): List of PDU id and origins of the first pdus
- we have seen from the context
-
- Returns:
- Deferred: Results in the received PDUs.
+ limit (int): The maximum number of events to return.
+ extremities (list): our current backwards extremities, to backfill from
"""
logger.debug("backfill extrem=%s", extremities)
@@ -192,34 +203,37 @@ class FederationClient(FederationBase):
if not extremities:
return
- transaction_data = yield self.transport_layer.backfill(
+ transaction_data = await self.transport_layer.backfill(
dest, room_id, extremities, limit
)
logger.debug("backfill transaction_data=%r", transaction_data)
- room_version = yield self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
+ room_version = await self.store.get_room_version(room_id)
pdus = [
- event_from_pdu_json(p, format_ver, outlier=False)
+ event_from_pdu_json(p, room_version, outlier=False)
for p in transaction_data["pdus"]
]
# FIXME: We should handle signature failures more gracefully.
- pdus[:] = yield make_deferred_yieldable(
+ pdus[:] = await make_deferred_yieldable(
defer.gatherResults(
- self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
+ self._check_sigs_and_hashes(room_version.identifier, pdus),
+ consumeErrors=True,
).addErrback(unwrapFirstError)
)
return pdus
- @defer.inlineCallbacks
- @log_function
- def get_pdu(
- self, destinations, event_id, room_version, outlier=False, timeout=None
- ):
+ async def get_pdu(
+ self,
+ destinations: Iterable[str],
+ event_id: str,
+ room_version: RoomVersion,
+ outlier: bool = False,
+ timeout: Optional[int] = None,
+ ) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home
servers.
@@ -227,18 +241,17 @@ class FederationClient(FederationBase):
one succeeds.
Args:
- destinations (list): Which homeservers to query
- event_id (str): event to fetch
- room_version (str): version of the room
- outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
+ destinations: Which homeservers to query
+ event_id: event to fetch
+ room_version: version of the room
+ outlier: Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
- timeout (int): How long to try (in ms) each destination for before
+ timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.
Returns:
- Deferred: Results in the requested PDU, or None if we were unable to find
- it.
+ The requested PDU, or None if we were unable to find it.
"""
# TODO: Rate limit the number of times we try and get the same event.
@@ -249,8 +262,6 @@ class FederationClient(FederationBase):
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
- format_ver = room_version_to_event_format(room_version)
-
signed_pdu = None
for destination in destinations:
now = self._clock.time_msec()
@@ -259,7 +270,7 @@ class FederationClient(FederationBase):
continue
try:
- transaction_data = yield self.transport_layer.get_event(
+ transaction_data = await self.transport_layer.get_event(
destination, event_id, timeout=timeout
)
@@ -271,7 +282,7 @@ class FederationClient(FederationBase):
)
pdu_list = [
- event_from_pdu_json(p, format_ver, outlier=outlier)
+ event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
]
@@ -279,7 +290,9 @@ class FederationClient(FederationBase):
pdu = pdu_list[0]
# Check signatures are correct.
- signed_pdu = yield self._check_sigs_and_hash(room_version, pdu)
+ signed_pdu = await self._check_sigs_and_hash(
+ room_version.identifier, pdu
+ )
break
@@ -309,15 +322,16 @@ class FederationClient(FederationBase):
return signed_pdu
- @defer.inlineCallbacks
- def get_room_state_ids(self, destination: str, room_id: str, event_id: str):
+ async def get_room_state_ids(
+ self, destination: str, room_id: str, event_id: str
+ ) -> Tuple[List[str], List[str]]:
"""Calls the /state_ids endpoint to fetch the state at a particular point
in the room, and the auth events for the given event
Returns:
- Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids)
+ a tuple of (state event_ids, auth event_ids)
"""
- result = yield self.transport_layer.get_room_state_ids(
+ result = await self.transport_layer.get_room_state_ids(
destination, room_id, event_id=event_id
)
@@ -331,37 +345,39 @@ class FederationClient(FederationBase):
return state_event_ids, auth_event_ids
- @defer.inlineCallbacks
- @log_function
- def get_event_auth(self, destination, room_id, event_id):
- res = yield self.transport_layer.get_event_auth(destination, room_id, event_id)
+ async def get_event_auth(self, destination, room_id, event_id):
+ res = await self.transport_layer.get_event_auth(destination, room_id, event_id)
- room_version = yield self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
+ room_version = await self.store.get_room_version(room_id)
auth_chain = [
- event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"]
+ event_from_pdu_json(p, room_version, outlier=True)
+ for p in res["auth_chain"]
]
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True, room_version=room_version
+ signed_auth = await self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True, room_version=room_version.identifier
)
signed_auth.sort(key=lambda e: e.depth)
return signed_auth
- @defer.inlineCallbacks
- def _try_destination_list(self, description, destinations, callback):
+ async def _try_destination_list(
+ self,
+ description: str,
+ destinations: Iterable[str],
+ callback: Callable[[str], Awaitable[T]],
+ ) -> T:
"""Try an operation on a series of servers, until it succeeds
Args:
- description (unicode): description of the operation we're doing, for logging
+ description: description of the operation we're doing, for logging
- destinations (Iterable[unicode]): list of server_names to try
+ destinations: list of server_names to try
- callback (callable): Function to run for each server. Passed a single
- argument: the server_name to try. May return a deferred.
+ callback: Function to run for each server. Passed a single
+ argument: the server_name to try.
If the callback raises a CodeMessageException with a 300/400 code,
attempts to perform the operation stop immediately and the exception is
@@ -372,7 +388,7 @@ class FederationClient(FederationBase):
suppressed if the exception is an InvalidResponseError.
Returns:
- The [Deferred] result of callback, if it succeeds
+ The result of callback, if it succeeds
Raises:
SynapseError if the chosen remote server returns a 300/400 code, or
@@ -383,7 +399,7 @@ class FederationClient(FederationBase):
continue
try:
- res = yield callback(destination)
+ res = await callback(destination)
return res
except InvalidResponseError as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
@@ -402,12 +418,12 @@ class FederationClient(FederationBase):
)
except Exception:
logger.warning(
- "Failed to %s via %s", description, destination, exc_info=1
+ "Failed to %s via %s", description, destination, exc_info=True
)
raise SynapseError(502, "Failed to %s via any server" % (description,))
- def make_membership_event(
+ async def make_membership_event(
self,
destinations: Iterable[str],
room_id: str,
@@ -415,7 +431,7 @@ class FederationClient(FederationBase):
membership: str,
content: dict,
params: Dict[str, str],
- ):
+ ) -> Tuple[str, EventBase, RoomVersion]:
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -436,19 +452,19 @@ class FederationClient(FederationBase):
content: Any additional data to put into the content field of the
event.
params: Query parameters to include in the request.
- Return:
- Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of
+
+ Returns:
`(origin, event, room_version)` where origin is the remote
homeserver which generated the event, and room_version is the
version of the room.
- Fails with a `UnsupportedRoomVersionError` if remote responds with
- a room version we don't understand.
+ Raises:
+ UnsupportedRoomVersionError: if remote responds with
+ a room version we don't understand.
- Fails with a ``SynapseError`` if the chosen remote server
- returns a 300/400 code.
+ SynapseError: if the chosen remote server returns a 300/400 code.
- Fails with a ``RuntimeError`` if no servers were reachable.
+ RuntimeError: if no servers were reachable.
"""
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
@@ -457,9 +473,8 @@ class FederationClient(FederationBase):
% (membership, ",".join(valid_memberships))
)
- @defer.inlineCallbacks
- def send_request(destination):
- ret = yield self.transport_layer.make_membership_event(
+ async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]:
+ ret = await self.transport_layer.make_membership_event(
destination, room_id, user_id, membership, params
)
@@ -492,88 +507,83 @@ class FederationClient(FederationBase):
event_dict=pdu_dict,
)
- return (destination, ev, room_version)
+ return destination, ev, room_version
- return self._try_destination_list(
+ return await self._try_destination_list(
"make_" + membership, destinations, send_request
)
- def send_join(self, destinations, pdu, event_format_version):
+ async def send_join(
+ self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion
+ ) -> Dict[str, Any]:
"""Sends a join event to one of a list of homeservers.
Doing so will cause the remote server to add the event to the graph,
and send the event out to the rest of the federation.
Args:
- destinations (str): Candidate homeservers which are probably
+ destinations: Candidate homeservers which are probably
participating in the room.
- pdu (BaseEvent): event to be sent
- event_format_version (int): The event format version
+ pdu: event to be sent
+ room_version: the version of the room (according to the server that
+ did the make_join)
- Return:
- Deferred: resolves to a dict with members ``origin`` (a string
- giving the serer the event was sent to, ``state`` (?) and
+ Returns:
+ a dict with members ``origin`` (a string
+ giving the server the event was sent to, ``state`` (?) and
``auth_chain``.
- Fails with a ``SynapseError`` if the chosen remote server
- returns a 300/400 code.
+ Raises:
+ SynapseError: if the chosen remote server returns a 300/400 code.
- Fails with a ``RuntimeError`` if no servers were reachable.
+ RuntimeError: if no servers were reachable.
"""
- def check_authchain_validity(signed_auth_chain):
- for e in signed_auth_chain:
- if e.type == EventTypes.Create:
- create_event = e
- break
- else:
- raise InvalidResponseError("no %s in auth chain" % (EventTypes.Create,))
-
- # the room version should be sane.
- room_version = create_event.content.get("room_version", "1")
- if room_version not in KNOWN_ROOM_VERSIONS:
- # This shouldn't be possible, because the remote server should have
- # rejected the join attempt during make_join.
- raise InvalidResponseError(
- "room appears to have unsupported version %s" % (room_version,)
- )
-
- @defer.inlineCallbacks
- def send_request(destination):
- content = yield self._do_send_join(destination, pdu)
+ async def send_request(destination) -> Dict[str, Any]:
+ content = await self._do_send_join(destination, pdu)
logger.debug("Got content: %s", content)
state = [
- event_from_pdu_json(p, event_format_version, outlier=True)
+ event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("state", [])
]
auth_chain = [
- event_from_pdu_json(p, event_format_version, outlier=True)
+ event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("auth_chain", [])
]
pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
- room_version = None
+ create_event = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
- room_version = e.content.get(
- "room_version", RoomVersions.V1.identifier
- )
+ create_event = e
break
- if room_version is None:
+ if create_event is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
- valid_pdus = yield self._check_sigs_and_hash_and_fetch(
+ # the room version should be sane.
+ create_room_version = create_event.content.get(
+ "room_version", RoomVersions.V1.identifier
+ )
+ if create_room_version != room_version.identifier:
+ # either the server that fulfilled the make_join, or the server that is
+ # handling the send_join, is lying.
+ raise InvalidResponseError(
+ "Unexpected room version %s in create event"
+ % (create_room_version,)
+ )
+
+ valid_pdus = await self._check_sigs_and_hash_and_fetch(
destination,
list(pdus.values()),
outlier=True,
- room_version=room_version,
+ room_version=room_version.identifier,
)
valid_pdus_map = {p.event_id: p for p in valid_pdus}
@@ -597,7 +607,17 @@ class FederationClient(FederationBase):
for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata)
- check_authchain_validity(signed_auth)
+ # double-check that the same create event has ended up in the auth chain
+ auth_chain_create_events = [
+ e.event_id
+ for e in signed_auth
+ if (e.type, e.state_key) == (EventTypes.Create, "")
+ ]
+ if auth_chain_create_events != [create_event.event_id]:
+ raise InvalidResponseError(
+ "Unexpected create event(s) in auth chain"
+ % (auth_chain_create_events,)
+ )
return {
"state": signed_state,
@@ -605,14 +625,13 @@ class FederationClient(FederationBase):
"origin": destination,
}
- return self._try_destination_list("send_join", destinations, send_request)
+ return await self._try_destination_list("send_join", destinations, send_request)
- @defer.inlineCallbacks
- def _do_send_join(self, destination, pdu):
+ async def _do_send_join(self, destination: str, pdu: EventBase):
time_now = self._clock.time_msec()
try:
- content = yield self.transport_layer.send_join_v2(
+ content = await self.transport_layer.send_join_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@@ -634,7 +653,7 @@ class FederationClient(FederationBase):
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
- resp = yield self.transport_layer.send_join_v1(
+ resp = await self.transport_layer.send_join_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@@ -645,51 +664,45 @@ class FederationClient(FederationBase):
# content.
return resp[1]
- @defer.inlineCallbacks
- def send_invite(self, destination, room_id, event_id, pdu):
- room_version = yield self.store.get_room_version_id(room_id)
+ async def send_invite(
+ self, destination: str, room_id: str, event_id: str, pdu: EventBase,
+ ) -> EventBase:
+ room_version = await self.store.get_room_version(room_id)
- content = yield self._do_send_invite(destination, pdu, room_version)
+ content = await self._do_send_invite(destination, pdu, room_version)
pdu_dict = content["event"]
logger.debug("Got response to send_invite: %s", pdu_dict)
- room_version = yield self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
-
- pdu = event_from_pdu_json(pdu_dict, format_ver)
+ pdu = event_from_pdu_json(pdu_dict, room_version)
# Check signatures are correct.
- pdu = yield self._check_sigs_and_hash(room_version, pdu)
+ pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
# FIXME: We should handle signature failures more gracefully.
return pdu
- @defer.inlineCallbacks
- def _do_send_invite(self, destination, pdu, room_version):
+ async def _do_send_invite(
+ self, destination: str, pdu: EventBase, room_version: RoomVersion
+ ) -> JsonDict:
"""Actually sends the invite, first trying v2 API and falling back to
v1 API if necessary.
- Args:
- destination (str): Target server
- pdu (FrozenEvent)
- room_version (str)
-
Returns:
- dict: The event as a dict as returned by the remote server
+ The event as a dict as returned by the remote server
"""
time_now = self._clock.time_msec()
try:
- content = yield self.transport_layer.send_invite_v2(
+ content = await self.transport_layer.send_invite_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content={
"event": pdu.get_pdu_json(time_now),
- "room_version": room_version,
+ "room_version": room_version.identifier,
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
},
)
@@ -707,8 +720,7 @@ class FederationClient(FederationBase):
# Otherwise, we assume that the remote server doesn't understand
# the v2 invite API. That's ok provided the room uses old-style event
# IDs.
- v = KNOWN_ROOM_VERSIONS.get(room_version)
- if v.event_format != EventFormatVersions.V1:
+ if room_version.event_format != EventFormatVersions.V1:
raise SynapseError(
400,
"User's homeserver does not support this room version",
@@ -722,7 +734,7 @@ class FederationClient(FederationBase):
# Didn't work, try v1 API.
# Note the v1 API returns a tuple of `(200, content)`
- _, content = yield self.transport_layer.send_invite_v1(
+ _, content = await self.transport_layer.send_invite_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@@ -730,7 +742,7 @@ class FederationClient(FederationBase):
)
return content
- def send_leave(self, destinations, pdu):
+ async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None:
"""Sends a leave event to one of a list of homeservers.
Doing so will cause the remote server to add the event to the graph,
@@ -739,34 +751,29 @@ class FederationClient(FederationBase):
This is mostly useful to reject received invites.
Args:
- destinations (str): Candidate homeservers which are probably
+ destinations: Candidate homeservers which are probably
participating in the room.
- pdu (BaseEvent): event to be sent
+ pdu: event to be sent
- Return:
- Deferred: resolves to None.
-
- Fails with a ``SynapseError`` if the chosen remote server
- returns a 300/400 code.
+ Raises:
+ SynapseError if the chosen remote server returns a 300/400 code.
- Fails with a ``RuntimeError`` if no servers were reachable.
+ RuntimeError if no servers were reachable.
"""
- @defer.inlineCallbacks
- def send_request(destination):
- content = yield self._do_send_leave(destination, pdu)
-
+ async def send_request(destination: str) -> None:
+ content = await self._do_send_leave(destination, pdu)
logger.debug("Got content: %s", content)
- return None
- return self._try_destination_list("send_leave", destinations, send_request)
+ return await self._try_destination_list(
+ "send_leave", destinations, send_request
+ )
- @defer.inlineCallbacks
- def _do_send_leave(self, destination, pdu):
+ async def _do_send_leave(self, destination, pdu):
time_now = self._clock.time_msec()
try:
- content = yield self.transport_layer.send_leave_v2(
+ content = await self.transport_layer.send_leave_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@@ -788,7 +795,7 @@ class FederationClient(FederationBase):
logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
- resp = yield self.transport_layer.send_leave_v1(
+ resp = await self.transport_layer.send_leave_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@@ -820,34 +827,33 @@ class FederationClient(FederationBase):
third_party_instance_id=third_party_instance_id,
)
- @defer.inlineCallbacks
- def get_missing_events(
+ async def get_missing_events(
self,
- destination,
- room_id,
- earliest_events_ids,
- latest_events,
- limit,
- min_depth,
- timeout,
- ):
+ destination: str,
+ room_id: str,
+ earliest_events_ids: Sequence[str],
+ latest_events: Iterable[EventBase],
+ limit: int,
+ min_depth: int,
+ timeout: int,
+ ) -> List[EventBase]:
"""Tries to fetch events we are missing. This is called when we receive
an event without having received all of its ancestors.
Args:
- destination (str)
- room_id (str)
- earliest_events_ids (list): List of event ids. Effectively the
+ destination
+ room_id
+ earliest_events_ids: List of event ids. Effectively the
events we expected to receive, but haven't. `get_missing_events`
should only return events that didn't happen before these.
- latest_events (list): List of events we have received that we don't
+ latest_events: List of events we have received that we don't
have all previous events for.
- limit (int): Maximum number of events to return.
- min_depth (int): Minimum depth of events tor return.
- timeout (int): Max time to wait in ms
+ limit: Maximum number of events to return.
+ min_depth: Minimum depth of events to return.
+ timeout: Max time to wait in ms
"""
try:
- content = yield self.transport_layer.get_missing_events(
+ content = await self.transport_layer.get_missing_events(
destination=destination,
room_id=room_id,
earliest_events=earliest_events_ids,
@@ -857,15 +863,14 @@ class FederationClient(FederationBase):
timeout=timeout,
)
- room_version = yield self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
+ room_version = await self.store.get_room_version(room_id)
events = [
- event_from_pdu_json(e, format_ver) for e in content.get("events", [])
+ event_from_pdu_json(e, room_version) for e in content.get("events", [])
]
- signed_events = yield self._check_sigs_and_hash_and_fetch(
- destination, events, outlier=False, room_version=room_version
+ signed_events = await self._check_sigs_and_hash_and_fetch(
+ destination, events, outlier=False, room_version=room_version.identifier
)
except HttpResponseException as e:
if not e.code == 400:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a4c97ed458..7f9da49326 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -38,7 +38,6 @@ from synapse.api.errors import (
UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
-from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
@@ -54,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
-from synapse.types import get_domain_from_id
+from synapse.types import JsonDict, get_domain_from_id
from synapse.util import glob_to_regex, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -82,6 +81,8 @@ class FederationServer(FederationBase):
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()
+ self.device_handler = hs.get_device_handler()
+
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -234,24 +235,17 @@ class FederationServer(FederationBase):
continue
try:
- room_version = await self.store.get_room_version_id(room_id)
+ room_version = await self.store.get_room_version(room_id)
except NotFoundError:
logger.info("Ignoring PDU for unknown room_id: %s", room_id)
continue
-
- try:
- format_ver = room_version_to_event_format(room_version)
- except UnsupportedRoomVersionError:
+ except UnsupportedRoomVersionError as e:
# this can happen if support for a given room version is withdrawn,
# so that we still get events for said room.
- logger.info(
- "Ignoring PDU for room %s with unknown version %s",
- room_id,
- room_version,
- )
+ logger.info("Ignoring PDU: %s", e)
continue
- event = event_from_pdu_json(p, format_ver)
+ event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event)
pdu_results = {}
@@ -302,7 +296,12 @@ class FederationServer(FederationBase):
async def _process_edu(edu_dict):
received_edus_counter.inc()
- edu = Edu(**edu_dict)
+ edu = Edu(
+ origin=origin,
+ destination=self.server_name,
+ edu_type=edu_dict["edu_type"],
+ content=edu_dict["content"],
+ )
await self.registry.on_edu(edu.edu_type, origin, edu.content)
await concurrently_execute(
@@ -396,20 +395,21 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
- async def on_invite_request(self, origin, content, room_version):
- if room_version not in KNOWN_ROOM_VERSIONS:
+ async def on_invite_request(
+ self, origin: str, content: JsonDict, room_version_id: str
+ ):
+ room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+ if not room_version:
raise SynapseError(
400,
"Homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
- format_ver = room_version_to_event_format(room_version)
-
- pdu = event_from_pdu_json(content, format_ver)
+ pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
- pdu = await self._check_sigs_and_hash(room_version, pdu)
+ pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version)
time_now = self._clock.time_msec()
return {"event": ret_pdu.get_pdu_json(time_now)}
@@ -417,16 +417,15 @@ class FederationServer(FederationBase):
async def on_send_join_request(self, origin, content, room_id):
logger.debug("on_send_join_request: content: %s", content)
- room_version = await self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
- pdu = event_from_pdu_json(content, format_ver)
+ room_version = await self.store.get_room_version(room_id)
+ pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
- pdu = await self._check_sigs_and_hash(room_version, pdu)
+ pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
@@ -448,16 +447,15 @@ class FederationServer(FederationBase):
async def on_send_leave_request(self, origin, content, room_id):
logger.debug("on_send_leave_request: content: %s", content)
- room_version = await self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
- pdu = event_from_pdu_json(content, format_ver)
+ room_version = await self.store.get_room_version(room_id)
+ pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
- pdu = await self._check_sigs_and_hash(room_version, pdu)
+ pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
await self.handler.on_send_leave_request(origin, pdu)
return {}
@@ -495,15 +493,14 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- room_version = await self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
+ room_version = await self.store.get_room_version(room_id)
auth_chain = [
- event_from_pdu_json(e, format_ver) for e in content["auth_chain"]
+ event_from_pdu_json(e, room_version) for e in content["auth_chain"]
]
signed_auth = await self._check_sigs_and_hash_and_fetch(
- origin, auth_chain, outlier=True, room_version=room_version
+ origin, auth_chain, outlier=True, room_version=room_version.identifier
)
ret = await self.handler.on_query_auth(
@@ -528,8 +525,9 @@ class FederationServer(FederationBase):
def on_query_client_keys(self, origin, content):
return self.on_query_request("client_keys", content)
- def on_query_user_devices(self, origin, user_id):
- return self.on_query_request("user_devices", user_id)
+ async def on_query_user_devices(self, origin: str, user_id: str):
+ keys = await self.device_handler.on_federation_query_user_devices(user_id)
+ return 200, keys
@trace
async def on_claim_client_keys(self, origin, content):
@@ -570,7 +568,7 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- logger.info(
+ logger.debug(
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
" limit: %d",
earliest_events,
@@ -583,11 +581,11 @@ class FederationServer(FederationBase):
)
if len(missing_events) < 5:
- logger.info(
+ logger.debug(
"Returning %d events: %r", len(missing_events), missing_events
)
else:
- logger.info("Returning %d events", len(missing_events))
+ logger.debug("Returning %d events", len(missing_events))
time_now = self._clock.time_msec()
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 36c83c3027..233cb33daf 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+from typing import Dict, Hashable, Iterable, List, Optional, Set
from six import itervalues
@@ -23,6 +24,7 @@ from twisted.internet import defer
import synapse
import synapse.metrics
+from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
@@ -39,6 +41,8 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.presence import UserPresenceState
+from synapse.types import ReadReceipt
from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@@ -68,7 +72,7 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs)
# map from destination to PerDestinationQueue
- self._per_destination_queues = {} # type: dict[str, PerDestinationQueue]
+ self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
@@ -84,7 +88,7 @@ class FederationSender(object):
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
- self.pending_presence = {}
+ self.pending_presence = {} # type: Dict[str, UserPresenceState]
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
@@ -116,20 +120,17 @@ class FederationSender(object):
# and that there is a pending call to _flush_rrs_for_room in the system.
self._queues_awaiting_rr_flush_by_room = (
{}
- ) # type: dict[str, set[PerDestinationQueue]]
+ ) # type: Dict[str, Set[PerDestinationQueue]]
self._rr_txn_interval_per_room_ms = (
- 1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second
+ 1000.0 / hs.config.federation_rr_transactions_per_room_per_second
)
- def _get_per_destination_queue(self, destination):
+ def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
Args:
- destination (str): server_name of remote server
-
- Returns:
- PerDestinationQueue
+ destination: server_name of remote server
"""
queue = self._per_destination_queues.get(destination)
if not queue:
@@ -137,7 +138,7 @@ class FederationSender(object):
self._per_destination_queues[destination] = queue
return queue
- def notify_new_events(self, current_id):
+ def notify_new_events(self, current_id: int) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
@@ -151,13 +152,12 @@ class FederationSender(object):
"process_event_queue_for_federation", self._process_event_queue_loop
)
- @defer.inlineCallbacks
- def _process_event_queue_loop(self):
+ async def _process_event_queue_loop(self) -> None:
try:
self._is_processing = True
while True:
- last_token = yield self.store.get_federation_out_pos("events")
- next_token, events = yield self.store.get_all_new_events_stream(
+ last_token = await self.store.get_federation_out_pos("events")
+ next_token, events = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
)
@@ -166,8 +166,7 @@ class FederationSender(object):
if not events and next_token >= self._last_poked_id:
break
- @defer.inlineCallbacks
- def handle_event(event):
+ async def handle_event(event: EventBase) -> None:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
@@ -184,7 +183,7 @@ class FederationSender(object):
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
- destinations = yield self.state.get_hosts_in_room_at_events(
+ destinations = await self.state.get_hosts_in_room_at_events(
event.room_id, event_ids=event.prev_event_ids()
)
except Exception:
@@ -206,17 +205,16 @@ class FederationSender(object):
self._send_pdu(event, destinations)
- @defer.inlineCallbacks
- def handle_room_events(events):
+ async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
for event in events:
- yield handle_event(event)
+ await handle_event(event)
- events_by_room = {}
+ events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
- yield make_deferred_yieldable(
+ await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
@@ -226,11 +224,11 @@ class FederationSender(object):
)
)
- yield self.store.update_federation_out_pos("events", next_token)
+ await self.store.update_federation_out_pos("events", next_token)
if events:
now = self.clock.time_msec()
- ts = yield self.store.get_received_ts(events[-1].event_id)
+ ts = await self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.labels(
"federation_sender"
@@ -254,7 +252,7 @@ class FederationSender(object):
finally:
self._is_processing = False
- def _send_pdu(self, pdu, destinations):
+ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
@@ -276,11 +274,11 @@ class FederationSender(object):
self._get_per_destination_queue(destination).send_pdu(pdu, order)
@defer.inlineCallbacks
- def send_read_receipt(self, receipt):
+ def send_read_receipt(self, receipt: ReadReceipt):
"""Send a RR to any other servers in the room
Args:
- receipt (synapse.types.ReadReceipt): receipt to be sent
+ receipt: receipt to be sent
"""
# Some background on the rate-limiting going on here.
@@ -343,7 +341,7 @@ class FederationSender(object):
else:
queue.flush_read_receipts_for_room(room_id)
- def _schedule_rr_flush_for_room(self, room_id, n_domains):
+ def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
# that is going to cause approximately len(domains) transactions, so now back
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
@@ -352,7 +350,7 @@ class FederationSender(object):
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
self._queues_awaiting_rr_flush_by_room[room_id] = set()
- def _flush_rrs_for_room(self, room_id):
+ def _flush_rrs_for_room(self, room_id: str) -> None:
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
logger.debug("Flushing RRs in %s to %s", room_id, queues)
@@ -368,14 +366,11 @@ class FederationSender(object):
@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
- def send_presence(self, states):
+ def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
-
- Args:
- states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
@@ -412,11 +407,10 @@ class FederationSender(object):
finally:
self._processing_pending_presence = False
- def send_presence_to_destinations(self, states, destinations):
+ def send_presence_to_destinations(
+ self, states: List[UserPresenceState], destinations: List[str]
+ ) -> None:
"""Send the given presence states to the given destinations.
-
- Args:
- states (list[UserPresenceState])
destinations (list[str])
"""
@@ -431,12 +425,9 @@ class FederationSender(object):
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
- def _process_presence_inner(self, states):
+ def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
-
- Args:
- states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
@@ -446,14 +437,20 @@ class FederationSender(object):
continue
self._get_per_destination_queue(destination).send_presence(states)
- def build_and_send_edu(self, destination, edu_type, content, key=None):
+ def build_and_send_edu(
+ self,
+ destination: str,
+ edu_type: str,
+ content: dict,
+ key: Optional[Hashable] = None,
+ ):
"""Construct an Edu object, and queue it for sending
Args:
- destination (str): name of server to send to
- edu_type (str): type of EDU to send
- content (dict): content of EDU
- key (Any|None): clobbering key for this edu
+ destination: name of server to send to
+ edu_type: type of EDU to send
+ content: content of EDU
+ key: clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
@@ -468,12 +465,12 @@ class FederationSender(object):
self.send_edu(edu, key)
- def send_edu(self, edu, key):
+ def send_edu(self, edu: Edu, key: Optional[Hashable]):
"""Queue an EDU for sending
Args:
- edu (Edu): edu to send
- key (Any|None): clobbering key for this edu
+ edu: edu to send
+ key: clobbering key for this edu
"""
queue = self._get_per_destination_queue(edu.destination)
if key:
@@ -481,7 +478,7 @@ class FederationSender(object):
else:
queue.send_edu(edu)
- def send_device_messages(self, destination):
+ def send_device_messages(self, destination: str):
if destination == self.server_name:
logger.warning("Not sending device update to ourselves")
return
@@ -501,5 +498,5 @@ class FederationSender(object):
self._get_per_destination_queue(destination).attempt_new_transaction()
- def get_current_token(self):
+ def get_current_token(self) -> int:
return 0
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 5012aaea35..e13cd20ffa 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,11 +15,11 @@
# limitations under the License.
import datetime
import logging
+from typing import Dict, Hashable, Iterable, List, Tuple
from prometheus_client import Counter
-from twisted.internet import defer
-
+import synapse.server
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
@@ -31,7 +31,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
-from synapse.types import StateMap
+from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
# This is defined in the Matrix spec and enforced by the receiver.
@@ -56,13 +56,18 @@ class PerDestinationQueue(object):
Manages the per-destination transmission queues.
Args:
- hs (synapse.HomeServer):
- transaction_sender (TransactionManager):
- destination (str): the server_name of the destination that we are managing
+ hs
+ transaction_sender
+ destination: the server_name of the destination that we are managing
transmission for.
"""
- def __init__(self, hs, transaction_manager, destination):
+ def __init__(
+ self,
+ hs: "synapse.server.HomeServer",
+ transaction_manager: "synapse.federation.sender.TransactionManager",
+ destination: str,
+ ):
self._server_name = hs.hostname
self._clock = hs.get_clock()
self._store = hs.get_datastore()
@@ -72,20 +77,20 @@ class PerDestinationQueue(object):
self.transmission_loop_running = False
# a list of tuples of (pending pdu, order)
- self._pending_pdus = [] # type: list[tuple[EventBase, int]]
- self._pending_edus = [] # type: list[Edu]
+ self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ self._pending_edus = [] # type: List[Edu]
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
- self._pending_edus_keyed = {} # type: StateMap[Edu]
+ self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu]
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
- self._pending_presence = {} # type: dict[str, UserPresenceState]
+ self._pending_presence = {} # type: Dict[str, UserPresenceState]
# room_id -> receipt_type -> user_id -> receipt_dict
- self._pending_rrs = {}
+ self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]]
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@@ -95,50 +100,50 @@ class PerDestinationQueue(object):
# stream_id of last successfully sent device list update.
self._last_device_list_stream_id = 0
- def __str__(self):
+ def __str__(self) -> str:
return "PerDestinationQueue[%s]" % self._destination
- def pending_pdu_count(self):
+ def pending_pdu_count(self) -> int:
return len(self._pending_pdus)
- def pending_edu_count(self):
+ def pending_edu_count(self) -> int:
return (
len(self._pending_edus)
+ len(self._pending_presence)
+ len(self._pending_edus_keyed)
)
- def send_pdu(self, pdu, order):
+ def send_pdu(self, pdu: EventBase, order: int) -> None:
"""Add a PDU to the queue, and start the transmission loop if neccessary
Args:
- pdu (EventBase): pdu to send
- order (int):
+ pdu: pdu to send
+ order
"""
self._pending_pdus.append((pdu, order))
self.attempt_new_transaction()
- def send_presence(self, states):
+ def send_presence(self, states: Iterable[UserPresenceState]) -> None:
"""Add presence updates to the queue. Start the transmission loop if neccessary.
Args:
- states (iterable[UserPresenceState]): presence to send
+ states: presence to send
"""
self._pending_presence.update({state.user_id: state for state in states})
self.attempt_new_transaction()
- def queue_read_receipt(self, receipt):
+ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
(see flush_read_receipts_for_room)
Args:
- receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
+ receipt: receipt to be queued
"""
self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
receipt.receipt_type, {}
)[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
- def flush_read_receipts_for_room(self, room_id):
+ def flush_read_receipts_for_room(self, room_id: str) -> None:
# if we don't have any read-receipts for this room, it may be that we've already
# sent them out, so we don't need to flush.
if room_id not in self._pending_rrs:
@@ -146,15 +151,15 @@ class PerDestinationQueue(object):
self._rrs_pending_flush = True
self.attempt_new_transaction()
- def send_keyed_edu(self, edu, key):
+ def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
self.attempt_new_transaction()
- def send_edu(self, edu):
+ def send_edu(self, edu) -> None:
self._pending_edus.append(edu)
self.attempt_new_transaction()
- def attempt_new_transaction(self):
+ def attempt_new_transaction(self) -> None:
"""Try to start a new transaction to this destination
If there is already a transaction in progress to this destination,
@@ -177,23 +182,22 @@ class PerDestinationQueue(object):
self._transaction_transmission_loop,
)
- @defer.inlineCallbacks
- def _transaction_transmission_loop(self):
- pending_pdus = []
+ async def _transaction_transmission_loop(self) -> None:
+ pending_pdus = [] # type: List[Tuple[EventBase, int]]
try:
self.transmission_loop_running = True
# This will throw if we wouldn't retry. We do this here so we fail
# quickly, but we will later check this again in the http client,
# hence why we throw the result away.
- yield get_retry_limiter(self._destination, self._clock, self._store)
+ await get_retry_limiter(self._destination, self._clock, self._store)
pending_pdus = []
while True:
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2
- device_update_edus, dev_list_id = yield self._get_device_update_edus(
+ device_update_edus, dev_list_id = await self._get_device_update_edus(
limit
)
@@ -202,7 +206,7 @@ class PerDestinationQueue(object):
(
to_device_edus,
device_stream_id,
- ) = yield self._get_to_device_message_edus(limit)
+ ) = await self._get_to_device_message_edus(limit)
pending_edus = device_update_edus + to_device_edus
@@ -269,7 +273,7 @@ class PerDestinationQueue(object):
# END CRITICAL SECTION
- success = yield self._transaction_manager.send_new_transaction(
+ success = await self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
@@ -280,7 +284,7 @@ class PerDestinationQueue(object):
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
- yield self._store.delete_device_msgs_for_remote(
+ await self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
@@ -289,7 +293,7 @@ class PerDestinationQueue(object):
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
- yield self._store.mark_as_sent_devices_by_remote(
+ await self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
@@ -334,7 +338,7 @@ class PerDestinationQueue(object):
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
- def _get_rr_edus(self, force_flush):
+ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
return
if not force_flush and not self._rrs_pending_flush:
@@ -351,17 +355,16 @@ class PerDestinationQueue(object):
self._rrs_pending_flush = False
yield edu
- def _pop_pending_edus(self, limit):
+ def _pop_pending_edus(self, limit: int) -> List[Edu]:
pending_edus = self._pending_edus
pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:]
return pending_edus
- @defer.inlineCallbacks
- def _get_device_update_edus(self, limit):
+ async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]:
last_device_list = self._last_device_list_stream_id
# Retrieve list of new device updates to send to the destination
- now_stream_id, results = yield self._store.get_device_updates_by_remote(
+ now_stream_id, results = await self._store.get_device_updates_by_remote(
self._destination, last_device_list, limit=limit
)
edus = [
@@ -378,11 +381,10 @@ class PerDestinationQueue(object):
return (edus, now_stream_id)
- @defer.inlineCallbacks
- def _get_to_device_message_edus(self, limit):
+ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]:
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
- contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
+ contents, stream_id = await self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id, limit
)
edus = [
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 5fed626d5b..3c2a02a3b3 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -13,14 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import List
from canonicaljson import json
-from twisted.internet import defer
-
+import synapse.server
from synapse.api.errors import HttpResponseException
+from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
-from synapse.federation.units import Transaction
+from synapse.federation.units import Edu, Transaction
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
@@ -39,7 +40,7 @@ class TransactionManager(object):
shared between PerDestinationQueue objects
"""
- def __init__(self, hs):
+ def __init__(self, hs: "synapse.server.HomeServer"):
self._server_name = hs.hostname
self.clock = hs.get_clock() # nb must be called this for @measure_func
self._store = hs.get_datastore()
@@ -50,8 +51,9 @@ class TransactionManager(object):
self._next_txn_id = int(self.clock.time_msec())
@measure_func("_send_new_transaction")
- @defer.inlineCallbacks
- def send_new_transaction(self, destination, pending_pdus, pending_edus):
+ async def send_new_transaction(
+ self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
+ ):
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
@@ -127,7 +129,7 @@ class TransactionManager(object):
return data
try:
- response = yield self._transport_layer.send_transaction(
+ response = await self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 125eadd796..92a9ae2320 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -158,7 +158,7 @@ class Authenticator(object):
origin, json_request, now, "Incoming request"
)
- logger.info("Request from %s", origin)
+ logger.debug("Request from %s", origin)
request.authenticated_entity = origin
# If we get a valid signed request from the other side, its probably
@@ -579,7 +579,7 @@ class FederationV1InviteServlet(BaseFederationServlet):
# state resolution algorithm, and we don't use that for processing
# invites
content = await self.handler.on_invite_request(
- origin, content, room_version=RoomVersions.V1.identifier
+ origin, content, room_version_id=RoomVersions.V1.identifier
)
# V1 federation API is defined to return a content of `[200, {...}]`
@@ -606,7 +606,7 @@ class FederationV2InviteServlet(BaseFederationServlet):
event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state
content = await self.handler.on_invite_request(
- origin, event, room_version=room_version
+ origin, event, room_version_id=room_version
)
return 200, content
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index b4d743cde7..6b32e0dcbf 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -19,11 +19,15 @@ server protocol.
import logging
+import attr
+
+from synapse.types import JsonDict
from synapse.util.jsonobject import JsonEncodedObject
logger = logging.getLogger(__name__)
+@attr.s(slots=True)
class Edu(JsonEncodedObject):
""" An Edu represents a piece of data sent from one homeserver to another.
@@ -32,11 +36,24 @@ class Edu(JsonEncodedObject):
internal ID or previous references graph.
"""
- valid_keys = ["origin", "destination", "edu_type", "content"]
+ edu_type = attr.ib(type=str)
+ content = attr.ib(type=dict)
+ origin = attr.ib(type=str)
+ destination = attr.ib(type=str)
- required_keys = ["edu_type"]
+ def get_dict(self) -> JsonDict:
+ return {
+ "edu_type": self.edu_type,
+ "content": self.content,
+ }
- internal_keys = ["origin", "destination"]
+ def get_internal_dict(self) -> JsonDict:
+ return {
+ "edu_type": self.edu_type,
+ "content": self.content,
+ "origin": self.origin,
+ "destination": self.destination,
+ }
def get_context(self):
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
|