diff options
-rw-r--r-- | changelog.d/12370.bugfix | 1 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 9 | ||||
-rw-r--r-- | synapse/handlers/room.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 20 | ||||
-rw-r--r-- | synapse/streams/events.py | 4 | ||||
-rw-r--r-- | tests/storage/test_stream.py | 4 |
6 files changed, 27 insertions, 15 deletions
diff --git a/changelog.d/12370.bugfix b/changelog.d/12370.bugfix new file mode 100644 index 0000000000..97dca97829 --- /dev/null +++ b/changelog.d/12370.bugfix @@ -0,0 +1 @@ +Fix `/messages` returning backfilled and [MSC2716](https://github.com/matrix-org/synapse/pull/12319) historic messages our of order. diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 876b879483..7ee3340373 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -441,7 +441,14 @@ class PaginationHandler: if pagin_config.from_token: from_token = pagin_config.from_token else: - from_token = self.hs.get_event_sources().get_current_token_for_pagination() + from_token = ( + await self.hs.get_event_sources().get_current_token_for_pagination( + room_id + ) + ) + # We expect `/messages` to use historic pagination tokens by default but + # `/messages` should still works with live tokens when manually provided. + assert from_token.room_key.topological if pagin_config.limit is None: # This shouldn't happen as we've set a default limit before this diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 65d4aea9af..b31f00b517 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1444,8 +1444,8 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): def get_current_key(self) -> RoomStreamToken: return self.store.get_room_max_token() - def get_current_key_for_room(self, room_id: str) -> Awaitable[str]: - return self.store.get_room_events_max_id(room_id) + def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]: + return self.store.get_current_room_stream_token_for_room_id(room_id) class ShutdownRoomResponse(TypedDict): diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8e764790db..82e9ef02d2 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -748,21 +748,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_room_event_before_stream_ordering", _f ) - async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str: - """Returns the current token for rooms stream. - - By default, it returns the current global stream token. Specifying a - `room_id` causes it to return the current room specific topological - token. + async def get_current_room_stream_token_for_room_id( + self, room_id: Optional[str] = None + ) -> RoomStreamToken: + """Returns the current position of the rooms stream. + + By default, it returns a live token with the current global stream + token. Specifying a `room_id` causes it to return a historic token with + the room specific topological token. """ - token = self.get_room_max_stream_ordering() + stream_ordering = self.get_room_max_stream_ordering() if room_id is None: - return "s%d" % (token,) + return RoomStreamToken(None, stream_ordering) else: topo = await self.db_pool.runInteraction( "_get_max_topological_txn", self._get_max_topological_txn, room_id ) - return "t%d-%d" % (topo, token) + return RoomStreamToken(topo, stream_ordering) def get_stream_id_for_event_txn( self, diff --git a/synapse/streams/events.py b/synapse/streams/events.py index fb8fe17295..acf17ba623 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -69,7 +69,7 @@ class EventSources: ) return token - def get_current_token_for_pagination(self) -> StreamToken: + async def get_current_token_for_pagination(self, room_id: str) -> StreamToken: """Get the current token for a given room to be used to paginate events. @@ -80,7 +80,7 @@ class EventSources: The current token for pagination. """ token = StreamToken( - room_key=self.sources.room.get_current_key(), + room_key=await self.sources.room.get_current_key_for_room(room_id), presence_key=0, typing_key=0, receipt_key=0, diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 52e41cdab4..78663a53fe 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -110,7 +110,9 @@ class PaginationTestCase(HomeserverTestCase): def _filter_messages(self, filter: JsonDict) -> List[EventBase]: """Make a request to /messages with a filter, returns the chunk of events.""" - from_token = self.hs.get_event_sources().get_current_token_for_pagination() + from_token = self.get_success( + self.hs.get_event_sources().get_current_token_for_pagination(self.room_id) + ) events, next_key = self.get_success( self.hs.get_datastores().main.paginate_room_events( |