diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8788d13cc7..9ecef16dad 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -1033,10 +1033,11 @@ class SlidingSyncHandler:
# If they are fully-joined to the room, let's find the latest activity
# at/before the `to_token`.
if room_for_user.membership == Membership.JOIN:
- stream_pos = self.store._events_stream_cache._entity_to_key.get(room_id)
- if stream_pos is not None:
- last_activity_in_room_map[room_id] = stream_pos
- continue
+ stream = self.store._events_stream_cache._entity_to_key.get(room_id)
+ if stream is not None:
+ if stream <= to_token.room_key.stream:
+ last_activity_in_room_map[room_id] = stream
+ continue
to_fetch.append(room_id)
else:
@@ -1049,11 +1050,15 @@ class SlidingSyncHandler:
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
- for room_id, stream_pos in (
- await self.store.rough_get_last_pos(to_fetch)
- ).items():
- if stream_pos is not None:
- last_activity_in_room_map[room_id] = stream_pos
+ ordering_map = await self.store.get_max_stream_ordering_in_rooms(to_fetch)
+ for room_id, stream_pos in ordering_map.items():
+ if stream_pos is None:
+ continue
+
+ if stream_pos.persisted_after(to_token.room_key):
+ continue
+
+ last_activity_in_room_map[room_id] = stream_pos.stream
for room_id in sync_room_map.keys() - last_activity_in_room_map.keys():
# TODO: Handle better
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 0550ab73da..2fcd927089 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache(
- "get_rough_stream_ordering_for_room", (room_id,)
+ "get_max_stream_ordering_in_room", (room_id,)
)
if redacts:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 02bc7c3d5e..0c7c2f9306 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore:
# From this point onwards the events are only events that we haven't
# seen before.
- self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+ self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
if new_forward_extremities:
self._update_forward_extremities_txn(
@@ -1555,6 +1555,7 @@ class PersistEventsStore:
def _store_event_txn(
self,
txn: LoggingTransaction,
+ room_id: str,
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
) -> None:
"""Insert new events into the event, event_json, redaction and
@@ -1629,17 +1630,26 @@ class PersistEventsStore:
],
)
- self.db_pool.simple_upsert_many_txn(
- txn,
- table="sliding_sync_room_metadata",
- key_names=("room_id",),
- key_values=[(event.room_id,) for event, _ in events_and_contexts],
- value_names=("last_stream_ordering",),
- value_values=[
- (event.internal_metadata.stream_ordering,)
- for event, _ in events_and_contexts
- ],
- )
+ # Update the `sliding_sync_room_metadata` with the latest
+ # (non-backfilled, ie positive) stream ordering.
+ #
+ # We know this list is sorted and non-empty, so we just take the last
+ # one event.
+ max_stream_ordering: int
+ for e, _ in events_and_contexts:
+ assert e.internal_metadata.stream_ordering is not None
+ max_stream_ordering = e.internal_metadata.stream_ordering
+
+ if max_stream_ordering > 0:
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_room_metadata",
+ keyvalues={"room_id": room_id},
+ values={
+ "instance_name": self._instance_name,
+ "last_stream_ordering": max_stream_ordering,
+ },
+ )
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ec0a2d4d16..e552b7d85f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -79,7 +79,12 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken, StrSequence
+from synapse.types import (
+ JsonDict,
+ PersistedEventPosition,
+ RoomStreamToken,
+ StrCollection,
+)
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
@@ -1192,63 +1197,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
@cachedList(
- cached_method_name="get_rough_stream_ordering_for_room",
+ cached_method_name="get_max_stream_ordering_in_room",
list_name="room_ids",
)
- async def rough_get_last_pos(
- self, room_ids: StrSequence
- ) -> Mapping[str, Optional[int]]:
- def rough_get_last_pos_txn(
- txn: LoggingTransaction,
- batch: StrSequence,
- ) -> Mapping[str, Optional[int]]:
- clause, args = make_in_list_sql_clause(
- self.database_engine, "room_id", batch
- )
- sql = f"""
- SELECT room_id, last_stream_ordering
- FROM sliding_sync_room_metadata
- WHERE {clause}
- """
-
- txn.execute(sql, args)
-
- return {room_id: stream_ordering for room_id, stream_ordering in txn}
+ async def get_max_stream_ordering_in_rooms(
+ self, room_ids: StrCollection
+ ) -> Mapping[str, Optional[PersistedEventPosition]]:
+ """Get the positions for the latest event in a room.
- results = {}
- for batch in batch_iter(room_ids, 100):
- results.update(
- await self.db_pool.runInteraction(
- "rough_get_last_pos", rough_get_last_pos_txn, batch
- )
- )
+ A batched version of `get_max_stream_ordering_in_room`.
+ """
+ rows = await self.db_pool.simple_select_many_batch(
+ table="sliding_sync_room_metadata",
+ column="room_id",
+ iterable=room_ids,
+ retcols=("room_id", "instance_name", "last_stream_ordering"),
+ desc="get_max_stream_ordering_in_rooms",
+ )
- return results
+ return {
+ room_id: PersistedEventPosition(instance_name, stream)
+ for room_id, instance_name, stream in rows
+ }
@cached(max_entries=10000)
- async def get_rough_stream_ordering_for_room(
+ async def get_max_stream_ordering_in_room(
self,
room_id: str,
- ) -> Optional[int]:
- def get_rough_stream_ordering_for_room_txn(
- txn: LoggingTransaction,
- ) -> Optional[int]:
- sql = """
- SELECT last_stream_ordering
- FROM sliding_sync_room_metadata
- WHERE room_id = ?
- """
-
- txn.execute(sql, (room_id,))
+ ) -> Optional[PersistedEventPosition]:
+ """Get the position for the latest event in a room.
- row = txn.fetchone()
- if row:
- return row[0]
+ Note: this may be after the current token for the room stream on this
+ process (e.g. due to replication lag)
+ """
+ row = await self.db_pool.simple_select_one(
+ table="sliding_sync_room_metadata",
+ retcols=("instance_name", "last_stream_ordering"),
+ keyvalues={"room_id": room_id},
+ allow_none=True,
+ desc="get_max_stream_ordering_in_room",
+ )
+ if not row:
return None
- return await self.db_pool.runInteraction(
- "get_rough_stream_ordering_for_room", get_rough_stream_ordering_for_room_txn
- )
+ return PersistedEventPosition(instance_name=row[0], stream=row[1])
async def get_last_event_pos_in_room_before_stream_ordering(
self,
@@ -2056,17 +2048,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
previous_room = progress.get("previous_room", "")
def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
- sql = """
- SELECT room_id, MAX(stream_ordering) FROM events
- WHERE stream_ordering IS NOT NULL
- AND room_id IN (
- SELECT room_id FROM rooms
- WHERE room_id > ?
- ORDER BY room_id ASC
- LIMIT ?
- )
- GROUP BY room_id
- """
+ # Both these queries are just getting the most recent
+ # instance_name/stream ordering for the next N rooms.
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = """
+ SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
+ LATERAL (
+ SELECT instance_name, stream_ordering
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ ) e
+ WHERE r.room_id > ?
+ ORDER BY r.room_id ASC
+ LIMIT ?
+ """
+ else:
+ sql = """
+ SELECT
+ room_id,
+ (
+ SELECT instance_name
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ ),
+ (
+ SELECT stream_ordering
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ )
+ FROM rooms AS r
+ WHERE r.room_id > ?
+ ORDER BY r.room_id ASC
+ LIMIT ?
+ """
+
txn.execute(sql, (previous_room, batch_size))
rows = txn.fetchall()
if not rows:
@@ -2076,9 +2094,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn,
table="sliding_sync_room_metadata",
key_names=("room_id",),
- key_values=[(room_id,) for room_id, _ in rows],
+ key_values=[(room_id,) for room_id, _, _ in rows],
value_names=("last_stream_ordering",),
- value_values=[(stream,) for _, stream in rows],
+ value_values=[
+ (
+ instance_name or "master",
+ stream,
+ )
+ for _, instance_name, stream in rows
+ ],
)
self.db_pool.updates._background_update_progress_txn(
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
index d8219aa922..e8bc33ff40 100644
--- a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -11,9 +11,13 @@
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+-- A table that maps from room ID to metadata useful for sliding sync.
CREATE TABLE sliding_sync_room_metadata (
room_id TEXT NOT NULL PRIMARY KEY,
- last_stream_ordering BIGINT
+
+ -- The instance_name / stream ordering of the last event in the room.
+ instance_name TEXT NOT NULL,
+ last_stream_ordering BIGINT NOT NULL
);
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|