summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@fizzadar.com>2022-04-13 11:38:35 +0100
committerGitHub <noreply@github.com>2022-04-13 11:38:35 +0100
commite3a49f4784d5c915355ac9306e60b09433db60b5 (patch)
tree37a4bfca3764b4792738abc1d22d4ff4419af317 /synapse
parentUse poetry lockfile in twisted trunk CI job (#12425) (diff)
downloadsynapse-e3a49f4784d5c915355ac9306e60b09433db60b5.tar.xz
Fix missing sync events during historical batch imports (#12319)
Discovered after much in-depth investigation in #12281.

Closes: #12281
Closes: #3305

Signed off by: Nick Mills-Barrett nick@beeper.com
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/sync.py15
-rw-r--r--synapse/storage/databases/main/stream.py26
3 files changed, 38 insertions, 17 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 47a63005a9..1b092e900e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -175,17 +175,13 @@ class MessageHandler:
         state_filter = state_filter or StateFilter.all()
 
         if at_token:
-            # FIXME this claims to get the state at a stream position, but
-            # get_recent_events_for_room operates by topo ordering. This therefore
-            # does not reliably give you the state at the given stream position.
-            # (https://github.com/matrix-org/synapse/issues/3305)
-            last_events, _ = await self.store.get_recent_events_for_room(
-                room_id, end_token=at_token.room_key, limit=1
+            last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+                room_id,
+                end_token=at_token.room_key,
             )
 
-            if not last_events:
+            if not last_event:
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
-            last_event = last_events[0]
 
             # check whether the user is in the room at that time to determine
             # whether they should be treated as peeking.
@@ -204,7 +200,7 @@ class MessageHandler:
             visible_events = await filter_events_for_client(
                 self.storage,
                 user_id,
-                last_events,
+                [last_event],
                 filter_send_to_client=False,
                 is_peeking=is_peeking,
             )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6c8b17c420..5125126a80 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -661,16 +661,15 @@ class SyncHandler:
             stream_position: point at which to get state
             state_filter: The state filter used to fetch state from the database.
         """
-        # FIXME this claims to get the state at a stream position, but
-        # get_recent_events_for_room operates by topo ordering. This therefore
-        # does not reliably give you the state at the given stream position.
-        # (https://github.com/matrix-org/synapse/issues/3305)
-        last_events, _ = await self.store.get_recent_events_for_room(
-            room_id, end_token=stream_position.room_key, limit=1
+        # FIXME: This gets the state at the latest event before the stream ordering,
+        # which might not be the same as the "current state" of the room at the time
+        # of the stream token if there were multiple forward extremities at the time.
+        last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+            room_id,
+            end_token=stream_position.room_key,
         )
 
-        if last_events:
-            last_event = last_events[-1]
+        if last_event:
             state = await self.get_state_after_event(
                 last_event, state_filter=state_filter or StateFilter.all()
             )
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 6d45a8a9f6..793e906630 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_room_event_before_stream_ordering", _f
         )
 
+    async def get_last_event_in_room_before_stream_ordering(
+        self,
+        room_id: str,
+        end_token: RoomStreamToken,
+    ) -> Optional[EventBase]:
+        """Returns the last event in a room at or before a stream ordering
+
+        Args:
+            room_id
+            end_token: The token used to stream from
+
+        Returns:
+            The most recent event.
+        """
+
+        last_row = await self.get_room_event_before_stream_ordering(
+            room_id=room_id,
+            stream_ordering=end_token.stream,
+        )
+        if last_row:
+            _, _, event_id = last_row
+            event = await self.get_event(event_id, get_prev_content=True)
+            return event
+
+        return None
+
     async def get_current_room_stream_token_for_room_id(
         self, room_id: Optional[str] = None
     ) -> RoomStreamToken: