From 89f1092284c4a171918179a87fdd488b003c86b9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Apr 2024 15:01:12 +0100 Subject: Also check if first event matches the last in prev batch (#17066) Refinement of #17064 cc @richvdh --- changelog.d/17066.bugfix | 1 + synapse/handlers/sync.py | 20 ++++++---- tests/handlers/test_sync.py | 95 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 7 deletions(-) create mode 100644 changelog.d/17066.bugfix diff --git a/changelog.d/17066.bugfix b/changelog.d/17066.bugfix new file mode 100644 index 0000000000..99ed435d75 --- /dev/null +++ b/changelog.d/17066.bugfix @@ -0,0 +1 @@ +Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 410805e806..a6d54ee4b8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1266,17 +1266,23 @@ class SyncHandler: # # c.f. #16941 for an example of why we can't do this for all non-gappy # syncs. - is_linear_timeline = False + is_linear_timeline = True if batch.events: - prev_event_id = batch.events[0].event_id - for e in batch.events[1:]: + # We need to make sure the first event in our batch points to the + # last event in the previous batch. + last_event_id_prev_batch = ( + await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=since_token.room_key, + ) + ) + + prev_event_id = last_event_id_prev_batch + for e in batch.events: if e.prev_event_ids() != [prev_event_id]: + is_linear_timeline = False break prev_event_id = e.event_id - else: - is_linear_timeline = True - else: - is_linear_timeline = True if is_linear_timeline and not batch.limited: state_ids: StateMap[str] = {} diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 57e14d79ca..2780d29cad 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -435,6 +435,101 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): [s2_event], ) + def test_state_includes_changes_on_long_lived_forks(self) -> None: + """State changes that happen on a fork of the DAG must be included in `state` + + Given the following DAG: + + E1 + ↗ ↖ + | S2 + | ↑ + --|------|---- + E3 | + --|------|---- + | E4 + | | + + ... and a filter that means we only return 1 event, represented by the dashed + horizontal lines: `S2` must be included in the `state` section on the second sync. + """ + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync as Alice to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Do an incremental sync, this will return E3 but *not* S2 at this + # point. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 1}}} + ), + ), + since_token=initial_sync_result.next_batch, + ) + ) + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertTrue(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [], + ) + + # Now send another event that points to S2, but not E3. + with self._patch_get_latest_events([s2_event]): + e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"] + + # Doing an incremental sync should return S2 in state. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 1}}} + ), + ), + since_token=incremental_sync.next_batch, + ) + ) + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertFalse(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e4_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + def test_state_includes_changes_on_ungappy_syncs(self) -> None: """Test `state` where the sync is not gappy. -- cgit 1.4.1