summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-10 15:54:46 +0100
committerErik Johnston <erik@matrix.org>2024-07-10 15:54:46 +0100
commita9c2e97ed4cd82fea0fcd210ee87155c3821ab93 (patch)
tree36d6c575e2aad94ce5ffab15a4d7196093ca476b
parentFixup (diff)
downloadsynapse-a9c2e97ed4cd82fea0fcd210ee87155c3821ab93.tar.xz
Fixup
-rw-r--r--synapse/handlers/sliding_sync.py34
-rw-r--r--synapse/storage/databases/main/cache.py3
-rw-r--r--synapse/storage/databases/main/stream.py23
3 files changed, 52 insertions, 8 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 4d56ba7b26..cd0ba7d1f6 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -1012,22 +1012,40 @@ class SlidingSyncHandler:
             A sorted list of room IDs by `stream_ordering` along with membership information.
         """
 
-        self.store._events_stream_cache._entity_to_key
-
-        last_activity_in_room_map = {}
-        to_fetch = []
+        # Assemble a map of room ID to the `stream_ordering` of the last activity that the
+        # user should see in the room (<= `to_token`)
+        last_activity_in_room_map: Dict[str, int] = {}
         for room_id, room_for_user in sync_room_map.items():
+            # If they are fully-joined to the room, let's find the latest activity
+            # at/before the `to_token`.
             if room_for_user.membership == Membership.JOIN:
                 stream_pos = self.store._events_stream_cache._entity_to_key.get(room_id)
                 if stream_pos is not None:
                     last_activity_in_room_map[room_id] = stream_pos
-                else:
-                    to_fetch.append(room_id)
+                    continue
+
+                last_event_result = await self.store.get_rough_stream_ordering_for_room(
+                    room_id
+                )
+
+                # If the room has no events at/before the `to_token`, this is probably a
+                # mistake in the code that generates the `sync_room_map` since that should
+                # only give us rooms that the user had membership in during the token range.
+                assert last_event_result is not None
+
+                _, event_pos = last_event_result
+
+                last_activity_in_room_map[room_id] = event_pos.stream
             else:
+                # Otherwise, if the user has left/been invited/knocked/been banned from
+                # a room, they shouldn't see anything past that point.
+                #
+                # FIXME: It's possible that people should see beyond this point in
+                # invited/knocked cases if for example the room has
+                # `invite`/`world_readable` history visibility, see
+                # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
                 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
 
-        last_activity_in_room_map.update(await self.store.rough_get_last_pos(to_fetch))
-
         return sorted(
             sync_room_map.values(),
             # Sort by the last activity (stream_ordering) in the room
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2d6b75e47e..0550ab73da 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(room_id, stream_ordering)  # type: ignore[attr-defined]
+            self._attempt_to_invalidate_cache(
+                "get_rough_stream_ordering_for_room", (room_id,)
+            )
 
         if redacts:
             self._invalidate_local_get_event_cache(redacts)  # type: ignore[attr-defined]
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 5ab0fbc5d9..d6801accab 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -68,6 +68,7 @@ from synapse.api.filtering import Filter
 from synapse.events import EventBase
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import trace
+from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -1214,6 +1215,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return results
 
+    @cached(max_entries=10000)
+    async def get_rough_stream_ordering_for_room(
+        self,
+        room_id: str,
+    ) -> Optional[int]:
+        def get_rough_stream_ordering_for_room_txn(
+            txn: LoggingTransaction,
+        ) -> Dict[str, int]:
+            sql = f"""
+                SELECT DISTINCT ON (room_id) room_id, stream_ordering FROM events
+                WHERE room_id = ? AND stream_ordering IS NOT NULL
+                ORDER BY room_id, stream_ordering DESC
+            """
+
+            txn.execute(sql, (room_id,))
+
+            return {room_id: stream_ordering for room_id, stream_ordering in txn}
+
+        return await self.db_pool.runInteraction(
+            "get_rough_stream_ordering_for_room", get_rough_stream_ordering_for_room_txn
+        )
+
     async def get_last_event_pos_in_room_before_stream_ordering(
         self,
         room_id: str,