diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 3b85b135e0..fee1477ab6 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -128,7 +128,7 @@ class FederationClient(FederationBase):
reset_expiry_on_get=False,
)
- def _clear_tried_cache(self):
+ def _clear_tried_cache(self) -> None:
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()
@@ -800,7 +800,7 @@ class FederationClient(FederationBase):
no servers successfully handle the request.
"""
- async def send_request(destination) -> SendJoinResult:
+ async def send_request(destination: str) -> SendJoinResult:
response = await self._do_send_join(room_version, destination, pdu)
# If an event was returned (and expected to be returned):
@@ -1395,11 +1395,28 @@ class FederationClient(FederationBase):
async def send_request(
destination: str,
) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
- res = await self.transport_layer.get_room_hierarchy(
- destination=destination,
- room_id=room_id,
- suggested_only=suggested_only,
- )
+ try:
+ res = await self.transport_layer.get_room_hierarchy(
+ destination=destination,
+ room_id=room_id,
+ suggested_only=suggested_only,
+ )
+ except HttpResponseException as e:
+ # If an error is received that is due to an unrecognised endpoint,
+ # fallback to the unstable endpoint. Otherwise consider it a
+ # legitmate error and raise.
+ if not self._is_unknown_endpoint(e):
+ raise
+
+ logger.debug(
+ "Couldn't fetch room hierarchy with the v1 API, falling back to the unstable API"
+ )
+
+ res = await self.transport_layer.get_room_hierarchy_unstable(
+ destination=destination,
+ room_id=room_id,
+ suggested_only=suggested_only,
+ )
room = res.get("room")
if not isinstance(room, dict):
@@ -1449,6 +1466,10 @@ class FederationClient(FederationBase):
if e.code != 502:
raise
+ logger.debug(
+ "Couldn't fetch room hierarchy, falling back to the spaces API"
+ )
+
# Fallback to the old federation API and translate the results if
# no servers implement the new API.
#
@@ -1496,6 +1517,83 @@ class FederationClient(FederationBase):
self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
return result
+ async def timestamp_to_event(
+ self, destination: str, room_id: str, timestamp: int, direction: str
+ ) -> "TimestampToEventResponse":
+ """
+ Calls a remote federating server at `destination` asking for their
+ closest event to the given timestamp in the given direction. Also
+ validates the response to always return the expected keys or raises an
+ error.
+
+ Args:
+ destination: Domain name of the remote homeserver
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ A parsed TimestampToEventResponse including the closest event_id
+ and origin_server_ts
+
+ Raises:
+ Various exceptions when the request fails
+ InvalidResponseError when the response does not have the correct
+ keys or wrong types
+ """
+ remote_response = await self.transport_layer.timestamp_to_event(
+ destination, room_id, timestamp, direction
+ )
+
+ if not isinstance(remote_response, dict):
+ raise InvalidResponseError(
+ "Response must be a JSON dictionary but received %r" % remote_response
+ )
+
+ try:
+ return TimestampToEventResponse.from_json_dict(remote_response)
+ except ValueError as e:
+ raise InvalidResponseError(str(e))
+
+
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class TimestampToEventResponse:
+ """Typed response dictionary for the federation /timestamp_to_event endpoint"""
+
+ event_id: str
+ origin_server_ts: int
+
+ # the raw data, including the above keys
+ data: JsonDict
+
+ @classmethod
+ def from_json_dict(cls, d: JsonDict) -> "TimestampToEventResponse":
+ """Parsed response from the federation /timestamp_to_event endpoint
+
+ Args:
+ d: JSON object response to be parsed
+
+ Raises:
+ ValueError if d does not the correct keys or they are the wrong types
+ """
+
+ event_id = d.get("event_id")
+ if not isinstance(event_id, str):
+ raise ValueError(
+ "Invalid response: 'event_id' must be a str but received %r" % event_id
+ )
+
+ origin_server_ts = d.get("origin_server_ts")
+ if not isinstance(origin_server_ts, int):
+ raise ValueError(
+ "Invalid response: 'origin_server_ts' must be a int but received %r"
+ % origin_server_ts
+ )
+
+ return cls(event_id, origin_server_ts, d)
+
@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryEventResult:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9a8758e9a6..8e37e76206 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1,6 +1,6 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
-# Copyright 2019 Matrix.org Federation C.I.C
+# Copyright 2019-2021 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -110,6 +110,7 @@ class FederationServer(FederationBase):
super().__init__(hs)
self.handler = hs.get_federation_handler()
+ self.storage = hs.get_storage()
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
@@ -200,6 +201,48 @@ class FederationServer(FederationBase):
return 200, res
+ async def on_timestamp_to_event_request(
+ self, origin: str, room_id: str, timestamp: int, direction: str
+ ) -> Tuple[int, Dict[str, Any]]:
+ """When we receive a federated `/timestamp_to_event` request,
+ handle all of the logic for validating and fetching the event.
+
+ Args:
+ origin: The server we received the event from
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ Tuple indicating the response status code and dictionary response
+ body including `event_id`.
+ """
+ with (await self._server_linearizer.queue((origin, room_id))):
+ origin_host, _ = parse_server_name(origin)
+ await self.check_server_matches_acl(origin_host, room_id)
+
+ # We only try to fetch data from the local database
+ event_id = await self.store.get_event_id_for_timestamp(
+ room_id, timestamp, direction
+ )
+ if event_id:
+ event = await self.store.get_event(
+ event_id, allow_none=False, allow_rejected=False
+ )
+
+ return 200, {
+ "event_id": event_id,
+ "origin_server_ts": event.origin_server_ts,
+ }
+
+ raise SynapseError(
+ 404,
+ "Unable to find event from %s in direction %s" % (timestamp, direction),
+ errcode=Codes.NOT_FOUND,
+ )
+
async def on_incoming_transaction(
self,
origin: str,
@@ -407,7 +450,7 @@ class FederationServer(FederationBase):
# require callouts to other servers to fetch missing events), but
# impose a limit to avoid going too crazy with ram/cpu.
- async def process_pdus_for_room(room_id: str):
+ async def process_pdus_for_room(room_id: str) -> None:
with nested_logging_context(room_id):
logger.debug("Processing PDUs for %s", room_id)
@@ -504,7 +547,7 @@ class FederationServer(FederationBase):
async def on_state_ids_request(
self, origin: str, room_id: str, event_id: str
- ) -> Tuple[int, Dict[str, Any]]:
+ ) -> Tuple[int, JsonDict]:
if not event_id:
raise NotImplementedError("Specify an event")
@@ -524,7 +567,9 @@ class FederationServer(FederationBase):
return 200, resp
- async def _on_state_ids_request_compute(self, room_id, event_id):
+ async def _on_state_ids_request_compute(
+ self, room_id: str, event_id: str
+ ) -> JsonDict:
state_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id)
auth_chain_ids = await self.store.get_auth_chain_ids(room_id, state_ids)
return {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
@@ -613,8 +658,11 @@ class FederationServer(FederationBase):
state = await self.store.get_events(state_ids)
time_now = self._clock.time_msec()
+ event_json = event.get_pdu_json()
return {
- "org.matrix.msc3083.v2.event": event.get_pdu_json(),
+ # TODO Remove the unstable prefix when servers have updated.
+ "org.matrix.msc3083.v2.event": event_json,
+ "event": event_json,
"state": [p.get_pdu_json(time_now) for p in state.values()],
"auth_chain": [p.get_pdu_json(time_now) for p in auth_chain],
}
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 4fead6ca29..523ab1c51e 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -1,4 +1,5 @@
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,6 +24,7 @@ from typing import Optional, Tuple
from synapse.federation.units import Transaction
from synapse.logging.utils import log_function
+from synapse.storage.databases.main import DataStore
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -31,7 +33,7 @@ logger = logging.getLogger(__name__)
class TransactionActions:
"""Defines persistence actions that relate to handling Transactions."""
- def __init__(self, datastore):
+ def __init__(self, datastore: DataStore):
self.store = datastore
@log_function
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 1fbf325fdc..63289a5a33 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -1,4 +1,5 @@
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -350,7 +351,7 @@ class BaseFederationRow:
TypeId = "" # Unique string that ids the type. Must be overridden in sub classes.
@staticmethod
- def from_data(data):
+ def from_data(data: JsonDict) -> "BaseFederationRow":
"""Parse the data from the federation stream into a row.
Args:
@@ -359,7 +360,7 @@ class BaseFederationRow:
"""
raise NotImplementedError()
- def to_data(self):
+ def to_data(self) -> JsonDict:
"""Serialize this row to be sent over the federation stream.
Returns:
@@ -368,7 +369,7 @@ class BaseFederationRow:
"""
raise NotImplementedError()
- def add_to_buffer(self, buff):
+ def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None:
"""Add this row to the appropriate field in the buffer ready for this
to be sent over federation.
@@ -391,15 +392,15 @@ class PresenceDestinationsRow(
TypeId = "pd"
@staticmethod
- def from_data(data):
+ def from_data(data: JsonDict) -> "PresenceDestinationsRow":
return PresenceDestinationsRow(
state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
)
- def to_data(self):
+ def to_data(self) -> JsonDict:
return {"state": self.state.as_dict(), "dests": self.destinations}
- def add_to_buffer(self, buff):
+ def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None:
buff.presence_destinations.append((self.state, self.destinations))
@@ -417,13 +418,13 @@ class KeyedEduRow(
TypeId = "k"
@staticmethod
- def from_data(data):
+ def from_data(data: JsonDict) -> "KeyedEduRow":
return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
- def to_data(self):
+ def to_data(self) -> JsonDict:
return {"key": self.key, "edu": self.edu.get_internal_dict()}
- def add_to_buffer(self, buff):
+ def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None:
buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
@@ -433,13 +434,13 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
TypeId = "e"
@staticmethod
- def from_data(data):
+ def from_data(data: JsonDict) -> "EduRow":
return EduRow(Edu(**data))
- def to_data(self):
+ def to_data(self) -> JsonDict:
return self.edu.get_internal_dict()
- def add_to_buffer(self, buff):
+ def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None:
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index afe35e72b6..391b30fbb5 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -1,5 +1,6 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
+# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +15,8 @@
# limitations under the License.
import datetime
import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
+from types import TracebackType
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type
import attr
from prometheus_client import Counter
@@ -213,7 +215,7 @@ class PerDestinationQueue:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
self.attempt_new_transaction()
- def send_edu(self, edu) -> None:
+ def send_edu(self, edu: Edu) -> None:
self._pending_edus.append(edu)
self.attempt_new_transaction()
@@ -701,7 +703,12 @@ class _TransactionQueueManager:
return self._pdus, pending_edus
- async def __aexit__(self, exc_type, exc, tb):
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc: Optional[BaseException],
+ tb: Optional[TracebackType],
+ ) -> None:
if exc_type is not None:
# Failed to send transaction, so we bail out.
return
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 10b5aa5af8..9fc4c31c93 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -21,6 +21,7 @@ from typing import (
Callable,
Collection,
Dict,
+ Generator,
Iterable,
List,
Mapping,
@@ -149,6 +150,42 @@ class TransportLayerClient:
)
@log_function
+ async def timestamp_to_event(
+ self, destination: str, room_id: str, timestamp: int, direction: str
+ ) -> Union[JsonDict, List]:
+ """
+ Calls a remote federating server at `destination` asking for their
+ closest event to the given timestamp in the given direction.
+
+ Args:
+ destination: Domain name of the remote homeserver
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ Response dict received from the remote homeserver.
+
+ Raises:
+ Various exceptions when the request fails
+ """
+ path = _create_path(
+ FEDERATION_UNSTABLE_PREFIX,
+ "/org.matrix.msc3030/timestamp_to_event/%s",
+ room_id,
+ )
+
+ args = {"ts": [str(timestamp)], "dir": [direction]}
+
+ remote_response = await self.client.get_json(
+ destination, path=path, args=args, try_trailing_slash_on_400=True
+ )
+
+ return remote_response
+
+ @log_function
async def send_transaction(
self,
transaction: Transaction,
@@ -199,11 +236,16 @@ class TransportLayerClient:
@log_function
async def make_query(
- self, destination, query_type, args, retry_on_dns_fail, ignore_backoff=False
- ):
+ self,
+ destination: str,
+ query_type: str,
+ args: dict,
+ retry_on_dns_fail: bool,
+ ignore_backoff: bool = False,
+ ) -> JsonDict:
path = _create_v1_path("/query/%s", query_type)
- content = await self.client.get_json(
+ return await self.client.get_json(
destination=destination,
path=path,
args=args,
@@ -212,8 +254,6 @@ class TransportLayerClient:
ignore_backoff=ignore_backoff,
)
- return content
-
@log_function
async def make_membership_event(
self,
@@ -1192,10 +1232,24 @@ class TransportLayerClient:
)
async def get_room_hierarchy(
- self,
- destination: str,
- room_id: str,
- suggested_only: bool,
+ self, destination: str, room_id: str, suggested_only: bool
+ ) -> JsonDict:
+ """
+ Args:
+ destination: The remote server
+ room_id: The room ID to ask about.
+ suggested_only: if True, only suggested rooms will be returned
+ """
+ path = _create_v1_path("/hierarchy/%s", room_id)
+
+ return await self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"suggested_only": "true" if suggested_only else "false"},
+ )
+
+ async def get_room_hierarchy_unstable(
+ self, destination: str, room_id: str, suggested_only: bool
) -> JsonDict:
"""
Args:
@@ -1267,7 +1321,7 @@ class SendJoinResponse:
@ijson.coroutine
-def _event_parser(event_dict: JsonDict):
+def _event_parser(event_dict: JsonDict) -> Generator[None, Tuple[str, Any], None]:
"""Helper function for use with `ijson.kvitems_coro` to parse key-value pairs
to add them to a given dictionary.
"""
@@ -1278,7 +1332,9 @@ def _event_parser(event_dict: JsonDict):
@ijson.coroutine
-def _event_list_parser(room_version: RoomVersion, events: List[EventBase]):
+def _event_list_parser(
+ room_version: RoomVersion, events: List[EventBase]
+) -> Generator[None, JsonDict, None]:
"""Helper function for use with `ijson.items_coro` to parse an array of
events and add them to the given list.
"""
@@ -1317,15 +1373,26 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
prefix + "auth_chain.item",
use_float=True,
)
- self._coro_event = ijson.kvitems_coro(
+ # TODO Remove the unstable prefix when servers have updated.
+ #
+ # By re-using the same event dictionary this will cause the parsing of
+ # org.matrix.msc3083.v2.event and event to stomp over each other.
+ # Generally this should be fine.
+ self._coro_unstable_event = ijson.kvitems_coro(
_event_parser(self._response.event_dict),
prefix + "org.matrix.msc3083.v2.event",
use_float=True,
)
+ self._coro_event = ijson.kvitems_coro(
+ _event_parser(self._response.event_dict),
+ prefix + "event",
+ use_float=True,
+ )
def write(self, data: bytes) -> int:
self._coro_state.send(data)
self._coro_auth.send(data)
+ self._coro_unstable_event.send(data)
self._coro_event.send(data)
return len(data)
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index c32539bf5a..77b936361a 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -22,7 +22,10 @@ from synapse.federation.transport.server._base import (
Authenticator,
BaseFederationServlet,
)
-from synapse.federation.transport.server.federation import FEDERATION_SERVLET_CLASSES
+from synapse.federation.transport.server.federation import (
+ FEDERATION_SERVLET_CLASSES,
+ FederationTimestampLookupServlet,
+)
from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES
from synapse.federation.transport.server.groups_server import (
GROUP_SERVER_SERVLET_CLASSES,
@@ -299,7 +302,7 @@ def register_servlets(
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
servlet_groups: Optional[Iterable[str]] = None,
-):
+) -> None:
"""Initialize and register servlet classes.
Will by default register all servlets. For custom behaviour, pass in
@@ -324,6 +327,13 @@ def register_servlets(
)
for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]:
+ # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
+ if (
+ servletclass == FederationTimestampLookupServlet
+ and not hs.config.experimental.msc3030_enabled
+ ):
+ continue
+
servletclass(
hs=hs,
authenticator=authenticator,
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index cef65929c5..dc39e3537b 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -15,10 +15,13 @@
import functools
import logging
import re
+from typing import Any, Awaitable, Callable, Optional, Tuple, cast
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.urls import FEDERATION_V1_PREFIX
+from synapse.http.server import HttpServer, ServletCallback
from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
@@ -29,6 +32,7 @@ from synapse.logging.opentracing import (
whitelisted_homeserver,
)
from synapse.server import HomeServer
+from synapse.types import JsonDict
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import parse_and_validate_server_name
@@ -59,9 +63,11 @@ class Authenticator:
self.replication_client = hs.get_tcp_replication()
# A method just so we can pass 'self' as the authenticator to the Servlets
- async def authenticate_request(self, request, content):
+ async def authenticate_request(
+ self, request: SynapseRequest, content: Optional[JsonDict]
+ ) -> str:
now = self._clock.time_msec()
- json_request = {
+ json_request: JsonDict = {
"method": request.method.decode("ascii"),
"uri": request.uri.decode("ascii"),
"destination": self.server_name,
@@ -114,7 +120,7 @@ class Authenticator:
return origin
- async def _reset_retry_timings(self, origin):
+ async def _reset_retry_timings(self, origin: str) -> None:
try:
logger.info("Marking origin %r as up", origin)
await self.store.set_destination_retry_timings(origin, None, 0, 0)
@@ -133,14 +139,14 @@ class Authenticator:
logger.exception("Error resetting retry timings on %s", origin)
-def _parse_auth_header(header_bytes):
+def _parse_auth_header(header_bytes: bytes) -> Tuple[str, str, str]:
"""Parse an X-Matrix auth header
Args:
- header_bytes (bytes): header value
+ header_bytes: header value
Returns:
- Tuple[str, str, str]: origin, key id, signature.
+ origin, key id, signature.
Raises:
AuthenticationError if the header could not be parsed
@@ -148,9 +154,9 @@ def _parse_auth_header(header_bytes):
try:
header_str = header_bytes.decode("utf-8")
params = header_str.split(" ")[1].split(",")
- param_dict = dict(kv.split("=") for kv in params)
+ param_dict = {k: v for k, v in (kv.split("=", maxsplit=1) for kv in params)}
- def strip_quotes(value):
+ def strip_quotes(value: str) -> str:
if value.startswith('"'):
return value[1:-1]
else:
@@ -233,23 +239,25 @@ class BaseFederationServlet:
self.ratelimiter = ratelimiter
self.server_name = server_name
- def _wrap(self, func):
+ def _wrap(self, func: Callable[..., Awaitable[Tuple[int, Any]]]) -> ServletCallback:
authenticator = self.authenticator
ratelimiter = self.ratelimiter
@functools.wraps(func)
- async def new_func(request, *args, **kwargs):
+ async def new_func(
+ request: SynapseRequest, *args: Any, **kwargs: str
+ ) -> Optional[Tuple[int, Any]]:
"""A callback which can be passed to HttpServer.RegisterPaths
Args:
- request (twisted.web.http.Request):
+ request:
*args: unused?
- **kwargs (dict[unicode, unicode]): the dict mapping keys to path
- components as specified in the path match regexp.
+ **kwargs: the dict mapping keys to path components as specified
+ in the path match regexp.
Returns:
- Tuple[int, object]|None: (response code, response object) as returned by
- the callback method. None if the request has already been handled.
+ (response code, response object) as returned by the callback method.
+ None if the request has already been handled.
"""
content = None
if request.method in [b"PUT", b"POST"]:
@@ -257,7 +265,9 @@ class BaseFederationServlet:
content = parse_json_object_from_request(request)
try:
- origin = await authenticator.authenticate_request(request, content)
+ origin: Optional[str] = await authenticator.authenticate_request(
+ request, content
+ )
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
@@ -301,7 +311,7 @@ class BaseFederationServlet:
"client disconnected before we started processing "
"request"
)
- return -1, None
+ return None
response = await func(
origin, content, request.args, *args, **kwargs
)
@@ -312,9 +322,9 @@ class BaseFederationServlet:
return response
- return new_func
+ return cast(ServletCallback, new_func)
- def register(self, server):
+ def register(self, server: HttpServer) -> None:
pattern = re.compile("^" + self.PREFIX + self.PATH + "$")
for method in ("GET", "PUT", "POST"):
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 2fdf6cc99e..77bfd88ad0 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -174,6 +174,46 @@ class FederationBackfillServlet(BaseFederationServerServlet):
return await self.handler.on_backfill_request(origin, room_id, versions, limit)
+class FederationTimestampLookupServlet(BaseFederationServerServlet):
+ """
+ API endpoint to fetch the `event_id` of the closest event to the given
+ timestamp (`ts` query parameter) in the given direction (`dir` query
+ parameter).
+
+ Useful for other homeservers when they're unable to find an event locally.
+
+ `ts` is a timestamp in milliseconds where we will find the closest event in
+ the given direction.
+
+ `dir` can be `f` or `b` to indicate forwards and backwards in time from the
+ given timestamp.
+
+ GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
+ {
+ "event_id": ...
+ }
+ """
+
+ PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?"
+ PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030"
+
+ async def on_GET(
+ self,
+ origin: str,
+ content: Literal[None],
+ query: Dict[bytes, List[bytes]],
+ room_id: str,
+ ) -> Tuple[int, JsonDict]:
+ timestamp = parse_integer_from_args(query, "ts", required=True)
+ direction = parse_string_from_args(
+ query, "dir", default="f", allowed_values=["f", "b"], required=True
+ )
+
+ return await self.handler.on_timestamp_to_event_request(
+ origin, room_id, timestamp, direction
+ )
+
+
class FederationQueryServlet(BaseFederationServerServlet):
PATH = "/query/(?P<query_type>[^/]*)"
@@ -611,7 +651,6 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
class FederationRoomHierarchyServlet(BaseFederationServlet):
- PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
PATH = "/hierarchy/(?P<room_id>[^/]*)"
def __init__(
@@ -637,6 +676,10 @@ class FederationRoomHierarchyServlet(BaseFederationServlet):
)
+class FederationRoomHierarchyUnstableServlet(FederationRoomHierarchyServlet):
+ PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
+
+
class RoomComplexityServlet(BaseFederationServlet):
"""
Indicates to other servers how complex (and therefore likely
@@ -680,6 +723,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationStateV1Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
+ FederationTimestampLookupServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
@@ -701,6 +745,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
RoomComplexityServlet,
FederationSpaceSummaryServlet,
FederationRoomHierarchyServlet,
+ FederationRoomHierarchyUnstableServlet,
FederationV1SendKnockServlet,
FederationMakeKnockServlet,
)
|