From a5a799722db0c33dc61fb2c6c7282ff7e82eb2e9 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 9 Feb 2023 22:33:39 +0000 Subject: Tag federation request spans with the worker name (#15042) * Systematically include worker name as process info * Changelog * don't bother with inner setdefault --- synapse/api/auth.py | 7 ------- 1 file changed, 7 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3d7f986ac7..66e869bc2d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -32,7 +32,6 @@ from synapse.appservice import ApplicationService from synapse.http import get_request_user_agent from synapse.http.site import SynapseRequest from synapse.logging.opentracing import ( - SynapseTags, active_span, force_tracing, start_active_span, @@ -162,12 +161,6 @@ class Auth: parent_span.set_tag( "authenticated_entity", requester.authenticated_entity ) - # We tag the Synapse instance name so that it's an easy jumping - # off point into the logs. Can also be used to filter for an - # instance that is under load. - parent_span.set_tag( - SynapseTags.INSTANCE_NAME, self.hs.get_instance_name() - ) parent_span.set_tag("user_id", requester.user.to_string()) if requester.device_id is not None: parent_span.set_tag("device_id", requester.device_id) -- cgit 1.5.1 From cf5233b783273efc84b991e7242fb4761ccc201a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 10 Feb 2023 09:22:16 -0500 Subject: Avoid fetching unused account data in sync. (#14973) The per-room account data is no longer unconditionally fetched, even if all rooms will be filtered out. Global account data will not be fetched if it will all be filtered out. --- changelog.d/14973.misc | 1 + synapse/api/filtering.py | 30 +++++- synapse/handlers/account_data.py | 10 +- synapse/handlers/initial_sync.py | 5 +- synapse/handlers/room_member.py | 2 +- synapse/handlers/sync.py | 88 +++++++++-------- synapse/rest/admin/users.py | 3 +- synapse/storage/databases/main/account_data.py | 127 ++++++++++++++++++------- 8 files changed, 176 insertions(+), 90 deletions(-) create mode 100644 changelog.d/14973.misc (limited to 'synapse/api') diff --git a/changelog.d/14973.misc b/changelog.d/14973.misc new file mode 100644 index 0000000000..3657623602 --- /dev/null +++ b/changelog.d/14973.misc @@ -0,0 +1 @@ +Improve performance of `/sync` in a few situations. diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 83c42fc25a..b9f432cc23 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -219,9 +219,13 @@ class FilterCollection: self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {})) self._room_state_filter = Filter(hs, room_filter_json.get("state", {})) self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {})) - self._room_account_data = Filter(hs, room_filter_json.get("account_data", {})) + self._room_account_data_filter = Filter( + hs, room_filter_json.get("account_data", {}) + ) self._presence_filter = Filter(hs, filter_json.get("presence", {})) - self._account_data = Filter(hs, filter_json.get("account_data", {})) + self._global_account_data_filter = Filter( + hs, filter_json.get("account_data", {}) + ) self.include_leave = filter_json.get("room", {}).get("include_leave", False) self.event_fields = filter_json.get("event_fields", []) @@ -256,8 +260,10 @@ class FilterCollection: ) -> List[UserPresenceState]: return await self._presence_filter.filter(presence_states) - async def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]: - return await self._account_data.filter(events) + async def filter_global_account_data( + self, events: Iterable[JsonDict] + ) -> List[JsonDict]: + return await self._global_account_data_filter.filter(events) async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]: return await self._room_state_filter.filter( @@ -279,7 +285,7 @@ class FilterCollection: async def filter_room_account_data( self, events: Iterable[JsonDict] ) -> List[JsonDict]: - return await self._room_account_data.filter( + return await self._room_account_data_filter.filter( await self._room_filter.filter(events) ) @@ -292,6 +298,13 @@ class FilterCollection: or self._presence_filter.filters_all_senders() ) + def blocks_all_global_account_data(self) -> bool: + """True if all global acount data will be filtered out.""" + return ( + self._global_account_data_filter.filters_all_types() + or self._global_account_data_filter.filters_all_senders() + ) + def blocks_all_room_ephemeral(self) -> bool: return ( self._room_ephemeral_filter.filters_all_types() @@ -299,6 +312,13 @@ class FilterCollection: or self._room_ephemeral_filter.filters_all_rooms() ) + def blocks_all_room_account_data(self) -> bool: + return ( + self._room_account_data_filter.filters_all_types() + or self._room_account_data_filter.filters_all_senders() + or self._room_account_data_filter.filters_all_rooms() + ) + def blocks_all_room_timeline(self) -> bool: return ( self._room_timeline_filter.filters_all_types() diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 67e789eef7..797de46dbc 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -343,10 +343,12 @@ class AccountDataEventSource(EventSource[int, JsonDict]): } ) - ( - account_data, - room_account_data, - ) = await self.store.get_updated_account_data_for_user(user_id, last_stream_id) + account_data = await self.store.get_updated_global_account_data_for_user( + user_id, last_stream_id + ) + room_account_data = await self.store.get_updated_room_account_data_for_user( + user_id, last_stream_id + ) for account_data_type, content in account_data.items(): results.append({"type": account_data_type, "content": content}) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 191529bd8e..1a29abde98 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -154,9 +154,8 @@ class InitialSyncHandler: tags_by_room = await self.store.get_tags_for_user(user_id) - account_data, account_data_by_room = await self.store.get_account_data_for_user( - user_id - ) + account_data = await self.store.get_global_account_data_for_user(user_id) + account_data_by_room = await self.store.get_room_account_data_for_user(user_id) public_room_ids = await self.store.get_public_room_ids() diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index d236cc09b5..6e7141d2ef 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -484,7 +484,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): user_id: The user's ID. """ # Retrieve user account data for predecessor room - user_account_data, _ = await self.store.get_account_data_for_user(user_id) + user_account_data = await self.store.get_global_account_data_for_user(user_id) # Copy direct message state if applicable direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {}) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 202b35eee6..399685e5b7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1444,9 +1444,9 @@ class SyncHandler: logger.debug("Fetching account data") - account_data_by_room = await self._generate_sync_entry_for_account_data( - sync_result_builder - ) + # Global account data is included if it is not filtered out. + if not sync_config.filter_collection.blocks_all_global_account_data(): + await self._generate_sync_entry_for_account_data(sync_result_builder) # Presence data is included if the server has it enabled and not filtered out. include_presence_data = bool( @@ -1472,9 +1472,7 @@ class SyncHandler: ( newly_joined_rooms, newly_left_rooms, - ) = await self._generate_sync_entry_for_rooms( - sync_result_builder, account_data_by_room - ) + ) = await self._generate_sync_entry_for_rooms(sync_result_builder) # Work out which users have joined or left rooms we're in. We use this # to build the presence and device_list parts of the sync response in @@ -1717,35 +1715,29 @@ class SyncHandler: async def _generate_sync_entry_for_account_data( self, sync_result_builder: "SyncResultBuilder" - ) -> Dict[str, Dict[str, JsonDict]]: - """Generates the account data portion of the sync response. + ) -> None: + """Generates the global account data portion of the sync response. Account data (called "Client Config" in the spec) can be set either globally or for a specific room. Account data consists of a list of events which accumulate state, much like a room. - This function retrieves global and per-room account data. The former is written - to the given `sync_result_builder`. The latter is returned directly, to be - later written to the `sync_result_builder` on a room-by-room basis. + This function retrieves global account data and writes it to the given + `sync_result_builder`. See `_generate_sync_entry_for_rooms` for handling + of per-room account data. Args: sync_result_builder - - Returns: - A dictionary whose keys (room ids) map to the per room account data for that - room. """ sync_config = sync_result_builder.sync_config user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token if since_token and not sync_result_builder.full_state: - # TODO Do not fetch room account data if it will be unused. - ( - global_account_data, - account_data_by_room, - ) = await self.store.get_updated_account_data_for_user( - user_id, since_token.account_data_key + global_account_data = ( + await self.store.get_updated_global_account_data_for_user( + user_id, since_token.account_data_key + ) ) push_rules_changed = await self.store.have_push_rules_changed_for_user( @@ -1758,28 +1750,26 @@ class SyncHandler: sync_config.user ) else: - # TODO Do not fetch room account data if it will be unused. - ( - global_account_data, - account_data_by_room, - ) = await self.store.get_account_data_for_user(sync_config.user.to_string()) + all_global_account_data = await self.store.get_global_account_data_for_user( + user_id + ) - global_account_data = dict(global_account_data) + global_account_data = dict(all_global_account_data) global_account_data["m.push_rules"] = await self.push_rules_for_user( sync_config.user ) - account_data_for_user = await sync_config.filter_collection.filter_account_data( - [ - {"type": account_data_type, "content": content} - for account_data_type, content in global_account_data.items() - ] + account_data_for_user = ( + await sync_config.filter_collection.filter_global_account_data( + [ + {"type": account_data_type, "content": content} + for account_data_type, content in global_account_data.items() + ] + ) ) sync_result_builder.account_data = account_data_for_user - return account_data_by_room - async def _generate_sync_entry_for_presence( self, sync_result_builder: "SyncResultBuilder", @@ -1839,9 +1829,7 @@ class SyncHandler: sync_result_builder.presence = presence async def _generate_sync_entry_for_rooms( - self, - sync_result_builder: "SyncResultBuilder", - account_data_by_room: Dict[str, Dict[str, JsonDict]], + self, sync_result_builder: "SyncResultBuilder" ) -> Tuple[AbstractSet[str], AbstractSet[str]]: """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. @@ -1852,7 +1840,6 @@ class SyncHandler: Args: sync_result_builder - account_data_by_room: Dictionary of per room account data Returns: Returns a 2-tuple describing rooms the user has joined or left. @@ -1865,9 +1852,30 @@ class SyncHandler: since_token = sync_result_builder.since_token user_id = sync_result_builder.sync_config.user.to_string() + blocks_all_rooms = ( + sync_result_builder.sync_config.filter_collection.blocks_all_rooms() + ) + + # 0. Start by fetching room account data (if required). + if ( + blocks_all_rooms + or sync_result_builder.sync_config.filter_collection.blocks_all_room_account_data() + ): + account_data_by_room: Mapping[str, Mapping[str, JsonDict]] = {} + elif since_token and not sync_result_builder.full_state: + account_data_by_room = ( + await self.store.get_updated_room_account_data_for_user( + user_id, since_token.account_data_key + ) + ) + else: + account_data_by_room = await self.store.get_room_account_data_for_user( + user_id + ) + # 1. Start by fetching all ephemeral events in rooms we've joined (if required). block_all_room_ephemeral = ( - sync_result_builder.sync_config.filter_collection.blocks_all_rooms() + blocks_all_rooms or sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral() ) if block_all_room_ephemeral: @@ -2294,7 +2302,7 @@ class SyncHandler: room_builder: "RoomSyncResultBuilder", ephemeral: List[JsonDict], tags: Optional[Dict[str, Dict[str, Any]]], - account_data: Dict[str, JsonDict], + account_data: Mapping[str, JsonDict], always_include: bool = False, ) -> None: """Populates the `joined` and `archived` section of `sync_result_builder` diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index b9dca8ef3a..0c0bf540b9 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -1192,7 +1192,8 @@ class AccountDataRestServlet(RestServlet): if not await self._store.get_user_by_id(user_id): raise NotFoundError("User not found") - global_data, by_room_data = await self._store.get_account_data_for_user(user_id) + global_data = await self._store.get_global_account_data_for_user(user_id) + by_room_data = await self._store.get_room_account_data_for_user(user_id) return HTTPStatus.OK, { "account_data": { "global": global_data, diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 8a359d7eb8..2d6f02c14f 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -21,6 +21,7 @@ from typing import ( FrozenSet, Iterable, List, + Mapping, Optional, Tuple, cast, @@ -122,25 +123,25 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) return self._account_data_id_gen.get_current_token() @cached() - async def get_account_data_for_user( + async def get_global_account_data_for_user( self, user_id: str - ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: + ) -> Mapping[str, JsonDict]: """ - Get all the client account_data for a user. + Get all the global client account_data for a user. If experimental MSC3391 support is enabled, any entries with an empty content body are excluded; as this means they have been deleted. Args: user_id: The user to get the account_data for. + Returns: - A 2-tuple of a dict of global account_data and a dict mapping from - room_id string to per room account_data dicts. + The global account_data. """ - def get_account_data_for_user_txn( + def get_global_account_data_for_user( txn: LoggingTransaction, - ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: + ) -> Dict[str, JsonDict]: # The 'content != '{}' condition below prevents us from using # `simple_select_list_txn` here, as it doesn't support conditions # other than 'equals'. @@ -158,10 +159,34 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) txn.execute(sql, (user_id,)) rows = self.db_pool.cursor_to_dict(txn) - global_account_data = { + return { row["account_data_type"]: db_to_json(row["content"]) for row in rows } + return await self.db_pool.runInteraction( + "get_global_account_data_for_user", get_global_account_data_for_user + ) + + @cached() + async def get_room_account_data_for_user( + self, user_id: str + ) -> Mapping[str, Mapping[str, JsonDict]]: + """ + Get all of the per-room client account_data for a user. + + If experimental MSC3391 support is enabled, any entries with an empty + content body are excluded; as this means they have been deleted. + + Args: + user_id: The user to get the account_data for. + + Returns: + A dict mapping from room_id string to per-room account_data dicts. + """ + + def get_room_account_data_for_user_txn( + txn: LoggingTransaction, + ) -> Dict[str, Dict[str, JsonDict]]: # The 'content != '{}' condition below prevents us from using # `simple_select_list_txn` here, as it doesn't support conditions # other than 'equals'. @@ -185,10 +210,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) room_data[row["account_data_type"]] = db_to_json(row["content"]) - return global_account_data, by_room + return by_room return await self.db_pool.runInteraction( - "get_account_data_for_user", get_account_data_for_user_txn + "get_room_account_data_for_user_txn", get_room_account_data_for_user_txn ) @cached(num_args=2, max_entries=5000, tree=True) @@ -342,36 +367,61 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) "get_updated_room_account_data", get_updated_room_account_data_txn ) - async def get_updated_account_data_for_user( + async def get_updated_global_account_data_for_user( self, user_id: str, stream_id: int - ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: - """Get all the client account_data for a that's changed for a user + ) -> Dict[str, JsonDict]: + """Get all the global account_data that's changed for a user. Args: user_id: The user to get the account_data for. stream_id: The point in the stream since which to get updates + Returns: - A deferred pair of a dict of global account_data and a dict - mapping from room_id string to per room account_data dicts. + A dict of global account_data. """ - def get_updated_account_data_for_user_txn( + def get_updated_global_account_data_for_user( txn: LoggingTransaction, - ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: - sql = ( - "SELECT account_data_type, content FROM account_data" - " WHERE user_id = ? AND stream_id > ?" - ) - + ) -> Dict[str, JsonDict]: + sql = """ + SELECT account_data_type, content FROM account_data + WHERE user_id = ? AND stream_id > ? + """ txn.execute(sql, (user_id, stream_id)) - global_account_data = {row[0]: db_to_json(row[1]) for row in txn} + return {row[0]: db_to_json(row[1]) for row in txn} - sql = ( - "SELECT room_id, account_data_type, content FROM room_account_data" - " WHERE user_id = ? AND stream_id > ?" - ) + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(stream_id) + ) + if not changed: + return {} + + return await self.db_pool.runInteraction( + "get_updated_global_account_data_for_user", + get_updated_global_account_data_for_user, + ) + + async def get_updated_room_account_data_for_user( + self, user_id: str, stream_id: int + ) -> Dict[str, Dict[str, JsonDict]]: + """Get all the room account_data that's changed for a user. + Args: + user_id: The user to get the account_data for. + stream_id: The point in the stream since which to get updates + + Returns: + A dict mapping from room_id string to per room account_data dicts. + """ + + def get_updated_room_account_data_for_user_txn( + txn: LoggingTransaction, + ) -> Dict[str, Dict[str, JsonDict]]: + sql = """ + SELECT room_id, account_data_type, content FROM room_account_data + WHERE user_id = ? AND stream_id > ? + """ txn.execute(sql, (user_id, stream_id)) account_data_by_room: Dict[str, Dict[str, JsonDict]] = {} @@ -379,16 +429,17 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) room_account_data = account_data_by_room.setdefault(row[0], {}) room_account_data[row[1]] = db_to_json(row[2]) - return global_account_data, account_data_by_room + return account_data_by_room changed = self._account_data_stream_cache.has_entity_changed( user_id, int(stream_id) ) if not changed: - return {}, {} + return {} return await self.db_pool.runInteraction( - "get_updated_account_data_for_user", get_updated_account_data_for_user_txn + "get_updated_room_account_data_for_user", + get_updated_room_account_data_for_user_txn, ) @cached(max_entries=5000, iterable=True) @@ -444,7 +495,8 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) self.get_global_account_data_by_type_for_user.invalidate( (row.user_id, row.data_type) ) - self.get_account_data_for_user.invalidate((row.user_id,)) + self.get_global_account_data_for_user.invalidate((row.user_id,)) + self.get_room_account_data_for_user.invalidate((row.user_id,)) self.get_account_data_for_room.invalidate((row.user_id, row.room_id)) self.get_account_data_for_room_and_type.invalidate( (row.user_id, row.room_id, row.data_type) @@ -492,7 +544,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) self._account_data_stream_cache.entity_has_changed(user_id, next_id) - self.get_account_data_for_user.invalidate((user_id,)) + self.get_room_account_data_for_user.invalidate((user_id,)) self.get_account_data_for_room.invalidate((user_id, room_id)) self.get_account_data_for_room_and_type.prefill( (user_id, room_id, account_data_type), content @@ -558,7 +610,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) return None self._account_data_stream_cache.entity_has_changed(user_id, next_id) - self.get_account_data_for_user.invalidate((user_id,)) + self.get_room_account_data_for_user.invalidate((user_id,)) self.get_account_data_for_room.invalidate((user_id, room_id)) self.get_account_data_for_room_and_type.prefill( (user_id, room_id, account_data_type), {} @@ -593,7 +645,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) self._account_data_stream_cache.entity_has_changed(user_id, next_id) - self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_by_type_for_user.invalidate( (user_id, account_data_type) ) @@ -761,7 +813,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) return None self._account_data_stream_cache.entity_has_changed(user_id, next_id) - self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_by_type_for_user.prefill( (user_id, account_data_type), {} ) @@ -822,7 +874,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) txn, self.get_account_data_for_room_and_type, (user_id,) ) self._invalidate_cache_and_stream( - txn, self.get_account_data_for_user, (user_id,) + txn, self.get_global_account_data_for_user, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_room_account_data_for_user, (user_id,) ) self._invalidate_cache_and_stream( txn, self.get_global_account_data_by_type_for_user, (user_id,) -- cgit 1.5.1 From 6cddf24e361fe43f086307c833cd814dc03363b6 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Sat, 11 Feb 2023 00:31:05 +0100 Subject: Faster joins: don't stall when a user joins during a fast join (#14606) Fixes #12801. Complement tests are at https://github.com/matrix-org/complement/pull/567. Avoid blocking on full state when handling a subsequent join into a partial state room. Also always perform a remote join into partial state rooms, since we do not know whether the joining user has been banned and want to avoid leaking history to banned users. Signed-off-by: Mathieu Velten Co-authored-by: Sean Quah Co-authored-by: David Robertson --- changelog.d/14606.misc | 1 + synapse/api/errors.py | 22 ++++++ synapse/federation/federation_server.py | 2 +- synapse/handlers/event_auth.py | 16 ++--- synapse/handlers/federation.py | 2 +- synapse/handlers/federation_event.py | 59 ++++++++++++++-- synapse/handlers/message.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/room_member.py | 118 ++++++++++++++++++++++--------- synapse/handlers/room_member_worker.py | 5 +- synapse/storage/databases/main/events.py | 21 +----- tests/handlers/test_federation.py | 40 +++++------ 12 files changed, 196 insertions(+), 94 deletions(-) create mode 100644 changelog.d/14606.misc (limited to 'synapse/api') diff --git a/changelog.d/14606.misc b/changelog.d/14606.misc new file mode 100644 index 0000000000..e2debc96d8 --- /dev/null +++ b/changelog.d/14606.misc @@ -0,0 +1 @@ +Faster joins: don't stall when another user joins during a fast join resync. diff --git a/synapse/api/errors.py b/synapse/api/errors.py index c2c177fd71..9235ce6536 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -751,3 +751,25 @@ class ModuleFailedException(Exception): Raised when a module API callback fails, for example because it raised an exception. """ + + +class PartialStateConflictError(SynapseError): + """An internal error raised when attempting to persist an event with partial state + after the room containing the event has been un-partial stated. + + This error should be handled by recomputing the event context and trying again. + + This error has an HTTP status code so that it can be transported over replication. + It should not be exposed to clients. + """ + + @staticmethod + def message() -> str: + return "Cannot persist partial state event in un-partial stated room" + + def __init__(self) -> None: + super().__init__( + HTTPStatus.CONFLICT, + msg=PartialStateConflictError.message(), + errcode=Codes.UNKNOWN, + ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 6addc0bb65..6d99845de5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -48,6 +48,7 @@ from synapse.api.errors import ( FederationError, IncompatibleRoomVersionError, NotFoundError, + PartialStateConflictError, SynapseError, UnsupportedRoomVersionError, ) @@ -81,7 +82,6 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) -from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.lock import Lock from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.roommember import MemberSummary diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py index a23a8ce2a1..46dd63c3f0 100644 --- a/synapse/handlers/event_auth.py +++ b/synapse/handlers/event_auth.py @@ -202,7 +202,7 @@ class EventAuthHandler: state_ids: StateMap[str], room_version: RoomVersion, user_id: str, - prev_member_event: Optional[EventBase], + prev_membership: Optional[str], ) -> None: """ Check whether a user can join a room without an invite due to restricted join rules. @@ -214,15 +214,14 @@ class EventAuthHandler: state_ids: The state of the room as it currently is. room_version: The room version of the room being joined. user_id: The user joining the room. - prev_member_event: The current membership event for this user. + prev_membership: The current membership state for this user. `None` if the + user has never joined the room (equivalent to "leave"). Raises: AuthError if the user cannot join the room. """ # If the member is invited or currently joined, then nothing to do. - if prev_member_event and ( - prev_member_event.membership in (Membership.JOIN, Membership.INVITE) - ): + if prev_membership in (Membership.JOIN, Membership.INVITE): return # This is not a room with a restricted join rule, so we don't need to do the @@ -255,13 +254,14 @@ class EventAuthHandler: ) async def has_restricted_join_rules( - self, state_ids: StateMap[str], room_version: RoomVersion + self, partial_state_ids: StateMap[str], room_version: RoomVersion ) -> bool: """ Return if the room has the proper join rules set for access via rooms. Args: - state_ids: The state of the room as it currently is. + state_ids: The state of the room as it currently is. May be full or partial + state. room_version: The room version of the room to query. Returns: @@ -272,7 +272,7 @@ class EventAuthHandler: return False # If there's no join rule, then it defaults to invite (so this doesn't apply). - join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None) + join_rules_event_id = partial_state_ids.get((EventTypes.JoinRules, ""), None) if not join_rules_event_id: return False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 43ed4a3dd1..08727e4857 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -49,6 +49,7 @@ from synapse.api.errors import ( FederationPullAttemptBackoffError, HttpResponseException, NotFoundError, + PartialStateConflictError, RequestSendFailed, SynapseError, ) @@ -68,7 +69,6 @@ from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, ReplicationStoreRoomOnOutlierMembershipRestServlet, ) -from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import JsonDict, StrCollection, get_domain_from_id from synapse.types.state import StateFilter diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 3561f2f1de..b7136f8d1c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -47,6 +47,7 @@ from synapse.api.errors import ( FederationError, FederationPullAttemptBackoffError, HttpResponseException, + PartialStateConflictError, RequestSendFailed, SynapseError, ) @@ -74,7 +75,6 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) from synapse.state import StateResolutionStore -from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( PersistedEventPosition, @@ -441,16 +441,17 @@ class FederationEventHandler: # Check if the user is already in the room or invited to the room. user_id = event.state_key prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) - prev_member_event = None + prev_membership = None if prev_member_event_id: prev_member_event = await self._store.get_event(prev_member_event_id) + prev_membership = prev_member_event.membership # Check if the member should be allowed access via membership in a space. await self._event_auth_handler.check_restricted_join_rules( prev_state_ids, event.room_version, user_id, - prev_member_event, + prev_membership, ) @trace @@ -526,11 +527,57 @@ class FederationEventHandler: "Peristing join-via-remote %s (partial_state: %s)", event, partial_state ) with nested_logging_context(suffix=event.event_id): + if partial_state: + # When handling a second partial state join into a partial state room, + # the returned state will exclude the membership from the first join. To + # preserve prior memberships, we try to compute the partial state before + # the event ourselves if we know about any of the prev events. + # + # When we don't know about any of the prev events, it's fine to just use + # the returned state, since the new join will create a new forward + # extremity, and leave the forward extremity containing our prior + # memberships alone. + prev_event_ids = set(event.prev_event_ids()) + seen_event_ids = await self._store.have_events_in_timeline( + prev_event_ids + ) + missing_event_ids = prev_event_ids - seen_event_ids + + state_maps_to_resolve: List[StateMap[str]] = [] + + # Fetch the state after the prev events that we know about. + state_maps_to_resolve.extend( + ( + await self._state_storage_controller.get_state_groups_ids( + room_id, seen_event_ids, await_full_state=False + ) + ).values() + ) + + # When there are prev events we do not have the state for, we state + # resolve with the state returned by the remote homeserver. + if missing_event_ids or len(state_maps_to_resolve) == 0: + state_maps_to_resolve.append( + {(e.type, e.state_key): e.event_id for e in state} + ) + + state_ids_before_event = ( + await self._state_resolution_handler.resolve_events_with_store( + event.room_id, + room_version.identifier, + state_maps_to_resolve, + event_map=None, + state_res_store=StateResolutionStore(self._store), + ) + ) + else: + state_ids_before_event = { + (e.type, e.state_key): e.event_id for e in state + } + context = await self._state_handler.compute_event_context( event, - state_ids_before_event={ - (e.type, e.state_key): e.event_id for e in state - }, + state_ids_before_event=state_ids_before_event, partial_state=partial_state, ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3e30f52e4d..8f5b658d9d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -38,6 +38,7 @@ from synapse.api.errors import ( Codes, ConsentNotGivenError, NotFoundError, + PartialStateConflictError, ShadowBanError, SynapseError, UnstableSpecAuthError, @@ -57,7 +58,6 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.replication.http.send_events import ReplicationSendEventsRestServlet -from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( MutableStateMap, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 060bbcb181..837dabb3b7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -43,6 +43,7 @@ from synapse.api.errors import ( Codes, LimitExceededError, NotFoundError, + PartialStateConflictError, StoreError, SynapseError, ) @@ -54,7 +55,6 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents from synapse.handlers.relations import BundledAggregations from synapse.module_api import NOT_SPAM from synapse.rest.admin._base import assert_user_is_admin -from synapse.storage.databases.main.events import PartialStateConflictError from synapse.streams import EventSource from synapse.types import ( JsonDict, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6e7141d2ef..a965c7ec76 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -26,7 +26,13 @@ from synapse.api.constants import ( GuestAccess, Membership, ) -from synapse.api.errors import AuthError, Codes, ShadowBanError, SynapseError +from synapse.api.errors import ( + AuthError, + Codes, + PartialStateConflictError, + ShadowBanError, + SynapseError, +) from synapse.api.ratelimiting import Ratelimiter from synapse.event_auth import get_named_level, get_power_level_event from synapse.events import EventBase @@ -34,7 +40,6 @@ from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.logging import opentracing from synapse.module_api import NOT_SPAM -from synapse.storage.databases.main.events import PartialStateConflictError from synapse.types import ( JsonDict, Requester, @@ -56,6 +61,13 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +class NoKnownServersError(SynapseError): + """No server already resident to the room was provided to the join/knock operation.""" + + def __init__(self, msg: str = "No known servers"): + super().__init__(404, msg) + + class RoomMemberHandler(metaclass=abc.ABCMeta): # TODO(paul): This handler currently contains a messy conflation of # low-level API that works on UserID objects and so on, and REST-level @@ -185,6 +197,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): room_id: Room that we are trying to join user: User who is trying to join content: A dict that should be used as the content of the join event. + + Raises: + NoKnownServersError: if remote_room_hosts does not contain a server joined to + the room. """ raise NotImplementedError() @@ -823,14 +839,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): latest_event_ids = await self.store.get_prev_events_for_room(room_id) - state_before_join = await self.state_handler.compute_state_after_events( - room_id, latest_event_ids + is_partial_state_room = await self.store.is_partial_state_room(room_id) + partial_state_before_join = await self.state_handler.compute_state_after_events( + room_id, latest_event_ids, await_full_state=False ) + # `is_partial_state_room` also indicates whether `partial_state_before_join` is + # partial. # TODO: Refactor into dictionary of explicitly allowed transitions # between old and new state, with specific error messages for some # transitions and generic otherwise - old_state_id = state_before_join.get((EventTypes.Member, target.to_string())) + old_state_id = partial_state_before_join.get( + (EventTypes.Member, target.to_string()) + ) if old_state_id: old_state = await self.store.get_event(old_state_id, allow_none=True) old_membership = old_state.content.get("membership") if old_state else None @@ -881,11 +902,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if action == "kick": raise AuthError(403, "The target user is not in the room") - is_host_in_room = await self._is_host_in_room(state_before_join) + is_host_in_room = await self._is_host_in_room(partial_state_before_join) if effective_membership_state == Membership.JOIN: if requester.is_guest: - guest_can_join = await self._can_guest_join(state_before_join) + guest_can_join = await self._can_guest_join(partial_state_before_join) if not guest_can_join: # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. @@ -927,8 +948,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): room_id, remote_room_hosts, content, + is_partial_state_room, is_host_in_room, - state_before_join, + partial_state_before_join, ) if remote_join: if ratelimit: @@ -1073,8 +1095,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): room_id: str, remote_room_hosts: List[str], content: JsonDict, + is_partial_state_room: bool, is_host_in_room: bool, - state_before_join: StateMap[str], + partial_state_before_join: StateMap[str], ) -> Tuple[bool, List[str]]: """ Check whether the server should do a remote join (as opposed to a local @@ -1093,9 +1116,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): remote_room_hosts: A list of remote room hosts. content: The content to use as the event body of the join. This may be modified. - is_host_in_room: True if the host is in the room. - state_before_join: The state before the join event (i.e. the resolution of - the states after its parent events). + is_partial_state_room: `True` if the server currently doesn't hold the full + state of the room. + is_host_in_room: `True` if the host is in the room. + partial_state_before_join: The state before the join event (i.e. the + resolution of the states after its parent events). May be full or + partial state, depending on `is_partial_state_room`. Returns: A tuple of: @@ -1109,6 +1135,23 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if not is_host_in_room: return True, remote_room_hosts + prev_member_event_id = partial_state_before_join.get( + (EventTypes.Member, user_id), None + ) + previous_membership = None + if prev_member_event_id: + prev_member_event = await self.store.get_event(prev_member_event_id) + previous_membership = prev_member_event.membership + + # If we are not fully joined yet, and the target is not already in the room, + # let's do a remote join so another server with the full state can validate + # that the user has not been banned for example. + # We could just accept the join and wait for state res to resolve that later on + # but we would then leak room history to this person until then, which is pretty + # bad. + if is_partial_state_room and previous_membership != Membership.JOIN: + return True, remote_room_hosts + # If the host is in the room, but not one of the authorised hosts # for restricted join rules, a remote join must be used. room_version = await self.store.get_room_version(room_id) @@ -1116,21 +1159,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # If restricted join rules are not being used, a local join can always # be used. if not await self.event_auth_handler.has_restricted_join_rules( - state_before_join, room_version + partial_state_before_join, room_version ): return False, [] # If the user is invited to the room or already joined, the join # event can always be issued locally. - prev_member_event_id = state_before_join.get((EventTypes.Member, user_id), None) - prev_member_event = None - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - if prev_member_event.membership in ( - Membership.JOIN, - Membership.INVITE, - ): - return False, [] + if previous_membership in (Membership.JOIN, Membership.INVITE): + return False, [] + + # All the partial state cases are covered above. We have been given the full + # state of the room. + assert not is_partial_state_room + state_before_join = partial_state_before_join # If the local host has a user who can issue invites, then a local # join can be done. @@ -1154,7 +1195,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # Ensure the member should be allowed access via membership in a room. await self.event_auth_handler.check_restricted_join_rules( - state_before_join, room_version, user_id, prev_member_event + state_before_join, room_version, user_id, previous_membership ) # If this is going to be a local join, additional information must @@ -1304,11 +1345,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if prev_member_event.membership == Membership.JOIN: await self._user_left_room(target_user, room_id) - async def _can_guest_join(self, current_state_ids: StateMap[str]) -> bool: + async def _can_guest_join(self, partial_current_state_ids: StateMap[str]) -> bool: """ Returns whether a guest can join a room based on its current state. + + Args: + partial_current_state_ids: The current state of the room. May be full or + partial state. """ - guest_access_id = current_state_ids.get((EventTypes.GuestAccess, ""), None) + guest_access_id = partial_current_state_ids.get( + (EventTypes.GuestAccess, ""), None + ) if not guest_access_id: return False @@ -1634,19 +1681,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ) return event, stream_id - async def _is_host_in_room(self, current_state_ids: StateMap[str]) -> bool: + async def _is_host_in_room(self, partial_current_state_ids: StateMap[str]) -> bool: + """Returns whether the homeserver is in the room based on its current state. + + Args: + partial_current_state_ids: The current state of the room. May be full or + partial state. + """ # Have we just created the room, and is this about to be the very # first member event? - create_event_id = current_state_ids.get(("m.room.create", "")) - if len(current_state_ids) == 1 and create_event_id: + create_event_id = partial_current_state_ids.get(("m.room.create", "")) + if len(partial_current_state_ids) == 1 and create_event_id: # We can only get here if we're in the process of creating the room return True - for etype, state_key in current_state_ids: + for etype, state_key in partial_current_state_ids: if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): continue - event_id = current_state_ids[(etype, state_key)] + event_id = partial_current_state_ids[(etype, state_key)] event = await self.store.get_event(event_id, allow_none=True) if not event: continue @@ -1715,8 +1768,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): ] if len(remote_room_hosts) == 0: - raise SynapseError( - 404, + raise NoKnownServersError( "Can't join remote room because no servers " "that are in the room have been provided.", ) @@ -1947,7 +1999,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): ] if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") + raise NoKnownServersError() return await self.federation_handler.do_knock( remote_room_hosts, room_id, user.to_string(), content=content diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 221552a2a6..ba261702d4 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -15,8 +15,7 @@ import logging from typing import TYPE_CHECKING, List, Optional, Tuple -from synapse.api.errors import SynapseError -from synapse.handlers.room_member import RoomMemberHandler +from synapse.handlers.room_member import NoKnownServersError, RoomMemberHandler from synapse.replication.http.membership import ( ReplicationRemoteJoinRestServlet as ReplRemoteJoin, ReplicationRemoteKnockRestServlet as ReplRemoteKnock, @@ -52,7 +51,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): ) -> Tuple[str, int]: """Implements RoomMemberHandler._remote_join""" if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") + raise NoKnownServersError() ret = await self._remote_join_client( requester=requester, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index cb66376fb4..ffe766fd56 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -16,7 +16,6 @@ import itertools import logging from collections import OrderedDict -from http import HTTPStatus from typing import ( TYPE_CHECKING, Any, @@ -36,7 +35,7 @@ from prometheus_client import Counter import synapse.metrics from synapse.api.constants import EventContentFields, EventTypes, RelationTypes -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import PartialStateConflictError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext @@ -72,24 +71,6 @@ event_counter = Counter( ) -class PartialStateConflictError(SynapseError): - """An internal error raised when attempting to persist an event with partial state - after the room containing the event has been un-partial stated. - - This error should be handled by recomputing the event context and trying again. - - This error has an HTTP status code so that it can be transported over replication. - It should not be exposed to clients. - """ - - def __init__(self) -> None: - super().__init__( - HTTPStatus.CONFLICT, - msg="Cannot persist partial state event in un-partial stated room", - errcode=Codes.UNKNOWN, - ) - - @attr.s(slots=True, auto_attribs=True) class DeltaState: """Deltas to use to update the `current_state_events` table. diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 57675fa407..5868eb2da7 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -575,26 +575,6 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase): fed_client = fed_handler.federation_client room_id = "!room:example.com" - membership_event = make_event_from_dict( - { - "room_id": room_id, - "type": "m.room.member", - "sender": "@alice:test", - "state_key": "@alice:test", - "content": {"membership": "join"}, - }, - RoomVersions.V10, - ) - - mock_make_membership_event = Mock( - return_value=make_awaitable( - ( - "example.com", - membership_event, - RoomVersions.V10, - ) - ) - ) EVENT_CREATE = make_event_from_dict( { @@ -640,6 +620,26 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase): }, room_version=RoomVersions.V10, ) + membership_event = make_event_from_dict( + { + "room_id": room_id, + "type": "m.room.member", + "sender": "@alice:test", + "state_key": "@alice:test", + "content": {"membership": "join"}, + "prev_events": [EVENT_INVITATION_MEMBERSHIP.event_id], + }, + RoomVersions.V10, + ) + mock_make_membership_event = Mock( + return_value=make_awaitable( + ( + "example.com", + membership_event, + RoomVersions.V10, + ) + ) + ) mock_send_join = Mock( return_value=make_awaitable( SendJoinResult( -- cgit 1.5.1 From 5febf88b6c5194582f427142dc0850625547c0d9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 15 Feb 2023 11:47:57 +0000 Subject: Update the error code for duplicate annotation (#15075) --- changelog.d/15075.feature | 2 ++ synapse/api/errors.py | 4 ++++ synapse/handlers/message.py | 6 +++++- 3 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 changelog.d/15075.feature (limited to 'synapse/api') diff --git a/changelog.d/15075.feature b/changelog.d/15075.feature new file mode 100644 index 0000000000..d25a7567a4 --- /dev/null +++ b/changelog.d/15075.feature @@ -0,0 +1,2 @@ +Update the error code returned when user sends a duplicate annotation. + diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 9235ce6536..e1737de59b 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -108,6 +108,10 @@ class Codes(str, Enum): USER_AWAITING_APPROVAL = "ORG.MATRIX.MSC3866_USER_AWAITING_APPROVAL" + # Attempt to send a second annotation with the same event type & annotation key + # MSC2677 + DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION" + class CodeMessageException(RuntimeError): """An exception with integer code and message string attributes. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8f5b658d9d..aa90d0000d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1337,7 +1337,11 @@ class EventCreationHandler: relation.parent_id, event.type, aggregation_key, event.sender ) if already_exists: - raise SynapseError(400, "Can't send same reaction twice") + raise SynapseError( + 400, + "Can't send same reaction twice", + errcode=Codes.DUPLICATE_ANNOTATION, + ) # Don't attempt to start a thread if the parent event is a relation. elif relation.rel_type == RelationTypes.THREAD: -- cgit 1.5.1