diff options
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r-- | synapse/handlers/room.py | 556 |
1 files changed, 327 insertions, 229 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 970be3c846..61db3ccc43 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2018-2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,21 +16,31 @@ # limitations under the License. """Contains functions for performing events on rooms.""" + import itertools import logging import math import string from collections import OrderedDict +from typing import Tuple from six import iteritems, string_types -from twisted.internet import defer - from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion +from synapse.events.utils import copy_power_levels_contents +from synapse.http.endpoint import parse_and_validate_server_name from synapse.storage.state import StateFilter -from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID +from synapse.types import ( + Requester, + RoomAlias, + RoomID, + RoomStreamToken, + StateMap, + StreamToken, + UserID, +) from synapse.util import stringutils from synapse.util.async_helpers import Linearizer from synapse.util.caches.response_cache import ResponseCache @@ -52,18 +63,21 @@ class RoomCreationHandler(BaseHandler): "history_visibility": "shared", "original_invitees_have_ops": False, "guest_can_join": True, + "power_level_content_override": {"invite": 0}, }, RoomCreationPreset.TRUSTED_PRIVATE_CHAT: { "join_rules": JoinRules.INVITE, "history_visibility": "shared", "original_invitees_have_ops": True, "guest_can_join": True, + "power_level_content_override": {"invite": 0}, }, RoomCreationPreset.PUBLIC_CHAT: { "join_rules": JoinRules.PUBLIC, "history_visibility": "shared", "original_invitees_have_ops": False, "guest_can_join": False, + "power_level_content_override": {}, }, } @@ -75,6 +89,8 @@ class RoomCreationHandler(BaseHandler): self.room_member_handler = hs.get_room_member_handler() self.config = hs.config + self._replication = hs.get_replication_data_handler() + # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") @@ -88,19 +104,20 @@ class RoomCreationHandler(BaseHandler): self.third_party_event_rules = hs.get_third_party_event_rules() - @defer.inlineCallbacks - def upgrade_room(self, requester, old_room_id, new_version): + async def upgrade_room( + self, requester: Requester, old_room_id: str, new_version: RoomVersion + ): """Replace a room with a new room with a different version Args: - requester (synapse.types.Requester): the user requesting the upgrade - old_room_id (unicode): the id of the room to be replaced - new_version (unicode): the new room version to use + requester: the user requesting the upgrade + old_room_id: the id of the room to be replaced + new_version: the new room version to use Returns: Deferred[unicode]: the new room id """ - yield self.ratelimit(requester) + await self.ratelimit(requester) user_id = requester.user.to_string() @@ -121,53 +138,56 @@ class RoomCreationHandler(BaseHandler): # If this user has sent multiple upgrade requests for the same room # and one of them is not complete yet, cache the response and # return it to all subsequent requests - ret = yield self._upgrade_response_cache.wrap( + ret = await self._upgrade_response_cache.wrap( (old_room_id, user_id), self._upgrade_room, requester, old_room_id, new_version, # args for _upgrade_room ) + return ret - @defer.inlineCallbacks - def _upgrade_room(self, requester, old_room_id, new_version): + async def _upgrade_room( + self, requester: Requester, old_room_id: str, new_version: RoomVersion + ): user_id = requester.user.to_string() # start by allocating a new room id - r = yield self.store.get_room(old_room_id) + r = await self.store.get_room(old_room_id) if r is None: raise NotFoundError("Unknown room id %s" % (old_room_id,)) - new_room_id = yield self._generate_room_id( - creator_id=user_id, is_public=r["is_public"] + new_room_id = await self._generate_room_id( + creator_id=user_id, is_public=r["is_public"], room_version=new_version, ) logger.info("Creating new room %s to replace %s", new_room_id, old_room_id) # we create and auth the tombstone event before properly creating the new # room, to check our user has perms in the old room. - tombstone_event, tombstone_context = ( - yield self.event_creation_handler.create_event( - requester, - { - "type": EventTypes.Tombstone, - "state_key": "", - "room_id": old_room_id, - "sender": user_id, - "content": { - "body": "This room has been replaced", - "replacement_room": new_room_id, - }, + ( + tombstone_event, + tombstone_context, + ) = await self.event_creation_handler.create_event( + requester, + { + "type": EventTypes.Tombstone, + "state_key": "", + "room_id": old_room_id, + "sender": user_id, + "content": { + "body": "This room has been replaced", + "replacement_room": new_room_id, }, - token_id=requester.access_token_id, - ) + }, + token_id=requester.access_token_id, ) - old_room_version = yield self.store.get_room_version(old_room_id) - yield self.auth.check_from_context( + old_room_version = await self.store.get_room_version_id(old_room_id) + await self.auth.check_from_context( old_room_version, tombstone_event, tombstone_context ) - yield self.clone_existing_room( + await self.clone_existing_room( requester, old_room_id=old_room_id, new_room_id=new_room_id, @@ -176,36 +196,44 @@ class RoomCreationHandler(BaseHandler): ) # now send the tombstone - yield self.event_creation_handler.send_nonmember_event( + await self.event_creation_handler.send_nonmember_event( requester, tombstone_event, tombstone_context ) - old_room_state = yield tombstone_context.get_current_state_ids(self.store) + old_room_state = await tombstone_context.get_current_state_ids() # update any aliases - yield self._move_aliases_to_new_room( + await self._move_aliases_to_new_room( requester, old_room_id, new_room_id, old_room_state ) - # and finally, shut down the PLs in the old room, and update them in the new + # Copy over user push rules, tags and migrate room directory state + await self.room_member_handler.transfer_room_state_on_room_upgrade( + old_room_id, new_room_id + ) + + # finally, shut down the PLs in the old room, and update them in the new # room. - yield self._update_upgraded_room_pls( - requester, old_room_id, new_room_id, old_room_state + await self._update_upgraded_room_pls( + requester, old_room_id, new_room_id, old_room_state, ) return new_room_id - @defer.inlineCallbacks - def _update_upgraded_room_pls( - self, requester, old_room_id, new_room_id, old_room_state + async def _update_upgraded_room_pls( + self, + requester: Requester, + old_room_id: str, + new_room_id: str, + old_room_state: StateMap[str], ): """Send updated power levels in both rooms after an upgrade Args: - requester (synapse.types.Requester): the user requesting the upgrade - old_room_id (unicode): the id of the room to be replaced - new_room_id (unicode): the id of the replacement room - old_room_state (dict[tuple[str, str], str]): the state map for the old room + requester: the user requesting the upgrade + old_room_id: the id of the room to be replaced + new_room_id: the id of the replacement room + old_room_state: the state map for the old room Returns: Deferred @@ -219,7 +247,7 @@ class RoomCreationHandler(BaseHandler): ) return - old_room_pl_state = yield self.store.get_event(old_room_pl_event_id) + old_room_pl_state = await self.store.get_event(old_room_pl_event_id) # we try to stop regular users from speaking by setting the PL required # to send regular events and invites to 'Moderator' level. That's normally @@ -234,7 +262,7 @@ class RoomCreationHandler(BaseHandler): for v in ("invite", "events_default"): current = int(pl_content.get(v, 0)) if current < restricted_level: - logger.info( + logger.debug( "Setting level for %s in %s to %i (was %i)", v, old_room_id, @@ -244,11 +272,11 @@ class RoomCreationHandler(BaseHandler): pl_content[v] = restricted_level updated = True else: - logger.info("Not setting level for %s (already %i)", v, current) + logger.debug("Not setting level for %s (already %i)", v, current) if updated: try: - yield self.event_creation_handler.create_and_send_nonmember_event( + await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.PowerLevels, @@ -262,8 +290,7 @@ class RoomCreationHandler(BaseHandler): except AuthError as e: logger.warning("Unable to update PLs in old room: %s", e) - logger.info("Setting correct PLs in new room") - yield self.event_creation_handler.create_and_send_nonmember_event( + await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.PowerLevels, @@ -275,22 +302,25 @@ class RoomCreationHandler(BaseHandler): ratelimit=False, ) - @defer.inlineCallbacks - def clone_existing_room( - self, requester, old_room_id, new_room_id, new_room_version, tombstone_event_id + async def clone_existing_room( + self, + requester: Requester, + old_room_id: str, + new_room_id: str, + new_room_version: RoomVersion, + tombstone_event_id: str, ): """Populate a new room based on an old room Args: - requester (synapse.types.Requester): the user requesting the upgrade - old_room_id (unicode): the id of the room to be replaced - new_room_id (unicode): the id to give the new room (should already have been + requester: the user requesting the upgrade + old_room_id : the id of the room to be replaced + new_room_id: the id to give the new room (should already have been created with _gemerate_room_id()) - new_room_version (unicode): the new room version to use - tombstone_event_id (unicode|str): the ID of the tombstone event in the old - room. + new_room_version: the new room version to use + tombstone_event_id: the ID of the tombstone event in the old room. Returns: - Deferred[None] + Deferred """ user_id = requester.user.to_string() @@ -298,21 +328,21 @@ class RoomCreationHandler(BaseHandler): raise SynapseError(403, "You are not permitted to create rooms") creation_content = { - "room_version": new_room_version, + "room_version": new_room_version.identifier, "predecessor": {"room_id": old_room_id, "event_id": tombstone_event_id}, } # Check if old room was non-federatable # Get old room's create event - old_room_create_event = yield self.store.get_create_event_for_room(old_room_id) + old_room_create_event = await self.store.get_create_event_for_room(old_room_id) # Check if the create event specified a non-federatable room if not old_room_create_event.content.get("m.federate", True): # If so, mark the new room as non-federatable as well creation_content["m.federate"] = False - initial_state = dict() + initial_state = {} # Replicate relevant room events types_to_copy = ( @@ -322,23 +352,52 @@ class RoomCreationHandler(BaseHandler): (EventTypes.RoomHistoryVisibility, ""), (EventTypes.GuestAccess, ""), (EventTypes.RoomAvatar, ""), - (EventTypes.Encryption, ""), + (EventTypes.RoomEncryption, ""), (EventTypes.ServerACL, ""), (EventTypes.RelatedGroups, ""), + (EventTypes.PowerLevels, ""), ) - old_room_state_ids = yield self.store.get_filtered_current_state_ids( + old_room_state_ids = await self.store.get_filtered_current_state_ids( old_room_id, StateFilter.from_types(types_to_copy) ) # map from event_id to BaseEvent - old_room_state_events = yield self.store.get_events(old_room_state_ids.values()) + old_room_state_events = await self.store.get_events(old_room_state_ids.values()) for k, old_event_id in iteritems(old_room_state_ids): old_event = old_room_state_events.get(old_event_id) if old_event: initial_state[k] = old_event.content - yield self._send_events_for_new_room( + # deep-copy the power-levels event before we start modifying it + # note that if frozen_dicts are enabled, `power_levels` will be a frozen + # dict so we can't just copy.deepcopy it. + initial_state[ + (EventTypes.PowerLevels, "") + ] = power_levels = copy_power_levels_contents( + initial_state[(EventTypes.PowerLevels, "")] + ) + + # Resolve the minimum power level required to send any state event + # We will give the upgrading user this power level temporarily (if necessary) such that + # they are able to copy all of the state events over, then revert them back to their + # original power level afterwards in _update_upgraded_room_pls + + # Copy over user power levels now as this will not be possible with >100PL users once + # the room has been created + + # Calculate the minimum power level needed to clone the room + event_power_levels = power_levels.get("events", {}) + state_default = power_levels.get("state_default", 0) + ban = power_levels.get("ban") + needed_power_level = max(state_default, ban, max(event_power_levels.values())) + + # Raise the requester's power level in the new room if necessary + current_power_level = power_levels["users"][user_id] + if current_power_level < needed_power_level: + power_levels["users"][user_id] = needed_power_level + + await self._send_events_for_new_room( requester, new_room_id, # we expect to override all the presets with initial_state, so this is @@ -350,12 +409,12 @@ class RoomCreationHandler(BaseHandler): ) # Transfer membership events - old_room_member_state_ids = yield self.store.get_filtered_current_state_ids( + old_room_member_state_ids = await self.store.get_filtered_current_state_ids( old_room_id, StateFilter.from_types([(EventTypes.Member, None)]) ) # map from event_id to BaseEvent - old_room_member_state_events = yield self.store.get_events( + old_room_member_state_events = await self.store.get_events( old_room_member_state_ids.values() ) for k, old_event in iteritems(old_room_member_state_events): @@ -364,7 +423,7 @@ class RoomCreationHandler(BaseHandler): "membership" in old_event.content and old_event.content["membership"] == "ban" ): - yield self.room_member_handler.update_membership( + await self.room_member_handler.update_membership( requester, UserID.from_string(old_event["state_key"]), new_room_id, @@ -376,102 +435,93 @@ class RoomCreationHandler(BaseHandler): # XXX invites/joins # XXX 3pid invites - @defer.inlineCallbacks - def _move_aliases_to_new_room( - self, requester, old_room_id, new_room_id, old_room_state + async def _move_aliases_to_new_room( + self, + requester: Requester, + old_room_id: str, + new_room_id: str, + old_room_state: StateMap[str], ): - directory_handler = self.hs.get_handlers().directory_handler - - aliases = yield self.store.get_aliases_for_room(old_room_id) - # check to see if we have a canonical alias. - canonical_alias = None + canonical_alias_event = None canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, "")) if canonical_alias_event_id: - canonical_alias_event = yield self.store.get_event(canonical_alias_event_id) - if canonical_alias_event: - canonical_alias = canonical_alias_event.content.get("alias", "") + canonical_alias_event = await self.store.get_event(canonical_alias_event_id) - # first we try to remove the aliases from the old room (we suppress sending - # the room_aliases event until the end). - # - # Note that we'll only be able to remove aliases that (a) aren't owned by an AS, - # and (b) unless the user is a server admin, which the user created. - # - # This is probably correct - given we don't allow such aliases to be deleted - # normally, it would be odd to allow it in the case of doing a room upgrade - - # but it makes the upgrade less effective, and you have to wonder why a room - # admin can't remove aliases that point to that room anyway. - # (cf https://github.com/matrix-org/synapse/issues/2360) - # - removed_aliases = [] - for alias_str in aliases: - alias = RoomAlias.from_string(alias_str) - try: - yield directory_handler.delete_association( - requester, alias, send_event=False - ) - removed_aliases.append(alias_str) - except SynapseError as e: - logger.warning("Unable to remove alias %s from old room: %s", alias, e) + await self.store.update_aliases_for_room(old_room_id, new_room_id) - # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest - # of this. - if not removed_aliases: + if not canonical_alias_event: return + # If there is a canonical alias we need to update the one in the old + # room and set one in the new one. + old_canonical_alias_content = dict(canonical_alias_event.content) + new_canonical_alias_content = {} + + canonical = canonical_alias_event.content.get("alias") + if canonical and self.hs.is_mine_id(canonical): + new_canonical_alias_content["alias"] = canonical + old_canonical_alias_content.pop("alias", None) + + # We convert to a list as it will be a Tuple. + old_alt_aliases = list(old_canonical_alias_content.get("alt_aliases", [])) + if old_alt_aliases: + old_canonical_alias_content["alt_aliases"] = old_alt_aliases + new_alt_aliases = new_canonical_alias_content.setdefault("alt_aliases", []) + for alias in canonical_alias_event.content.get("alt_aliases", []): + try: + if self.hs.is_mine_id(alias): + new_alt_aliases.append(alias) + old_alt_aliases.remove(alias) + except Exception: + logger.info( + "Invalid alias %s in canonical alias event %s", + alias, + canonical_alias_event_id, + ) + + if not old_alt_aliases: + old_canonical_alias_content.pop("alt_aliases") + + # If a canonical alias event existed for the old room, fire a canonical + # alias event for the new room with a copy of the information. try: - # this can fail if, for some reason, our user doesn't have perms to send - # m.room.aliases events in the old room (note that we've already checked that - # they have perms to send a tombstone event, so that's not terribly likely). - # - # If that happens, it's regrettable, but we should carry on: it's the same - # as when you remove an alias from the directory normally - it just means that - # the aliases event gets out of sync with the directory - # (cf https://github.com/vector-im/riot-web/issues/2369) - yield directory_handler.send_room_alias_update_event(requester, old_room_id) - except AuthError as e: - logger.warning("Failed to send updated alias event on old room: %s", e) - - # we can now add any aliases we successfully removed to the new room. - for alias in removed_aliases: - try: - yield directory_handler.create_association( - requester, - RoomAlias.from_string(alias), - new_room_id, - servers=(self.hs.hostname,), - send_event=False, - check_membership=False, - ) - logger.info("Moved alias %s to new room", alias) - except SynapseError as e: - # I'm not really expecting this to happen, but it could if the spam - # checking module decides it shouldn't, or similar. - logger.error("Error adding alias %s to new room: %s", alias, e) + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": old_room_id, + "sender": requester.user.to_string(), + "content": old_canonical_alias_content, + }, + ratelimit=False, + ) + except SynapseError as e: + # again I'm not really expecting this to fail, but if it does, I'd rather + # we returned the new room to the client at this point. + logger.error("Unable to send updated alias events in old room: %s", e) try: - if canonical_alias and (canonical_alias in removed_aliases): - yield self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.CanonicalAlias, - "state_key": "", - "room_id": new_room_id, - "sender": requester.user.to_string(), - "content": {"alias": canonical_alias}, - }, - ratelimit=False, - ) - - yield directory_handler.send_room_alias_update_event(requester, new_room_id) + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": new_room_id, + "sender": requester.user.to_string(), + "content": new_canonical_alias_content, + }, + ratelimit=False, + ) except SynapseError as e: # again I'm not really expecting this to fail, but if it does, I'd rather # we returned the new room to the client at this point. logger.error("Unable to send updated alias events in new room: %s", e) - @defer.inlineCallbacks - def create_room(self, requester, config, ratelimit=True, creator_join_profile=None): + async def create_room( + self, requester, config, ratelimit=True, creator_join_profile=None + ) -> Tuple[dict, int]: """ Creates a new room. Args: @@ -488,9 +538,9 @@ class RoomCreationHandler(BaseHandler): `avatar_url` and/or `displayname`. Returns: - Deferred[dict]: - a dict containing the keys `room_id` and, if an alias was - requested, `room_alias`. + First, a dict containing the keys `room_id` and, if an alias + was, requested, `room_alias`. Secondly, the stream_id of the + last persisted event. Raises: SynapseError if the room ID couldn't be stored, or something went horribly wrong. @@ -499,7 +549,7 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - yield self.auth.check_auth_blocking(user_id) + await self.auth.check_auth_blocking(user_id) if ( self._server_notices_mxid is not None @@ -508,13 +558,17 @@ class RoomCreationHandler(BaseHandler): # allow the server notices mxid to create rooms is_requester_admin = True else: - is_requester_admin = yield self.auth.is_server_admin(requester.user) + is_requester_admin = await self.auth.is_server_admin(requester.user) # Check whether the third party rules allows/changes the room create # request. - yield self.third_party_event_rules.on_create_room( + event_allowed = await self.third_party_event_rules.on_create_room( requester, config, is_requester_admin=is_requester_admin ) + if not event_allowed: + raise SynapseError( + 403, "You are not permitted to create rooms", Codes.FORBIDDEN + ) if not is_requester_admin and not self.spam_checker.user_may_create_room( user_id @@ -522,16 +576,17 @@ class RoomCreationHandler(BaseHandler): raise SynapseError(403, "You are not permitted to create rooms") if ratelimit: - yield self.ratelimit(requester) + await self.ratelimit(requester) - room_version = config.get( + room_version_id = config.get( "room_version", self.config.default_room_version.identifier ) - if not isinstance(room_version, string_types): + if not isinstance(room_version_id, string_types): raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON) - if room_version not in KNOWN_ROOM_VERSIONS: + room_version = KNOWN_ROOM_VERSIONS.get(room_version_id) + if room_version is None: raise SynapseError( 400, "Your homeserver does not support this room version", @@ -544,7 +599,7 @@ class RoomCreationHandler(BaseHandler): raise SynapseError(400, "Invalid characters in room alias") room_alias = RoomAlias(config["room_alias_name"], self.hs.hostname) - mapping = yield self.store.get_association_from_room_alias(room_alias) + mapping = await self.store.get_association_from_room_alias(room_alias) if mapping: raise SynapseError(400, "Room alias already taken", Codes.ROOM_IN_USE) @@ -554,11 +609,12 @@ class RoomCreationHandler(BaseHandler): invite_list = config.get("invite", []) for i in invite_list: try: - UserID.from_string(i) + uid = UserID.from_string(i) + parse_and_validate_server_name(uid.domain) except Exception: raise SynapseError(400, "Invalid user_id: %s" % (i,)) - yield self.event_creation_handler.assert_accepted_privacy_policy(requester) + await self.event_creation_handler.assert_accepted_privacy_policy(requester) power_level_content_override = config.get("power_level_content_override") if ( @@ -577,19 +633,27 @@ class RoomCreationHandler(BaseHandler): visibility = config.get("visibility", None) is_public = visibility == "public" - room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public) + room_id = await self._generate_room_id( + creator_id=user_id, is_public=is_public, room_version=room_version, + ) directory_handler = self.hs.get_handlers().directory_handler if room_alias: - yield directory_handler.create_association( + await directory_handler.create_association( requester=requester, room_id=room_id, room_alias=room_alias, servers=[self.hs.hostname], - send_event=False, check_membership=False, ) + if is_public: + if not self.config.is_publishing_room_allowed(user_id, room_id, room_alias): + # Lets just return a generic message, as there may be all sorts of + # reasons why we said no. TODO: Allow configurable error messages + # per alias creation rule? + raise SynapseError(403, "Not allowed to publish room") + preset_config = config.get( "preset", RoomCreationPreset.PRIVATE_CHAT @@ -606,9 +670,9 @@ class RoomCreationHandler(BaseHandler): creation_content = config.get("creation_content", {}) # override any attempt to set room versions via the creation_content - creation_content["room_version"] = room_version + creation_content["room_version"] = room_version.identifier - yield self._send_events_for_new_room( + last_stream_id = await self._send_events_for_new_room( requester, room_id, preset_config=preset_config, @@ -622,7 +686,10 @@ class RoomCreationHandler(BaseHandler): if "name" in config: name = config["name"] - yield self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Name, @@ -636,7 +703,10 @@ class RoomCreationHandler(BaseHandler): if "topic" in config: topic = config["topic"] - yield self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Topic, @@ -654,7 +724,7 @@ class RoomCreationHandler(BaseHandler): if is_direct: content["is_direct"] = is_direct - yield self.room_member_handler.update_membership( + _, last_stream_id = await self.room_member_handler.update_membership( requester, UserID.from_string(invitee), room_id, @@ -668,7 +738,7 @@ class RoomCreationHandler(BaseHandler): id_access_token = invite_3pid.get("id_access_token") # optional address = invite_3pid["address"] medium = invite_3pid["medium"] - yield self.hs.get_room_member_handler().do_3pid_invite( + last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite( room_id, requester.user, medium, @@ -683,12 +753,15 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() - yield directory_handler.send_room_alias_update_event(requester, room_id) - return result + # Always wait for room creation to progate before returning + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", last_stream_id + ) + + return result, last_stream_id - @defer.inlineCallbacks - def _send_events_for_new_room( + async def _send_events_for_new_room( self, creator, # A Requester object. room_id, @@ -697,9 +770,15 @@ class RoomCreationHandler(BaseHandler): initial_state, creation_content, room_alias=None, - power_level_content_override=None, + power_level_content_override=None, # Doesn't apply when initial state has power level state event content creator_join_profile=None, - ): + ) -> int: + """Sends the initial events into a new room. + + Returns: + The stream_id of the last event persisted. + """ + def create(etype, content, **kwargs): e = {"type": etype, "content": content} @@ -708,13 +787,16 @@ class RoomCreationHandler(BaseHandler): return e - @defer.inlineCallbacks - def send(etype, content, **kwargs): + async def send(etype, content, **kwargs) -> int: event = create(etype, content, **kwargs) - logger.info("Sending %s in new room", etype) - yield self.event_creation_handler.create_and_send_nonmember_event( + logger.debug("Sending %s in new room", etype) + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False ) + return last_stream_id config = RoomCreationHandler.PRESETS_DICT[preset_config] @@ -723,10 +805,10 @@ class RoomCreationHandler(BaseHandler): event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""} creation_content.update({"creator": creator_id}) - yield send(etype=EventTypes.Create, content=creation_content) + await send(etype=EventTypes.Create, content=creation_content) - logger.info("Sending %s in new room", EventTypes.Member) - yield self.room_member_handler.update_membership( + logger.debug("Sending %s in new room", EventTypes.Member) + await self.room_member_handler.update_membership( creator, creator.user, room_id, @@ -739,7 +821,9 @@ class RoomCreationHandler(BaseHandler): # of the first events that get sent into a room. pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) if pl_content is not None: - yield send(etype=EventTypes.PowerLevels, content=pl_content) + last_sent_stream_id = await send( + etype=EventTypes.PowerLevels, content=pl_content + ) else: power_level_content = { "users": {creator_id: 100}, @@ -750,52 +834,65 @@ class RoomCreationHandler(BaseHandler): EventTypes.RoomHistoryVisibility: 100, EventTypes.CanonicalAlias: 50, EventTypes.RoomAvatar: 50, + EventTypes.Tombstone: 100, + EventTypes.ServerACL: 100, + EventTypes.RoomEncryption: 100, }, "events_default": 0, "state_default": 50, "ban": 50, "kick": 50, "redact": 50, - "invite": 0, + "invite": 50, } if config["original_invitees_have_ops"]: for invitee in invite_list: power_level_content["users"][invitee] = 100 + # Power levels overrides are defined per chat preset + power_level_content.update(config["power_level_content_override"]) + if power_level_content_override: power_level_content.update(power_level_content_override) - yield send(etype=EventTypes.PowerLevels, content=power_level_content) + last_sent_stream_id = await send( + etype=EventTypes.PowerLevels, content=power_level_content + ) if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: - yield send( + last_sent_stream_id = await send( etype=EventTypes.CanonicalAlias, content={"alias": room_alias.to_string()}, ) if (EventTypes.JoinRules, "") not in initial_state: - yield send( + last_sent_stream_id = await send( etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]} ) if (EventTypes.RoomHistoryVisibility, "") not in initial_state: - yield send( + last_sent_stream_id = await send( etype=EventTypes.RoomHistoryVisibility, content={"history_visibility": config["history_visibility"]}, ) if config["guest_can_join"]: if (EventTypes.GuestAccess, "") not in initial_state: - yield send( + last_sent_stream_id = await send( etype=EventTypes.GuestAccess, content={"guest_access": "can_join"} ) for (etype, state_key), content in initial_state.items(): - yield send(etype=etype, state_key=state_key, content=content) + last_sent_stream_id = await send( + etype=etype, state_key=state_key, content=content + ) - @defer.inlineCallbacks - def _generate_room_id(self, creator_id, is_public): + return last_sent_stream_id + + async def _generate_room_id( + self, creator_id: str, is_public: str, room_version: RoomVersion, + ): # autogen room IDs and try to create it. We may clash, so just # try a few times till one goes through, giving up eventually. attempts = 0 @@ -805,10 +902,11 @@ class RoomCreationHandler(BaseHandler): gen_room_id = RoomID(random_string, self.hs.hostname).to_string() if isinstance(gen_room_id, bytes): gen_room_id = gen_room_id.decode("utf-8") - yield self.store.store_room( + await self.store.store_room( room_id=gen_room_id, room_creator_user_id=creator_id, is_public=is_public, + room_version=room_version, ) return gen_room_id except StoreError: @@ -820,9 +918,10 @@ class RoomContextHandler(object): def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() + self.storage = hs.get_storage() + self.state_store = self.storage.state - @defer.inlineCallbacks - def get_event_context(self, user, room_id, event_id, limit, event_filter): + async def get_event_context(self, user, room_id, event_id, limit, event_filter): """Retrieves events, pagination tokens and state around a given event in a room. @@ -841,31 +940,38 @@ class RoomContextHandler(object): before_limit = math.floor(limit / 2.0) after_limit = limit - before_limit - users = yield self.store.get_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) is_peeking = user.to_string() not in users def filter_evts(events): return filter_events_for_client( - self.store, user.to_string(), events, is_peeking=is_peeking + self.storage, user.to_string(), events, is_peeking=is_peeking ) - event = yield self.store.get_event( + event = await self.store.get_event( event_id, get_prev_content=True, allow_none=True ) if not event: return None - filtered = yield (filter_evts([event])) + filtered = await filter_evts([event]) if not filtered: raise AuthError(403, "You don't have permission to access that event.") - results = yield self.store.get_events_around( + results = await self.store.get_events_around( room_id, event_id, before_limit, after_limit, event_filter ) - results["events_before"] = yield filter_evts(results["events_before"]) - results["events_after"] = yield filter_evts(results["events_after"]) - results["event"] = event + if event_filter: + results["events_before"] = event_filter.filter(results["events_before"]) + results["events_after"] = event_filter.filter(results["events_after"]) + + results["events_before"] = await filter_evts(results["events_before"]) + results["events_after"] = await filter_evts(results["events_after"]) + # filter_evts can return a pruned event in case the user is allowed to see that + # there's something there but not see the content, so use the event that's in + # `filtered` rather than the event we retrieved from the datastore. + results["event"] = filtered[0] if results["events_after"]: last_event_id = results["events_after"][-1].event_id @@ -888,10 +994,15 @@ class RoomContextHandler(object): # first? Shouldn't we be consistent with /sync? # https://github.com/matrix-org/matrix-doc/issues/687 - state = yield self.store.get_state_for_events( + state = await self.state_store.get_state_for_events( [last_event_id], state_filter=state_filter ) - results["state"] = list(state[last_event_id].values()) + + state_events = list(state[last_event_id].values()) + if event_filter: + state_events = event_filter.filter(state_events) + + results["state"] = await filter_evts(state_events) # We use a dummy token here as we only care about the room portion of # the token, which we replace. @@ -910,17 +1021,16 @@ class RoomEventSource(object): def __init__(self, hs): self.store = hs.get_datastore() - @defer.inlineCallbacks - def get_new_events( + async def get_new_events( self, user, from_key, limit, room_ids, is_guest, explicit_room_id=None ): # We just ignore the key for now. - to_key = yield self.get_current_key() + to_key = await self.get_current_key() from_token = RoomStreamToken.parse(from_key) if from_token.topological: - logger.warn("Stream has topological part!!!! %r", from_key) + logger.warning("Stream has topological part!!!! %r", from_key) from_key = "s%s" % (from_token.stream,) app_service = self.store.get_app_service_by_user_id(user.to_string()) @@ -929,11 +1039,11 @@ class RoomEventSource(object): # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() else: - room_events = yield self.store.get_membership_changes_for_user( + room_events = await self.store.get_membership_changes_for_user( user.to_string(), from_key, to_key ) - room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_to_events = await self.store.get_room_events_stream_for_rooms( room_ids=room_ids, from_key=from_key, to_key=to_key, @@ -961,15 +1071,3 @@ class RoomEventSource(object): def get_current_key_for_room(self, room_id): return self.store.get_room_events_max_id(room_id) - - @defer.inlineCallbacks - def get_pagination_rows(self, user, config, key): - events, next_key = yield self.store.paginate_room_events( - room_id=key, - from_key=config.from_key, - to_key=config.to_key, - direction=config.direction, - limit=config.limit, - ) - - return (events, next_key) |