diff --git a/changelog.d/16930.bugfix b/changelog.d/16930.bugfix
index 21f964ef97..99ed435d75 100644
--- a/changelog.d/16930.bugfix
+++ b/changelog.d/16930.bugfix
@@ -1 +1 @@
-Fix a long-standing bug which could cause state to be omitted from `/sync` responses when certain events are filtered out of the timeline.
+Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
diff --git a/changelog.d/16932.bugfix b/changelog.d/16932.bugfix
index 624388ea8e..99ed435d75 100644
--- a/changelog.d/16932.bugfix
+++ b/changelog.d/16932.bugfix
@@ -1 +1 @@
-Fix a long-standing bug which could cause incorrect state to be returned from `/sync` for rooms where the user has left.
\ No newline at end of file
+Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
diff --git a/changelog.d/16942.bugfix b/changelog.d/16942.bugfix
new file mode 100644
index 0000000000..99ed435d75
--- /dev/null
+++ b/changelog.d/16942.bugfix
@@ -0,0 +1 @@
+Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 773e291aa8..554c820f79 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1195,6 +1195,8 @@ class SyncHandler:
)
if batch:
+ # Strictly speaking, this returns the state *after* the first event in the
+ # timeline, but that is good enough here.
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
@@ -1257,25 +1259,25 @@ class SyncHandler:
await_full_state = True
lazy_load_members = False
- if batch.limited:
- if batch:
- state_at_timeline_start = (
- await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- )
- else:
- # We can get here if the user has ignored the senders of all
- # the recent events.
- state_at_timeline_start = await self.get_state_at(
- room_id,
- stream_position=end_token,
+ if batch:
+ state_at_timeline_start = (
+ await self._state_storage_controller.get_state_ids_for_event(
+ batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
+ )
+ else:
+ # We can get here if the user has ignored the senders of all
+ # the recent events.
+ state_at_timeline_start = await self.get_state_at(
+ room_id,
+ stream_position=end_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+ if batch.limited:
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
@@ -1290,47 +1292,28 @@ class SyncHandler:
# about them).
state_filter = StateFilter.all()
- state_at_previous_sync = await self.get_state_at(
- room_id,
- stream_position=since_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
+ state_at_previous_sync = await self.get_state_at(
+ room_id,
+ stream_position=since_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
- state_at_timeline_end = await self.get_state_at(
- room_id,
- stream_position=end_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=end_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+
+ state_ids = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state_at_timeline_start,
+ timeline_end=state_at_timeline_end,
+ previous_timeline_end=state_at_previous_sync,
+ lazy_load_members=lazy_load_members,
+ )
- state_ids = _calculate_state(
- timeline_contains=timeline_state,
- timeline_start=state_at_timeline_start,
- timeline_end=state_at_timeline_end,
- previous_timeline_end=state_at_previous_sync,
- lazy_load_members=lazy_load_members,
- )
- else:
- state_ids = {}
- if lazy_load_members:
- if members_to_fetch and batch.events:
- # We're returning an incremental sync, with no
- # "gap" since the previous sync, so normally there would be
- # no state to return.
- # But we're lazy-loading, so the client might need some more
- # member events to understand the events in this timeline.
- # So we fish out all the member events corresponding to the
- # timeline here. The caller will then dedupe any redundant ones.
-
- state_ids = await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id,
- # we only want members!
- state_filter=StateFilter.from_types(
- (EventTypes.Member, member) for member in members_to_fetch
- ),
- await_full_state=False,
- )
return state_ids
async def _find_missing_partial_state_memberships(
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 5d8e886541..57e14d79ca 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -435,6 +435,111 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
[s2_event],
)
+ def test_state_includes_changes_on_ungappy_syncs(self) -> None:
+ """Test `state` where the sync is not gappy.
+
+ We start with a DAG like this:
+
+ E1
+ ↗ ↖
+ | S2
+ |
+ --|---
+ |
+ E3
+
+ ... and initialsync with `limit=1`, represented by the horizontal dashed line.
+ At this point, we do not expect S2 to appear in the response at all (since
+ it is excluded from the timeline by the `limit`, and the state is based on the
+ state after the most recent event before the sync token (E3), which doesn't
+ include S2.
+
+ Now more events arrive, and we do an incremental sync:
+
+ E1
+ ↗ ↖
+ | S2
+ | ↑
+ E3 |
+ ↑ |
+ --|------|----
+ | |
+ E4 |
+ ↖ /
+ E5
+
+ This is the last chance for us to tell the client about S2, so it *must* be
+ included in the response.
+ """
+ alice = self.register_user("alice", "password")
+ alice_tok = self.login(alice, "password")
+ alice_requester = create_requester(alice)
+ room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
+
+ # Do an initial sync to get a known starting point.
+ initial_sync_result = self.get_success(
+ self.sync_handler.wait_for_sync_for_user(
+ alice_requester, generate_sync_config(alice)
+ )
+ )
+ last_room_creation_event_id = (
+ initial_sync_result.joined[0].timeline.events[-1].event_id
+ )
+
+ # Send a state event, and a regular event, both using the same prev ID
+ with self._patch_get_latest_events([last_room_creation_event_id]):
+ s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
+ "event_id"
+ ]
+ e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
+
+ # Another initial sync, with limit=1
+ initial_sync_result = self.get_success(
+ self.sync_handler.wait_for_sync_for_user(
+ alice_requester,
+ generate_sync_config(
+ alice,
+ filter_collection=FilterCollection(
+ self.hs, {"room": {"timeline": {"limit": 1}}}
+ ),
+ ),
+ )
+ )
+ room_sync = initial_sync_result.joined[0]
+ self.assertEqual(room_sync.room_id, room_id)
+ self.assertEqual(
+ [e.event_id for e in room_sync.timeline.events],
+ [e3_event],
+ )
+ self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()])
+
+ # More events, E4 and E5
+ with self._patch_get_latest_events([e3_event]):
+ e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
+ e5_event = self.helper.send(room_id, "e5", tok=alice_tok)["event_id"]
+
+ # Now incremental sync
+ incremental_sync = self.get_success(
+ self.sync_handler.wait_for_sync_for_user(
+ alice_requester,
+ generate_sync_config(alice),
+ since_token=initial_sync_result.next_batch,
+ )
+ )
+
+ # The state event should appear in the 'state' section of the response.
+ room_sync = incremental_sync.joined[0]
+ self.assertEqual(room_sync.room_id, room_id)
+ self.assertFalse(room_sync.timeline.limited)
+ self.assertEqual(
+ [e.event_id for e in room_sync.timeline.events],
+ [e4_event, e5_event],
+ )
+ self.assertEqual(
+ [e.event_id for e in room_sync.state.values()],
+ [s2_event],
+ )
+
@parameterized.expand(
[
(False, False),
|