diff options
author | Nick Mills-Barrett <nick@fizzadar.com> | 2022-04-13 11:38:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-13 11:38:35 +0100 |
commit | e3a49f4784d5c915355ac9306e60b09433db60b5 (patch) | |
tree | 37a4bfca3764b4792738abc1d22d4ff4419af317 /tests/rest/client | |
parent | Use poetry lockfile in twisted trunk CI job (#12425) (diff) | |
download | synapse-e3a49f4784d5c915355ac9306e60b09433db60b5.tar.xz |
Fix missing sync events during historical batch imports (#12319)
Discovered after much in-depth investigation in #12281. Closes: #12281 Closes: #3305 Signed off by: Nick Mills-Barrett nick@beeper.com
Diffstat (limited to 'tests/rest/client')
-rw-r--r-- | tests/rest/client/test_room_batch.py | 125 |
1 files changed, 123 insertions, 2 deletions
diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py index 44f333a0ee..41a1bf6d89 100644 --- a/tests/rest/client/test_room_batch.py +++ b/tests/rest/client/test_room_batch.py @@ -7,9 +7,9 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventContentFields, EventTypes from synapse.appservice import ApplicationService from synapse.rest import admin -from synapse.rest.client import login, register, room, room_batch +from synapse.rest.client import login, register, room, room_batch, sync from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, RoomStreamToken from synapse.util import Clock from tests import unittest @@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase): room.register_servlets, register.register_servlets, login.register_servlets, + sync.register_servlets, ] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: @@ -178,3 +179,123 @@ class RoomBatchTestCase(unittest.HomeserverTestCase): "Expected a single state_group to be returned by saw state_groups=%s" % (state_group_map.keys(),), ) + + @unittest.override_config({"experimental_features": {"msc2716_enabled": True}}) + def test_sync_while_batch_importing(self) -> None: + """ + Make sure that /sync correctly returns full room state when a user joins + during ongoing batch backfilling. + See: https://github.com/matrix-org/synapse/issues/12281 + """ + # Create user who will be invited & join room + user_id = self.register_user("beep", "test") + user_tok = self.login("beep", "test") + + time_before_room = int(self.clock.time_msec()) + + # Create a room with some events + room_id, _, _, _ = self._create_test_room() + # Invite the user + self.helper.invite( + room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id + ) + + # Create another room, send a bunch of events to advance the stream token + other_room_id = self.helper.create_room_as( + self.appservice.sender, tok=self.appservice.token + ) + for _ in range(5): + self.helper.send_event( + room_id=other_room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "C"}, + tok=self.appservice.token, + ) + + # Join the room as the normal user + self.helper.join(room_id, user_id, tok=user_tok) + + # Create an event to hang the historical batch from - In order to see + # the failure case originally reported in #12281, the historical batch + # must be hung from the most recent event in the room so the base + # insertion event ends up with the highest `topogological_ordering` + # (`depth`) in the room but will have a negative `stream_ordering` + # because it's a `historical` event. Previously, when assembling the + # `state` for the `/sync` response, the bugged logic would sort by + # `topological_ordering` descending and pick up the base insertion + # event because it has a negative `stream_ordering` below the given + # pagination token. Now we properly sort by `stream_ordering` + # descending which puts `historical` events with a negative + # `stream_ordering` way at the bottom and aren't selected as expected. + response = self.helper.send_event( + room_id=room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "C", + }, + tok=self.appservice.token, + ) + event_to_hang_id = response["event_id"] + + channel = self.make_request( + "POST", + "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s" + % (room_id, event_to_hang_id), + content={ + "events": _create_message_events_for_batch_send_request( + self.virtual_user_id, time_before_room, 3 + ), + "state_events_at_start": _create_join_state_events_for_batch_send_request( + [self.virtual_user_id], time_before_room + ), + }, + access_token=self.appservice.token, + ) + self.assertEqual(channel.code, 200, channel.result) + + # Now we need to find the invite + join events stream tokens so we can sync between + main_store = self.hs.get_datastores().main + events, next_key = self.get_success( + main_store.get_recent_events_for_room( + room_id, + 50, + end_token=main_store.get_room_max_token(), + ), + ) + invite_event_position = None + for event in events: + if ( + event.type == "m.room.member" + and event.content["membership"] == "invite" + ): + invite_event_position = self.get_success( + main_store.get_topological_token_for_event(event.event_id) + ) + break + + assert invite_event_position is not None, "No invite event found" + + # Remove the topological order from the token by re-creating w/stream only + invite_event_position = RoomStreamToken(None, invite_event_position.stream) + + # Sync everything after this token + since_token = self.get_success(invite_event_position.to_string(main_store)) + sync_response = self.make_request( + "GET", + f"/sync?since={since_token}", + access_token=user_tok, + ) + + # Assert that, for this room, the user was considered to have joined and thus + # receives the full state history + state_event_types = [ + event["type"] + for event in sync_response.json_body["rooms"]["join"][room_id]["state"][ + "events" + ] + ] + + assert ( + "m.room.create" in state_event_types + ), "Missing room full state in sync response" |