summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/cache.py3
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/databases/main/stream.py145
-rw-r--r--synapse/storage/schema/main/delta/85/07_sliding_sync.sql24
4 files changed, 193 insertions, 3 deletions
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py

index 2d6b75e47e..2fcd927089 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore): if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined] + self._attempt_to_invalidate_cache( + "get_max_stream_ordering_in_room", (room_id,) + ) if redacts: self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined] diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore: # From this point onwards the events are only events that we haven't # seen before. - self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts) if new_forward_extremities: self._update_forward_extremities_txn( @@ -1555,6 +1555,7 @@ class PersistEventsStore: def _store_event_txn( self, txn: LoggingTransaction, + room_id: str, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: """Insert new events into the event, event_json, redaction and @@ -1629,6 +1630,27 @@ class PersistEventsStore: ], ) + # Update the `sliding_sync_room_metadata` with the latest + # (non-backfilled, ie positive) stream ordering. + # + # We know this list is sorted and non-empty, so we just take the last + # one event. + max_stream_ordering: int + for e, _ in events_and_contexts: + assert e.internal_metadata.stream_ordering is not None + max_stream_ordering = e.internal_metadata.stream_ordering + + if max_stream_ordering > 0: + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_room_metadata", + keyvalues={"room_id": room_id}, + values={ + "instance_name": self._instance_name, + "last_stream_ordering": max_stream_ordering, + }, + ) + # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as # requiring censoring. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e74e0d2e91..d3e4340d4c 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -50,6 +50,7 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -78,8 +79,13 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken -from synapse.util.caches.descriptors import cached +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + StrCollection, +) +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable @@ -610,6 +616,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() + database.updates.register_background_update_handler( + "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update + ) + def get_room_max_stream_ordering(self) -> int: """Get the stream_ordering of regular events that we have committed up to @@ -1185,6 +1195,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None + @cachedList( + cached_method_name="get_max_stream_ordering_in_room", + list_name="room_ids", + ) + async def get_max_stream_ordering_in_rooms( + self, room_ids: StrCollection + ) -> Mapping[str, Optional[PersistedEventPosition]]: + """Get the positions for the latest event in a room. + + A batched version of `get_max_stream_ordering_in_room`. + """ + rows = await self.db_pool.simple_select_many_batch( + table="sliding_sync_room_metadata", + column="room_id", + iterable=room_ids, + retcols=("room_id", "instance_name", "last_stream_ordering"), + desc="get_max_stream_ordering_in_rooms", + ) + + return { + room_id: PersistedEventPosition(instance_name, stream) + for room_id, instance_name, stream in rows + } + + @cached(max_entries=10000) + async def get_max_stream_ordering_in_room( + self, + room_id: str, + ) -> Optional[PersistedEventPosition]: + """Get the position for the latest event in a room. + + Note: this may be after the current token for the room stream on this + process (e.g. due to replication lag) + """ + row = await self.db_pool.simple_select_one( + table="sliding_sync_room_metadata", + retcols=("instance_name", "last_stream_ordering"), + keyvalues={"room_id": room_id}, + allow_none=True, + desc="get_max_stream_ordering_in_room", + ) + if not row: + return None + + return PersistedEventPosition(instance_name=row[0], stream=row[1]) + async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, @@ -1983,3 +2039,88 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return RoomStreamToken(stream=last_position.stream - 1) return None + + async def _sliding_sync_room_metadata_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update to fill out 'sliding_sync_room_metadata' table""" + previous_room = progress.get("previous_room", "") + + def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int: + # Both these queries are just getting the most recent + # instance_name/stream ordering for the next N rooms. + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT room_id, instance_name, stream_ordering FROM rooms AS r, + LATERAL ( + SELECT instance_name, stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) e + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + else: + sql = """ + SELECT + room_id, + ( + SELECT instance_name + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ), + ( + SELECT stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) + FROM rooms AS r + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + + txn.execute(sql, (previous_room, batch_size)) + rows = txn.fetchall() + if not rows: + return 0 + + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_room_metadata", + key_names=("room_id",), + key_values=[(room_id,) for room_id, _, _ in rows], + value_names=( + "instance_name", + "last_stream_ordering", + ), + value_values=[ + ( + instance_name or "master", + stream, + ) + for _, instance_name, stream in rows + ], + ) + + self.db_pool.updates._background_update_progress_txn( + txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]} + ) + + return len(rows) + + rows = await self.db_pool.runInteraction( + "_sliding_sync_room_metadata_bg_update", + _sliding_sync_room_metadata_bg_update_txn, + ) + + if rows == 0: + await self.db_pool.updates._end_background_update( + "sliding_sync_room_metadata" + ) + + return rows diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql new file mode 100644
index 0000000000..e8bc33ff40 --- /dev/null +++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,24 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + +-- A table that maps from room ID to metadata useful for sliding sync. +CREATE TABLE sliding_sync_room_metadata ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- The instance_name / stream ordering of the last event in the room. + instance_name TEXT NOT NULL, + last_stream_ordering BIGINT NOT NULL +); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8507, 'sliding_sync_room_metadata', '{}');