diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 8b098ad48d..d78a15097c 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -46,3 +46,6 @@ class ExperimentalConfig(Config):
# MSC3266 (room summary api)
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)
+
+ # MSC3030 (Jump to date API endpoint)
+ self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index bc3f96c1fc..be1423da24 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1517,6 +1517,83 @@ class FederationClient(FederationBase):
self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
return result
+ async def timestamp_to_event(
+ self, destination: str, room_id: str, timestamp: int, direction: str
+ ) -> "TimestampToEventResponse":
+ """
+ Calls a remote federating server at `destination` asking for their
+ closest event to the given timestamp in the given direction. Also
+ validates the response to always return the expected keys or raises an
+ error.
+
+ Args:
+ destination: Domain name of the remote homeserver
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ A parsed TimestampToEventResponse including the closest event_id
+ and origin_server_ts
+
+ Raises:
+ Various exceptions when the request fails
+ InvalidResponseError when the response does not have the correct
+ keys or wrong types
+ """
+ remote_response = await self.transport_layer.timestamp_to_event(
+ destination, room_id, timestamp, direction
+ )
+
+ if not isinstance(remote_response, dict):
+ raise InvalidResponseError(
+ "Response must be a JSON dictionary but received %r" % remote_response
+ )
+
+ try:
+ return TimestampToEventResponse.from_json_dict(remote_response)
+ except ValueError as e:
+ raise InvalidResponseError(str(e))
+
+
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class TimestampToEventResponse:
+ """Typed response dictionary for the federation /timestamp_to_event endpoint"""
+
+ event_id: str
+ origin_server_ts: int
+
+ # the raw data, including the above keys
+ data: JsonDict
+
+ @classmethod
+ def from_json_dict(cls, d: JsonDict) -> "TimestampToEventResponse":
+ """Parsed response from the federation /timestamp_to_event endpoint
+
+ Args:
+ d: JSON object response to be parsed
+
+ Raises:
+ ValueError if d does not the correct keys or they are the wrong types
+ """
+
+ event_id = d.get("event_id")
+ if not isinstance(event_id, str):
+ raise ValueError(
+ "Invalid response: 'event_id' must be a str but received %r" % event_id
+ )
+
+ origin_server_ts = d.get("origin_server_ts")
+ if not isinstance(origin_server_ts, int):
+ raise ValueError(
+ "Invalid response: 'origin_server_ts' must be a int but received %r"
+ % origin_server_ts
+ )
+
+ return cls(event_id, origin_server_ts, d)
+
@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryEventResult:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8fbc75aa65..cce85526e7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -110,6 +110,7 @@ class FederationServer(FederationBase):
super().__init__(hs)
self.handler = hs.get_federation_handler()
+ self.storage = hs.get_storage()
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
@@ -200,6 +201,48 @@ class FederationServer(FederationBase):
return 200, res
+ async def on_timestamp_to_event_request(
+ self, origin: str, room_id: str, timestamp: int, direction: str
+ ) -> Tuple[int, Dict[str, Any]]:
+ """When we receive a federated `/timestamp_to_event` request,
+ handle all of the logic for validating and fetching the event.
+
+ Args:
+ origin: The server we received the event from
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ Tuple indicating the response status code and dictionary response
+ body including `event_id`.
+ """
+ with (await self._server_linearizer.queue((origin, room_id))):
+ origin_host, _ = parse_server_name(origin)
+ await self.check_server_matches_acl(origin_host, room_id)
+
+ # We only try to fetch data from the local database
+ event_id = await self.store.get_event_id_for_timestamp(
+ room_id, timestamp, direction
+ )
+ if event_id:
+ event = await self.store.get_event(
+ event_id, allow_none=False, allow_rejected=False
+ )
+
+ return 200, {
+ "event_id": event_id,
+ "origin_server_ts": event.origin_server_ts,
+ }
+
+ raise SynapseError(
+ 404,
+ "Unable to find event from %s in direction %s" % (timestamp, direction),
+ errcode=Codes.NOT_FOUND,
+ )
+
async def on_incoming_transaction(
self,
origin: str,
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index fe29bcfd4b..d1f4be641d 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -149,6 +149,42 @@ class TransportLayerClient:
)
@log_function
+ async def timestamp_to_event(
+ self, destination: str, room_id: str, timestamp: int, direction: str
+ ) -> Union[JsonDict, List]:
+ """
+ Calls a remote federating server at `destination` asking for their
+ closest event to the given timestamp in the given direction.
+
+ Args:
+ destination: Domain name of the remote homeserver
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ Response dict received from the remote homeserver.
+
+ Raises:
+ Various exceptions when the request fails
+ """
+ path = _create_path(
+ FEDERATION_UNSTABLE_PREFIX,
+ "/org.matrix.msc3030/timestamp_to_event/%s",
+ room_id,
+ )
+
+ args = {"ts": [str(timestamp)], "dir": [direction]}
+
+ remote_response = await self.client.get_json(
+ destination, path=path, args=args, try_trailing_slash_on_400=True
+ )
+
+ return remote_response
+
+ @log_function
async def send_transaction(
self,
transaction: Transaction,
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index c32539bf5a..abcb8728f5 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -22,7 +22,10 @@ from synapse.federation.transport.server._base import (
Authenticator,
BaseFederationServlet,
)
-from synapse.federation.transport.server.federation import FEDERATION_SERVLET_CLASSES
+from synapse.federation.transport.server.federation import (
+ FEDERATION_SERVLET_CLASSES,
+ FederationTimestampLookupServlet,
+)
from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES
from synapse.federation.transport.server.groups_server import (
GROUP_SERVER_SERVLET_CLASSES,
@@ -324,6 +327,13 @@ def register_servlets(
)
for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]:
+ # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
+ if (
+ servletclass == FederationTimestampLookupServlet
+ and not hs.config.experimental.msc3030_enabled
+ ):
+ continue
+
servletclass(
hs=hs,
authenticator=authenticator,
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 66e915228c..77bfd88ad0 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -174,6 +174,46 @@ class FederationBackfillServlet(BaseFederationServerServlet):
return await self.handler.on_backfill_request(origin, room_id, versions, limit)
+class FederationTimestampLookupServlet(BaseFederationServerServlet):
+ """
+ API endpoint to fetch the `event_id` of the closest event to the given
+ timestamp (`ts` query parameter) in the given direction (`dir` query
+ parameter).
+
+ Useful for other homeservers when they're unable to find an event locally.
+
+ `ts` is a timestamp in milliseconds where we will find the closest event in
+ the given direction.
+
+ `dir` can be `f` or `b` to indicate forwards and backwards in time from the
+ given timestamp.
+
+ GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
+ {
+ "event_id": ...
+ }
+ """
+
+ PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?"
+ PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030"
+
+ async def on_GET(
+ self,
+ origin: str,
+ content: Literal[None],
+ query: Dict[bytes, List[bytes]],
+ room_id: str,
+ ) -> Tuple[int, JsonDict]:
+ timestamp = parse_integer_from_args(query, "ts", required=True)
+ direction = parse_string_from_args(
+ query, "dir", default="f", allowed_values=["f", "b"], required=True
+ )
+
+ return await self.handler.on_timestamp_to_event_request(
+ origin, room_id, timestamp, direction
+ )
+
+
class FederationQueryServlet(BaseFederationServerServlet):
PATH = "/query/(?P<query_type>[^/]*)"
@@ -683,6 +723,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationStateV1Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
+ FederationTimestampLookupServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3112cc88b1..1ea837d082 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -68,6 +68,37 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
+ """Get joined domains from state
+
+ Args:
+ state: State map from type/state key to event.
+
+ Returns:
+ Returns a list of servers with the lowest depth of their joins.
+ Sorted by lowest depth first.
+ """
+ joined_users = [
+ (state_key, int(event.depth))
+ for (e_type, state_key), event in state.items()
+ if e_type == EventTypes.Member and event.membership == Membership.JOIN
+ ]
+
+ joined_domains: Dict[str, int] = {}
+ for u, d in joined_users:
+ try:
+ dom = get_domain_from_id(u)
+ old_d = joined_domains.get(dom)
+ if old_d:
+ joined_domains[dom] = min(d, old_d)
+ else:
+ joined_domains[dom] = d
+ except Exception:
+ pass
+
+ return sorted(joined_domains.items(), key=lambda d: d[1])
+
+
class FederationHandler:
"""Handles general incoming federation requests
@@ -268,36 +299,6 @@ class FederationHandler:
curr_state = await self.state_handler.get_current_state(room_id)
- def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
- """Get joined domains from state
-
- Args:
- state: State map from type/state key to event.
-
- Returns:
- Returns a list of servers with the lowest depth of their joins.
- Sorted by lowest depth first.
- """
- joined_users = [
- (state_key, int(event.depth))
- for (e_type, state_key), event in state.items()
- if e_type == EventTypes.Member and event.membership == Membership.JOIN
- ]
-
- joined_domains: Dict[str, int] = {}
- for u, d in joined_users:
- try:
- dom = get_domain_from_id(u)
- old_d = joined_domains.get(dom)
- if old_d:
- joined_domains[dom] = min(d, old_d)
- else:
- joined_domains[dom] = d
- except Exception:
- pass
-
- return sorted(joined_domains.items(), key=lambda d: d[1])
-
curr_domains = get_domains_from_state(curr_state)
likely_domains = [
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 88053f9869..2bcdf32dcc 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -46,6 +46,7 @@ from synapse.api.constants import (
from synapse.api.errors import (
AuthError,
Codes,
+ HttpResponseException,
LimitExceededError,
NotFoundError,
StoreError,
@@ -56,6 +57,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase
from synapse.events.utils import copy_power_levels_contents
+from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers.federation import get_domains_from_state
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
from synapse.streams import EventSource
@@ -1220,6 +1223,147 @@ class RoomContextHandler:
return results
+class TimestampLookupHandler:
+ def __init__(self, hs: "HomeServer"):
+ self.server_name = hs.hostname
+ self.store = hs.get_datastore()
+ self.state_handler = hs.get_state_handler()
+ self.federation_client = hs.get_federation_client()
+
+ async def get_event_for_timestamp(
+ self,
+ requester: Requester,
+ room_id: str,
+ timestamp: int,
+ direction: str,
+ ) -> Tuple[str, int]:
+ """Find the closest event to the given timestamp in the given direction.
+ If we can't find an event locally or the event we have locally is next to a gap,
+ it will ask other federated homeservers for an event.
+
+ Args:
+ requester: The user making the request according to the access token
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ A tuple containing the `event_id` closest to the given timestamp in
+ the given direction and the `origin_server_ts`.
+
+ Raises:
+ SynapseError if unable to find any event locally in the given direction
+ """
+
+ local_event_id = await self.store.get_event_id_for_timestamp(
+ room_id, timestamp, direction
+ )
+ logger.debug(
+ "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s",
+ local_event_id,
+ timestamp,
+ )
+
+ # Check for gaps in the history where events could be hiding in between
+ # the timestamp given and the event we were able to find locally
+ is_event_next_to_backward_gap = False
+ is_event_next_to_forward_gap = False
+ if local_event_id:
+ local_event = await self.store.get_event(
+ local_event_id, allow_none=False, allow_rejected=False
+ )
+
+ if direction == "f":
+ # We only need to check for a backward gap if we're looking forwards
+ # to ensure there is nothing in between.
+ is_event_next_to_backward_gap = (
+ await self.store.is_event_next_to_backward_gap(local_event)
+ )
+ elif direction == "b":
+ # We only need to check for a forward gap if we're looking backwards
+ # to ensure there is nothing in between
+ is_event_next_to_forward_gap = (
+ await self.store.is_event_next_to_forward_gap(local_event)
+ )
+
+ # If we found a gap, we should probably ask another homeserver first
+ # about more history in between
+ if (
+ not local_event_id
+ or is_event_next_to_backward_gap
+ or is_event_next_to_forward_gap
+ ):
+ logger.debug(
+ "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first",
+ local_event_id,
+ timestamp,
+ )
+
+ # Find other homeservers from the given state in the room
+ curr_state = await self.state_handler.get_current_state(room_id)
+ curr_domains = get_domains_from_state(curr_state)
+ likely_domains = [
+ domain for domain, depth in curr_domains if domain != self.server_name
+ ]
+
+ # Loop through each homeserver candidate until we get a succesful response
+ for domain in likely_domains:
+ try:
+ remote_response = await self.federation_client.timestamp_to_event(
+ domain, room_id, timestamp, direction
+ )
+ logger.debug(
+ "get_event_for_timestamp: response from domain(%s)=%s",
+ domain,
+ remote_response,
+ )
+
+ # TODO: Do we want to persist this as an extremity?
+ # TODO: I think ideally, we would try to backfill from
+ # this event and run this whole
+ # `get_event_for_timestamp` function again to make sure
+ # they didn't give us an event from their gappy history.
+ remote_event_id = remote_response.event_id
+ origin_server_ts = remote_response.origin_server_ts
+
+ # Only return the remote event if it's closer than the local event
+ if not local_event or (
+ abs(origin_server_ts - timestamp)
+ < abs(local_event.origin_server_ts - timestamp)
+ ):
+ return remote_event_id, origin_server_ts
+ except (HttpResponseException, InvalidResponseError) as ex:
+ # Let's not put a high priority on some other homeserver
+ # failing to respond or giving a random response
+ logger.debug(
+ "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
+ domain,
+ type(ex).__name__,
+ ex,
+ ex.args,
+ )
+ except Exception as ex:
+ # But we do want to see some exceptions in our code
+ logger.warning(
+ "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
+ domain,
+ type(ex).__name__,
+ ex,
+ ex.args,
+ )
+
+ if not local_event_id:
+ raise SynapseError(
+ 404,
+ "Unable to find event from %s in direction %s" % (timestamp, direction),
+ errcode=Codes.NOT_FOUND,
+ )
+
+ return local_event_id, local_event.origin_server_ts
+
+
class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 91ba93372c..6dd9b9ad03 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -79,6 +79,35 @@ def parse_integer(
return parse_integer_from_args(args, name, default, required)
+@overload
+def parse_integer_from_args(
+ args: Mapping[bytes, Sequence[bytes]],
+ name: str,
+ default: Optional[int] = None,
+) -> Optional[int]:
+ ...
+
+
+@overload
+def parse_integer_from_args(
+ args: Mapping[bytes, Sequence[bytes]],
+ name: str,
+ *,
+ required: Literal[True],
+) -> int:
+ ...
+
+
+@overload
+def parse_integer_from_args(
+ args: Mapping[bytes, Sequence[bytes]],
+ name: str,
+ default: Optional[int] = None,
+ required: bool = False,
+) -> Optional[int]:
+ ...
+
+
def parse_integer_from_args(
args: Mapping[bytes, Sequence[bytes]],
name: str,
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 99f303c88e..3598967be0 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -1070,6 +1070,62 @@ def register_txn_path(
)
+class TimestampLookupRestServlet(RestServlet):
+ """
+ API endpoint to fetch the `event_id` of the closest event to the given
+ timestamp (`ts` query parameter) in the given direction (`dir` query
+ parameter).
+
+ Useful for cases like jump to date so you can start paginating messages from
+ a given date in the archive.
+
+ `ts` is a timestamp in milliseconds where we will find the closest event in
+ the given direction.
+
+ `dir` can be `f` or `b` to indicate forwards and backwards in time from the
+ given timestamp.
+
+ GET /_matrix/client/unstable/org.matrix.msc3030/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
+ {
+ "event_id": ...
+ }
+ """
+
+ PATTERNS = (
+ re.compile(
+ "^/_matrix/client/unstable/org.matrix.msc3030"
+ "/rooms/(?P<room_id>[^/]*)/timestamp_to_event$"
+ ),
+ )
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__()
+ self._auth = hs.get_auth()
+ self._store = hs.get_datastore()
+ self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler()
+
+ async def on_GET(
+ self, request: SynapseRequest, room_id: str
+ ) -> Tuple[int, JsonDict]:
+ requester = await self._auth.get_user_by_req(request)
+ await self._auth.check_user_in_room(room_id, requester.user.to_string())
+
+ timestamp = parse_integer(request, "ts", required=True)
+ direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
+
+ (
+ event_id,
+ origin_server_ts,
+ ) = await self.timestamp_lookup_handler.get_event_for_timestamp(
+ requester, room_id, timestamp, direction
+ )
+
+ return 200, {
+ "event_id": event_id,
+ "origin_server_ts": origin_server_ts,
+ }
+
+
class RoomSpaceSummaryRestServlet(RestServlet):
PATTERNS = (
re.compile(
@@ -1239,6 +1295,8 @@ def register_servlets(
RoomAliasListServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server)
RoomCreateRestServlet(hs).register(http_server)
+ if hs.config.experimental.msc3030_enabled:
+ TimestampLookupRestServlet(hs).register(http_server)
# Some servlets only get registered for the main process.
if not is_worker:
diff --git a/synapse/server.py b/synapse/server.py
index 877eba6c08..185e40e4da 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -97,6 +97,7 @@ from synapse.handlers.room import (
RoomContextHandler,
RoomCreationHandler,
RoomShutdownHandler,
+ TimestampLookupHandler,
)
from synapse.handlers.room_batch import RoomBatchHandler
from synapse.handlers.room_list import RoomListHandler
@@ -729,6 +730,10 @@ class HomeServer(metaclass=abc.ABCMeta):
return RoomContextHandler(self)
@cache_in_self
+ def get_timestamp_lookup_handler(self) -> TimestampLookupHandler:
+ return TimestampLookupHandler(self)
+
+ @cache_in_self
def get_registration_handler(self) -> RegistrationHandler:
return RegistrationHandler(self)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 4cefc0a07e..fd19674f93 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1762,3 +1762,198 @@ class EventsWorkerStore(SQLBaseStore):
"_cleanup_old_transaction_ids",
_cleanup_old_transaction_ids_txn,
)
+
+ async def is_event_next_to_backward_gap(self, event: EventBase) -> bool:
+ """Check if the given event is next to a backward gap of missing events.
+ <latest messages> A(False)--->B(False)--->C(True)---> <gap, unknown events> <oldest messages>
+
+ Args:
+ room_id: room where the event lives
+ event_id: event to check
+
+ Returns:
+ Boolean indicating whether it's an extremity
+ """
+
+ def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool:
+ # If the event in question has any of its prev_events listed as a
+ # backward extremity, it's next to a gap.
+ #
+ # We can't just check the backward edges in `event_edges` because
+ # when we persist events, we will also record the prev_events as
+ # edges to the event in question regardless of whether we have those
+ # prev_events yet. We need to check whether those prev_events are
+ # backward extremities, also known as gaps, that need to be
+ # backfilled.
+ backward_extremity_query = """
+ SELECT 1 FROM event_backward_extremities
+ WHERE
+ room_id = ?
+ AND %s
+ LIMIT 1
+ """
+
+ # If the event in question is a backward extremity or has any of its
+ # prev_events listed as a backward extremity, it's next to a
+ # backward gap.
+ clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "event_id",
+ [event.event_id] + list(event.prev_event_ids()),
+ )
+
+ txn.execute(backward_extremity_query % (clause,), [event.room_id] + args)
+ backward_extremities = txn.fetchall()
+
+ # We consider any backward extremity as a backward gap
+ if len(backward_extremities):
+ return True
+
+ return False
+
+ return await self.db_pool.runInteraction(
+ "is_event_next_to_backward_gap_txn",
+ is_event_next_to_backward_gap_txn,
+ )
+
+ async def is_event_next_to_forward_gap(self, event: EventBase) -> bool:
+ """Check if the given event is next to a forward gap of missing events.
+ The gap in front of the latest events is not considered a gap.
+ <latest messages> A(False)--->B(False)--->C(False)---> <gap, unknown events> <oldest messages>
+ <latest messages> A(False)--->B(False)---> <gap, unknown events> --->D(True)--->E(False) <oldest messages>
+
+ Args:
+ room_id: room where the event lives
+ event_id: event to check
+
+ Returns:
+ Boolean indicating whether it's an extremity
+ """
+
+ def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool:
+ # If the event in question is a forward extremity, we will just
+ # consider any potential forward gap as not a gap since it's one of
+ # the latest events in the room.
+ #
+ # `event_forward_extremities` does not include backfilled or outlier
+ # events so we can't rely on it to find forward gaps. We can only
+ # use it to determine whether a message is the latest in the room.
+ #
+ # We can't combine this query with the `forward_edge_query` below
+ # because if the event in question has no forward edges (isn't
+ # referenced by any other event's prev_events) but is in
+ # `event_forward_extremities`, we don't want to return 0 rows and
+ # say it's next to a gap.
+ forward_extremity_query = """
+ SELECT 1 FROM event_forward_extremities
+ WHERE
+ room_id = ?
+ AND event_id = ?
+ LIMIT 1
+ """
+
+ # Check to see whether the event in question is already referenced
+ # by another event. If we don't see any edges, we're next to a
+ # forward gap.
+ forward_edge_query = """
+ SELECT 1 FROM event_edges
+ /* Check to make sure the event referencing our event in question is not rejected */
+ LEFT JOIN rejections ON event_edges.event_id == rejections.event_id
+ WHERE
+ event_edges.room_id = ?
+ AND event_edges.prev_event_id = ?
+ /* It's not a valid edge if the event referencing our event in
+ * question is rejected.
+ */
+ AND rejections.event_id IS NULL
+ LIMIT 1
+ """
+
+ # We consider any forward extremity as the latest in the room and
+ # not a forward gap.
+ #
+ # To expand, even though there is technically a gap at the front of
+ # the room where the forward extremities are, we consider those the
+ # latest messages in the room so asking other homeservers for more
+ # is useless. The new latest messages will just be federated as
+ # usual.
+ txn.execute(forward_extremity_query, (event.room_id, event.event_id))
+ forward_extremities = txn.fetchall()
+ if len(forward_extremities):
+ return False
+
+ # If there are no forward edges to the event in question (another
+ # event hasn't referenced this event in their prev_events), then we
+ # assume there is a forward gap in the history.
+ txn.execute(forward_edge_query, (event.room_id, event.event_id))
+ forward_edges = txn.fetchall()
+ if not len(forward_edges):
+ return True
+
+ return False
+
+ return await self.db_pool.runInteraction(
+ "is_event_next_to_gap_txn",
+ is_event_next_to_gap_txn,
+ )
+
+ async def get_event_id_for_timestamp(
+ self, room_id: str, timestamp: int, direction: str
+ ) -> Optional[str]:
+ """Find the closest event to the given timestamp in the given direction.
+
+ Args:
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ The closest event_id otherwise None if we can't find any event in
+ the given direction.
+ """
+
+ sql_template = """
+ SELECT event_id FROM events
+ LEFT JOIN rejections USING (event_id)
+ WHERE
+ origin_server_ts %s ?
+ AND room_id = ?
+ /* Make sure event is not rejected */
+ AND rejections.event_id IS NULL
+ ORDER BY origin_server_ts %s
+ LIMIT 1;
+ """
+
+ def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
+ if direction == "b":
+ # Find closest event *before* a given timestamp. We use descending
+ # (which gives values largest to smallest) because we want the
+ # largest possible timestamp *before* the given timestamp.
+ comparison_operator = "<="
+ order = "DESC"
+ else:
+ # Find closest event *after* a given timestamp. We use ascending
+ # (which gives values smallest to largest) because we want the
+ # closest possible timestamp *after* the given timestamp.
+ comparison_operator = ">="
+ order = "ASC"
+
+ txn.execute(
+ sql_template % (comparison_operator, order), (timestamp, room_id)
+ )
+ row = txn.fetchone()
+ if row:
+ (event_id,) = row
+ return event_id
+
+ return None
+
+ if direction not in ("f", "b"):
+ raise ValueError("Unknown direction: %s" % (direction,))
+
+ return await self.db_pool.runInteraction(
+ "get_event_id_for_timestamp_txn",
+ get_event_id_for_timestamp_txn,
+ )
|