summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12319.bugfix1
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/sync.py15
-rw-r--r--synapse/storage/databases/main/stream.py26
-rw-r--r--tests/rest/client/test_room_batch.py125
5 files changed, 162 insertions, 19 deletions
diff --git a/changelog.d/12319.bugfix b/changelog.d/12319.bugfix
new file mode 100644
index 0000000000..a50191feaa
--- /dev/null
+++ b/changelog.d/12319.bugfix
@@ -0,0 +1 @@
+Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 47a63005a9..1b092e900e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -175,17 +175,13 @@ class MessageHandler:
         state_filter = state_filter or StateFilter.all()
 
         if at_token:
-            # FIXME this claims to get the state at a stream position, but
-            # get_recent_events_for_room operates by topo ordering. This therefore
-            # does not reliably give you the state at the given stream position.
-            # (https://github.com/matrix-org/synapse/issues/3305)
-            last_events, _ = await self.store.get_recent_events_for_room(
-                room_id, end_token=at_token.room_key, limit=1
+            last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+                room_id,
+                end_token=at_token.room_key,
             )
 
-            if not last_events:
+            if not last_event:
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
-            last_event = last_events[0]
 
             # check whether the user is in the room at that time to determine
             # whether they should be treated as peeking.
@@ -204,7 +200,7 @@ class MessageHandler:
             visible_events = await filter_events_for_client(
                 self.storage,
                 user_id,
-                last_events,
+                [last_event],
                 filter_send_to_client=False,
                 is_peeking=is_peeking,
             )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6c8b17c420..5125126a80 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -661,16 +661,15 @@ class SyncHandler:
             stream_position: point at which to get state
             state_filter: The state filter used to fetch state from the database.
         """
-        # FIXME this claims to get the state at a stream position, but
-        # get_recent_events_for_room operates by topo ordering. This therefore
-        # does not reliably give you the state at the given stream position.
-        # (https://github.com/matrix-org/synapse/issues/3305)
-        last_events, _ = await self.store.get_recent_events_for_room(
-            room_id, end_token=stream_position.room_key, limit=1
+        # FIXME: This gets the state at the latest event before the stream ordering,
+        # which might not be the same as the "current state" of the room at the time
+        # of the stream token if there were multiple forward extremities at the time.
+        last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+            room_id,
+            end_token=stream_position.room_key,
         )
 
-        if last_events:
-            last_event = last_events[-1]
+        if last_event:
             state = await self.get_state_after_event(
                 last_event, state_filter=state_filter or StateFilter.all()
             )
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 6d45a8a9f6..793e906630 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_room_event_before_stream_ordering", _f
         )
 
+    async def get_last_event_in_room_before_stream_ordering(
+        self,
+        room_id: str,
+        end_token: RoomStreamToken,
+    ) -> Optional[EventBase]:
+        """Returns the last event in a room at or before a stream ordering
+
+        Args:
+            room_id
+            end_token: The token used to stream from
+
+        Returns:
+            The most recent event.
+        """
+
+        last_row = await self.get_room_event_before_stream_ordering(
+            room_id=room_id,
+            stream_ordering=end_token.stream,
+        )
+        if last_row:
+            _, _, event_id = last_row
+            event = await self.get_event(event_id, get_prev_content=True)
+            return event
+
+        return None
+
     async def get_current_room_stream_token_for_room_id(
         self, room_id: Optional[str] = None
     ) -> RoomStreamToken:
diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py
index 44f333a0ee..41a1bf6d89 100644
--- a/tests/rest/client/test_room_batch.py
+++ b/tests/rest/client/test_room_batch.py
@@ -7,9 +7,9 @@ from twisted.test.proto_helpers import MemoryReactor
 from synapse.api.constants import EventContentFields, EventTypes
 from synapse.appservice import ApplicationService
 from synapse.rest import admin
-from synapse.rest.client import login, register, room, room_batch
+from synapse.rest.client import login, register, room, room_batch, sync
 from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import JsonDict, RoomStreamToken
 from synapse.util import Clock
 
 from tests import unittest
@@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase):
         room.register_servlets,
         register.register_servlets,
         login.register_servlets,
+        sync.register_servlets,
     ]
 
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
@@ -178,3 +179,123 @@ class RoomBatchTestCase(unittest.HomeserverTestCase):
             "Expected a single state_group to be returned by saw state_groups=%s"
             % (state_group_map.keys(),),
         )
+
+    @unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
+    def test_sync_while_batch_importing(self) -> None:
+        """
+        Make sure that /sync correctly returns full room state when a user joins
+        during ongoing batch backfilling.
+        See: https://github.com/matrix-org/synapse/issues/12281
+        """
+        # Create user who will be invited & join room
+        user_id = self.register_user("beep", "test")
+        user_tok = self.login("beep", "test")
+
+        time_before_room = int(self.clock.time_msec())
+
+        # Create a room with some events
+        room_id, _, _, _ = self._create_test_room()
+        # Invite the user
+        self.helper.invite(
+            room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
+        )
+
+        # Create another room, send a bunch of events to advance the stream token
+        other_room_id = self.helper.create_room_as(
+            self.appservice.sender, tok=self.appservice.token
+        )
+        for _ in range(5):
+            self.helper.send_event(
+                room_id=other_room_id,
+                type=EventTypes.Message,
+                content={"msgtype": "m.text", "body": "C"},
+                tok=self.appservice.token,
+            )
+
+        # Join the room as the normal user
+        self.helper.join(room_id, user_id, tok=user_tok)
+
+        # Create an event to hang the historical batch from - In order to see
+        # the failure case originally reported in #12281, the historical batch
+        # must be hung from the most recent event in the room so the base
+        # insertion event ends up with the highest `topogological_ordering`
+        # (`depth`) in the room but will have a negative `stream_ordering`
+        # because it's a `historical` event. Previously, when assembling the
+        # `state` for the `/sync` response, the bugged logic would sort by
+        # `topological_ordering` descending and pick up the base insertion
+        # event because it has a negative `stream_ordering` below the given
+        # pagination token. Now we properly sort by `stream_ordering`
+        # descending which puts `historical` events with a negative
+        # `stream_ordering` way at the bottom and aren't selected as expected.
+        response = self.helper.send_event(
+            room_id=room_id,
+            type=EventTypes.Message,
+            content={
+                "msgtype": "m.text",
+                "body": "C",
+            },
+            tok=self.appservice.token,
+        )
+        event_to_hang_id = response["event_id"]
+
+        channel = self.make_request(
+            "POST",
+            "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
+            % (room_id, event_to_hang_id),
+            content={
+                "events": _create_message_events_for_batch_send_request(
+                    self.virtual_user_id, time_before_room, 3
+                ),
+                "state_events_at_start": _create_join_state_events_for_batch_send_request(
+                    [self.virtual_user_id], time_before_room
+                ),
+            },
+            access_token=self.appservice.token,
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+
+        # Now we need to find the invite + join events stream tokens so we can sync between
+        main_store = self.hs.get_datastores().main
+        events, next_key = self.get_success(
+            main_store.get_recent_events_for_room(
+                room_id,
+                50,
+                end_token=main_store.get_room_max_token(),
+            ),
+        )
+        invite_event_position = None
+        for event in events:
+            if (
+                event.type == "m.room.member"
+                and event.content["membership"] == "invite"
+            ):
+                invite_event_position = self.get_success(
+                    main_store.get_topological_token_for_event(event.event_id)
+                )
+                break
+
+        assert invite_event_position is not None, "No invite event found"
+
+        # Remove the topological order from the token by re-creating w/stream only
+        invite_event_position = RoomStreamToken(None, invite_event_position.stream)
+
+        # Sync everything after this token
+        since_token = self.get_success(invite_event_position.to_string(main_store))
+        sync_response = self.make_request(
+            "GET",
+            f"/sync?since={since_token}",
+            access_token=user_tok,
+        )
+
+        # Assert that, for this room, the user was considered to have joined and thus
+        # receives the full state history
+        state_event_types = [
+            event["type"]
+            for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
+                "events"
+            ]
+        ]
+
+        assert (
+            "m.room.create" in state_event_types
+        ), "Missing room full state in sync response"