summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2024-04-04 12:47:59 +0100
committerGitHub <noreply@github.com>2024-04-04 12:47:59 +0100
commit05957ac70f5d634eafbea61bd79a9a89196507c2 (patch)
tree760e6a6310abfc7ca0c5a2822c5ca1e93a496e86
parentAdd missing index to `access_tokens` table (#17045) (diff)
downloadsynapse-05957ac70f5d634eafbea61bd79a9a89196507c2.tar.xz
Fix bug in `/sync` response for archived rooms (#16932)
This PR fixes a very, very niche edge-case, but I've got some more work
coming which will otherwise make the problem worse.

The bug happens when the syncing user leaves a room, and has a sync
filter which includes "left" rooms, but sets the timeline limit to 0. In
that case, the state returned in the `state` section is calculated
incorrectly.

The fix is to pass a token corresponding to the point that the user
leaves the room through to `compute_state_delta`.
-rw-r--r--changelog.d/16932.bugfix1
-rw-r--r--synapse/handlers/sync.py121
-rw-r--r--tests/handlers/test_sync.py208
-rw-r--r--tests/rest/client/utils.py18
4 files changed, 314 insertions, 34 deletions
diff --git a/changelog.d/16932.bugfix b/changelog.d/16932.bugfix
new file mode 100644
index 0000000000..624388ea8e
--- /dev/null
+++ b/changelog.d/16932.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug which could cause incorrect state to be returned from `/sync` for rooms where the user has left.
\ No newline at end of file
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3aa2e2b7ba..7fcd54ac55 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -953,7 +953,7 @@ class SyncHandler:
         batch: TimelineBatch,
         sync_config: SyncConfig,
         since_token: Optional[StreamToken],
-        now_token: StreamToken,
+        end_token: StreamToken,
         full_state: bool,
     ) -> MutableStateMap[EventBase]:
         """Works out the difference in state between the end of the previous sync and
@@ -964,7 +964,9 @@ class SyncHandler:
             batch: The timeline batch for the room that will be sent to the user.
             sync_config:
             since_token: Token of the end of the previous batch. May be `None`.
-            now_token: Token of the end of the current batch.
+            end_token: Token of the end of the current batch. Normally this will be
+                the same as the global "now_token", but if the user has left the room,
+                the point just after their leave event.
             full_state: Whether to force returning the full state.
                 `lazy_load_members` still applies when `full_state` is `True`.
 
@@ -1044,7 +1046,7 @@ class SyncHandler:
                     room_id,
                     sync_config.user,
                     batch,
-                    now_token,
+                    end_token,
                     members_to_fetch,
                     timeline_state,
                 )
@@ -1058,7 +1060,7 @@ class SyncHandler:
                     room_id,
                     batch,
                     since_token,
-                    now_token,
+                    end_token,
                     members_to_fetch,
                     timeline_state,
                 )
@@ -1130,7 +1132,7 @@ class SyncHandler:
         room_id: str,
         syncing_user: UserID,
         batch: TimelineBatch,
-        now_token: StreamToken,
+        end_token: StreamToken,
         members_to_fetch: Optional[Set[str]],
         timeline_state: StateMap[str],
     ) -> StateMap[str]:
@@ -1143,7 +1145,9 @@ class SyncHandler:
             room_id: The room we are calculating for.
             syncing_user: The user that is calling `/sync`.
             batch: The timeline batch for the room that will be sent to the user.
-            now_token: Token of the end of the current batch.
+            end_token: Token of the end of the current batch. Normally this will be
+                the same as the global "now_token", but if the user has left the room,
+                the point just after their leave event.
             members_to_fetch: If lazy-loading is enabled, the memberships needed for
                 events in the timeline.
             timeline_state: The contribution to the room state from state events in
@@ -1202,7 +1206,7 @@ class SyncHandler:
         else:
             state_at_timeline_end = await self.get_state_at(
                 room_id,
-                stream_position=now_token,
+                stream_position=end_token,
                 state_filter=state_filter,
                 await_full_state=await_full_state,
             )
@@ -1223,7 +1227,7 @@ class SyncHandler:
         room_id: str,
         batch: TimelineBatch,
         since_token: StreamToken,
-        now_token: StreamToken,
+        end_token: StreamToken,
         members_to_fetch: Optional[Set[str]],
         timeline_state: StateMap[str],
     ) -> StateMap[str]:
@@ -1239,7 +1243,9 @@ class SyncHandler:
             room_id: The room we are calculating for.
             batch: The timeline batch for the room that will be sent to the user.
             since_token: Token of the end of the previous batch.
-            now_token: Token of the end of the current batch.
+            end_token: Token of the end of the current batch. Normally this will be
+                the same as the global "now_token", but if the user has left the room,
+                the point just after their leave event.
             members_to_fetch: If lazy-loading is enabled, the memberships needed for
                 events in the timeline. Otherwise, `None`.
             timeline_state: The contribution to the room state from state events in
@@ -1273,7 +1279,7 @@ class SyncHandler:
                 # the recent events.
                 state_at_timeline_start = await self.get_state_at(
                     room_id,
-                    stream_position=now_token,
+                    stream_position=end_token,
                     state_filter=state_filter,
                     await_full_state=await_full_state,
                 )
@@ -1312,7 +1318,7 @@ class SyncHandler:
                 # the recent events.
                 state_at_timeline_end = await self.get_state_at(
                     room_id,
-                    stream_position=now_token,
+                    stream_position=end_token,
                     state_filter=state_filter,
                     await_full_state=await_full_state,
                 )
@@ -2344,6 +2350,7 @@ class SyncHandler:
                         full_state=False,
                         since_token=since_token,
                         upto_token=leave_token,
+                        end_token=leave_token,
                         out_of_band=leave_event.internal_metadata.is_out_of_band_membership(),
                     )
                 )
@@ -2381,6 +2388,7 @@ class SyncHandler:
                     full_state=False,
                     since_token=None if newly_joined else since_token,
                     upto_token=prev_batch_token,
+                    end_token=now_token,
                 )
             else:
                 entry = RoomSyncResultBuilder(
@@ -2391,6 +2399,7 @@ class SyncHandler:
                     full_state=False,
                     since_token=since_token,
                     upto_token=since_token,
+                    end_token=now_token,
                 )
 
             room_entries.append(entry)
@@ -2449,6 +2458,7 @@ class SyncHandler:
                         full_state=True,
                         since_token=since_token,
                         upto_token=now_token,
+                        end_token=now_token,
                     )
                 )
             elif event.membership == Membership.INVITE:
@@ -2478,6 +2488,7 @@ class SyncHandler:
                         full_state=True,
                         since_token=since_token,
                         upto_token=leave_token,
+                        end_token=leave_token,
                     )
                 )
 
@@ -2548,6 +2559,7 @@ class SyncHandler:
                 {
                     "since_token": since_token,
                     "upto_token": upto_token,
+                    "end_token": room_builder.end_token,
                 }
             )
 
@@ -2621,7 +2633,7 @@ class SyncHandler:
                     batch,
                     sync_config,
                     since_token,
-                    now_token,
+                    room_builder.end_token,
                     full_state=full_state,
                 )
             else:
@@ -2781,6 +2793,70 @@ def _calculate_state(
             e for t, e in timeline_start.items() if t[0] == EventTypes.Member
         )
 
+    # Naively, we would just return the difference between the state at the start
+    # of the timeline (`timeline_start_ids`) and that at the end of the previous sync
+    # (`previous_timeline_end_ids`). However, that fails in the presence of forks in
+    # the DAG.
+    #
+    # For example, consider a DAG such as the following:
+    #
+    #       E1
+    #     ↗    ↖
+    #    |      S2
+    #    |      ↑
+    #  --|------|----
+    #    |      |
+    #    E3     |
+    #     ↖    /
+    #       E4
+    #
+    # ... and a filter that means we only return 2 events, represented by the dashed
+    # horizontal line. Assuming S2 was *not* included in the previous sync, we need to
+    # include it in the `state` section.
+    #
+    # Note that the state at the start of the timeline (E3) does not include S2. So,
+    # to make sure it gets included in the calculation here, we actually look at
+    # the state at the *end* of the timeline, and subtract any events that are present
+    # in the timeline.
+    #
+    # ----------
+    #
+    # Aside 1: You may then wonder if we need to include `timeline_start` in the
+    # calculation. Consider a linear DAG:
+    #
+    #      E1
+    #      ↑
+    #      S2
+    #      ↑
+    #  ----|------
+    #      |
+    #      E3
+    #      ↑
+    #      S4
+    #      ↑
+    #      E5
+    #
+    # ... where S2 and S4 change the same piece of state; and where we have a filter
+    # that returns 3 events (E3, S4, E5). We still need to tell the client about S2,
+    # because it might affect the display of E3. However, the state at the end of the
+    # timeline only tells us about S4; if we don't inspect `timeline_start` we won't
+    # find out about S2.
+    #
+    # (There are yet more complicated cases in which a state event is excluded from the
+    # timeline, but whose effect actually lands in the DAG in the *middle* of the
+    # timeline. We have no way to represent that in the /sync response, and we don't
+    # even try; it is ether omitted or plonked into `state` as if it were at the start
+    # of the timeline, depending on what else is in the timeline.)
+    #
+    # ----------
+    #
+    # Aside 2: it's worth noting that `timeline_end`, as provided to us, is actually
+    # the state *before* the final event in the timeline. In other words: if the final
+    # event in the timeline is a state event, it won't be included in `timeline_end`.
+    # However, that doesn't matter here, because the only difference can be in that
+    # one piece of state, and by definition that event is in the timeline, so we
+    # don't need to include it in the `state` section.
+
     state_ids = (
         (timeline_end_ids | timeline_start_ids)
         - previous_timeline_end_ids
@@ -2883,13 +2959,30 @@ class RoomSyncResultBuilder:
 
     Attributes:
         room_id
+
         rtype: One of `"joined"` or `"archived"`
+
         events: List of events to include in the room (more events may be added
             when generating result).
+
         newly_joined: If the user has newly joined the room
+
         full_state: Whether the full state should be sent in result
+
         since_token: Earliest point to return events from, or None
-        upto_token: Latest point to return events from.
+
+        upto_token: Latest point to return events from. If `events` is populated,
+           this is set to the token at the start of `events`
+
+        end_token: The last point in the timeline that the client should see events
+           from. Normally this will be the same as the global `now_token`, but in
+           the case of rooms where the user has left the room, this will be the point
+           just after their leave event.
+
+           This is used in the calculation of the state which is returned in `state`:
+           any state changes *up to* `end_token` (and not beyond!) which are not
+           reflected in the timeline need to be returned in `state`.
+
         out_of_band: whether the events in the room are "out of band" events
             and the server isn't in the room.
     """
@@ -2901,5 +2994,5 @@ class RoomSyncResultBuilder:
     full_state: bool
     since_token: Optional[StreamToken]
     upto_token: StreamToken
-
+    end_token: StreamToken
     out_of_band: bool = False
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 1b36324b8f..897c52c785 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -17,14 +17,16 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
-from typing import Collection, List, Optional
+from typing import Collection, ContextManager, List, Optional
 from unittest.mock import AsyncMock, Mock, patch
 
+from parameterized import parameterized
+
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.api.errors import Codes, ResourceLimitError
-from synapse.api.filtering import Filtering
+from synapse.api.filtering import FilterCollection, Filtering
 from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
@@ -33,7 +35,7 @@ from synapse.handlers.sync import SyncConfig, SyncResult
 from synapse.rest import admin
 from synapse.rest.client import knock, login, room
 from synapse.server import HomeServer
-from synapse.types import UserID, create_requester
+from synapse.types import JsonDict, UserID, create_requester
 from synapse.util import Clock
 
 import tests.unittest
@@ -258,13 +260,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
         # Eve tries to join the room. We monkey patch the internal logic which selects
         # the prev_events used when creating the join event, such that the ban does not
         # precede the join.
-        mocked_get_prev_events = patch.object(
-            self.hs.get_datastores().main,
-            "get_prev_events_for_room",
-            new_callable=AsyncMock,
-            return_value=[last_room_creation_event_id],
-        )
-        with mocked_get_prev_events:
+        with self._patch_get_latest_events([last_room_creation_event_id]):
             self.helper.join(room_id, eve, tok=eve_token)
 
         # Eve makes a second, incremental sync.
@@ -288,6 +284,180 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
         )
         self.assertEqual(eve_initial_sync_after_join.joined, [])
 
