diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a4c97ed458..7f9da49326 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -38,7 +38,6 @@ from synapse.api.errors import (
UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
-from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
@@ -54,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
-from synapse.types import get_domain_from_id
+from synapse.types import JsonDict, get_domain_from_id
from synapse.util import glob_to_regex, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -82,6 +81,8 @@ class FederationServer(FederationBase):
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()
+ self.device_handler = hs.get_device_handler()
+
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -234,24 +235,17 @@ class FederationServer(FederationBase):
continue
try:
- room_version = await self.store.get_room_version_id(room_id)
+ room_version = await self.store.get_room_version(room_id)
except NotFoundError:
logger.info("Ignoring PDU for unknown room_id: %s", room_id)
continue
-
- try:
- format_ver = room_version_to_event_format(room_version)
- except UnsupportedRoomVersionError:
+ except UnsupportedRoomVersionError as e:
# this can happen if support for a given room version is withdrawn,
# so that we still get events for said room.
- logger.info(
- "Ignoring PDU for room %s with unknown version %s",
- room_id,
- room_version,
- )
+ logger.info("Ignoring PDU: %s", e)
continue
- event = event_from_pdu_json(p, format_ver)
+ event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event)
pdu_results = {}
@@ -302,7 +296,12 @@ class FederationServer(FederationBase):
async def _process_edu(edu_dict):
received_edus_counter.inc()
- edu = Edu(**edu_dict)
+ edu = Edu(
+ origin=origin,
+ destination=self.server_name,
+ edu_type=edu_dict["edu_type"],
+ content=edu_dict["content"],
+ )
await self.registry.on_edu(edu.edu_type, origin, edu.content)
await concurrently_execute(
@@ -396,20 +395,21 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
- async def on_invite_request(self, origin, content, room_version):
- if room_version not in KNOWN_ROOM_VERSIONS:
+ async def on_invite_request(
+ self, origin: str, content: JsonDict, room_version_id: str
+ ):
+ room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+ if not room_version:
raise SynapseError(
400,
"Homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
- format_ver = room_version_to_event_format(room_version)
-
- pdu = event_from_pdu_json(content, format_ver)
+ pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
- pdu = await self._check_sigs_and_hash(room_version, pdu)
+ pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version)
time_now = self._clock.time_msec()
return {"event": ret_pdu.get_pdu_json(time_now)}
@@ -417,16 +417,15 @@ class FederationServer(FederationBase):
async def on_send_join_request(self, origin, content, room_id):
logger.debug("on_send_join_request: content: %s", content)
- room_version = await self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
- pdu = event_from_pdu_json(content, format_ver)
+ room_version = await self.store.get_room_version(room_id)
+ pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
- pdu = await self._check_sigs_and_hash(room_version, pdu)
+ pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
@@ -448,16 +447,15 @@ class FederationServer(FederationBase):
async def on_send_leave_request(self, origin, content, room_id):
logger.debug("on_send_leave_request: content: %s", content)
- room_version = await self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
- pdu = event_from_pdu_json(content, format_ver)
+ room_version = await self.store.get_room_version(room_id)
+ pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
- pdu = await self._check_sigs_and_hash(room_version, pdu)
+ pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
await self.handler.on_send_leave_request(origin, pdu)
return {}
@@ -495,15 +493,14 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- room_version = await self.store.get_room_version_id(room_id)
- format_ver = room_version_to_event_format(room_version)
+ room_version = await self.store.get_room_version(room_id)
auth_chain = [
- event_from_pdu_json(e, format_ver) for e in content["auth_chain"]
+ event_from_pdu_json(e, room_version) for e in content["auth_chain"]
]
signed_auth = await self._check_sigs_and_hash_and_fetch(
- origin, auth_chain, outlier=True, room_version=room_version
+ origin, auth_chain, outlier=True, room_version=room_version.identifier
)
ret = await self.handler.on_query_auth(
@@ -528,8 +525,9 @@ class FederationServer(FederationBase):
def on_query_client_keys(self, origin, content):
return self.on_query_request("client_keys", content)
- def on_query_user_devices(self, origin, user_id):
- return self.on_query_request("user_devices", user_id)
+ async def on_query_user_devices(self, origin: str, user_id: str):
+ keys = await self.device_handler.on_federation_query_user_devices(user_id)
+ return 200, keys
@trace
async def on_claim_client_keys(self, origin, content):
@@ -570,7 +568,7 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- logger.info(
+ logger.debug(
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
" limit: %d",
earliest_events,
@@ -583,11 +581,11 @@ class FederationServer(FederationBase):
)
if len(missing_events) < 5:
- logger.info(
+ logger.debug(
"Returning %d events: %r", len(missing_events), missing_events
)
else:
- logger.info("Returning %d events", len(missing_events))
+ logger.debug("Returning %d events", len(missing_events))
time_now = self._clock.time_msec()
|