summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/sliding_sync.py61
-rw-r--r--synapse/storage/databases/main/cache.py3
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/databases/main/stream.py145
-rw-r--r--synapse/storage/schema/main/delta/85/07_sliding_sync.sql24
5 files changed, 234 insertions, 23 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 818b13621c..1135a2a436 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -34,6 +34,7 @@ from synapse.api.constants import (
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
+from synapse.logging.opentracing import trace
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.types import (
     JsonDict,
@@ -988,6 +989,7 @@ class SlidingSyncHandler:
         # Assemble a new sync room map but only with the `filtered_room_id_set`
         return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
 
+    @trace
     async def sort_rooms(
         self,
         sync_room_map: Dict[str, _RoomMembershipForUser],
@@ -1009,24 +1011,18 @@ class SlidingSyncHandler:
         # Assemble a map of room ID to the `stream_ordering` of the last activity that the
         # user should see in the room (<= `to_token`)
         last_activity_in_room_map: Dict[str, int] = {}
+        to_fetch = []
         for room_id, room_for_user in sync_room_map.items():
             # If they are fully-joined to the room, let's find the latest activity
             # at/before the `to_token`.
             if room_for_user.membership == Membership.JOIN:
-                last_event_result = (
-                    await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                        room_id, to_token.room_key
-                    )
-                )
-
-                # If the room has no events at/before the `to_token`, this is probably a
-                # mistake in the code that generates the `sync_room_map` since that should
-                # only give us rooms that the user had membership in during the token range.
-                assert last_event_result is not None
-
-                _, event_pos = last_event_result
+                stream = self.store._events_stream_cache._entity_to_key.get(room_id)
+                if stream is not None:
+                    if stream <= to_token.room_key.stream:
+                        last_activity_in_room_map[room_id] = stream
+                        continue
 
-                last_activity_in_room_map[room_id] = event_pos.stream
+                to_fetch.append(room_id)
             else:
                 # Otherwise, if the user has left/been invited/knocked/been banned from
                 # a room, they shouldn't see anything past that point.
@@ -1037,6 +1033,20 @@ class SlidingSyncHandler:
                 # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
                 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
 
+        ordering_map = await self.store.get_max_stream_ordering_in_rooms(to_fetch)
+        for room_id, stream_pos in ordering_map.items():
+            if stream_pos is None:
+                continue
+
+            if stream_pos.persisted_after(to_token.room_key):
+                continue
+
+            last_activity_in_room_map[room_id] = stream_pos.stream
+
+        for room_id in sync_room_map.keys() - last_activity_in_room_map.keys():
+            # TODO: Handle better
+            last_activity_in_room_map[room_id] = sync_room_map[room_id].event_pos.stream
+
         return sorted(
             sync_room_map.values(),
             # Sort by the last activity (stream_ordering) in the room
@@ -1409,18 +1419,29 @@ class SlidingSyncHandler:
                     break
 
         # Figure out the last bump event in the room
-        last_bump_event_result = (
-            await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+        last_bump_event_stream_ordering = None
+        if timeline_events:
+            for e in reversed(timeline_events):
+                if e.type in DEFAULT_BUMP_EVENT_TYPES:
+                    last_bump_event_stream_ordering = (
+                        e.internal_metadata.stream_ordering
+                    )
+                    break
+
+        if last_bump_event_stream_ordering is None:
+            last_bump_event_result = (
+                await self.store.get_last_event_pos_in_room_before_stream_ordering(
+                    room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+                )
             )
-        )
+            if last_bump_event_result is not None:
+                last_bump_event_stream_ordering = last_bump_event_result[1].stream
 
         # By default, just choose the membership event position
         bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
         # But if we found a bump event, use that instead
-        if last_bump_event_result is not None:
-            _, bump_event_pos = last_bump_event_result
-            bump_stamp = bump_event_pos.stream
+        if last_bump_event_stream_ordering is not None:
+            bump_stamp = last_bump_event_stream_ordering
 
         return SlidingSyncResult.RoomResult(
             name=room_name,
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2d6b75e47e..2fcd927089 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(room_id, stream_ordering)  # type: ignore[attr-defined]
+            self._attempt_to_invalidate_cache(
+                "get_max_stream_ordering_in_room", (room_id,)
+            )
 
         if redacts:
             self._invalidate_local_get_event_cache(redacts)  # type: ignore[attr-defined]
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore:
         # From this point onwards the events are only events that we haven't
         # seen before.
 
-        self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+        self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
 
         if new_forward_extremities:
             self._update_forward_extremities_txn(
@@ -1555,6 +1555,7 @@ class PersistEventsStore:
     def _store_event_txn(
         self,
         txn: LoggingTransaction,
+        room_id: str,
         events_and_contexts: Collection[Tuple[EventBase, EventContext]],
     ) -> None:
         """Insert new events into the event, event_json, redaction and
@@ -1629,6 +1630,27 @@ class PersistEventsStore:
             ],
         )
 
+        # Update the `sliding_sync_room_metadata` with the latest
+        # (non-backfilled, ie positive) stream ordering.
+        #
+        # We know this list is sorted and non-empty, so we just take the last
+        # one event.
+        max_stream_ordering: int
+        for e, _ in events_and_contexts:
+            assert e.internal_metadata.stream_ordering is not None
+            max_stream_ordering = e.internal_metadata.stream_ordering
+
+        if max_stream_ordering > 0:
+            self.db_pool.simple_upsert_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                keyvalues={"room_id": room_id},
+                values={
+                    "instance_name": self._instance_name,
+                    "last_stream_ordering": max_stream_ordering,
+                },
+            )
+
         # If we're persisting an unredacted event we go and ensure
         # that we mark any redactions that reference this event as
         # requiring censoring.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e74e0d2e91..d3e4340d4c 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -50,6 +50,7 @@ from typing import (
     Dict,
     Iterable,
     List,
+    Mapping,
     Optional,
     Set,
     Tuple,
@@ -78,8 +79,13 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken
-from synapse.util.caches.descriptors import cached
+from synapse.types import (
+    JsonDict,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StrCollection,
+)
+from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
 
@@ -610,6 +616,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
+        database.updates.register_background_update_handler(
+            "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update
+        )
+
     def get_room_max_stream_ordering(self) -> int:
         """Get the stream_ordering of regular events that we have committed up to
 
@@ -1185,6 +1195,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return None
 
+    @cachedList(
+        cached_method_name="get_max_stream_ordering_in_room",
+        list_name="room_ids",
+    )
+    async def get_max_stream_ordering_in_rooms(
+        self, room_ids: StrCollection
+    ) -> Mapping[str, Optional[PersistedEventPosition]]:
+        """Get the positions for the latest event in a room.
+
+        A batched version of `get_max_stream_ordering_in_room`.
+        """
+        rows = await self.db_pool.simple_select_many_batch(
+            table="sliding_sync_room_metadata",
+            column="room_id",
+            iterable=room_ids,
+            retcols=("room_id", "instance_name", "last_stream_ordering"),
+            desc="get_max_stream_ordering_in_rooms",
+        )
+
+        return {
+            room_id: PersistedEventPosition(instance_name, stream)
+            for room_id, instance_name, stream in rows
+        }
+
+    @cached(max_entries=10000)
+    async def get_max_stream_ordering_in_room(
+        self,
+        room_id: str,
+    ) -> Optional[PersistedEventPosition]:
+        """Get the position for the latest event in a room.
+
+        Note: this may be after the current token for the room stream on this
+        process (e.g. due to replication lag)
+        """
+        row = await self.db_pool.simple_select_one(
+            table="sliding_sync_room_metadata",
+            retcols=("instance_name", "last_stream_ordering"),
+            keyvalues={"room_id": room_id},
+            allow_none=True,
+            desc="get_max_stream_ordering_in_room",
+        )
+        if not row:
+            return None
+
+        return PersistedEventPosition(instance_name=row[0], stream=row[1])
+
     async def get_last_event_pos_in_room_before_stream_ordering(
         self,
         room_id: str,
@@ -1983,3 +2039,88 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             return RoomStreamToken(stream=last_position.stream - 1)
 
         return None
+
+    async def _sliding_sync_room_metadata_bg_update(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Background update to fill out 'sliding_sync_room_metadata' table"""
+        previous_room = progress.get("previous_room", "")
+
+        def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
+            # Both these queries are just getting the most recent
+            # instance_name/stream ordering for the next N rooms.
+            if isinstance(self.database_engine, PostgresEngine):
+                sql = """
+                    SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
+                    LATERAL (
+                        SELECT instance_name, stream_ordering
+                        FROM events WHERE events.room_id = r.room_id
+                        ORDER BY stream_ordering DESC
+                        LIMIT 1
+                    ) e
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+            else:
+                sql = """
+                    SELECT
+                        room_id,
+                        (
+                            SELECT instance_name
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        ),
+                        (
+                            SELECT stream_ordering
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        )
+                    FROM rooms AS r
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+
+            txn.execute(sql, (previous_room, batch_size))
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                key_names=("room_id",),
+                key_values=[(room_id,) for room_id, _, _ in rows],
+                value_names=(
+                    "instance_name",
+                    "last_stream_ordering",
+                ),
+                value_values=[
+                    (
+                        instance_name or "master",
+                        stream,
+                    )
+                    for _, instance_name, stream in rows
+                ],
+            )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]}
+            )
+
+            return len(rows)
+
+        rows = await self.db_pool.runInteraction(
+            "_sliding_sync_room_metadata_bg_update",
+            _sliding_sync_room_metadata_bg_update_txn,
+        )
+
+        if rows == 0:
+            await self.db_pool.updates._end_background_update(
+                "sliding_sync_room_metadata"
+            )
+
+        return rows
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
new file mode 100644
index 0000000000..e8bc33ff40
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,24 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- A table that maps from room ID to metadata useful for sliding sync.
+CREATE TABLE sliding_sync_room_metadata (
+    room_id TEXT NOT NULL PRIMARY KEY,
+
+    -- The instance_name / stream ordering of the last event in the room.
+    instance_name TEXT NOT NULL,
+    last_stream_ordering BIGINT NOT NULL
+);
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (8507, 'sliding_sync_room_metadata', '{}');