+    def test_state_includes_changes_on_forks(self) -> None:
+        """State changes that happen on a fork of the DAG must be included in `state`
+
+        Given the following DAG:
+
+             E1
+           ↗    ↖
+          |      S2
+          |      ↑
+        --|------|----
+          |      |
+          E3     |
+           ↖    /
+             E4
+
+        ... and a filter that means we only return 2 events, represented by the dashed
+        horizontal line: `S2` must be included in the `state` section.
+        """
+        alice = self.register_user("alice", "password")
+        alice_tok = self.login(alice, "password")
+        alice_requester = create_requester(alice)
+        room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
+
+        # Do an initial sync as Alice to get a known starting point.
+        initial_sync_result = self.get_success(
+            self.sync_handler.wait_for_sync_for_user(
+                alice_requester, generate_sync_config(alice)
+            )
+        )
+        last_room_creation_event_id = (
+            initial_sync_result.joined[0].timeline.events[-1].event_id
+        )
+
+        # Send a state event, and a regular event, both using the same prev ID
+        with self._patch_get_latest_events([last_room_creation_event_id]):
+            s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
+                "event_id"
+            ]
+            e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
+
+        # Send a final event, joining the two branches of the dag
+        e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
+
+        # do an incremental sync, with a filter that will ensure we only get two of
+        # the three new events.
+        incremental_sync = self.get_success(
+            self.sync_handler.wait_for_sync_for_user(
+                alice_requester,
+                generate_sync_config(
+                    alice,
+                    filter_collection=FilterCollection(
+                        self.hs, {"room": {"timeline": {"limit": 2}}}
+                    ),
+                ),
+                since_token=initial_sync_result.next_batch,
+            )
+        )
+
+        # The state event should appear in the 'state' section of the response.
+        room_sync = incremental_sync.joined[0]
+        self.assertEqual(room_sync.room_id, room_id)
+        self.assertTrue(room_sync.timeline.limited)
+        self.assertEqual(
+            [e.event_id for e in room_sync.timeline.events],
+            [e3_event, e4_event],
+        )
+        self.assertEqual(
+            [e.event_id for e in room_sync.state.values()],
+            [s2_event],
+        )
+
+    @parameterized.expand(
+        [
+            (False, False),
+            (True, False),
+            (False, True),
+            (True, True),
+        ]
+    )
+    def test_archived_rooms_do_not_include_state_after_leave(
+        self, initial_sync: bool, empty_timeline: bool
+    ) -> None:
+        """If the user leaves the room, state changes that happen after they leave are not returned.
+
+        We try with both a zero and a normal timeline limit,
+        and we try both an initial sync and an incremental sync for both.
+        """
+        if empty_timeline and not initial_sync:
+            # FIXME synapse doesn't return the room at all in this situation!
+            self.skipTest("Synapse does not correctly handle this case")
+
+        # Alice creates the room, and bob joins.
+        alice = self.register_user("alice", "password")
+        alice_tok = self.login(alice, "password")
+
+        bob = self.register_user("bob", "password")
+        bob_tok = self.login(bob, "password")
+        bob_requester = create_requester(bob)
+
+        room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
+        self.helper.join(room_id, bob, tok=bob_tok)
+
+        initial_sync_result = self.get_success(
+            self.sync_handler.wait_for_sync_for_user(
+                bob_requester, generate_sync_config(bob)
+            )
+        )
+
+        # Alice sends a message and a state
+        before_message_event = self.helper.send(room_id, "before", tok=alice_tok)[
+            "event_id"
+        ]
+        before_state_event = self.helper.send_state(
+            room_id, "test_state", {"body": "before"}, tok=alice_tok
+        )["event_id"]
+
+        # Bob leaves
+        leave_event = self.helper.leave(room_id, bob, tok=bob_tok)["event_id"]
+
+        # Alice sends some more stuff
+        self.helper.send(room_id, "after", tok=alice_tok)["event_id"]
+        self.helper.send_state(room_id, "test_state", {"body": "after"}, tok=alice_tok)[
+            "event_id"
+        ]
+
+        # And now, Bob resyncs.
+        filter_dict: JsonDict = {"room": {"include_leave": True}}
+        if empty_timeline:
+            filter_dict["room"]["timeline"] = {"limit": 0}
+        sync_room_result = self.get_success(
+            self.sync_handler.wait_for_sync_for_user(
+                bob_requester,
+                generate_sync_config(
+                    bob, filter_collection=FilterCollection(self.hs, filter_dict)
+                ),
+                since_token=None if initial_sync else initial_sync_result.next_batch,
+            )
+        ).archived[0]
+
+        if empty_timeline:
+            # The timeline should be empty
+            self.assertEqual(sync_room_result.timeline.events, [])
+
+            # And the state should include the leave event...
+            self.assertEqual(
+                sync_room_result.state[("m.room.member", bob)].event_id, leave_event
+            )
+            # ... and the state change before he left.
+            self.assertEqual(
+                sync_room_result.state[("test_state", "")].event_id, before_state_event
+            )
+        else:
+            # The last three events in the timeline should be those leading up to the
+            # leave
+            self.assertEqual(
+                [e.event_id for e in sync_room_result.timeline.events[-3:]],
+                [before_message_event, before_state_event, leave_event],
+            )
+            # ... And the state should be empty
+            self.assertEqual(sync_room_result.state, {})
+
+    def _patch_get_latest_events(self, latest_events: List[str]) -> ContextManager:
+        """Monkey-patch `get_prev_events_for_room`
+
+        Returns a context manager which will replace the implementation of
+        `get_prev_events_for_room` with one which returns `latest_events`.
+        """
+        return patch.object(
+            self.hs.get_datastores().main,
+            "get_prev_events_for_room",
+            new_callable=AsyncMock,
+            return_value=latest_events,
+        )
+
     def test_call_invite_in_public_room_not_returned(self) -> None:
         user = self.register_user("alice", "password")
         tok = self.login(user, "password")
