summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/sliding_sync.py23
-rw-r--r--synapse/storage/databases/main/cache.py2
-rw-r--r--synapse/storage/databases/main/events.py34
-rw-r--r--synapse/storage/databases/main/stream.py144
-rw-r--r--synapse/storage/schema/main/delta/85/07_sliding_sync.sql6
-rw-r--r--tests/storage/test_event_chain.py1
6 files changed, 127 insertions, 83 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8788d13cc7..9ecef16dad 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -1033,10 +1033,11 @@ class SlidingSyncHandler:
             # If they are fully-joined to the room, let's find the latest activity
             # at/before the `to_token`.
             if room_for_user.membership == Membership.JOIN:
-                stream_pos = self.store._events_stream_cache._entity_to_key.get(room_id)
-                if stream_pos is not None:
-                    last_activity_in_room_map[room_id] = stream_pos
-                    continue
+                stream = self.store._events_stream_cache._entity_to_key.get(room_id)
+                if stream is not None:
+                    if stream <= to_token.room_key.stream:
+                        last_activity_in_room_map[room_id] = stream
+                        continue
 
                 to_fetch.append(room_id)
             else:
@@ -1049,11 +1050,15 @@ class SlidingSyncHandler:
                 # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
                 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
 
-        for room_id, stream_pos in (
-            await self.store.rough_get_last_pos(to_fetch)
-        ).items():
-            if stream_pos is not None:
-                last_activity_in_room_map[room_id] = stream_pos
+        ordering_map = await self.store.get_max_stream_ordering_in_rooms(to_fetch)
+        for room_id, stream_pos in ordering_map.items():
+            if stream_pos is None:
+                continue
+
+            if stream_pos.persisted_after(to_token.room_key):
+                continue
+
+            last_activity_in_room_map[room_id] = stream_pos.stream
 
         for room_id in sync_room_map.keys() - last_activity_in_room_map.keys():
             # TODO: Handle better
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 0550ab73da..2fcd927089 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         if not backfilled:
             self._events_stream_cache.entity_has_changed(room_id, stream_ordering)  # type: ignore[attr-defined]
             self._attempt_to_invalidate_cache(
-                "get_rough_stream_ordering_for_room", (room_id,)
+                "get_max_stream_ordering_in_room", (room_id,)
             )
 
         if redacts:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 02bc7c3d5e..0c7c2f9306 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore:
         # From this point onwards the events are only events that we haven't
         # seen before.
 
