diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 47a63005a9..1b092e900e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -175,17 +175,13 @@ class MessageHandler:
state_filter = state_filter or StateFilter.all()
if at_token:
- # FIXME this claims to get the state at a stream position, but
- # get_recent_events_for_room operates by topo ordering. This therefore
- # does not reliably give you the state at the given stream position.
- # (https://github.com/matrix-org/synapse/issues/3305)
- last_events, _ = await self.store.get_recent_events_for_room(
- room_id, end_token=at_token.room_key, limit=1
+ last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+ room_id,
+ end_token=at_token.room_key,
)
- if not last_events:
+ if not last_event:
raise NotFoundError("Can't find event for token %s" % (at_token,))
- last_event = last_events[0]
# check whether the user is in the room at that time to determine
# whether they should be treated as peeking.
@@ -204,7 +200,7 @@ class MessageHandler:
visible_events = await filter_events_for_client(
self.storage,
user_id,
- last_events,
+ [last_event],
filter_send_to_client=False,
is_peeking=is_peeking,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6c8b17c420..5125126a80 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -661,16 +661,15 @@ class SyncHandler:
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
"""
- # FIXME this claims to get the state at a stream position, but
- # get_recent_events_for_room operates by topo ordering. This therefore
- # does not reliably give you the state at the given stream position.
- # (https://github.com/matrix-org/synapse/issues/3305)
- last_events, _ = await self.store.get_recent_events_for_room(
- room_id, end_token=stream_position.room_key, limit=1
+ # FIXME: This gets the state at the latest event before the stream ordering,
+ # which might not be the same as the "current state" of the room at the time
+ # of the stream token if there were multiple forward extremities at the time.
+ last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+ room_id,
+ end_token=stream_position.room_key,
)
- if last_events:
- last_event = last_events[-1]
+ if last_event:
state = await self.get_state_after_event(
last_event, state_filter=state_filter or StateFilter.all()
)
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 6d45a8a9f6..793e906630 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"get_room_event_before_stream_ordering", _f
)
+ async def get_last_event_in_room_before_stream_ordering(
+ self,
+ room_id: str,
+ end_token: RoomStreamToken,
+ ) -> Optional[EventBase]:
+ """Returns the last event in a room at or before a stream ordering
+
+ Args:
+ room_id
+ end_token: The token used to stream from
+
+ Returns:
+ The most recent event.
+ """
+
+ last_row = await self.get_room_event_before_stream_ordering(
+ room_id=room_id,
+ stream_ordering=end_token.stream,
+ )
+ if last_row:
+ _, _, event_id = last_row
+ event = await self.get_event(event_id, get_prev_content=True)
+ return event
+
+ return None
+
async def get_current_room_stream_token_for_room_id(
self, room_id: Optional[str] = None
) -> RoomStreamToken:
|