@@ -401,14 +571,26 @@ _request_key = 0
 
 
 def generate_sync_config(
-    user_id: str, device_id: Optional[str] = "device_id"
+    user_id: str,
+    device_id: Optional[str] = "device_id",
+    filter_collection: Optional[FilterCollection] = None,
 ) -> SyncConfig:
-    """Generate a sync config (with a unique request key)."""
+    """Generate a sync config (with a unique request key).
+
+    Args:
+        user_id: user who is syncing.
+        device_id: device that is syncing. Defaults to "device_id".
+        filter_collection: filter to apply. Defaults to the default filter (ie,
+           return everything, with a default limit)
+    """
+    if filter_collection is None:
+        filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION
+
     global _request_key
     _request_key += 1
     return SyncConfig(
         user=UserID.from_string(user_id),
-        filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION,
+        filter_collection=filter_collection,
         is_guest=False,
         request_key=("request_key", _request_key),
         device_id=device_id,
diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py
index daa68d78b9..fe00afe198 100644
--- a/tests/rest/client/utils.py
+++ b/tests/rest/client/utils.py
@@ -170,8 +170,8 @@ class RestHelper:
         targ: Optional[str] = None,
         expect_code: int = HTTPStatus.OK,
         tok: Optional[str] = None,
-    ) -> None:
-        self.change_membership(
+    ) -> JsonDict:
+        return self.change_membership(
             room=room,
             src=src,
             targ=targ,
@@ -189,8 +189,8 @@ class RestHelper:
         appservice_user_id: Optional[str] = None,
         expect_errcode: Optional[Codes] = None,
         expect_additional_fields: Optional[dict] = None,
-    ) -> None:
-        self.change_membership(
+    ) -> JsonDict:
+        return self.change_membership(
             room=room,
             src=user,
             targ=user,
@@ -242,8 +242,8 @@ class RestHelper:
         user: Optional[str] = None,
         expect_code: int = HTTPStatus.OK,
         tok: Optional[str] = None,
-    ) -> None:
-        self.change_membership(
+    ) -> JsonDict:
+        return self.change_membership(
             room=room,
             src=user,
             targ=user,
@@ -282,7 +282,7 @@ class RestHelper:
         expect_code: int = HTTPStatus.OK,
         expect_errcode: Optional[str] = None,
         expect_additional_fields: Optional[dict] = None,
-    ) -> None:
+    ) -> JsonDict:
         """
         Send a membership state event into a room.
 
@@ -298,6 +298,9 @@ class RestHelper:
                 using an application service access token in `tok`.
             expect_code: The expected HTTP response code
             expect_errcode: The expected Matrix error code
+
+        Returns:
+            The JSON response
         """
         temp_id = self.auth_user_id
         self.auth_user_id = src
@@ -356,6 +359,7 @@ class RestHelper:
                 )
 
         self.auth_user_id = temp_id
+        return channel.json_body
 
     def send(
         self,