diff options
-rw-r--r-- | synapse/handlers/sliding_sync.py | 23 | ||||
-rw-r--r-- | synapse/storage/databases/main/cache.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 34 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 144 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/85/07_sliding_sync.sql | 6 | ||||
-rw-r--r-- | tests/storage/test_event_chain.py | 1 |
6 files changed, 127 insertions, 83 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 8788d13cc7..9ecef16dad 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1033,10 +1033,11 @@ class SlidingSyncHandler: # If they are fully-joined to the room, let's find the latest activity # at/before the `to_token`. if room_for_user.membership == Membership.JOIN: - stream_pos = self.store._events_stream_cache._entity_to_key.get(room_id) - if stream_pos is not None: - last_activity_in_room_map[room_id] = stream_pos - continue + stream = self.store._events_stream_cache._entity_to_key.get(room_id) + if stream is not None: + if stream <= to_token.room_key.stream: + last_activity_in_room_map[room_id] = stream + continue to_fetch.append(room_id) else: @@ -1049,11 +1050,15 @@ class SlidingSyncHandler: # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream - for room_id, stream_pos in ( - await self.store.rough_get_last_pos(to_fetch) - ).items(): - if stream_pos is not None: - last_activity_in_room_map[room_id] = stream_pos + ordering_map = await self.store.get_max_stream_ordering_in_rooms(to_fetch) + for room_id, stream_pos in ordering_map.items(): + if stream_pos is None: + continue + + if stream_pos.persisted_after(to_token.room_key): + continue + + last_activity_in_room_map[room_id] = stream_pos.stream for room_id in sync_room_map.keys() - last_activity_in_room_map.keys(): # TODO: Handle better diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 0550ab73da..2fcd927089 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined] self._attempt_to_invalidate_cache( - "get_rough_stream_ordering_for_room", (room_id,) + "get_max_stream_ordering_in_room", (room_id,) ) if redacts: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 02bc7c3d5e..0c7c2f9306 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -551,7 +551,7 @@ class PersistEventsStore: # From this point onwards the events are only events that we haven't # seen before. - self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts) if new_forward_extremities: self._update_forward_extremities_txn( @@ -1555,6 +1555,7 @@ class PersistEventsStore: def _store_event_txn( self, txn: LoggingTransaction, + room_id: str, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: """Insert new events into the event, event_json, redaction and @@ -1629,17 +1630,26 @@ class PersistEventsStore: ], ) - self.db_pool.simple_upsert_many_txn( - txn, - table="sliding_sync_room_metadata", - key_names=("room_id",), - key_values=[(event.room_id,) for event, _ in events_and_contexts], - value_names=("last_stream_ordering",), - value_values=[ - (event.internal_metadata.stream_ordering,) - for event, _ in events_and_contexts - ], - ) + # Update the `sliding_sync_room_metadata` with the latest + # (non-backfilled, ie positive) stream ordering. + # + # We know this list is sorted and non-empty, so we just take the last + # one event. + max_stream_ordering: int + for e, _ in events_and_contexts: + assert e.internal_metadata.stream_ordering is not None + max_stream_ordering = e.internal_metadata.stream_ordering + + if max_stream_ordering > 0: + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_room_metadata", + keyvalues={"room_id": room_id}, + values={ + "instance_name": self._instance_name, + "last_stream_ordering": max_stream_ordering, + }, + ) # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index ec0a2d4d16..e552b7d85f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -79,7 +79,12 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken, StrSequence +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + StrCollection, +) from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable @@ -1192,63 +1197,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None @cachedList( - cached_method_name="get_rough_stream_ordering_for_room", + cached_method_name="get_max_stream_ordering_in_room", list_name="room_ids", ) - async def rough_get_last_pos( - self, room_ids: StrSequence - ) -> Mapping[str, Optional[int]]: - def rough_get_last_pos_txn( - txn: LoggingTransaction, - batch: StrSequence, - ) -> Mapping[str, Optional[int]]: - clause, args = make_in_list_sql_clause( - self.database_engine, "room_id", batch - ) - sql = f""" - SELECT room_id, last_stream_ordering - FROM sliding_sync_room_metadata - WHERE {clause} - """ - - txn.execute(sql, args) - - return {room_id: stream_ordering for room_id, stream_ordering in txn} + async def get_max_stream_ordering_in_rooms( + self, room_ids: StrCollection + ) -> Mapping[str, Optional[PersistedEventPosition]]: + """Get the positions for the latest event in a room. - results = {} - for batch in batch_iter(room_ids, 100): - results.update( - await self.db_pool.runInteraction( - "rough_get_last_pos", rough_get_last_pos_txn, batch - ) - ) + A batched version of `get_max_stream_ordering_in_room`. + """ + rows = await self.db_pool.simple_select_many_batch( + table="sliding_sync_room_metadata", + column="room_id", + iterable=room_ids, + retcols=("room_id", "instance_name", "last_stream_ordering"), + desc="get_max_stream_ordering_in_rooms", + ) - return results + return { + room_id: PersistedEventPosition(instance_name, stream) + for room_id, instance_name, stream in rows + } @cached(max_entries=10000) - async def get_rough_stream_ordering_for_room( + async def get_max_stream_ordering_in_room( self, room_id: str, - ) -> Optional[int]: - def get_rough_stream_ordering_for_room_txn( - txn: LoggingTransaction, - ) -> Optional[int]: - sql = """ - SELECT last_stream_ordering - FROM sliding_sync_room_metadata - WHERE room_id = ? - """ - - txn.execute(sql, (room_id,)) + ) -> Optional[PersistedEventPosition]: + """Get the position for the latest event in a room. - row = txn.fetchone() - if row: - return row[0] + Note: this may be after the current token for the room stream on this + process (e.g. due to replication lag) + """ + row = await self.db_pool.simple_select_one( + table="sliding_sync_room_metadata", + retcols=("instance_name", "last_stream_ordering"), + keyvalues={"room_id": room_id}, + allow_none=True, + desc="get_max_stream_ordering_in_room", + ) + if not row: return None - return await self.db_pool.runInteraction( - "get_rough_stream_ordering_for_room", get_rough_stream_ordering_for_room_txn - ) + return PersistedEventPosition(instance_name=row[0], stream=row[1]) async def get_last_event_pos_in_room_before_stream_ordering( self, @@ -2056,17 +2048,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): previous_room = progress.get("previous_room", "") def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int: - sql = """ - SELECT room_id, MAX(stream_ordering) FROM events - WHERE stream_ordering IS NOT NULL - AND room_id IN ( - SELECT room_id FROM rooms - WHERE room_id > ? - ORDER BY room_id ASC - LIMIT ? - ) - GROUP BY room_id - """ + # Both these queries are just getting the most recent + # instance_name/stream ordering for the next N rooms. + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT room_id, instance_name, stream_ordering FROM rooms AS r, + LATERAL ( + SELECT instance_name, stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) e + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + else: + sql = """ + SELECT + room_id, + ( + SELECT instance_name + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ), + ( + SELECT stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) + FROM rooms AS r + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + txn.execute(sql, (previous_room, batch_size)) rows = txn.fetchall() if not rows: @@ -2076,9 +2094,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn, table="sliding_sync_room_metadata", key_names=("room_id",), - key_values=[(room_id,) for room_id, _ in rows], + key_values=[(room_id,) for room_id, _, _ in rows], value_names=("last_stream_ordering",), - value_values=[(stream,) for _, stream in rows], + value_values=[ + ( + instance_name or "master", + stream, + ) + for _, instance_name, stream in rows + ], ) self.db_pool.updates._background_update_progress_txn( diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql index d8219aa922..e8bc33ff40 100644 --- a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql +++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql @@ -11,9 +11,13 @@ -- See the GNU Affero General Public License for more details: -- <https://www.gnu.org/licenses/agpl-3.0.html>. +-- A table that maps from room ID to metadata useful for sliding sync. CREATE TABLE sliding_sync_room_metadata ( room_id TEXT NOT NULL PRIMARY KEY, - last_stream_ordering BIGINT + + -- The instance_name / stream ordering of the last event in the room. + instance_name TEXT NOT NULL, + last_stream_ordering BIGINT NOT NULL ); INSERT INTO background_updates (ordering, update_name, progress_json) VALUES diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index c4e216c308..037bbca1ba 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase): assert persist_events_store is not None persist_events_store._store_event_txn( txn, + events[0].room_id, [ (e, EventContext(self.hs.get_storage_controllers(), {})) for e in events |