summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-11 15:35:31 +0100
committerErik Johnston <erik@matrix.org>2024-07-11 15:40:46 +0100
commit99cf1e7c1cc591cb155ebc13161a92a1584759c8 (patch)
tree799eb769c866c0682027f78f4055a101b0066735 /synapse/storage/databases
parentHandle to-device extensions to Sliding Sync (#17416) (diff)
downloadsynapse-github/erikj/ss_sort.tar.xz
Faster sliding sync sorting github/erikj/ss_sort erikj/ss_sort
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/cache.py3
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/databases/main/stream.py145
3 files changed, 169 insertions, 3 deletions
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py

index 2d6b75e47e..2fcd927089 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore): if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined] + self._attempt_to_invalidate_cache( + "get_max_stream_ordering_in_room", (room_id,) + ) if redacts: self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined] diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore: # From this point onwards the events are only events that we haven't # seen before. - self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts) if new_forward_extremities: self._update_forward_extremities_txn( @@ -1555,6 +1555,7 @@ class PersistEventsStore: def _store_event_txn( self, txn: LoggingTransaction, + room_id: str, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: """Insert new events into the event, event_json, redaction and @@ -1629,6 +1630,27 @@ class PersistEventsStore: ], ) + # Update the `sliding_sync_room_metadata` with the latest + # (non-backfilled, ie positive) stream ordering. + # + # We know this list is sorted and non-empty, so we just take the last + # one event. + max_stream_ordering: int + for e, _ in events_and_contexts: + assert e.internal_metadata.stream_ordering is not None + max_stream_ordering = e.internal_metadata.stream_ordering + + if max_stream_ordering > 0: + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_room_metadata", + keyvalues={"room_id": room_id}, + values={ + "instance_name": self._instance_name, + "last_stream_ordering": max_stream_ordering, + }, + ) + # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as # requiring censoring. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e74e0d2e91..d3e4340d4c 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -50,6 +50,7 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -78,8 +79,13 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken -from synapse.util.caches.descriptors import cached +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + StrCollection, +) +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable @@ -610,6 +616,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() + database.updates.register_background_update_handler( + "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update + ) + def get_room_max_stream_ordering(self) -> int: """Get the stream_ordering of regular events that we have committed up to @@ -1185,6 +1195,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None + @cachedList( + cached_method_name="get_max_stream_ordering_in_room", + list_name="room_ids", + ) + async def get_max_stream_ordering_in_rooms( + self, room_ids: StrCollection + ) -> Mapping[str, Optional[PersistedEventPosition]]: + """Get the positions for the latest event in a room. + + A batched version of `get_max_stream_ordering_in_room`. + """ + rows = await self.db_pool.simple_select_many_batch( + table="sliding_sync_room_metadata", + column="room_id", + iterable=room_ids, + retcols=("room_id", "instance_name", "last_stream_ordering"), + desc="get_max_stream_ordering_in_rooms", + ) + + return { + room_id: PersistedEventPosition(instance_name, stream) + for room_id, instance_name, stream in rows + } + + @cached(max_entries=10000) + async def get_max_stream_ordering_in_room( + self, + room_id: str, + ) -> Optional[PersistedEventPosition]: + """Get the position for the latest event in a room. + + Note: this may be after the current token for the room stream on this + process (e.g. due to replication lag) + """ + row = await self.db_pool.simple_select_one( + table="sliding_sync_room_metadata", + retcols=("instance_name", "last_stream_ordering"), + keyvalues={"room_id": room_id}, + allow_none=True, + desc="get_max_stream_ordering_in_room", + ) + if not row: + return None + + return PersistedEventPosition(instance_name=row[0], stream=row[1]) + async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, @@ -1983,3 +2039,88 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return RoomStreamToken(stream=last_position.stream - 1) return None + + async def _sliding_sync_room_metadata_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update to fill out 'sliding_sync_room_metadata' table""" + previous_room = progress.get("previous_room", "") + + def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int: + # Both these queries are just getting the most recent + # instance_name/stream ordering for the next N rooms. + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT room_id, instance_name, stream_ordering FROM rooms AS r, + LATERAL ( + SELECT instance_name, stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) e + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + else: + sql = """ + SELECT + room_id, + ( + SELECT instance_name + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ), + ( + SELECT stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) + FROM rooms AS r + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + + txn.execute(sql, (previous_room, batch_size)) + rows = txn.fetchall() + if not rows: + return 0 + + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_room_metadata", + key_names=("room_id",), + key_values=[(room_id,) for room_id, _, _ in rows], + value_names=( + "instance_name", + "last_stream_ordering", + ), + value_values=[ + ( + instance_name or "master", + stream, + ) + for _, instance_name, stream in rows + ], + ) + + self.db_pool.updates._background_update_progress_txn( + txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]} + ) + + return len(rows) + + rows = await self.db_pool.runInteraction( + "_sliding_sync_room_metadata_bg_update", + _sliding_sync_room_metadata_bg_update_txn, + ) + + if rows == 0: + await self.db_pool.updates._end_background_update( + "sliding_sync_room_metadata" + ) + + return rows