diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3112cc88b1..1ea837d082 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -68,6 +68,37 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
+ """Get joined domains from state
+
+ Args:
+ state: State map from type/state key to event.
+
+ Returns:
+ Returns a list of servers with the lowest depth of their joins.
+ Sorted by lowest depth first.
+ """
+ joined_users = [
+ (state_key, int(event.depth))
+ for (e_type, state_key), event in state.items()
+ if e_type == EventTypes.Member and event.membership == Membership.JOIN
+ ]
+
+ joined_domains: Dict[str, int] = {}
+ for u, d in joined_users:
+ try:
+ dom = get_domain_from_id(u)
+ old_d = joined_domains.get(dom)
+ if old_d:
+ joined_domains[dom] = min(d, old_d)
+ else:
+ joined_domains[dom] = d
+ except Exception:
+ pass
+
+ return sorted(joined_domains.items(), key=lambda d: d[1])
+
+
class FederationHandler:
"""Handles general incoming federation requests
@@ -268,36 +299,6 @@ class FederationHandler:
curr_state = await self.state_handler.get_current_state(room_id)
- def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
- """Get joined domains from state
-
- Args:
- state: State map from type/state key to event.
-
- Returns:
- Returns a list of servers with the lowest depth of their joins.
- Sorted by lowest depth first.
- """
- joined_users = [
- (state_key, int(event.depth))
- for (e_type, state_key), event in state.items()
- if e_type == EventTypes.Member and event.membership == Membership.JOIN
- ]
-
- joined_domains: Dict[str, int] = {}
- for u, d in joined_users:
- try:
- dom = get_domain_from_id(u)
- old_d = joined_domains.get(dom)
- if old_d:
- joined_domains[dom] = min(d, old_d)
- else:
- joined_domains[dom] = d
- except Exception:
- pass
-
- return sorted(joined_domains.items(), key=lambda d: d[1])
-
curr_domains = get_domains_from_state(curr_state)
likely_domains = [
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 88053f9869..2bcdf32dcc 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -46,6 +46,7 @@ from synapse.api.constants import (
from synapse.api.errors import (
AuthError,
Codes,
+ HttpResponseException,
LimitExceededError,
NotFoundError,
StoreError,
@@ -56,6 +57,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase
from synapse.events.utils import copy_power_levels_contents
+from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers.federation import get_domains_from_state
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
from synapse.streams import EventSource
@@ -1220,6 +1223,147 @@ class RoomContextHandler:
return results
+class TimestampLookupHandler:
+ def __init__(self, hs: "HomeServer"):
+ self.server_name = hs.hostname
+ self.store = hs.get_datastore()
+ self.state_handler = hs.get_state_handler()
+ self.federation_client = hs.get_federation_client()
+
+ async def get_event_for_timestamp(
+ self,
+ requester: Requester,
+ room_id: str,
+ timestamp: int,
+ direction: str,
+ ) -> Tuple[str, int]:
+ """Find the closest event to the given timestamp in the given direction.
+ If we can't find an event locally or the event we have locally is next to a gap,
+ it will ask other federated homeservers for an event.
+
+ Args:
+ requester: The user making the request according to the access token
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ A tuple containing the `event_id` closest to the given timestamp in
+ the given direction and the `origin_server_ts`.
+
+ Raises:
+ SynapseError if unable to find any event locally in the given direction
+ """
+
+ local_event_id = await self.store.get_event_id_for_timestamp(
+ room_id, timestamp, direction
+ )
+ logger.debug(
+ "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s",
+ local_event_id,
+ timestamp,
+ )
+
+ # Check for gaps in the history where events could be hiding in between
+ # the timestamp given and the event we were able to find locally
+ is_event_next_to_backward_gap = False
+ is_event_next_to_forward_gap = False
+ if local_event_id:
+ local_event = await self.store.get_event(
+ local_event_id, allow_none=False, allow_rejected=False
+ )
+
+ if direction == "f":
+ # We only need to check for a backward gap if we're looking forwards
+ # to ensure there is nothing in between.
+ is_event_next_to_backward_gap = (
+ await self.store.is_event_next_to_backward_gap(local_event)
+ )
+ elif direction == "b":
+ # We only need to check for a forward gap if we're looking backwards
+ # to ensure there is nothing in between
+ is_event_next_to_forward_gap = (
+ await self.store.is_event_next_to_forward_gap(local_event)
+ )
+
+ # If we found a gap, we should probably ask another homeserver first
+ # about more history in between
+ if (
+ not local_event_id
+ or is_event_next_to_backward_gap
+ or is_event_next_to_forward_gap
+ ):
+ logger.debug(
+ "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first",
+ local_event_id,
+ timestamp,
+ )
+
+ # Find other homeservers from the given state in the room
+ curr_state = await self.state_handler.get_current_state(room_id)
+ curr_domains = get_domains_from_state(curr_state)
+ likely_domains = [
+ domain for domain, depth in curr_domains if domain != self.server_name
+ ]
+
+ # Loop through each homeserver candidate until we get a succesful response
+ for domain in likely_domains:
+ try:
+ remote_response = await self.federation_client.timestamp_to_event(
+ domain, room_id, timestamp, direction
+ )
+ logger.debug(
+ "get_event_for_timestamp: response from domain(%s)=%s",
+ domain,
+ remote_response,
+ )
+
+ # TODO: Do we want to persist this as an extremity?
+ # TODO: I think ideally, we would try to backfill from
+ # this event and run this whole
+ # `get_event_for_timestamp` function again to make sure
+ # they didn't give us an event from their gappy history.
+ remote_event_id = remote_response.event_id
+ origin_server_ts = remote_response.origin_server_ts
+
+ # Only return the remote event if it's closer than the local event
+ if not local_event or (
+ abs(origin_server_ts - timestamp)
+ < abs(local_event.origin_server_ts - timestamp)
+ ):
+ return remote_event_id, origin_server_ts
+ except (HttpResponseException, InvalidResponseError) as ex:
+ # Let's not put a high priority on some other homeserver
+ # failing to respond or giving a random response
+ logger.debug(
+ "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
+ domain,
+ type(ex).__name__,
+ ex,
+ ex.args,
+ )
+ except Exception as ex:
+ # But we do want to see some exceptions in our code
+ logger.warning(
+ "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
+ domain,
+ type(ex).__name__,
+ ex,
+ ex.args,
+ )
+
+ if not local_event_id:
+ raise SynapseError(
+ 404,
+ "Unable to find event from %s in direction %s" % (timestamp, direction),
+ errcode=Codes.NOT_FOUND,
+ )
+
+ return local_event_id, local_event.origin_server_ts
+
+
class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
|