summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py61
-rw-r--r--synapse/handlers/room.py144
2 files changed, 175 insertions, 30 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3112cc88b1..1ea837d082 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -68,6 +68,37 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
+    """Get joined domains from state
+
+    Args:
+        state: State map from type/state key to event.
+
+    Returns:
+        Returns a list of servers with the lowest depth of their joins.
+            Sorted by lowest depth first.
+    """
+    joined_users = [
+        (state_key, int(event.depth))
+        for (e_type, state_key), event in state.items()
+        if e_type == EventTypes.Member and event.membership == Membership.JOIN
+    ]
+
+    joined_domains: Dict[str, int] = {}
+    for u, d in joined_users:
+        try:
+            dom = get_domain_from_id(u)
+            old_d = joined_domains.get(dom)
+            if old_d:
+                joined_domains[dom] = min(d, old_d)
+            else:
+                joined_domains[dom] = d
+        except Exception:
+            pass
+
+    return sorted(joined_domains.items(), key=lambda d: d[1])
+
+
 class FederationHandler:
     """Handles general incoming federation requests
 
@@ -268,36 +299,6 @@ class FederationHandler:
 
         curr_state = await self.state_handler.get_current_state(room_id)
 
-        def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
-            """Get joined domains from state
-
-            Args:
-                state: State map from type/state key to event.
-
-            Returns:
-                Returns a list of servers with the lowest depth of their joins.
-                 Sorted by lowest depth first.
-            """
-            joined_users = [
-                (state_key, int(event.depth))
-                for (e_type, state_key), event in state.items()
-                if e_type == EventTypes.Member and event.membership == Membership.JOIN
-            ]
-
-            joined_domains: Dict[str, int] = {}
-            for u, d in joined_users:
-                try:
-                    dom = get_domain_from_id(u)
-                    old_d = joined_domains.get(dom)
-                    if old_d:
-                        joined_domains[dom] = min(d, old_d)
-                    else:
-                        joined_domains[dom] = d
-                except Exception:
-                    pass
-
-            return sorted(joined_domains.items(), key=lambda d: d[1])
-
         curr_domains = get_domains_from_state(curr_state)
 
         likely_domains = [
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 88053f9869..2bcdf32dcc 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -46,6 +46,7 @@ from synapse.api.constants import (
 from synapse.api.errors import (
     AuthError,
     Codes,
+    HttpResponseException,
     LimitExceededError,
     NotFoundError,
     StoreError,
@@ -56,6 +57,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.event_auth import validate_event_for_room_version
 from synapse.events import EventBase
 from synapse.events.utils import copy_power_levels_contents
+from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers.federation import get_domains_from_state
 from synapse.rest.admin._base import assert_user_is_admin
 from synapse.storage.state import StateFilter
 from synapse.streams import EventSource
@@ -1220,6 +1223,147 @@ class RoomContextHandler:
         return results
 
 
+class TimestampLookupHandler:
+    def __init__(self, hs: "HomeServer"):
+        self.server_name = hs.hostname
+        self.store = hs.get_datastore()
+        self.state_handler = hs.get_state_handler()
+        self.federation_client = hs.get_federation_client()
+
+    async def get_event_for_timestamp(
+        self,
+        requester: Requester,
+        room_id: str,
+        timestamp: int,
+        direction: str,
+    ) -> Tuple[str, int]:
+        """Find the closest event to the given timestamp in the given direction.
+        If we can't find an event locally or the event we have locally is next to a gap,
+        it will ask other federated homeservers for an event.
+
+        Args:
+            requester: The user making the request according to the access token
+            room_id: Room to fetch the event from
+            timestamp: The point in time (inclusive) we should navigate from in
+                the given direction to find the closest event.
+            direction: ["f"|"b"] to indicate whether we should navigate forward
+                or backward from the given timestamp to find the closest event.
+
+        Returns:
+            A tuple containing the `event_id` closest to the given timestamp in
+            the given direction and the `origin_server_ts`.
+
+        Raises:
+            SynapseError if unable to find any event locally in the given direction
+        """
+
+        local_event_id = await self.store.get_event_id_for_timestamp(
+            room_id, timestamp, direction
+        )
+        logger.debug(
+            "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s",
+            local_event_id,
+            timestamp,
+        )
+
+        # Check for gaps in the history where events could be hiding in between
+        # the timestamp given and the event we were able to find locally
+        is_event_next_to_backward_gap = False
+        is_event_next_to_forward_gap = False
+        if local_event_id:
+            local_event = await self.store.get_event(
+                local_event_id, allow_none=False, allow_rejected=False
+            )
+
+            if direction == "f":
+                # We only need to check for a backward gap if we're looking forwards
+                # to ensure there is nothing in between.
+                is_event_next_to_backward_gap = (
+                    await self.store.is_event_next_to_backward_gap(local_event)
+                )
+            elif direction == "b":
+                # We only need to check for a forward gap if we're looking backwards
+                # to ensure there is nothing in between
+                is_event_next_to_forward_gap = (
+                    await self.store.is_event_next_to_forward_gap(local_event)
+                )
+
+        # If we found a gap, we should probably ask another homeserver first
+        # about more history in between
+        if (
+            not local_event_id
+            or is_event_next_to_backward_gap
+            or is_event_next_to_forward_gap
+        ):
+            logger.debug(
+                "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first",
+                local_event_id,
+                timestamp,
+            )
+
+            # Find other homeservers from the given state in the room
+            curr_state = await self.state_handler.get_current_state(room_id)
+            curr_domains = get_domains_from_state(curr_state)
+            likely_domains = [
+                domain for domain, depth in curr_domains if domain != self.server_name
+            ]
+
+            # Loop through each homeserver candidate until we get a succesful response
+            for domain in likely_domains:
+                try:
+                    remote_response = await self.federation_client.timestamp_to_event(
+                        domain, room_id, timestamp, direction
+                    )
+                    logger.debug(
+                        "get_event_for_timestamp: response from domain(%s)=%s",
+                        domain,
+                        remote_response,
+                    )
+
+                    # TODO: Do we want to persist this as an extremity?
+                    # TODO: I think ideally, we would try to backfill from
+                    # this event and run this whole
+                    # `get_event_for_timestamp` function again to make sure
+                    # they didn't give us an event from their gappy history.
+                    remote_event_id = remote_response.event_id
+                    origin_server_ts = remote_response.origin_server_ts
+
+                    # Only return the remote event if it's closer than the local event
+                    if not local_event or (
+                        abs(origin_server_ts - timestamp)
+                        < abs(local_event.origin_server_ts - timestamp)
+                    ):
+                        return remote_event_id, origin_server_ts
+                except (HttpResponseException, InvalidResponseError) as ex:
+                    # Let's not put a high priority on some other homeserver
+                    # failing to respond or giving a random response
+                    logger.debug(
+                        "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
+                        domain,
+                        type(ex).__name__,
+                        ex,
+                        ex.args,
+                    )
+                except Exception as ex:
+                    # But we do want to see some exceptions in our code
+                    logger.warning(
+                        "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
+                        domain,
+                        type(ex).__name__,
+                        ex,
+                        ex.args,
+                    )
+
+        if not local_event_id:
+            raise SynapseError(
+                404,
+                "Unable to find event from %s in direction %s" % (timestamp, direction),
+                errcode=Codes.NOT_FOUND,
+            )
+
+        return local_event_id, local_event.origin_server_ts
+
+
 class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
     def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()