summary refs log tree commit diff
path: root/synapse/handlers/room.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r--synapse/handlers/room.py187
1 files changed, 112 insertions, 75 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py

index 73f9eeb399..61db3ccc43 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -22,6 +22,7 @@ import logging import math import string from collections import OrderedDict +from typing import Tuple from six import iteritems, string_types @@ -88,6 +89,8 @@ class RoomCreationHandler(BaseHandler): self.room_member_handler = hs.get_room_member_handler() self.config = hs.config + self._replication = hs.get_replication_data_handler() + # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") @@ -439,73 +442,78 @@ class RoomCreationHandler(BaseHandler): new_room_id: str, old_room_state: StateMap[str], ): - directory_handler = self.hs.get_handlers().directory_handler - - aliases = await self.store.get_aliases_for_room(old_room_id) - # check to see if we have a canonical alias. canonical_alias_event = None canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, "")) if canonical_alias_event_id: canonical_alias_event = await self.store.get_event(canonical_alias_event_id) - # first we try to remove the aliases from the old room (we suppress sending - # the room_aliases event until the end). - # - # Note that we'll only be able to remove aliases that (a) aren't owned by an AS, - # and (b) unless the user is a server admin, which the user created. - # - # This is probably correct - given we don't allow such aliases to be deleted - # normally, it would be odd to allow it in the case of doing a room upgrade - - # but it makes the upgrade less effective, and you have to wonder why a room - # admin can't remove aliases that point to that room anyway. - # (cf https://github.com/matrix-org/synapse/issues/2360) - # - removed_aliases = [] - for alias_str in aliases: - alias = RoomAlias.from_string(alias_str) - try: - await directory_handler.delete_association(requester, alias) - removed_aliases.append(alias_str) - except SynapseError as e: - logger.warning("Unable to remove alias %s from old room: %s", alias, e) - - # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest - # of this. - if not removed_aliases: + await self.store.update_aliases_for_room(old_room_id, new_room_id) + + if not canonical_alias_event: return - # we can now add any aliases we successfully removed to the new room. - for alias in removed_aliases: - try: - await directory_handler.create_association( - requester, - RoomAlias.from_string(alias), - new_room_id, - servers=(self.hs.hostname,), - check_membership=False, - ) - logger.info("Moved alias %s to new room", alias) - except SynapseError as e: - # I'm not really expecting this to happen, but it could if the spam - # checking module decides it shouldn't, or similar. - logger.error("Error adding alias %s to new room: %s", alias, e) + # If there is a canonical alias we need to update the one in the old + # room and set one in the new one. + old_canonical_alias_content = dict(canonical_alias_event.content) + new_canonical_alias_content = {} + + canonical = canonical_alias_event.content.get("alias") + if canonical and self.hs.is_mine_id(canonical): + new_canonical_alias_content["alias"] = canonical + old_canonical_alias_content.pop("alias", None) + + # We convert to a list as it will be a Tuple. + old_alt_aliases = list(old_canonical_alias_content.get("alt_aliases", [])) + if old_alt_aliases: + old_canonical_alias_content["alt_aliases"] = old_alt_aliases + new_alt_aliases = new_canonical_alias_content.setdefault("alt_aliases", []) + for alias in canonical_alias_event.content.get("alt_aliases", []): + try: + if self.hs.is_mine_id(alias): + new_alt_aliases.append(alias) + old_alt_aliases.remove(alias) + except Exception: + logger.info( + "Invalid alias %s in canonical alias event %s", + alias, + canonical_alias_event_id, + ) + + if not old_alt_aliases: + old_canonical_alias_content.pop("alt_aliases") # If a canonical alias event existed for the old room, fire a canonical # alias event for the new room with a copy of the information. try: - if canonical_alias_event: - await self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.CanonicalAlias, - "state_key": "", - "room_id": new_room_id, - "sender": requester.user.to_string(), - "content": canonical_alias_event.content, - }, - ratelimit=False, - ) + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": old_room_id, + "sender": requester.user.to_string(), + "content": old_canonical_alias_content, + }, + ratelimit=False, + ) + except SynapseError as e: + # again I'm not really expecting this to fail, but if it does, I'd rather + # we returned the new room to the client at this point. + logger.error("Unable to send updated alias events in old room: %s", e) + + try: + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": new_room_id, + "sender": requester.user.to_string(), + "content": new_canonical_alias_content, + }, + ratelimit=False, + ) except SynapseError as e: # again I'm not really expecting this to fail, but if it does, I'd rather # we returned the new room to the client at this point. @@ -513,7 +521,7 @@ class RoomCreationHandler(BaseHandler): async def create_room( self, requester, config, ratelimit=True, creator_join_profile=None - ): + ) -> Tuple[dict, int]: """ Creates a new room. Args: @@ -530,9 +538,9 @@ class RoomCreationHandler(BaseHandler): `avatar_url` and/or `displayname`. Returns: - Deferred[dict]: - a dict containing the keys `room_id` and, if an alias was - requested, `room_alias`. + First, a dict containing the keys `room_id` and, if an alias + was, requested, `room_alias`. Secondly, the stream_id of the + last persisted event. Raises: SynapseError if the room ID couldn't be stored, or something went horribly wrong. @@ -664,7 +672,7 @@ class RoomCreationHandler(BaseHandler): # override any attempt to set room versions via the creation_content creation_content["room_version"] = room_version.identifier - await self._send_events_for_new_room( + last_stream_id = await self._send_events_for_new_room( requester, room_id, preset_config=preset_config, @@ -678,7 +686,10 @@ class RoomCreationHandler(BaseHandler): if "name" in config: name = config["name"] - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Name, @@ -692,7 +703,10 @@ class RoomCreationHandler(BaseHandler): if "topic" in config: topic = config["topic"] - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Topic, @@ -710,7 +724,7 @@ class RoomCreationHandler(BaseHandler): if is_direct: content["is_direct"] = is_direct - await self.room_member_handler.update_membership( + _, last_stream_id = await self.room_member_handler.update_membership( requester, UserID.from_string(invitee), room_id, @@ -724,7 +738,7 @@ class RoomCreationHandler(BaseHandler): id_access_token = invite_3pid.get("id_access_token") # optional address = invite_3pid["address"] medium = invite_3pid["medium"] - await self.hs.get_room_member_handler().do_3pid_invite( + last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite( room_id, requester.user, medium, @@ -740,7 +754,12 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() - return result + # Always wait for room creation to progate before returning + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", last_stream_id + ) + + return result, last_stream_id async def _send_events_for_new_room( self, @@ -753,7 +772,13 @@ class RoomCreationHandler(BaseHandler): room_alias=None, power_level_content_override=None, # Doesn't apply when initial state has power level state event content creator_join_profile=None, - ): + ) -> int: + """Sends the initial events into a new room. + + Returns: + The stream_id of the last event persisted. + """ + def create(etype, content, **kwargs): e = {"type": etype, "content": content} @@ -762,12 +787,16 @@ class RoomCreationHandler(BaseHandler): return e - async def send(etype, content, **kwargs): + async def send(etype, content, **kwargs) -> int: event = create(etype, content, **kwargs) logger.debug("Sending %s in new room", etype) - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False ) + return last_stream_id config = RoomCreationHandler.PRESETS_DICT[preset_config] @@ -792,7 +821,9 @@ class RoomCreationHandler(BaseHandler): # of the first events that get sent into a room. pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) if pl_content is not None: - await send(etype=EventTypes.PowerLevels, content=pl_content) + last_sent_stream_id = await send( + etype=EventTypes.PowerLevels, content=pl_content + ) else: power_level_content = { "users": {creator_id: 100}, @@ -825,33 +856,39 @@ class RoomCreationHandler(BaseHandler): if power_level_content_override: power_level_content.update(power_level_content_override) - await send(etype=EventTypes.PowerLevels, content=power_level_content) + last_sent_stream_id = await send( + etype=EventTypes.PowerLevels, content=power_level_content + ) if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.CanonicalAlias, content={"alias": room_alias.to_string()}, ) if (EventTypes.JoinRules, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]} ) if (EventTypes.RoomHistoryVisibility, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.RoomHistoryVisibility, content={"history_visibility": config["history_visibility"]}, ) if config["guest_can_join"]: if (EventTypes.GuestAccess, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.GuestAccess, content={"guest_access": "can_join"} ) for (etype, state_key), content in initial_state.items(): - await send(etype=etype, state_key=state_key, content=content) + last_sent_stream_id = await send( + etype=etype, state_key=state_key, content=content + ) + + return last_sent_stream_id async def _generate_room_id( self, creator_id: str, is_public: str, room_version: RoomVersion,