diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 27a7fd8eca..3cf42bd14d 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -1343,8 +1343,8 @@ class SlidingSyncHandler:
and state_key == StateValues.ME
):
required_state_types.append(
- (EventTypes.Member, user.to_string())
- )
+ (EventTypes.Member, user.to_string())
+ )
else:
required_state_types.append((state_type, state_key))
@@ -1434,10 +1434,10 @@ class SlidingSyncHandler:
# Figure out the last bump event in the room
last_bump_event_stream_ordering = None
if timeline_events:
- for event in reversed(timeline_events):
- if event.type in DEFAULT_BUMP_EVENT_TYPES:
+ for e in reversed(timeline_events):
+ if e.type in DEFAULT_BUMP_EVENT_TYPES:
last_bump_event_stream_ordering = (
- event.internal_metadata.stream_ordering
+ e.internal_metadata.stream_ordering
)
break
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..02bc7c3d5e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1629,6 +1629,18 @@ class PersistEventsStore:
],
)
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="sliding_sync_room_metadata",
+ key_names=("room_id",),
+ key_values=[(event.room_id,) for event, _ in events_and_contexts],
+ value_names=("last_stream_ordering",),
+ value_values=[
+ (event.internal_metadata.stream_ordering,)
+ for event, _ in events_and_contexts
+ ],
+ )
+
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index f7e1a1ad1e..f5645dbbff 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -50,6 +50,7 @@ from typing import (
Dict,
Iterable,
List,
+ Mapping,
Optional,
Set,
Tuple,
@@ -68,7 +69,6 @@ from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import trace
-from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -79,7 +79,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken, StrSequence
+from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken, StrSequence
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
@@ -612,6 +612,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
+ database.updates.register_background_update_handler(
+ "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update
+ )
+
def get_room_max_stream_ordering(self) -> int:
"""Get the stream_ordering of regular events that we have committed up to
@@ -1188,22 +1192,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
@cachedList(
- cached_method_name="get_rough_stream_ordering_for_room", list_name="room_ids",
+ cached_method_name="get_rough_stream_ordering_for_room",
+ list_name="room_ids",
)
async def rough_get_last_pos(
self, room_ids: StrSequence
- ) -> Dict[str, Optional[int]]:
+ ) -> Mapping[str, Optional[int]]:
def rough_get_last_pos_txn(
txn: LoggingTransaction,
batch: StrSequence,
- ) -> Dict[str, int]:
+ ) -> Mapping[str, Optional[int]]:
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch
)
sql = f"""
- SELECT DISTINCT ON (room_id) room_id, stream_ordering FROM events
- WHERE {clause} AND stream_ordering IS NOT NULL
- ORDER BY room_id, stream_ordering DESC
+ SELECT room_id, last_stream_ordering
+ FROM sliding_sync_room_metadata
+ WHERE {clause}
"""
txn.execute(sql, args)
@@ -1227,9 +1232,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
) -> Optional[int]:
def get_rough_stream_ordering_for_room_txn(
txn: LoggingTransaction,
- ) -> Dict[str, int]:
- sql = f"""
- SELECT MAX(stream_ordering) FROM events
+ ) -> Optional[int]:
+ sql = """
+ SELECT last_stream_ordering
+ FROM sliding_sync_room_metadata
WHERE room_id = ?
"""
@@ -2042,3 +2048,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return RoomStreamToken(stream=last_position.stream - 1)
return None
+
+ async def _sliding_sync_room_metadata_bg_update(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Background update to fill out 'sliding_sync_room_metadata' table"""
+ previous_room = progress.get("previous_room", "")
+
+ def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
+ sql = """
+ SELECT room_id, MAX(stream_ordering) FROM rooms
+ INNER JOIN events USING (room_id)
+ WHERE room_id > ? AND stream_ordering IS NOT NULL
+ GROUP BY room_id
+ ORDER BY room_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (previous_room, batch_size))
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="sliding_sync_room_metadata",
+ key_names=("room_id",),
+ key_values=[(room_id,) for room_id, _ in rows],
+ value_names=("last_stream_ordering",),
+ value_values=[(stream,) for _, stream in rows],
+ )
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]}
+ )
+
+ return len(rows)
+
+ rows = await self.db_pool.runInteraction(
+ "_sliding_sync_room_metadata_bg_update",
+ _sliding_sync_room_metadata_bg_update_txn,
+ )
+
+ if rows == 0:
+ await self.db_pool.updates._end_background_update(
+ "sliding_sync_room_metadata"
+ )
+
+ return rows
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
new file mode 100644
index 0000000000..0c24d6ca31
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,20 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+CREATE TABLE sliding_sync_room_metadata (
+ room_id BIGINT NOT NULL PRIMARY KEY,
+ last_stream_ordering BIGINT
+);
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (8507, 'sliding_sync_room_metadata', '{}');
|