summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-12-02 01:02:20 -0600
committerGitHub <noreply@github.com>2021-12-02 01:02:20 -0600
commita6f1a3abecf8e8fd3e1bff439a06b853df18f194 (patch)
treee754454d9ebd223535f8cd5c7003744a5ae907de /synapse/storage/databases/main
parentPort wiki pages to documentation website (#11402) (diff)
downloadsynapse-a6f1a3abecf8e8fd3e1bff439a06b853df18f194.tar.xz
Add MSC3030 experimental client and federation API endpoints to get the closest event to a given timestamp (#9445)
MSC3030: https://github.com/matrix-org/matrix-doc/pull/3030

Client API endpoint. This will also go and fetch from the federation API endpoint if unable to find an event locally or we found an extremity with possibly a closer event we don't know about.
```
GET /_matrix/client/unstable/org.matrix.msc3030/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
{
    "event_id": ...
    "origin_server_ts": ...
}
```

Federation API endpoint:
```
GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
{
    "event_id": ...
    "origin_server_ts": ...
}
```

Co-authored-by: Erik Johnston <erik@matrix.org>
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/events_worker.py195
1 files changed, 195 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 4cefc0a07e..fd19674f93 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1762,3 +1762,198 @@ class EventsWorkerStore(SQLBaseStore):
             "_cleanup_old_transaction_ids",
             _cleanup_old_transaction_ids_txn,
         )
+
+    async def is_event_next_to_backward_gap(self, event: EventBase) -> bool:
+        """Check if the given event is next to a backward gap of missing events.
+        <latest messages> A(False)--->B(False)--->C(True)--->  <gap, unknown events> <oldest messages>
+
+        Args:
+            room_id: room where the event lives
+            event_id: event to check
+
+        Returns:
+            Boolean indicating whether it's an extremity
+        """
+
+        def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool:
+            # If the event in question has any of its prev_events listed as a
+            # backward extremity, it's next to a gap.
+            #
+            # We can't just check the backward edges in `event_edges` because
+            # when we persist events, we will also record the prev_events as
+            # edges to the event in question regardless of whether we have those
+            # prev_events yet. We need to check whether those prev_events are
+            # backward extremities, also known as gaps, that need to be
+            # backfilled.
+            backward_extremity_query = """
+                SELECT 1 FROM event_backward_extremities
+                WHERE
+                    room_id = ?
+                    AND %s
+                LIMIT 1
+            """
+
+            # If the event in question is a backward extremity or has any of its
+            # prev_events listed as a backward extremity, it's next to a
+            # backward gap.
+            clause, args = make_in_list_sql_clause(
+                self.database_engine,
+                "event_id",
+                [event.event_id] + list(event.prev_event_ids()),
+            )
+
+            txn.execute(backward_extremity_query % (clause,), [event.room_id] + args)
+            backward_extremities = txn.fetchall()
+
+            # We consider any backward extremity as a backward gap
+            if len(backward_extremities):
+                return True
+
+            return False
+
+        return await self.db_pool.runInteraction(
+            "is_event_next_to_backward_gap_txn",
+            is_event_next_to_backward_gap_txn,
+        )
+
+    async def is_event_next_to_forward_gap(self, event: EventBase) -> bool:
+        """Check if the given event is next to a forward gap of missing events.
+        The gap in front of the latest events is not considered a gap.
+        <latest messages> A(False)--->B(False)--->C(False)--->  <gap, unknown events> <oldest messages>
+        <latest messages> A(False)--->B(False)--->  <gap, unknown events>  --->D(True)--->E(False) <oldest messages>
+
+        Args:
+            room_id: room where the event lives
+            event_id: event to check
+
+        Returns:
+            Boolean indicating whether it's an extremity
+        """
+
+        def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool:
+            # If the event in question is a forward extremity, we will just
+            # consider any potential forward gap as not a gap since it's one of
+            # the latest events in the room.
+            #
+            # `event_forward_extremities` does not include backfilled or outlier
+            # events so we can't rely on it to find forward gaps. We can only
+            # use it to determine whether a message is the latest in the room.
+            #
+            # We can't combine this query with the `forward_edge_query` below
+            # because if the event in question has no forward edges (isn't
+            # referenced by any other event's prev_events) but is in
+            # `event_forward_extremities`, we don't want to return 0 rows and
+            # say it's next to a gap.
+            forward_extremity_query = """
+                SELECT 1 FROM event_forward_extremities
+                WHERE
+                    room_id = ?
+                    AND event_id = ?
+                LIMIT 1
+            """
+
+            # Check to see whether the event in question is already referenced
+            # by another event. If we don't see any edges, we're next to a
+            # forward gap.
+            forward_edge_query = """
+                SELECT 1 FROM event_edges
+                /* Check to make sure the event referencing our event in question is not rejected */
+                LEFT JOIN rejections ON event_edges.event_id == rejections.event_id
+                WHERE
+                    event_edges.room_id = ?
+                    AND event_edges.prev_event_id = ?
+                    /* It's not a valid edge if the event referencing our event in
+                     * question is rejected.
+                     */
+                    AND rejections.event_id IS NULL
+                LIMIT 1
+            """
+
+            # We consider any forward extremity as the latest in the room and
+            # not a forward gap.
+            #
+            # To expand, even though there is technically a gap at the front of
+            # the room where the forward extremities are, we consider those the
+            # latest messages in the room so asking other homeservers for more
+            # is useless. The new latest messages will just be federated as
+            # usual.
+            txn.execute(forward_extremity_query, (event.room_id, event.event_id))
+            forward_extremities = txn.fetchall()
+            if len(forward_extremities):
+                return False
+
+            # If there are no forward edges to the event in question (another
+            # event hasn't referenced this event in their prev_events), then we
+            # assume there is a forward gap in the history.
+            txn.execute(forward_edge_query, (event.room_id, event.event_id))
+            forward_edges = txn.fetchall()
+            if not len(forward_edges):
+                return True
+
+            return False
+
+        return await self.db_pool.runInteraction(
+            "is_event_next_to_gap_txn",
+            is_event_next_to_gap_txn,
+        )
+
+    async def get_event_id_for_timestamp(
+        self, room_id: str, timestamp: int, direction: str
+    ) -> Optional[str]:
+        """Find the closest event to the given timestamp in the given direction.
+
+        Args:
+            room_id: Room to fetch the event from
+            timestamp: The point in time (inclusive) we should navigate from in
+                the given direction to find the closest event.
+            direction: ["f"|"b"] to indicate whether we should navigate forward
+                or backward from the given timestamp to find the closest event.
+
+        Returns:
+            The closest event_id otherwise None if we can't find any event in
+            the given direction.
+        """
+
+        sql_template = """
+            SELECT event_id FROM events
+            LEFT JOIN rejections USING (event_id)
+            WHERE
+                origin_server_ts %s ?
+                AND room_id = ?
+                /* Make sure event is not rejected */
+                AND rejections.event_id IS NULL
+            ORDER BY origin_server_ts %s
+            LIMIT 1;
+        """
+
+        def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
+            if direction == "b":
+                # Find closest event *before* a given timestamp. We use descending
+                # (which gives values largest to smallest) because we want the
+                # largest possible timestamp *before* the given timestamp.
+                comparison_operator = "<="
+                order = "DESC"
+            else:
+                # Find closest event *after* a given timestamp. We use ascending
+                # (which gives values smallest to largest) because we want the
+                # closest possible timestamp *after* the given timestamp.
+                comparison_operator = ">="
+                order = "ASC"
+
+            txn.execute(
+                sql_template % (comparison_operator, order), (timestamp, room_id)
+            )
+            row = txn.fetchone()
+            if row:
+                (event_id,) = row
+                return event_id
+
+            return None
+
+        if direction not in ("f", "b"):
+            raise ValueError("Unknown direction: %s" % (direction,))
+
+        return await self.db_pool.runInteraction(
+            "get_event_id_for_timestamp_txn",
+            get_event_id_for_timestamp_txn,
+        )