summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-11 11:35:45 +0100
committerErik Johnston <erik@matrix.org>2024-07-11 11:35:45 +0100
commit99a0925fa9f3b998fa1e6e6bb723cc0f3588dafc (patch)
tree8a858e5501b805b444a864ac690c138054686d62
parentHandle $ME correctly (diff)
downloadsynapse-99a0925fa9f3b998fa1e6e6bb723cc0f3588dafc.tar.xz
New table
-rw-r--r--synapse/handlers/sliding_sync.py10
-rw-r--r--synapse/storage/databases/main/events.py12
-rw-r--r--synapse/storage/databases/main/stream.py75
-rw-r--r--synapse/storage/schema/main/delta/85/07_sliding_sync.sql20
4 files changed, 101 insertions, 16 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py

index 27a7fd8eca..3cf42bd14d 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py
@@ -1343,8 +1343,8 @@ class SlidingSyncHandler: and state_key == StateValues.ME ): required_state_types.append( - (EventTypes.Member, user.to_string()) - ) + (EventTypes.Member, user.to_string()) + ) else: required_state_types.append((state_type, state_key)) @@ -1434,10 +1434,10 @@ class SlidingSyncHandler: # Figure out the last bump event in the room last_bump_event_stream_ordering = None if timeline_events: - for event in reversed(timeline_events): - if event.type in DEFAULT_BUMP_EVENT_TYPES: + for e in reversed(timeline_events): + if e.type in DEFAULT_BUMP_EVENT_TYPES: last_bump_event_stream_ordering = ( - event.internal_metadata.stream_ordering + e.internal_metadata.stream_ordering ) break diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..02bc7c3d5e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -1629,6 +1629,18 @@ class PersistEventsStore: ], ) + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_room_metadata", + key_names=("room_id",), + key_values=[(event.room_id,) for event, _ in events_and_contexts], + value_names=("last_stream_ordering",), + value_values=[ + (event.internal_metadata.stream_ordering,) + for event, _ in events_and_contexts + ], + ) + # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as # requiring censoring. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index f7e1a1ad1e..f5645dbbff 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -50,6 +50,7 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -68,7 +69,6 @@ from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import trace -from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -79,7 +79,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken, StrSequence +from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken, StrSequence from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable @@ -612,6 +612,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() + database.updates.register_background_update_handler( + "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update + ) + def get_room_max_stream_ordering(self) -> int: """Get the stream_ordering of regular events that we have committed up to @@ -1188,22 +1192,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None @cachedList( - cached_method_name="get_rough_stream_ordering_for_room", list_name="room_ids", + cached_method_name="get_rough_stream_ordering_for_room", + list_name="room_ids", ) async def rough_get_last_pos( self, room_ids: StrSequence - ) -> Dict[str, Optional[int]]: + ) -> Mapping[str, Optional[int]]: def rough_get_last_pos_txn( txn: LoggingTransaction, batch: StrSequence, - ) -> Dict[str, int]: + ) -> Mapping[str, Optional[int]]: clause, args = make_in_list_sql_clause( self.database_engine, "room_id", batch ) sql = f""" - SELECT DISTINCT ON (room_id) room_id, stream_ordering FROM events - WHERE {clause} AND stream_ordering IS NOT NULL - ORDER BY room_id, stream_ordering DESC + SELECT room_id, last_stream_ordering + FROM sliding_sync_room_metadata + WHERE {clause} """ txn.execute(sql, args) @@ -1227,9 +1232,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) -> Optional[int]: def get_rough_stream_ordering_for_room_txn( txn: LoggingTransaction, - ) -> Dict[str, int]: - sql = f""" - SELECT MAX(stream_ordering) FROM events + ) -> Optional[int]: + sql = """ + SELECT last_stream_ordering + FROM sliding_sync_room_metadata WHERE room_id = ? """ @@ -2042,3 +2048,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return RoomStreamToken(stream=last_position.stream - 1) return None + + async def _sliding_sync_room_metadata_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update to fill out 'sliding_sync_room_metadata' table""" + previous_room = progress.get("previous_room", "") + + def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int: + sql = """ + SELECT room_id, MAX(stream_ordering) FROM rooms + INNER JOIN events USING (room_id) + WHERE room_id > ? AND stream_ordering IS NOT NULL + GROUP BY room_id + ORDER BY room_id ASC + LIMIT ? + """ + txn.execute(sql, (previous_room, batch_size)) + rows = txn.fetchall() + if not rows: + return 0 + + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_room_metadata", + key_names=("room_id",), + key_values=[(room_id,) for room_id, _ in rows], + value_names=("last_stream_ordering",), + value_values=[(stream,) for _, stream in rows], + ) + + self.db_pool.updates._background_update_progress_txn( + txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]} + ) + + return len(rows) + + rows = await self.db_pool.runInteraction( + "_sliding_sync_room_metadata_bg_update", + _sliding_sync_room_metadata_bg_update_txn, + ) + + if rows == 0: + await self.db_pool.updates._end_background_update( + "sliding_sync_room_metadata" + ) + + return rows diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql new file mode 100644
index 0000000000..0c24d6ca31 --- /dev/null +++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,20 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + +CREATE TABLE sliding_sync_room_metadata ( + room_id BIGINT NOT NULL PRIMARY KEY, + last_stream_ordering BIGINT +); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8507, 'sliding_sync_room_metadata', '{}');