diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/sliding_sync.py | 61 | ||||
-rw-r--r-- | synapse/storage/databases/main/cache.py | 3 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 24 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 145 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/85/07_sliding_sync.sql | 24 |
5 files changed, 234 insertions, 23 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 818b13621c..1135a2a436 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -34,6 +34,7 @@ from synapse.api.constants import ( from synapse.events import EventBase from synapse.events.utils import strip_event from synapse.handlers.relations import BundledAggregations +from synapse.logging.opentracing import trace from synapse.storage.databases.main.stream import CurrentStateDeltaMembership from synapse.types import ( JsonDict, @@ -988,6 +989,7 @@ class SlidingSyncHandler: # Assemble a new sync room map but only with the `filtered_room_id_set` return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + @trace async def sort_rooms( self, sync_room_map: Dict[str, _RoomMembershipForUser], @@ -1009,24 +1011,18 @@ class SlidingSyncHandler: # Assemble a map of room ID to the `stream_ordering` of the last activity that the # user should see in the room (<= `to_token`) last_activity_in_room_map: Dict[str, int] = {} + to_fetch = [] for room_id, room_for_user in sync_room_map.items(): # If they are fully-joined to the room, let's find the latest activity # at/before the `to_token`. if room_for_user.membership == Membership.JOIN: - last_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key - ) - ) - - # If the room has no events at/before the `to_token`, this is probably a - # mistake in the code that generates the `sync_room_map` since that should - # only give us rooms that the user had membership in during the token range. - assert last_event_result is not None - - _, event_pos = last_event_result + stream = self.store._events_stream_cache._entity_to_key.get(room_id) + if stream is not None: + if stream <= to_token.room_key.stream: + last_activity_in_room_map[room_id] = stream + continue - last_activity_in_room_map[room_id] = event_pos.stream + to_fetch.append(room_id) else: # Otherwise, if the user has left/been invited/knocked/been banned from # a room, they shouldn't see anything past that point. @@ -1037,6 +1033,20 @@ class SlidingSyncHandler: # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + ordering_map = await self.store.get_max_stream_ordering_in_rooms(to_fetch) + for room_id, stream_pos in ordering_map.items(): + if stream_pos is None: + continue + + if stream_pos.persisted_after(to_token.room_key): + continue + + last_activity_in_room_map[room_id] = stream_pos.stream + + for room_id in sync_room_map.keys() - last_activity_in_room_map.keys(): + # TODO: Handle better + last_activity_in_room_map[room_id] = sync_room_map[room_id].event_pos.stream + return sorted( sync_room_map.values(), # Sort by the last activity (stream_ordering) in the room @@ -1409,18 +1419,29 @@ class SlidingSyncHandler: break # Figure out the last bump event in the room - last_bump_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + last_bump_event_stream_ordering = None + if timeline_events: + for e in reversed(timeline_events): + if e.type in DEFAULT_BUMP_EVENT_TYPES: + last_bump_event_stream_ordering = ( + e.internal_metadata.stream_ordering + ) + break + + if last_bump_event_stream_ordering is None: + last_bump_event_result = ( + await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + ) ) - ) + if last_bump_event_result is not None: + last_bump_event_stream_ordering = last_bump_event_result[1].stream # By default, just choose the membership event position bump_stamp = room_membership_for_user_at_to_token.event_pos.stream # But if we found a bump event, use that instead - if last_bump_event_result is not None: - _, bump_event_pos = last_bump_event_result - bump_stamp = bump_event_pos.stream + if last_bump_event_stream_ordering is not None: + bump_stamp = last_bump_event_stream_ordering return SlidingSyncResult.RoomResult( name=room_name, diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2d6b75e47e..2fcd927089 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore): if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined] + self._attempt_to_invalidate_cache( + "get_max_stream_ordering_in_room", (room_id,) + ) if redacts: self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined] diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1f7acdb859..0c7c2f9306 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -551,7 +551,7 @@ class PersistEventsStore: # From this point onwards the events are only events that we haven't # seen before. - self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts) if new_forward_extremities: self._update_forward_extremities_txn( @@ -1555,6 +1555,7 @@ class PersistEventsStore: def _store_event_txn( self, txn: LoggingTransaction, + room_id: str, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: """Insert new events into the event, event_json, redaction and @@ -1629,6 +1630,27 @@ class PersistEventsStore: ], ) + # Update the `sliding_sync_room_metadata` with the latest + # (non-backfilled, ie positive) stream ordering. + # + # We know this list is sorted and non-empty, so we just take the last + # one event. + max_stream_ordering: int + for e, _ in events_and_contexts: + assert e.internal_metadata.stream_ordering is not None + max_stream_ordering = e.internal_metadata.stream_ordering + + if max_stream_ordering > 0: + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_room_metadata", + keyvalues={"room_id": room_id}, + values={ + "instance_name": self._instance_name, + "last_stream_ordering": max_stream_ordering, + }, + ) + # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as # requiring censoring. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index e74e0d2e91..d3e4340d4c 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -50,6 +50,7 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -78,8 +79,13 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken -from synapse.util.caches.descriptors import cached +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + StrCollection, +) +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable @@ -610,6 +616,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() + database.updates.register_background_update_handler( + "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update + ) + def get_room_max_stream_ordering(self) -> int: """Get the stream_ordering of regular events that we have committed up to @@ -1185,6 +1195,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None + @cachedList( + cached_method_name="get_max_stream_ordering_in_room", + list_name="room_ids", + ) + async def get_max_stream_ordering_in_rooms( + self, room_ids: StrCollection + ) -> Mapping[str, Optional[PersistedEventPosition]]: + """Get the positions for the latest event in a room. + + A batched version of `get_max_stream_ordering_in_room`. + """ + rows = await self.db_pool.simple_select_many_batch( + table="sliding_sync_room_metadata", + column="room_id", + iterable=room_ids, + retcols=("room_id", "instance_name", "last_stream_ordering"), + desc="get_max_stream_ordering_in_rooms", + ) + + return { + room_id: PersistedEventPosition(instance_name, stream) + for room_id, instance_name, stream in rows + } + + @cached(max_entries=10000) + async def get_max_stream_ordering_in_room( + self, + room_id: str, + ) -> Optional[PersistedEventPosition]: + """Get the position for the latest event in a room. + + Note: this may be after the current token for the room stream on this + process (e.g. due to replication lag) + """ + row = await self.db_pool.simple_select_one( + table="sliding_sync_room_metadata", + retcols=("instance_name", "last_stream_ordering"), + keyvalues={"room_id": room_id}, + allow_none=True, + desc="get_max_stream_ordering_in_room", + ) + if not row: + return None + + return PersistedEventPosition(instance_name=row[0], stream=row[1]) + async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, @@ -1983,3 +2039,88 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return RoomStreamToken(stream=last_position.stream - 1) return None + + async def _sliding_sync_room_metadata_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update to fill out 'sliding_sync_room_metadata' table""" + previous_room = progress.get("previous_room", "") + + def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int: + # Both these queries are just getting the most recent + # instance_name/stream ordering for the next N rooms. + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT room_id, instance_name, stream_ordering FROM rooms AS r, + LATERAL ( + SELECT instance_name, stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) e + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + else: + sql = """ + SELECT + room_id, + ( + SELECT instance_name + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ), + ( + SELECT stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) + FROM rooms AS r + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + + txn.execute(sql, (previous_room, batch_size)) + rows = txn.fetchall() + if not rows: + return 0 + + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_room_metadata", + key_names=("room_id",), + key_values=[(room_id,) for room_id, _, _ in rows], + value_names=( + "instance_name", + "last_stream_ordering", + ), + value_values=[ + ( + instance_name or "master", + stream, + ) + for _, instance_name, stream in rows + ], + ) + + self.db_pool.updates._background_update_progress_txn( + txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]} + ) + + return len(rows) + + rows = await self.db_pool.runInteraction( + "_sliding_sync_room_metadata_bg_update", + _sliding_sync_room_metadata_bg_update_txn, + ) + + if rows == 0: + await self.db_pool.updates._end_background_update( + "sliding_sync_room_metadata" + ) + + return rows diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql new file mode 100644 index 0000000000..e8bc33ff40 --- /dev/null +++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql @@ -0,0 +1,24 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + +-- A table that maps from room ID to metadata useful for sliding sync. +CREATE TABLE sliding_sync_room_metadata ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- The instance_name / stream ordering of the last event in the room. + instance_name TEXT NOT NULL, + last_stream_ordering BIGINT NOT NULL +); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8507, 'sliding_sync_room_metadata', '{}'); |