diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 970be3c846..61db3ccc43 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,21 +16,31 @@
# limitations under the License.
"""Contains functions for performing events on rooms."""
+
import itertools
import logging
import math
import string
from collections import OrderedDict
+from typing import Tuple
from six import iteritems, string_types
-from twisted.internet import defer
-
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
+from synapse.events.utils import copy_power_levels_contents
+from synapse.http.endpoint import parse_and_validate_server_name
from synapse.storage.state import StateFilter
-from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+ Requester,
+ RoomAlias,
+ RoomID,
+ RoomStreamToken,
+ StateMap,
+ StreamToken,
+ UserID,
+)
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.response_cache import ResponseCache
@@ -52,18 +63,21 @@ class RoomCreationHandler(BaseHandler):
"history_visibility": "shared",
"original_invitees_have_ops": False,
"guest_can_join": True,
+ "power_level_content_override": {"invite": 0},
},
RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
"join_rules": JoinRules.INVITE,
"history_visibility": "shared",
"original_invitees_have_ops": True,
"guest_can_join": True,
+ "power_level_content_override": {"invite": 0},
},
RoomCreationPreset.PUBLIC_CHAT: {
"join_rules": JoinRules.PUBLIC,
"history_visibility": "shared",
"original_invitees_have_ops": False,
"guest_can_join": False,
+ "power_level_content_override": {},
},
}
@@ -75,6 +89,8 @@ class RoomCreationHandler(BaseHandler):
self.room_member_handler = hs.get_room_member_handler()
self.config = hs.config
+ self._replication = hs.get_replication_data_handler()
+
# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@@ -88,19 +104,20 @@ class RoomCreationHandler(BaseHandler):
self.third_party_event_rules = hs.get_third_party_event_rules()
- @defer.inlineCallbacks
- def upgrade_room(self, requester, old_room_id, new_version):
+ async def upgrade_room(
+ self, requester: Requester, old_room_id: str, new_version: RoomVersion
+ ):
"""Replace a room with a new room with a different version
Args:
- requester (synapse.types.Requester): the user requesting the upgrade
- old_room_id (unicode): the id of the room to be replaced
- new_version (unicode): the new room version to use
+ requester: the user requesting the upgrade
+ old_room_id: the id of the room to be replaced
+ new_version: the new room version to use
Returns:
Deferred[unicode]: the new room id
"""
- yield self.ratelimit(requester)
+ await self.ratelimit(requester)
user_id = requester.user.to_string()
@@ -121,53 +138,56 @@ class RoomCreationHandler(BaseHandler):
# If this user has sent multiple upgrade requests for the same room
# and one of them is not complete yet, cache the response and
# return it to all subsequent requests
- ret = yield self._upgrade_response_cache.wrap(
+ ret = await self._upgrade_response_cache.wrap(
(old_room_id, user_id),
self._upgrade_room,
requester,
old_room_id,
new_version, # args for _upgrade_room
)
+
return ret
- @defer.inlineCallbacks
- def _upgrade_room(self, requester, old_room_id, new_version):
+ async def _upgrade_room(
+ self, requester: Requester, old_room_id: str, new_version: RoomVersion
+ ):
user_id = requester.user.to_string()
# start by allocating a new room id
- r = yield self.store.get_room(old_room_id)
+ r = await self.store.get_room(old_room_id)
if r is None:
raise NotFoundError("Unknown room id %s" % (old_room_id,))
- new_room_id = yield self._generate_room_id(
- creator_id=user_id, is_public=r["is_public"]
+ new_room_id = await self._generate_room_id(
+ creator_id=user_id, is_public=r["is_public"], room_version=new_version,
)
logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
# we create and auth the tombstone event before properly creating the new
# room, to check our user has perms in the old room.
- tombstone_event, tombstone_context = (
- yield self.event_creation_handler.create_event(
- requester,
- {
- "type": EventTypes.Tombstone,
- "state_key": "",
- "room_id": old_room_id,
- "sender": user_id,
- "content": {
- "body": "This room has been replaced",
- "replacement_room": new_room_id,
- },
+ (
+ tombstone_event,
+ tombstone_context,
+ ) = await self.event_creation_handler.create_event(
+ requester,
+ {
+ "type": EventTypes.Tombstone,
+ "state_key": "",
+ "room_id": old_room_id,
+ "sender": user_id,
+ "content": {
+ "body": "This room has been replaced",
+ "replacement_room": new_room_id,
},
- token_id=requester.access_token_id,
- )
+ },
+ token_id=requester.access_token_id,
)
- old_room_version = yield self.store.get_room_version(old_room_id)
- yield self.auth.check_from_context(
+ old_room_version = await self.store.get_room_version_id(old_room_id)
+ await self.auth.check_from_context(
old_room_version, tombstone_event, tombstone_context
)
- yield self.clone_existing_room(
+ await self.clone_existing_room(
requester,
old_room_id=old_room_id,
new_room_id=new_room_id,
@@ -176,36 +196,44 @@ class RoomCreationHandler(BaseHandler):
)
# now send the tombstone
- yield self.event_creation_handler.send_nonmember_event(
+ await self.event_creation_handler.send_nonmember_event(
requester, tombstone_event, tombstone_context
)
- old_room_state = yield tombstone_context.get_current_state_ids(self.store)
+ old_room_state = await tombstone_context.get_current_state_ids()
# update any aliases
- yield self._move_aliases_to_new_room(
+ await self._move_aliases_to_new_room(
requester, old_room_id, new_room_id, old_room_state
)
- # and finally, shut down the PLs in the old room, and update them in the new
+ # Copy over user push rules, tags and migrate room directory state
+ await self.room_member_handler.transfer_room_state_on_room_upgrade(
+ old_room_id, new_room_id
+ )
+
+ # finally, shut down the PLs in the old room, and update them in the new
# room.
- yield self._update_upgraded_room_pls(
- requester, old_room_id, new_room_id, old_room_state
+ await self._update_upgraded_room_pls(
+ requester, old_room_id, new_room_id, old_room_state,
)
return new_room_id
- @defer.inlineCallbacks
- def _update_upgraded_room_pls(
- self, requester, old_room_id, new_room_id, old_room_state
+ async def _update_upgraded_room_pls(
+ self,
+ requester: Requester,
+ old_room_id: str,
+ new_room_id: str,
+ old_room_state: StateMap[str],
):
"""Send updated power levels in both rooms after an upgrade
Args:
- requester (synapse.types.Requester): the user requesting the upgrade
- old_room_id (unicode): the id of the room to be replaced
- new_room_id (unicode): the id of the replacement room
- old_room_state (dict[tuple[str, str], str]): the state map for the old room
+ requester: the user requesting the upgrade
+ old_room_id: the id of the room to be replaced
+ new_room_id: the id of the replacement room
+ old_room_state: the state map for the old room
Returns:
Deferred
@@ -219,7 +247,7 @@ class RoomCreationHandler(BaseHandler):
)
return
- old_room_pl_state = yield self.store.get_event(old_room_pl_event_id)
+ old_room_pl_state = await self.store.get_event(old_room_pl_event_id)
# we try to stop regular users from speaking by setting the PL required
# to send regular events and invites to 'Moderator' level. That's normally
@@ -234,7 +262,7 @@ class RoomCreationHandler(BaseHandler):
for v in ("invite", "events_default"):
current = int(pl_content.get(v, 0))
if current < restricted_level:
- logger.info(
+ logger.debug(
"Setting level for %s in %s to %i (was %i)",
v,
old_room_id,
@@ -244,11 +272,11 @@ class RoomCreationHandler(BaseHandler):
pl_content[v] = restricted_level
updated = True
else:
- logger.info("Not setting level for %s (already %i)", v, current)
+ logger.debug("Not setting level for %s (already %i)", v, current)
if updated:
try:
- yield self.event_creation_handler.create_and_send_nonmember_event(
+ await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.PowerLevels,
@@ -262,8 +290,7 @@ class RoomCreationHandler(BaseHandler):
except AuthError as e:
logger.warning("Unable to update PLs in old room: %s", e)
- logger.info("Setting correct PLs in new room")
- yield self.event_creation_handler.create_and_send_nonmember_event(
+ await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.PowerLevels,
@@ -275,22 +302,25 @@ class RoomCreationHandler(BaseHandler):
ratelimit=False,
)
- @defer.inlineCallbacks
- def clone_existing_room(
- self, requester, old_room_id, new_room_id, new_room_version, tombstone_event_id
+ async def clone_existing_room(
+ self,
+ requester: Requester,
+ old_room_id: str,
+ new_room_id: str,
+ new_room_version: RoomVersion,
+ tombstone_event_id: str,
):
"""Populate a new room based on an old room
Args:
- requester (synapse.types.Requester): the user requesting the upgrade
- old_room_id (unicode): the id of the room to be replaced
- new_room_id (unicode): the id to give the new room (should already have been
+ requester: the user requesting the upgrade
+ old_room_id : the id of the room to be replaced
+ new_room_id: the id to give the new room (should already have been
created with _gemerate_room_id())
- new_room_version (unicode): the new room version to use
- tombstone_event_id (unicode|str): the ID of the tombstone event in the old
- room.
+ new_room_version: the new room version to use
+ tombstone_event_id: the ID of the tombstone event in the old room.
Returns:
- Deferred[None]
+ Deferred
"""
user_id = requester.user.to_string()
@@ -298,21 +328,21 @@ class RoomCreationHandler(BaseHandler):
raise SynapseError(403, "You are not permitted to create rooms")
creation_content = {
- "room_version": new_room_version,
+ "room_version": new_room_version.identifier,
"predecessor": {"room_id": old_room_id, "event_id": tombstone_event_id},
}
# Check if old room was non-federatable
# Get old room's create event
- old_room_create_event = yield self.store.get_create_event_for_room(old_room_id)
+ old_room_create_event = await self.store.get_create_event_for_room(old_room_id)
# Check if the create event specified a non-federatable room
if not old_room_create_event.content.get("m.federate", True):
# If so, mark the new room as non-federatable as well
creation_content["m.federate"] = False
- initial_state = dict()
+ initial_state = {}
# Replicate relevant room events
types_to_copy = (
@@ -322,23 +352,52 @@ class RoomCreationHandler(BaseHandler):
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.GuestAccess, ""),
(EventTypes.RoomAvatar, ""),
- (EventTypes.Encryption, ""),
+ (EventTypes.RoomEncryption, ""),
(EventTypes.ServerACL, ""),
(EventTypes.RelatedGroups, ""),
+ (EventTypes.PowerLevels, ""),
)
- old_room_state_ids = yield self.store.get_filtered_current_state_ids(
+ old_room_state_ids = await self.store.get_filtered_current_state_ids(
old_room_id, StateFilter.from_types(types_to_copy)
)
# map from event_id to BaseEvent
- old_room_state_events = yield self.store.get_events(old_room_state_ids.values())
+ old_room_state_events = await self.store.get_events(old_room_state_ids.values())
for k, old_event_id in iteritems(old_room_state_ids):
old_event = old_room_state_events.get(old_event_id)
if old_event:
initial_state[k] = old_event.content
- yield self._send_events_for_new_room(
+ # deep-copy the power-levels event before we start modifying it
+ # note that if frozen_dicts are enabled, `power_levels` will be a frozen
+ # dict so we can't just copy.deepcopy it.
+ initial_state[
+ (EventTypes.PowerLevels, "")
+ ] = power_levels = copy_power_levels_contents(
+ initial_state[(EventTypes.PowerLevels, "")]
+ )
+
+ # Resolve the minimum power level required to send any state event
+ # We will give the upgrading user this power level temporarily (if necessary) such that
+ # they are able to copy all of the state events over, then revert them back to their
+ # original power level afterwards in _update_upgraded_room_pls
+
+ # Copy over user power levels now as this will not be possible with >100PL users once
+ # the room has been created
+
+ # Calculate the minimum power level needed to clone the room
+ event_power_levels = power_levels.get("events", {})
+ state_default = power_levels.get("state_default", 0)
+ ban = power_levels.get("ban")
+ needed_power_level = max(state_default, ban, max(event_power_levels.values()))
+
+ # Raise the requester's power level in the new room if necessary
+ current_power_level = power_levels["users"][user_id]
+ if current_power_level < needed_power_level:
+ power_levels["users"][user_id] = needed_power_level
+
+ await self._send_events_for_new_room(
requester,
new_room_id,
# we expect to override all the presets with initial_state, so this is
@@ -350,12 +409,12 @@ class RoomCreationHandler(BaseHandler):
)
# Transfer membership events
- old_room_member_state_ids = yield self.store.get_filtered_current_state_ids(
+ old_room_member_state_ids = await self.store.get_filtered_current_state_ids(
old_room_id, StateFilter.from_types([(EventTypes.Member, None)])
)
# map from event_id to BaseEvent
- old_room_member_state_events = yield self.store.get_events(
+ old_room_member_state_events = await self.store.get_events(
old_room_member_state_ids.values()
)
for k, old_event in iteritems(old_room_member_state_events):
@@ -364,7 +423,7 @@ class RoomCreationHandler(BaseHandler):
"membership" in old_event.content
and old_event.content["membership"] == "ban"
):
- yield self.room_member_handler.update_membership(
+ await self.room_member_handler.update_membership(
requester,
UserID.from_string(old_event["state_key"]),
new_room_id,
@@ -376,102 +435,93 @@ class RoomCreationHandler(BaseHandler):
# XXX invites/joins
# XXX 3pid invites
- @defer.inlineCallbacks
- def _move_aliases_to_new_room(
- self, requester, old_room_id, new_room_id, old_room_state
+ async def _move_aliases_to_new_room(
+ self,
+ requester: Requester,
+ old_room_id: str,
+ new_room_id: str,
+ old_room_state: StateMap[str],
):
- directory_handler = self.hs.get_handlers().directory_handler
-
- aliases = yield self.store.get_aliases_for_room(old_room_id)
-
# check to see if we have a canonical alias.
- canonical_alias = None
+ canonical_alias_event = None
canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
if canonical_alias_event_id:
- canonical_alias_event = yield self.store.get_event(canonical_alias_event_id)
- if canonical_alias_event:
- canonical_alias = canonical_alias_event.content.get("alias", "")
+ canonical_alias_event = await self.store.get_event(canonical_alias_event_id)
- # first we try to remove the aliases from the old room (we suppress sending
- # the room_aliases event until the end).
- #
- # Note that we'll only be able to remove aliases that (a) aren't owned by an AS,
- # and (b) unless the user is a server admin, which the user created.
- #
- # This is probably correct - given we don't allow such aliases to be deleted
- # normally, it would be odd to allow it in the case of doing a room upgrade -
- # but it makes the upgrade less effective, and you have to wonder why a room
- # admin can't remove aliases that point to that room anyway.
- # (cf https://github.com/matrix-org/synapse/issues/2360)
- #
- removed_aliases = []
- for alias_str in aliases:
- alias = RoomAlias.from_string(alias_str)
- try:
- yield directory_handler.delete_association(
- requester, alias, send_event=False
- )
- removed_aliases.append(alias_str)
- except SynapseError as e:
- logger.warning("Unable to remove alias %s from old room: %s", alias, e)
+ await self.store.update_aliases_for_room(old_room_id, new_room_id)
- # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest
- # of this.
- if not removed_aliases:
+ if not canonical_alias_event:
return
+ # If there is a canonical alias we need to update the one in the old
+ # room and set one in the new one.
+ old_canonical_alias_content = dict(canonical_alias_event.content)
+ new_canonical_alias_content = {}
+
+ canonical = canonical_alias_event.content.get("alias")
+ if canonical and self.hs.is_mine_id(canonical):
+ new_canonical_alias_content["alias"] = canonical
+ old_canonical_alias_content.pop("alias", None)
+
+ # We convert to a list as it will be a Tuple.
+ old_alt_aliases = list(old_canonical_alias_content.get("alt_aliases", []))
+ if old_alt_aliases:
+ old_canonical_alias_content["alt_aliases"] = old_alt_aliases
+ new_alt_aliases = new_canonical_alias_content.setdefault("alt_aliases", [])
+ for alias in canonical_alias_event.content.get("alt_aliases", []):
+ try:
+ if self.hs.is_mine_id(alias):
+ new_alt_aliases.append(alias)
+ old_alt_aliases.remove(alias)
+ except Exception:
+ logger.info(
+ "Invalid alias %s in canonical alias event %s",
+ alias,
+ canonical_alias_event_id,
+ )
+
+ if not old_alt_aliases:
+ old_canonical_alias_content.pop("alt_aliases")
+
+ # If a canonical alias event existed for the old room, fire a canonical
+ # alias event for the new room with a copy of the information.
try:
- # this can fail if, for some reason, our user doesn't have perms to send
- # m.room.aliases events in the old room (note that we've already checked that
- # they have perms to send a tombstone event, so that's not terribly likely).
- #
- # If that happens, it's regrettable, but we should carry on: it's the same
- # as when you remove an alias from the directory normally - it just means that
- # the aliases event gets out of sync with the directory
- # (cf https://github.com/vector-im/riot-web/issues/2369)
- yield directory_handler.send_room_alias_update_event(requester, old_room_id)
- except AuthError as e:
- logger.warning("Failed to send updated alias event on old room: %s", e)
-
- # we can now add any aliases we successfully removed to the new room.
- for alias in removed_aliases:
- try:
- yield directory_handler.create_association(
- requester,
- RoomAlias.from_string(alias),
- new_room_id,
- servers=(self.hs.hostname,),
- send_event=False,
- check_membership=False,
- )
- logger.info("Moved alias %s to new room", alias)
- except SynapseError as e:
- # I'm not really expecting this to happen, but it could if the spam
- # checking module decides it shouldn't, or similar.
- logger.error("Error adding alias %s to new room: %s", alias, e)
+ await self.event_creation_handler.create_and_send_nonmember_event(
+ requester,
+ {
+ "type": EventTypes.CanonicalAlias,
+ "state_key": "",
+ "room_id": old_room_id,
+ "sender": requester.user.to_string(),
+ "content": old_canonical_alias_content,
+ },
+ ratelimit=False,
+ )
+ except SynapseError as e:
+ # again I'm not really expecting this to fail, but if it does, I'd rather
+ # we returned the new room to the client at this point.
+ logger.error("Unable to send updated alias events in old room: %s", e)
try:
- if canonical_alias and (canonical_alias in removed_aliases):
- yield self.event_creation_handler.create_and_send_nonmember_event(
- requester,
- {
- "type": EventTypes.CanonicalAlias,
- "state_key": "",
- "room_id": new_room_id,
- "sender": requester.user.to_string(),
- "content": {"alias": canonical_alias},
- },
- ratelimit=False,
- )
-
- yield directory_handler.send_room_alias_update_event(requester, new_room_id)
+ await self.event_creation_handler.create_and_send_nonmember_event(
+ requester,
+ {
+ "type": EventTypes.CanonicalAlias,
+ "state_key": "",
+ "room_id": new_room_id,
+ "sender": requester.user.to_string(),
+ "content": new_canonical_alias_content,
+ },
+ ratelimit=False,
+ )
except SynapseError as e:
# again I'm not really expecting this to fail, but if it does, I'd rather
# we returned the new room to the client at this point.
logger.error("Unable to send updated alias events in new room: %s", e)
- @defer.inlineCallbacks
- def create_room(self, requester, config, ratelimit=True, creator_join_profile=None):
+ async def create_room(
+ self, requester, config, ratelimit=True, creator_join_profile=None
+ ) -> Tuple[dict, int]:
""" Creates a new room.
Args:
@@ -488,9 +538,9 @@ class RoomCreationHandler(BaseHandler):
`avatar_url` and/or `displayname`.
Returns:
- Deferred[dict]:
- a dict containing the keys `room_id` and, if an alias was
- requested, `room_alias`.
+ First, a dict containing the keys `room_id` and, if an alias
+ was, requested, `room_alias`. Secondly, the stream_id of the
+ last persisted event.
Raises:
SynapseError if the room ID couldn't be stored, or something went
horribly wrong.
@@ -499,7 +549,7 @@ class RoomCreationHandler(BaseHandler):
"""
user_id = requester.user.to_string()
- yield self.auth.check_auth_blocking(user_id)
+ await self.auth.check_auth_blocking(user_id)
if (
self._server_notices_mxid is not None
@@ -508,13 +558,17 @@ class RoomCreationHandler(BaseHandler):
# allow the server notices mxid to create rooms
is_requester_admin = True
else:
- is_requester_admin = yield self.auth.is_server_admin(requester.user)
+ is_requester_admin = await self.auth.is_server_admin(requester.user)
# Check whether the third party rules allows/changes the room create
# request.
- yield self.third_party_event_rules.on_create_room(
+ event_allowed = await self.third_party_event_rules.on_create_room(
requester, config, is_requester_admin=is_requester_admin
)
+ if not event_allowed:
+ raise SynapseError(
+ 403, "You are not permitted to create rooms", Codes.FORBIDDEN
+ )
if not is_requester_admin and not self.spam_checker.user_may_create_room(
user_id
@@ -522,16 +576,17 @@ class RoomCreationHandler(BaseHandler):
raise SynapseError(403, "You are not permitted to create rooms")
if ratelimit:
- yield self.ratelimit(requester)
+ await self.ratelimit(requester)
- room_version = config.get(
+ room_version_id = config.get(
"room_version", self.config.default_room_version.identifier
)
- if not isinstance(room_version, string_types):
+ if not isinstance(room_version_id, string_types):
raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
- if room_version not in KNOWN_ROOM_VERSIONS:
+ room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+ if room_version is None:
raise SynapseError(
400,
"Your homeserver does not support this room version",
@@ -544,7 +599,7 @@ class RoomCreationHandler(BaseHandler):
raise SynapseError(400, "Invalid characters in room alias")
room_alias = RoomAlias(config["room_alias_name"], self.hs.hostname)
- mapping = yield self.store.get_association_from_room_alias(room_alias)
+ mapping = await self.store.get_association_from_room_alias(room_alias)
if mapping:
raise SynapseError(400, "Room alias already taken", Codes.ROOM_IN_USE)
@@ -554,11 +609,12 @@ class RoomCreationHandler(BaseHandler):
invite_list = config.get("invite", [])
for i in invite_list:
try:
- UserID.from_string(i)
+ uid = UserID.from_string(i)
+ parse_and_validate_server_name(uid.domain)
except Exception:
raise SynapseError(400, "Invalid user_id: %s" % (i,))
- yield self.event_creation_handler.assert_accepted_privacy_policy(requester)
+ await self.event_creation_handler.assert_accepted_privacy_policy(requester)
power_level_content_override = config.get("power_level_content_override")
if (
@@ -577,19 +633,27 @@ class RoomCreationHandler(BaseHandler):
visibility = config.get("visibility", None)
is_public = visibility == "public"
- room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
+ room_id = await self._generate_room_id(
+ creator_id=user_id, is_public=is_public, room_version=room_version,
+ )
directory_handler = self.hs.get_handlers().directory_handler
if room_alias:
- yield directory_handler.create_association(
+ await directory_handler.create_association(
requester=requester,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
- send_event=False,
check_membership=False,
)
+ if is_public:
+ if not self.config.is_publishing_room_allowed(user_id, room_id, room_alias):
+ # Lets just return a generic message, as there may be all sorts of
+ # reasons why we said no. TODO: Allow configurable error messages
+ # per alias creation rule?
+ raise SynapseError(403, "Not allowed to publish room")
+
preset_config = config.get(
"preset",
RoomCreationPreset.PRIVATE_CHAT
@@ -606,9 +670,9 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
# override any attempt to set room versions via the creation_content
- creation_content["room_version"] = room_version
+ creation_content["room_version"] = room_version.identifier
- yield self._send_events_for_new_room(
+ last_stream_id = await self._send_events_for_new_room(
requester,
room_id,
preset_config=preset_config,
@@ -622,7 +686,10 @@ class RoomCreationHandler(BaseHandler):
if "name" in config:
name = config["name"]
- yield self.event_creation_handler.create_and_send_nonmember_event(
+ (
+ _,
+ last_stream_id,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.Name,
@@ -636,7 +703,10 @@ class RoomCreationHandler(BaseHandler):
if "topic" in config:
topic = config["topic"]
- yield self.event_creation_handler.create_and_send_nonmember_event(
+ (
+ _,
+ last_stream_id,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.Topic,
@@ -654,7 +724,7 @@ class RoomCreationHandler(BaseHandler):
if is_direct:
content["is_direct"] = is_direct
- yield self.room_member_handler.update_membership(
+ _, last_stream_id = await self.room_member_handler.update_membership(
requester,
UserID.from_string(invitee),
room_id,
@@ -668,7 +738,7 @@ class RoomCreationHandler(BaseHandler):
id_access_token = invite_3pid.get("id_access_token") # optional
address = invite_3pid["address"]
medium = invite_3pid["medium"]
- yield self.hs.get_room_member_handler().do_3pid_invite(
+ last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite(
room_id,
requester.user,
medium,
@@ -683,12 +753,15 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
- yield directory_handler.send_room_alias_update_event(requester, room_id)
- return result
+ # Always wait for room creation to progate before returning
+ await self._replication.wait_for_stream_position(
+ self.hs.config.worker.writers.events, "events", last_stream_id
+ )
+
+ return result, last_stream_id
- @defer.inlineCallbacks
- def _send_events_for_new_room(
+ async def _send_events_for_new_room(
self,
creator, # A Requester object.
room_id,
@@ -697,9 +770,15 @@ class RoomCreationHandler(BaseHandler):
initial_state,
creation_content,
room_alias=None,
- power_level_content_override=None,
+ power_level_content_override=None, # Doesn't apply when initial state has power level state event content
creator_join_profile=None,
- ):
+ ) -> int:
+ """Sends the initial events into a new room.
+
+ Returns:
+ The stream_id of the last event persisted.
+ """
+
def create(etype, content, **kwargs):
e = {"type": etype, "content": content}
@@ -708,13 +787,16 @@ class RoomCreationHandler(BaseHandler):
return e
- @defer.inlineCallbacks
- def send(etype, content, **kwargs):
+ async def send(etype, content, **kwargs) -> int:
event = create(etype, content, **kwargs)
- logger.info("Sending %s in new room", etype)
- yield self.event_creation_handler.create_and_send_nonmember_event(
+ logger.debug("Sending %s in new room", etype)
+ (
+ _,
+ last_stream_id,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
creator, event, ratelimit=False
)
+ return last_stream_id
config = RoomCreationHandler.PRESETS_DICT[preset_config]
@@ -723,10 +805,10 @@ class RoomCreationHandler(BaseHandler):
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
creation_content.update({"creator": creator_id})
- yield send(etype=EventTypes.Create, content=creation_content)
+ await send(etype=EventTypes.Create, content=creation_content)
- logger.info("Sending %s in new room", EventTypes.Member)
- yield self.room_member_handler.update_membership(
+ logger.debug("Sending %s in new room", EventTypes.Member)
+ await self.room_member_handler.update_membership(
creator,
creator.user,
room_id,
@@ -739,7 +821,9 @@ class RoomCreationHandler(BaseHandler):
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
if pl_content is not None:
- yield send(etype=EventTypes.PowerLevels, content=pl_content)
+ last_sent_stream_id = await send(
+ etype=EventTypes.PowerLevels, content=pl_content
+ )
else:
power_level_content = {
"users": {creator_id: 100},
@@ -750,52 +834,65 @@ class RoomCreationHandler(BaseHandler):
EventTypes.RoomHistoryVisibility: 100,
EventTypes.CanonicalAlias: 50,
EventTypes.RoomAvatar: 50,
+ EventTypes.Tombstone: 100,
+ EventTypes.ServerACL: 100,
+ EventTypes.RoomEncryption: 100,
},
"events_default": 0,
"state_default": 50,
"ban": 50,
"kick": 50,
"redact": 50,
- "invite": 0,
+ "invite": 50,
}
if config["original_invitees_have_ops"]:
for invitee in invite_list:
power_level_content["users"][invitee] = 100
+ # Power levels overrides are defined per chat preset
+ power_level_content.update(config["power_level_content_override"])
+
if power_level_content_override:
power_level_content.update(power_level_content_override)
- yield send(etype=EventTypes.PowerLevels, content=power_level_content)
+ last_sent_stream_id = await send(
+ etype=EventTypes.PowerLevels, content=power_level_content
+ )
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
- yield send(
+ last_sent_stream_id = await send(
etype=EventTypes.CanonicalAlias,
content={"alias": room_alias.to_string()},
)
if (EventTypes.JoinRules, "") not in initial_state:
- yield send(
+ last_sent_stream_id = await send(
etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
)
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
- yield send(
+ last_sent_stream_id = await send(
etype=EventTypes.RoomHistoryVisibility,
content={"history_visibility": config["history_visibility"]},
)
if config["guest_can_join"]:
if (EventTypes.GuestAccess, "") not in initial_state:
- yield send(
+ last_sent_stream_id = await send(
etype=EventTypes.GuestAccess, content={"guest_access": "can_join"}
)
for (etype, state_key), content in initial_state.items():
- yield send(etype=etype, state_key=state_key, content=content)
+ last_sent_stream_id = await send(
+ etype=etype, state_key=state_key, content=content
+ )
- @defer.inlineCallbacks
- def _generate_room_id(self, creator_id, is_public):
+ return last_sent_stream_id
+
+ async def _generate_room_id(
+ self, creator_id: str, is_public: str, room_version: RoomVersion,
+ ):
# autogen room IDs and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
attempts = 0
@@ -805,10 +902,11 @@ class RoomCreationHandler(BaseHandler):
gen_room_id = RoomID(random_string, self.hs.hostname).to_string()
if isinstance(gen_room_id, bytes):
gen_room_id = gen_room_id.decode("utf-8")
- yield self.store.store_room(
+ await self.store.store_room(
room_id=gen_room_id,
room_creator_user_id=creator_id,
is_public=is_public,
+ room_version=room_version,
)
return gen_room_id
except StoreError:
@@ -820,9 +918,10 @@ class RoomContextHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
- @defer.inlineCallbacks
- def get_event_context(self, user, room_id, event_id, limit, event_filter):
+ async def get_event_context(self, user, room_id, event_id, limit, event_filter):
"""Retrieves events, pagination tokens and state around a given event
in a room.
@@ -841,31 +940,38 @@ class RoomContextHandler(object):
before_limit = math.floor(limit / 2.0)
after_limit = limit - before_limit
- users = yield self.store.get_users_in_room(room_id)
+ users = await self.store.get_users_in_room(room_id)
is_peeking = user.to_string() not in users
def filter_evts(events):
return filter_events_for_client(
- self.store, user.to_string(), events, is_peeking=is_peeking
+ self.storage, user.to_string(), events, is_peeking=is_peeking
)
- event = yield self.store.get_event(
+ event = await self.store.get_event(
event_id, get_prev_content=True, allow_none=True
)
if not event:
return None
- filtered = yield (filter_evts([event]))
+ filtered = await filter_evts([event])
if not filtered:
raise AuthError(403, "You don't have permission to access that event.")
- results = yield self.store.get_events_around(
+ results = await self.store.get_events_around(
room_id, event_id, before_limit, after_limit, event_filter
)
- results["events_before"] = yield filter_evts(results["events_before"])
- results["events_after"] = yield filter_evts(results["events_after"])
- results["event"] = event
+ if event_filter:
+ results["events_before"] = event_filter.filter(results["events_before"])
+ results["events_after"] = event_filter.filter(results["events_after"])
+
+ results["events_before"] = await filter_evts(results["events_before"])
+ results["events_after"] = await filter_evts(results["events_after"])
+ # filter_evts can return a pruned event in case the user is allowed to see that
+ # there's something there but not see the content, so use the event that's in
+ # `filtered` rather than the event we retrieved from the datastore.
+ results["event"] = filtered[0]
if results["events_after"]:
last_event_id = results["events_after"][-1].event_id
@@ -888,10 +994,15 @@ class RoomContextHandler(object):
# first? Shouldn't we be consistent with /sync?
# https://github.com/matrix-org/matrix-doc/issues/687
- state = yield self.store.get_state_for_events(
+ state = await self.state_store.get_state_for_events(
[last_event_id], state_filter=state_filter
)
- results["state"] = list(state[last_event_id].values())
+
+ state_events = list(state[last_event_id].values())
+ if event_filter:
+ state_events = event_filter.filter(state_events)
+
+ results["state"] = await filter_evts(state_events)
# We use a dummy token here as we only care about the room portion of
# the token, which we replace.
@@ -910,17 +1021,16 @@ class RoomEventSource(object):
def __init__(self, hs):
self.store = hs.get_datastore()
- @defer.inlineCallbacks
- def get_new_events(
+ async def get_new_events(
self, user, from_key, limit, room_ids, is_guest, explicit_room_id=None
):
# We just ignore the key for now.
- to_key = yield self.get_current_key()
+ to_key = await self.get_current_key()
from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
- logger.warn("Stream has topological part!!!! %r", from_key)
+ logger.warning("Stream has topological part!!!! %r", from_key)
from_key = "s%s" % (from_token.stream,)
app_service = self.store.get_app_service_by_user_id(user.to_string())
@@ -929,11 +1039,11 @@ class RoomEventSource(object):
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
else:
- room_events = yield self.store.get_membership_changes_for_user(
+ room_events = await self.store.get_membership_changes_for_user(
user.to_string(), from_key, to_key
)
- room_to_events = yield self.store.get_room_events_stream_for_rooms(
+ room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=room_ids,
from_key=from_key,
to_key=to_key,
@@ -961,15 +1071,3 @@ class RoomEventSource(object):
def get_current_key_for_room(self, room_id):
return self.store.get_room_events_max_id(room_id)
-
- @defer.inlineCallbacks
- def get_pagination_rows(self, user, config, key):
- events, next_key = yield self.store.paginate_room_events(
- room_id=key,
- from_key=config.from_key,
- to_key=config.to_key,
- direction=config.direction,
- limit=config.limit,
- )
-
- return (events, next_key)
|