-        self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+        self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
 
         if new_forward_extremities:
             self._update_forward_extremities_txn(
@@ -1555,6 +1555,7 @@ class PersistEventsStore:
     def _store_event_txn(
         self,
         txn: LoggingTransaction,
+        room_id: str,
         events_and_contexts: Collection[Tuple[EventBase, EventContext]],
     ) -> None:
         """Insert new events into the event, event_json, redaction and
@@ -1629,17 +1630,26 @@ class PersistEventsStore:
             ],
         )
 
-        self.db_pool.simple_upsert_many_txn(
-            txn,
-            table="sliding_sync_room_metadata",
-            key_names=("room_id",),
-            key_values=[(event.room_id,) for event, _ in events_and_contexts],
-            value_names=("last_stream_ordering",),
-            value_values=[
-                (event.internal_metadata.stream_ordering,)
-                for event, _ in events_and_contexts
-            ],
-        )
+        # Update the `sliding_sync_room_metadata` with the latest
+        # (non-backfilled, ie positive) stream ordering.
+        #
+        # We know this list is sorted and non-empty, so we just take the last
+        # one event.
+        max_stream_ordering: int
+        for e, _ in events_and_contexts:
+            assert e.internal_metadata.stream_ordering is not None
+            max_stream_ordering = e.internal_metadata.stream_ordering
+
+        if max_stream_ordering > 0:
+            self.db_pool.simple_upsert_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                keyvalues={"room_id": room_id},
+                values={
+                    "instance_name": self._instance_name,
+                    "last_stream_ordering": max_stream_ordering,
+                },
+            )
 
         # If we're persisting an unredacted event we go and ensure
         # that we mark any redactions that reference this event as
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ec0a2d4d16..e552b7d85f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -79,7 +79,12 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken, StrSequence
+from synapse.types import (
+    JsonDict,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StrCollection,
+)
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
@@ -1192,63 +1197,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         return None
 
     @cachedList(
-        cached_method_name="get_rough_stream_ordering_for_room",
+        cached_method_name="get_max_stream_ordering_in_room",
         list_name="room_ids",
     )
-    async def rough_get_last_pos(
-        self, room_ids: StrSequence
-    ) -> Mapping[str, Optional[int]]:
-        def rough_get_last_pos_txn(
-            txn: LoggingTransaction,
-            batch: StrSequence,
-        ) -> Mapping[str, Optional[int]]:
-            clause, args = make_in_list_sql_clause(
-                self.database_engine, "room_id", batch
-            )
-            sql = f"""
-                SELECT room_id, last_stream_ordering
-                FROM sliding_sync_room_metadata
-                WHERE {clause}
-            """
-
-            txn.execute(sql, args)
-
-            return {room_id: stream_ordering for room_id, stream_ordering in txn}
+    async def get_max_stream_ordering_in_rooms(
+        self, room_ids: StrCollection
+    ) -> Mapping[str, Optional[PersistedEventPosition]]:
+        """Get the positions for the latest event in a room.
 
-        results = {}
-        for batch in batch_iter(room_ids, 100):
-            results.update(
-                await self.db_pool.runInteraction(
-                    "rough_get_last_pos", rough_get_last_pos_txn, batch
-                )
-            )
+        A batched version of `get_max_stream_ordering_in_room`.
+        """
+        rows = await self.db_pool.simple_select_many_batch(
+            table="sliding_sync_room_metadata",
+            column="room_id",
+            iterable=room_ids,
+            retcols=("room_id", "instance_name", "last_stream_ordering"),
+            desc="get_max_stream_ordering_in_rooms",
+        )
 
-        return results
+        return {
+            room_id: PersistedEventPosition(instance_name, stream)
+            for room_id, instance_name, stream in rows
+        }
 
     @cached(max_entries=10000)
-    async def get_rough_stream_ordering_for_room(
+    async def get_max_stream_ordering_in_room(
         self,
         room_id: str,
-    ) -> Optional[int]:
-        def get_rough_stream_ordering_for_room_txn(
-            txn: LoggingTransaction,
-        ) -> Optional[int]:
-            sql = """
-                SELECT last_stream_ordering
-                FROM sliding_sync_room_metadata
-                WHERE room_id = ?
-            """
-
-            txn.execute(sql, (room_id,))
+    ) -> Optional[PersistedEventPosition]:
+        """Get the position for the latest event in a room.
 
-            row = txn.fetchone()
-            if row:
-                return row[0]
+        Note: this may be after the current token for the room stream on this
+        process (e.g. due to replication lag)
+        """
+        row = await self.db_pool.simple_select_one(
+            table="sliding_sync_room_metadata",
+            retcols=("instance_name", "last_stream_ordering"),
+            keyvalues={"room_id": room_id},
+            allow_none=True,
+            desc="get_max_stream_ordering_in_room",
+        )
+        if not row:
             return None
 
-        return await self.db_pool.runInteraction(
-            "get_rough_stream_ordering_for_room", get_rough_stream_ordering_for_room_txn
-        )
+        return PersistedEventPosition(instance_name=row[0], stream=row[1])
 
     async def get_last_event_pos_in_room_before_stream_ordering(
         self,
@@ -2056,17 +2048,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         previous_room = progress.get("previous_room", "")
 
         def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
-            sql = """
-                SELECT room_id, MAX(stream_ordering) FROM events
-                WHERE stream_ordering IS NOT NULL
-                    AND room_id IN (
-                        SELECT room_id FROM rooms
-                        WHERE room_id > ?
-                        ORDER BY room_id ASC
-                        LIMIT ?
-                    )
-                GROUP BY room_id
-            """
+            # Both these queries are just getting the most recent
+            # instance_name/stream ordering for the next N rooms.
+            if isinstance(self.database_engine, PostgresEngine):
+                sql = """
+                    SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
+                    LATERAL (
+                        SELECT instance_name, stream_ordering
+                        FROM events WHERE events.room_id = r.room_id
+                        ORDER BY stream_ordering DESC
+                        LIMIT 1
+                    ) e
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+            else:
+                sql = """
+                    SELECT
+                        room_id,
+                        (
+                            SELECT instance_name
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        ),
+                        (
+                            SELECT stream_ordering
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        )
+                    FROM rooms AS r
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+
             txn.execute(sql, (previous_room, batch_size))
             rows = txn.fetchall()
             if not rows:
@@ -2076,9 +2094,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 txn,
                 table="sliding_sync_room_metadata",
                 key_names=("room_id",),
-                key_values=[(room_id,) for room_id, _ in rows],
+                key_values=[(room_id,) for room_id, _, _ in rows],
                 value_names=("last_stream_ordering",),
-                value_values=[(stream,) for _, stream in rows],
+                value_values=[
+                    (
+                        instance_name or "master",
+                        stream,
+                    )
+                    for _, instance_name, stream in rows
+                ],
             )
 
             self.db_pool.updates._background_update_progress_txn(
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
index d8219aa922..e8bc33ff40 100644
--- a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -11,9 +11,13 @@
 -- See the GNU Affero General Public License for more details:
 -- <https://www.gnu.org/licenses/agpl-3.0.html>.
 
+-- A table that maps from room ID to metadata useful for sliding sync.
 CREATE TABLE sliding_sync_room_metadata (
     room_id TEXT NOT NULL PRIMARY KEY,
-    last_stream_ordering BIGINT
+
+    -- The instance_name / stream ordering of the last event in the room.
+    instance_name TEXT NOT NULL,
+    last_stream_ordering BIGINT NOT NULL
 );
 
 INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index c4e216c308..037bbca1ba 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
             assert persist_events_store is not None
             persist_events_store._store_event_txn(
                 txn,
+                events[0].room_id,
                 [
                     (e, EventContext(self.hs.get_storage_controllers(), {}))
                     for e in events