summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12370.bugfix1
-rw-r--r--synapse/handlers/pagination.py9
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/storage/databases/main/stream.py20
-rw-r--r--synapse/streams/events.py4
-rw-r--r--tests/storage/test_stream.py4
6 files changed, 27 insertions, 15 deletions
diff --git a/changelog.d/12370.bugfix b/changelog.d/12370.bugfix
new file mode 100644
index 0000000000..97dca97829
--- /dev/null
+++ b/changelog.d/12370.bugfix
@@ -0,0 +1 @@
+Fix `/messages` returning backfilled and [MSC2716](https://github.com/matrix-org/synapse/pull/12319) historic messages our of order.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 876b879483..7ee3340373 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -441,7 +441,14 @@ class PaginationHandler:
         if pagin_config.from_token:
             from_token = pagin_config.from_token
         else:
-            from_token = self.hs.get_event_sources().get_current_token_for_pagination()
+            from_token = (
+                await self.hs.get_event_sources().get_current_token_for_pagination(
+                    room_id
+                )
+            )
+            # We expect `/messages` to use historic pagination tokens by default but
+            # `/messages` should still works with live tokens when manually provided.
+            assert from_token.room_key.topological
 
         if pagin_config.limit is None:
             # This shouldn't happen as we've set a default limit before this
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 65d4aea9af..b31f00b517 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1444,8 +1444,8 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
     def get_current_key(self) -> RoomStreamToken:
         return self.store.get_room_max_token()
 
-    def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
-        return self.store.get_room_events_max_id(room_id)
+    def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
+        return self.store.get_current_room_stream_token_for_room_id(room_id)
 
 
 class ShutdownRoomResponse(TypedDict):
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 8e764790db..82e9ef02d2 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -748,21 +748,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_room_event_before_stream_ordering", _f
         )
 
-    async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
-        """Returns the current token for rooms stream.
-
-        By default, it returns the current global stream token. Specifying a
-        `room_id` causes it to return the current room specific topological
-        token.
+    async def get_current_room_stream_token_for_room_id(
+        self, room_id: Optional[str] = None
+    ) -> RoomStreamToken:
+        """Returns the current position of the rooms stream.
+
+        By default, it returns a live token with the current global stream
+        token. Specifying a `room_id` causes it to return a historic token with
+        the room specific topological token.
         """
-        token = self.get_room_max_stream_ordering()
+        stream_ordering = self.get_room_max_stream_ordering()
         if room_id is None:
-            return "s%d" % (token,)
+            return RoomStreamToken(None, stream_ordering)
         else:
             topo = await self.db_pool.runInteraction(
                 "_get_max_topological_txn", self._get_max_topological_txn, room_id
             )
-            return "t%d-%d" % (topo, token)
+            return RoomStreamToken(topo, stream_ordering)
 
     def get_stream_id_for_event_txn(
         self,
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index fb8fe17295..acf17ba623 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -69,7 +69,7 @@ class EventSources:
         )
         return token
 
-    def get_current_token_for_pagination(self) -> StreamToken:
+    async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
         """Get the current token for a given room to be used to paginate
         events.
 
@@ -80,7 +80,7 @@ class EventSources:
             The current token for pagination.
         """
         token = StreamToken(
-            room_key=self.sources.room.get_current_key(),
+            room_key=await self.sources.room.get_current_key_for_room(room_id),
             presence_key=0,
             typing_key=0,
             receipt_key=0,
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index 52e41cdab4..78663a53fe 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -110,7 +110,9 @@ class PaginationTestCase(HomeserverTestCase):
     def _filter_messages(self, filter: JsonDict) -> List[EventBase]:
         """Make a request to /messages with a filter, returns the chunk of events."""
 
-        from_token = self.hs.get_event_sources().get_current_token_for_pagination()
+        from_token = self.get_success(
+            self.hs.get_event_sources().get_current_token_for_pagination(self.room_id)
+        )
 
         events, next_key = self.get_success(
             self.hs.get_datastores().main.paginate_room_events(