diff --git a/changelog.d/17066.bugfix b/changelog.d/17066.bugfix
new file mode 100644
index 0000000000..99ed435d75
--- /dev/null
+++ b/changelog.d/17066.bugfix
@@ -0,0 +1 @@
+Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 410805e806..a6d54ee4b8 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1266,17 +1266,23 @@ class SyncHandler:
#
# c.f. #16941 for an example of why we can't do this for all non-gappy
# syncs.
- is_linear_timeline = False
+ is_linear_timeline = True
if batch.events:
- prev_event_id = batch.events[0].event_id
- for e in batch.events[1:]:
+ # We need to make sure the first event in our batch points to the
+ # last event in the previous batch.
+ last_event_id_prev_batch = (
+ await self.store.get_last_event_in_room_before_stream_ordering(
+ room_id,
+ end_token=since_token.room_key,
+ )
+ )
+
+ prev_event_id = last_event_id_prev_batch
+ for e in batch.events:
if e.prev_event_ids() != [prev_event_id]:
+ is_linear_timeline = False
break
prev_event_id = e.event_id
- else:
- is_linear_timeline = True
- else:
- is_linear_timeline = True
if is_linear_timeline and not batch.limited:
state_ids: StateMap[str] = {}
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 57e14d79ca..2780d29cad 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -435,6 +435,101 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
[s2_event],
)
+ def test_state_includes_changes_on_long_lived_forks(self) -> None:
+ """State changes that happen on a fork of the DAG must be included in `state`
+
+ Given the following DAG:
+
+ E1
+ ↗ ↖
+ | S2
+ | ↑
+ --|------|----
+ E3 |
+ --|------|----
+ | E4
+ | |
+
+ ... and a filter that means we only return 1 event, represented by the dashed
+ horizontal lines: `S2` must be included in the `state` section on the second sync.
+ """
+ alice = self.register_user("alice", "password")
+ alice_tok = self.login(alice, "password")
+ alice_requester = create_requester(alice)
+ room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
+
+ # Do an initial sync as Alice to get a known starting point.
+ initial_sync_result = self.get_success(
+ self.sync_handler.wait_for_sync_for_user(
+ alice_requester, generate_sync_config(alice)
+ )
+ )
+ last_room_creation_event_id = (
+ initial_sync_result.joined[0].timeline.events[-1].event_id
+ )
+
+ # Send a state event, and a regular event, both using the same prev ID
+ with self._patch_get_latest_events([last_room_creation_event_id]):
+ s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
+ "event_id"
+ ]
+ e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
+
+ # Do an incremental sync, this will return E3 but *not* S2 at this
+ # point.
+ incremental_sync = self.get_success(
+ self.sync_handler.wait_for_sync_for_user(
+ alice_requester,
+ generate_sync_config(
+ alice,
+ filter_collection=FilterCollection(
+ self.hs, {"room": {"timeline": {"limit": 1}}}
+ ),
+ ),
+ since_token=initial_sync_result.next_batch,
+ )
+ )
+ room_sync = incremental_sync.joined[0]
+ self.assertEqual(room_sync.room_id, room_id)
+ self.assertTrue(room_sync.timeline.limited)
+ self.assertEqual(
+ [e.event_id for e in room_sync.timeline.events],
+ [e3_event],
+ )
+ self.assertEqual(
+ [e.event_id for e in room_sync.state.values()],
+ [],
+ )
+
+ # Now send another event that points to S2, but not E3.
+ with self._patch_get_latest_events([s2_event]):
+ e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
+
+ # Doing an incremental sync should return S2 in state.
+ incremental_sync = self.get_success(
+ self.sync_handler.wait_for_sync_for_user(
+ alice_requester,
+ generate_sync_config(
+ alice,
+ filter_collection=FilterCollection(
+ self.hs, {"room": {"timeline": {"limit": 1}}}
+ ),
+ ),
+ since_token=incremental_sync.next_batch,
+ )
+ )
+ room_sync = incremental_sync.joined[0]
+ self.assertEqual(room_sync.room_id, room_id)
+ self.assertFalse(room_sync.timeline.limited)
+ self.assertEqual(
+ [e.event_id for e in room_sync.timeline.events],
+ [e4_event],
+ )
+ self.assertEqual(
+ [e.event_id for e in room_sync.state.values()],
+ [s2_event],
+ )
+
def test_state_includes_changes_on_ungappy_syncs(self) -> None:
"""Test `state` where the sync is not gappy.
|