diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2d6b75e47e..2fcd927089 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]
+ self._attempt_to_invalidate_cache(
+ "get_max_stream_ordering_in_room", (room_id,)
+ )
if redacts:
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore:
# From this point onwards the events are only events that we haven't
# seen before.
- self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+ self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
if new_forward_extremities:
self._update_forward_extremities_txn(
@@ -1555,6 +1555,7 @@ class PersistEventsStore:
def _store_event_txn(
self,
txn: LoggingTransaction,
+ room_id: str,
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
) -> None:
"""Insert new events into the event, event_json, redaction and
@@ -1629,6 +1630,27 @@ class PersistEventsStore:
],
)
+ # Update the `sliding_sync_room_metadata` with the latest
+ # (non-backfilled, ie positive) stream ordering.
+ #
+ # We know this list is sorted and non-empty, so we just take the last
+ # one event.
+ max_stream_ordering: int
+ for e, _ in events_and_contexts:
+ assert e.internal_metadata.stream_ordering is not None
+ max_stream_ordering = e.internal_metadata.stream_ordering
+
+ if max_stream_ordering > 0:
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_room_metadata",
+ keyvalues={"room_id": room_id},
+ values={
+ "instance_name": self._instance_name,
+ "last_stream_ordering": max_stream_ordering,
+ },
+ )
+
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e74e0d2e91..d3e4340d4c 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -50,6 +50,7 @@ from typing import (
Dict,
Iterable,
List,
+ Mapping,
Optional,
Set,
Tuple,
@@ -78,8 +79,13 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken
-from synapse.util.caches.descriptors import cached
+from synapse.types import (
+ JsonDict,
+ PersistedEventPosition,
+ RoomStreamToken,
+ StrCollection,
+)
+from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
@@ -610,6 +616,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
+ database.updates.register_background_update_handler(
+ "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update
+ )
+
def get_room_max_stream_ordering(self) -> int:
"""Get the stream_ordering of regular events that we have committed up to
@@ -1185,6 +1195,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
+ @cachedList(
+ cached_method_name="get_max_stream_ordering_in_room",
+ list_name="room_ids",
+ )
+ async def get_max_stream_ordering_in_rooms(
+ self, room_ids: StrCollection
+ ) -> Mapping[str, Optional[PersistedEventPosition]]:
+ """Get the positions for the latest event in a room.
+
+ A batched version of `get_max_stream_ordering_in_room`.
+ """
+ rows = await self.db_pool.simple_select_many_batch(
+ table="sliding_sync_room_metadata",
+ column="room_id",
+ iterable=room_ids,
+ retcols=("room_id", "instance_name", "last_stream_ordering"),
+ desc="get_max_stream_ordering_in_rooms",
+ )
+
+ return {
+ room_id: PersistedEventPosition(instance_name, stream)
+ for room_id, instance_name, stream in rows
+ }
+
+ @cached(max_entries=10000)
+ async def get_max_stream_ordering_in_room(
+ self,
+ room_id: str,
+ ) -> Optional[PersistedEventPosition]:
+ """Get the position for the latest event in a room.
+
+ Note: this may be after the current token for the room stream on this
+ process (e.g. due to replication lag)
+ """
+ row = await self.db_pool.simple_select_one(
+ table="sliding_sync_room_metadata",
+ retcols=("instance_name", "last_stream_ordering"),
+ keyvalues={"room_id": room_id},
+ allow_none=True,
+ desc="get_max_stream_ordering_in_room",
+ )
+ if not row:
+ return None
+
+ return PersistedEventPosition(instance_name=row[0], stream=row[1])
+
async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
@@ -1983,3 +2039,88 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return RoomStreamToken(stream=last_position.stream - 1)
return None
+
+ async def _sliding_sync_room_metadata_bg_update(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Background update to fill out 'sliding_sync_room_metadata' table"""
+ previous_room = progress.get("previous_room", "")
+
+ def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
+ # Both these queries are just getting the most recent
+ # instance_name/stream ordering for the next N rooms.
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = """
+ SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
+ LATERAL (
+ SELECT instance_name, stream_ordering
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ ) e
+ WHERE r.room_id > ?
+ ORDER BY r.room_id ASC
+ LIMIT ?
+ """
+ else:
+ sql = """
+ SELECT
+ room_id,
+ (
+ SELECT instance_name
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ ),
+ (
+ SELECT stream_ordering
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ )
+ FROM rooms AS r
+ WHERE r.room_id > ?
+ ORDER BY r.room_id ASC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (previous_room, batch_size))
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="sliding_sync_room_metadata",
+ key_names=("room_id",),
+ key_values=[(room_id,) for room_id, _, _ in rows],
+ value_names=(
+ "instance_name",
+ "last_stream_ordering",
+ ),
+ value_values=[
+ (
+ instance_name or "master",
+ stream,
+ )
+ for _, instance_name, stream in rows
+ ],
+ )
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]}
+ )
+
+ return len(rows)
+
+ rows = await self.db_pool.runInteraction(
+ "_sliding_sync_room_metadata_bg_update",
+ _sliding_sync_room_metadata_bg_update_txn,
+ )
+
+ if rows == 0:
+ await self.db_pool.updates._end_background_update(
+ "sliding_sync_room_metadata"
+ )
+
+ return rows
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
new file mode 100644
index 0000000000..e8bc33ff40
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,24 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- A table that maps from room ID to metadata useful for sliding sync.
+CREATE TABLE sliding_sync_room_metadata (
+ room_id TEXT NOT NULL PRIMARY KEY,
+
+ -- The instance_name / stream ordering of the last event in the room.
+ instance_name TEXT NOT NULL,
+ last_stream_ordering BIGINT NOT NULL
+);
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (8507, 'sliding_sync_room_metadata', '{}');
|