summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/17066.bugfix1
-rw-r--r--synapse/handlers/sync.py20
-rw-r--r--tests/handlers/test_sync.py95
3 files changed, 109 insertions, 7 deletions
diff --git a/changelog.d/17066.bugfix b/changelog.d/17066.bugfix
new file mode 100644
index 0000000000..99ed435d75
--- /dev/null
+++ b/changelog.d/17066.bugfix
@@ -0,0 +1 @@
+Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 410805e806..a6d54ee4b8 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1266,17 +1266,23 @@ class SyncHandler:
         #
         # c.f. #16941 for an example of why we can't do this for all non-gappy
         # syncs.
-        is_linear_timeline = False
+        is_linear_timeline = True
         if batch.events:
-            prev_event_id = batch.events[0].event_id
-            for e in batch.events[1:]:
+            # We need to make sure the first event in our batch points to the
+            # last event in the previous batch.
+            last_event_id_prev_batch = (
+                await self.store.get_last_event_in_room_before_stream_ordering(
+                    room_id,
+                    end_token=since_token.room_key,
+                )
+            )
+
+            prev_event_id = last_event_id_prev_batch
+            for e in batch.events:
                 if e.prev_event_ids() != [prev_event_id]:
+                    is_linear_timeline = False
                     break
                 prev_event_id = e.event_id
-            else:
-                is_linear_timeline = True
-        else:
-            is_linear_timeline = True
 
         if is_linear_timeline and not batch.limited:
             state_ids: StateMap[str] = {}
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 57e14d79ca..2780d29cad 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -435,6 +435,101 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
             [s2_event],
         )
 
+    def test_state_includes_changes_on_long_lived_forks(self) -> None:
+        """State changes that happen on a fork of the DAG must be included in `state`
+
+        Given the following DAG:
+
+             E1
+           ↗    ↖
+          |      S2
+          |      ↑
+        --|------|----
+          E3     |
+        --|------|----
+          |      E4
+          |      |
+
+        ... and a filter that means we only return 1 event, represented by the dashed
+        horizontal lines: `S2` must be included in the `state` section on the second sync.
+        """
+        alice = self.register_user("alice", "password")
+        alice_tok = self.login(alice, "password")
+        alice_requester = create_requester(alice)
+        room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
+
+        # Do an initial sync as Alice to get a known starting point.
+        initial_sync_result = self.get_success(
+            self.sync_handler.wait_for_sync_for_user(
+                alice_requester, generate_sync_config(alice)
+            )
+        )
+        last_room_creation_event_id = (
+            initial_sync_result.joined[0].timeline.events[-1].event_id
+        )
+
+        # Send a state event, and a regular event, both using the same prev ID
+        with self._patch_get_latest_events([last_room_creation_event_id]):
+            s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
+                "event_id"
+            ]
+            e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
+
+        # Do an incremental sync, this will return E3 but *not* S2 at this
+        # point.
+        incremental_sync = self.get_success(
+            self.sync_handler.wait_for_sync_for_user(
+                alice_requester,
+                generate_sync_config(
+                    alice,
+                    filter_collection=FilterCollection(
+                        self.hs, {"room": {"timeline": {"limit": 1}}}
+                    ),
+                ),
+                since_token=initial_sync_result.next_batch,
+            )
+        )
+        room_sync = incremental_sync.joined[0]
+        self.assertEqual(room_sync.room_id, room_id)
+        self.assertTrue(room_sync.timeline.limited)
+        self.assertEqual(
+            [e.event_id for e in room_sync.timeline.events],
+            [e3_event],
+        )
+        self.assertEqual(
+            [e.event_id for e in room_sync.state.values()],
+            [],
+        )
+
+        # Now send another event that points to S2, but not E3.
+        with self._patch_get_latest_events([s2_event]):
+            e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
+
+        # Doing an incremental sync should return S2 in state.
+        incremental_sync = self.get_success(
+            self.sync_handler.wait_for_sync_for_user(
+                alice_requester,
+                generate_sync_config(
+                    alice,
+                    filter_collection=FilterCollection(
+                        self.hs, {"room": {"timeline": {"limit": 1}}}
+                    ),
+                ),
+                since_token=incremental_sync.next_batch,
+            )
+        )
+        room_sync = incremental_sync.joined[0]
+        self.assertEqual(room_sync.room_id, room_id)
+        self.assertFalse(room_sync.timeline.limited)
+        self.assertEqual(
+            [e.event_id for e in room_sync.timeline.events],
+            [e4_event],
+        )
+        self.assertEqual(
+            [e.event_id for e in room_sync.state.values()],
+            [s2_event],
+        )
+
     def test_state_includes_changes_on_ungappy_syncs(self) -> None:
         """Test `state` where the sync is not gappy.