diff options
Diffstat (limited to 'synapse/handlers/room_member.py')
-rw-r--r-- | synapse/handlers/room_member.py | 962 |
1 files changed, 369 insertions, 593 deletions
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 35450feb6f..0f7af982f0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -17,30 +17,26 @@ import abc import logging +from typing import Dict, Iterable, List, Optional, Tuple from six.moves import http_client -from signedjson.key import decode_verify_key_bytes -from signedjson.sign import verify_signed_json -from unpaddedbase64 import decode_base64 - -from twisted.internet import defer - from synapse import types from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError -from synapse.handlers.identity import LookupAlgorithm, create_id_access_token_header -from synapse.types import RoomID, UserID +from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.events import EventBase +from synapse.events.snapshot import EventContext +from synapse.replication.http.membership import ( + ReplicationLocallyRejectInviteRestServlet, +) +from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room -from synapse.util.hash import sha256_and_url_safe_base64 from ._base import BaseHandler logger = logging.getLogger(__name__) -id_server_scheme = "https://" - class RoomMemberHandler(object): # TODO(paul): This handler currently contains a messy conflation of @@ -51,20 +47,15 @@ class RoomMemberHandler(object): __metaclass__ = abc.ABCMeta def __init__(self, hs): - """ - - Args: - hs (synapse.server.HomeServer): - """ self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() self.state_handler = hs.get_state_handler() self.config = hs.config - self.simple_http_client = hs.get_simple_http_client() self.federation_handler = hs.get_handlers().federation_handler self.directory_handler = hs.get_handlers().directory_handler + self.identity_handler = hs.get_handlers().identity_handler self.registration_handler = hs.get_registration_handler() self.profile_handler = hs.get_profile_handler() self.event_creation_handler = hs.get_event_creation_handler() @@ -78,88 +69,117 @@ class RoomMemberHandler(object): self._enable_lookup = hs.config.enable_3pid_lookup self.allow_per_room_profiles = self.config.allow_per_room_profiles + self._event_stream_writer_instance = hs.config.worker.writers.events + self._is_on_event_persistence_instance = ( + self._event_stream_writer_instance == hs.get_instance_name() + ) + if self._is_on_event_persistence_instance: + self.persist_event_storage = hs.get_storage().persistence + else: + self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client( + hs + ) + # This is only used to get at ratelimit function, and # maybe_kick_guest_users. It's fine there are multiple of these as # it doesn't store state. self.base_handler = BaseHandler(hs) @abc.abstractmethod - def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + async def _remote_join( + self, + requester: Requester, + remote_room_hosts: List[str], + room_id: str, + user: UserID, + content: dict, + ) -> Tuple[str, int]: """Try and join a room that this server is not in Args: - requester (Requester) - remote_room_hosts (list[str]): List of servers that can be used - to join via. - room_id (str): Room that we are trying to join - user (UserID): User who is trying to join - content (dict): A dict that should be used as the content of the - join event. - - Returns: - Deferred + requester + remote_room_hosts: List of servers that can be used to join via. + room_id: Room that we are trying to join + user: User who is trying to join + content: A dict that should be used as the content of the join event. """ raise NotImplementedError() @abc.abstractmethod - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + async def _remote_reject_invite( + self, + requester: Requester, + remote_room_hosts: List[str], + room_id: str, + target: UserID, + content: dict, + ) -> Tuple[Optional[str], int]: """Attempt to reject an invite for a room this server is not in. If we fail to do so we locally mark the invite as rejected. Args: - requester (Requester) - remote_room_hosts (list[str]): List of servers to use to try and - reject invite - room_id (str) - target (UserID): The user rejecting the invite + requester + remote_room_hosts: List of servers to use to try and reject invite + room_id + target: The user rejecting the invite + content: The content for the rejection event Returns: - Deferred[dict]: A dictionary to be returned to the client, may + A dictionary to be returned to the client, may include event_id etc, or nothing if we locally rejected """ raise NotImplementedError() + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + if self._is_on_event_persistence_instance: + return await self.persist_event_storage.locally_reject_invite( + user_id, room_id + ) + else: + result = await self._locally_reject_client( + instance_name=self._event_stream_writer_instance, + user_id=user_id, + room_id=room_id, + ) + return result["stream_id"] + @abc.abstractmethod - def _user_joined_room(self, target, room_id): + async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has joined the room. Args: - target (UserID) - room_id (str) - - Returns: - Deferred|None + target + room_id """ raise NotImplementedError() @abc.abstractmethod - def _user_left_room(self, target, room_id): + async def _user_left_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has left the room. Args: - target (UserID) - room_id (str) - - Returns: - Deferred|None + target + room_id """ raise NotImplementedError() - @defer.inlineCallbacks - def _local_membership_update( + async def _local_membership_update( self, - requester, - target, - room_id, - membership, - prev_events_and_hashes, - txn_id=None, - ratelimit=True, - content=None, - require_consent=True, - ): + requester: Requester, + target: UserID, + room_id: str, + membership: str, + prev_event_ids: Collection[str], + txn_id: Optional[str] = None, + ratelimit: bool = True, + content: Optional[dict] = None, + require_consent: bool = True, + ) -> Tuple[str, int]: user_id = target.to_string() if content is None: @@ -169,7 +189,7 @@ class RoomMemberHandler(object): if requester.is_guest: content["kind"] = "guest" - event, context = yield self.event_creation_handler.create_event( + event, context = await self.event_creation_handler.create_event( requester, { "type": EventTypes.Member, @@ -182,23 +202,24 @@ class RoomMemberHandler(object): }, token_id=requester.access_token_id, txn_id=txn_id, - prev_events_and_hashes=prev_events_and_hashes, + prev_event_ids=prev_event_ids, require_consent=require_consent, ) # Check if this event matches the previous membership event for the user. - duplicate = yield self.event_creation_handler.deduplicate_state_event( + duplicate = await self.event_creation_handler.deduplicate_state_event( event, context ) if duplicate is not None: # Discard the new event since this membership change is a no-op. - return duplicate + _, stream_id = await self.store.get_event_ordering(duplicate.event_id) + return duplicate.event_id, stream_id - yield self.event_creation_handler.handle_new_client_event( + stream_id = await self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[target], ratelimit=ratelimit ) - prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = await context.get_prev_state_ids() prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) @@ -208,48 +229,30 @@ class RoomMemberHandler(object): # info. newly_joined = True if prev_member_event_id: - prev_member_event = yield self.store.get_event(prev_member_event_id) + prev_member_event = await self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield self._user_joined_room(target, room_id) - - # Copy over direct message status and room tags if this is a join - # on an upgraded room - - # Check if this is an upgraded room - predecessor = yield self.store.get_room_predecessor(room_id) - - if predecessor: - # It is an upgraded room. Copy over old tags - self.copy_room_tags_and_direct_to_room( - predecessor["room_id"], room_id, user_id - ) - # Move over old push rules - self.store.move_push_rules_from_room_to_room_for_user( - predecessor["room_id"], room_id, user_id - ) + await self._user_joined_room(target, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: - prev_member_event = yield self.store.get_event(prev_member_event_id) + prev_member_event = await self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - yield self._user_left_room(target, room_id) + await self._user_left_room(target, room_id) - return event + return event.event_id, stream_id - @defer.inlineCallbacks - def copy_room_tags_and_direct_to_room(self, old_room_id, new_room_id, user_id): + async def copy_room_tags_and_direct_to_room( + self, old_room_id, new_room_id, user_id + ) -> None: """Copies the tags and direct room state from one room to another. Args: - old_room_id (str) - new_room_id (str) - user_id (str) - - Returns: - Deferred[None] + old_room_id: The room ID of the old room. + new_room_id: The room ID of the new room. + user_id: The user's ID. """ # Retrieve user account data for predecessor room - user_account_data, _ = yield self.store.get_account_data_for_user(user_id) + user_account_data, _ = await self.store.get_account_data_for_user(user_id) # Copy direct message state if applicable direct_rooms = user_account_data.get("m.direct", {}) @@ -262,36 +265,35 @@ class RoomMemberHandler(object): direct_rooms[key].append(new_room_id) # Save back to user's m.direct account data - yield self.store.add_account_data_for_user( + await self.store.add_account_data_for_user( user_id, "m.direct", direct_rooms ) break # Copy room tags if applicable - room_tags = yield self.store.get_tags_for_room(user_id, old_room_id) + room_tags = await self.store.get_tags_for_room(user_id, old_room_id) # Copy each room tag to the new room for tag, tag_content in room_tags.items(): - yield self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content) + await self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content) - @defer.inlineCallbacks - def update_membership( + async def update_membership( self, - requester, - target, - room_id, - action, - txn_id=None, - remote_room_hosts=None, - third_party_signed=None, - ratelimit=True, - content=None, - require_consent=True, - ): + requester: Requester, + target: UserID, + room_id: str, + action: str, + txn_id: Optional[str] = None, + remote_room_hosts: Optional[List[str]] = None, + third_party_signed: Optional[dict] = None, + ratelimit: bool = True, + content: Optional[dict] = None, + require_consent: bool = True, + ) -> Tuple[Optional[str], int]: key = (room_id,) - with (yield self.member_linearizer.queue(key)): - result = yield self._update_membership( + with (await self.member_linearizer.queue(key)): + result = await self._update_membership( requester, target, room_id, @@ -306,20 +308,19 @@ class RoomMemberHandler(object): return result - @defer.inlineCallbacks - def _update_membership( + async def _update_membership( self, - requester, - target, - room_id, - action, - txn_id=None, - remote_room_hosts=None, - third_party_signed=None, - ratelimit=True, - content=None, - require_consent=True, - ): + requester: Requester, + target: UserID, + room_id: str, + action: str, + txn_id: Optional[str] = None, + remote_room_hosts: Optional[List[str]] = None, + third_party_signed: Optional[dict] = None, + ratelimit: bool = True, + content: Optional[dict] = None, + require_consent: bool = True, + ) -> Tuple[Optional[str], int]: content_specified = bool(content) if content is None: content = {} @@ -342,7 +343,7 @@ class RoomMemberHandler(object): # if this is a join with a 3pid signature, we may need to turn a 3pid # invite into a normal invite before we can handle the join. if third_party_signed is not None: - yield self.federation_handler.exchange_third_party_invite( + await self.federation_handler.exchange_third_party_invite( third_party_signed["sender"], target.to_string(), room_id, @@ -353,7 +354,7 @@ class RoomMemberHandler(object): remote_room_hosts = [] if effective_membership_state not in ("leave", "ban"): - is_blocked = yield self.store.is_room_blocked(room_id) + is_blocked = await self.store.is_room_blocked(room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") @@ -372,7 +373,7 @@ class RoomMemberHandler(object): is_requester_admin = True else: - is_requester_admin = yield self.auth.is_server_admin(requester.user) + is_requester_admin = await self.auth.is_server_admin(requester.user) if not is_requester_admin: if self.config.block_non_admin_invites: @@ -391,10 +392,9 @@ class RoomMemberHandler(object): if block_invite: raise SynapseError(403, "Invites have been disabled on this server") - prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id) - latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes) + latest_event_ids = await self.store.get_prev_events_for_room(room_id) - current_state_ids = yield self.state_handler.get_current_state_ids( + current_state_ids = await self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids ) @@ -403,7 +403,7 @@ class RoomMemberHandler(object): # transitions and generic otherwise old_state_id = current_state_ids.get((EventTypes.Member, target.to_string())) if old_state_id: - old_state = yield self.store.get_event(old_state_id, allow_none=True) + old_state = await self.store.get_event(old_state_id, allow_none=True) old_membership = old_state.content.get("membership") if old_state else None if action == "unban" and old_membership != "ban": raise SynapseError( @@ -424,7 +424,13 @@ class RoomMemberHandler(object): same_membership = old_membership == effective_membership_state same_sender = requester.user.to_string() == old_state.sender if same_sender and same_membership and same_content: - return old_state + _, stream_id = await self.store.get_event_ordering( + old_state.event_id + ) + return ( + old_state.event_id, + stream_id, + ) if old_membership in ["ban", "leave"] and action == "kick": raise AuthError(403, "The target user is not in the room") @@ -435,7 +441,7 @@ class RoomMemberHandler(object): old_membership == Membership.INVITE and effective_membership_state == Membership.LEAVE ): - is_blocked = yield self._is_server_notice_room(room_id) + is_blocked = await self._is_server_notice_room(room_id) if is_blocked: raise SynapseError( http_client.FORBIDDEN, @@ -446,18 +452,18 @@ class RoomMemberHandler(object): if action == "kick": raise AuthError(403, "The target user is not in the room") - is_host_in_room = yield self._is_host_in_room(current_state_ids) + is_host_in_room = await self._is_host_in_room(current_state_ids) if effective_membership_state == Membership.JOIN: if requester.is_guest: - guest_can_join = yield self._can_guest_join(current_state_ids) + guest_can_join = await self._can_guest_join(current_state_ids) if not guest_can_join: # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") if not is_host_in_room: - inviter = yield self._get_inviter(target.to_string(), room_id) + inviter = await self._get_inviter(target.to_string(), room_id) if inviter and not self.hs.is_mine(inviter): remote_room_hosts.append(inviter.domain) @@ -465,21 +471,22 @@ class RoomMemberHandler(object): profile = self.profile_handler if not content_specified: - content["displayname"] = yield profile.get_displayname(target) - content["avatar_url"] = yield profile.get_avatar_url(target) + content["displayname"] = await profile.get_displayname(target) + content["avatar_url"] = await profile.get_avatar_url(target) if requester.is_guest: content["kind"] = "guest" - ret = yield self._remote_join( + remote_join_response = await self._remote_join( requester, remote_room_hosts, room_id, target, content ) - return ret + + return remote_join_response elif effective_membership_state == Membership.LEAVE: if not is_host_in_room: # perhaps we've been invited - inviter = yield self._get_inviter(target.to_string(), room_id) + inviter = await self._get_inviter(target.to_string(), room_id) if not inviter: raise SynapseError(404, "Not a known room") @@ -493,36 +500,116 @@ class RoomMemberHandler(object): else: # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] - res = yield self._remote_reject_invite( - requester, remote_room_hosts, room_id, target + return await self._remote_reject_invite( + requester, remote_room_hosts, room_id, target, content, ) - return res - res = yield self._local_membership_update( + return await self._local_membership_update( requester=requester, target=target, room_id=room_id, membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, - prev_events_and_hashes=prev_events_and_hashes, + prev_event_ids=latest_event_ids, content=content, require_consent=require_consent, ) - return res - @defer.inlineCallbacks - def send_membership_event(self, requester, event, context, ratelimit=True): + async def transfer_room_state_on_room_upgrade( + self, old_room_id: str, room_id: str + ) -> None: + """Upon our server becoming aware of an upgraded room, either by upgrading a room + ourselves or joining one, we can transfer over information from the previous room. + + Copies user state (tags/push rules) for every local user that was in the old room, as + well as migrating the room directory state. + + Args: + old_room_id: The ID of the old room + room_id: The ID of the new room + """ + logger.info("Transferring room state from %s to %s", old_room_id, room_id) + + # Find all local users that were in the old room and copy over each user's state + users = await self.store.get_users_in_room(old_room_id) + await self.copy_user_state_on_room_upgrade(old_room_id, room_id, users) + + # Add new room to the room directory if the old room was there + # Remove old room from the room directory + old_room = await self.store.get_room(old_room_id) + if old_room and old_room["is_public"]: + await self.store.set_room_is_public(old_room_id, False) + await self.store.set_room_is_public(room_id, True) + + # Transfer alias mappings in the room directory + await self.store.update_aliases_for_room(old_room_id, room_id) + + # Check if any groups we own contain the predecessor room + local_group_ids = await self.store.get_local_groups_for_room(old_room_id) + for group_id in local_group_ids: + # Add new the new room to those groups + await self.store.add_room_to_group(group_id, room_id, old_room["is_public"]) + + # Remove the old room from those groups + await self.store.remove_room_from_group(group_id, old_room_id) + + async def copy_user_state_on_room_upgrade( + self, old_room_id: str, new_room_id: str, user_ids: Iterable[str] + ) -> None: + """Copy user-specific information when they join a new room when that new room is the + result of a room upgrade + + Args: + old_room_id: The ID of upgraded room + new_room_id: The ID of the new room + user_ids: User IDs to copy state for + """ + + logger.debug( + "Copying over room tags and push rules from %s to %s for users %s", + old_room_id, + new_room_id, + user_ids, + ) + + for user_id in user_ids: + try: + # It is an upgraded room. Copy over old tags + await self.copy_room_tags_and_direct_to_room( + old_room_id, new_room_id, user_id + ) + # Copy over push rules + await self.store.copy_push_rules_from_room_to_room_for_user( + old_room_id, new_room_id, user_id + ) + except Exception: + logger.exception( + "Error copying tags and/or push rules from rooms %s to %s for user %s. " + "Skipping...", + old_room_id, + new_room_id, + user_id, + ) + continue + + async def send_membership_event( + self, + requester: Requester, + event: EventBase, + context: EventContext, + ratelimit: bool = True, + ): """ Change the membership status of a user in a room. Args: - requester (Requester): The local user who requested the membership + requester: The local user who requested the membership event. If None, certain checks, like whether this homeserver can act as the sender, will be skipped. - event (SynapseEvent): The membership event. + event: The membership event. context: The context of the event. - ratelimit (bool): Whether to rate limit this request. + ratelimit: Whether to rate limit this request. Raises: SynapseError if there was a problem changing the membership. """ @@ -538,27 +625,27 @@ class RoomMemberHandler(object): else: requester = types.create_requester(target_user) - prev_event = yield self.event_creation_handler.deduplicate_state_event( + prev_event = await self.event_creation_handler.deduplicate_state_event( event, context ) if prev_event is not None: return - prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = await context.get_prev_state_ids() if event.membership == Membership.JOIN: if requester.is_guest: - guest_can_join = yield self._can_guest_join(prev_state_ids) + guest_can_join = await self._can_guest_join(prev_state_ids) if not guest_can_join: # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") if event.membership not in (Membership.LEAVE, Membership.BAN): - is_blocked = yield self.store.is_room_blocked(room_id) + is_blocked = await self.store.is_room_blocked(room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - yield self.event_creation_handler.handle_new_client_event( + await self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[target_user], ratelimit=ratelimit ) @@ -572,18 +659,19 @@ class RoomMemberHandler(object): # info. newly_joined = True if prev_member_event_id: - prev_member_event = yield self.store.get_event(prev_member_event_id) + prev_member_event = await self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield self._user_joined_room(target_user, room_id) + await self._user_joined_room(target_user, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: - prev_member_event = yield self.store.get_event(prev_member_event_id) + prev_member_event = await self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - yield self._user_left_room(target_user, room_id) + await self._user_left_room(target_user, room_id) - @defer.inlineCallbacks - def _can_guest_join(self, current_state_ids): + async def _can_guest_join( + self, current_state_ids: Dict[Tuple[str, str], str] + ) -> bool: """ Returns whether a guest can join a room based on its current state. """ @@ -591,7 +679,7 @@ class RoomMemberHandler(object): if not guest_access_id: return False - guest_access = yield self.store.get_event(guest_access_id) + guest_access = await self.store.get_event(guest_access_id) return ( guest_access @@ -600,13 +688,14 @@ class RoomMemberHandler(object): and guest_access.content["guest_access"] == "can_join" ) - @defer.inlineCallbacks - def lookup_room_alias(self, room_alias): + async def lookup_room_alias( + self, room_alias: RoomAlias + ) -> Tuple[RoomID, List[str]]: """ Get the room ID associated with a room alias. Args: - room_alias (RoomAlias): The alias to look up. + room_alias: The alias to look up. Returns: A tuple of: The room ID as a RoomID object. @@ -615,7 +704,7 @@ class RoomMemberHandler(object): SynapseError if room alias could not be found. """ directory_handler = self.directory_handler - mapping = yield directory_handler.get_association(room_alias) + mapping = await directory_handler.get_association(room_alias) if not mapping: raise SynapseError(404, "No such room alias") @@ -630,28 +719,27 @@ class RoomMemberHandler(object): return RoomID.from_string(room_id), servers - @defer.inlineCallbacks - def _get_inviter(self, user_id, room_id): - invite = yield self.store.get_invite_for_user_in_room( + async def _get_inviter(self, user_id: str, room_id: str) -> Optional[UserID]: + invite = await self.store.get_invite_for_local_user_in_room( user_id=user_id, room_id=room_id ) if invite: return UserID.from_string(invite.sender) + return None - @defer.inlineCallbacks - def do_3pid_invite( + async def do_3pid_invite( self, - room_id, - inviter, - medium, - address, - id_server, - requester, - txn_id, - id_access_token=None, - ): + room_id: str, + inviter: UserID, + medium: str, + address: str, + id_server: str, + requester: Requester, + txn_id: Optional[str], + id_access_token: Optional[str] = None, + ) -> int: if self.config.block_non_admin_invites: - is_requester_admin = yield self.auth.is_server_admin(requester.user) + is_requester_admin = await self.auth.is_server_admin(requester.user) if not is_requester_admin: raise SynapseError( 403, "Invites have been disabled on this server", Codes.FORBIDDEN @@ -659,9 +747,9 @@ class RoomMemberHandler(object): # We need to rate limit *before* we send out any 3PID invites, so we # can't just rely on the standard ratelimiting of events. - yield self.base_handler.ratelimit(requester) + await self.base_handler.ratelimit(requester) - can_invite = yield self.third_party_event_rules.check_threepid_can_be_invited( + can_invite = await self.third_party_event_rules.check_threepid_can_be_invited( medium, address, room_id ) if not can_invite: @@ -676,14 +764,16 @@ class RoomMemberHandler(object): 403, "Looking up third-party identifiers is denied from this server" ) - invitee = yield self._lookup_3pid(id_server, medium, address, id_access_token) + invitee = await self.identity_handler.lookup_3pid( + id_server, medium, address, id_access_token + ) if invitee: - yield self.update_membership( + _, stream_id = await self.update_membership( requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id ) else: - yield self._make_and_store_3pid_invite( + stream_id = await self._make_and_store_3pid_invite( requester, id_server, medium, @@ -694,215 +784,20 @@ class RoomMemberHandler(object): id_access_token=id_access_token, ) - @defer.inlineCallbacks - def _lookup_3pid(self, id_server, medium, address, id_access_token=None): - """Looks up a 3pid in the passed identity server. - - Args: - id_server (str): The server name (including port, if required) - of the identity server to use. - medium (str): The type of the third party identifier (e.g. "email"). - address (str): The third party identifier (e.g. "foo@example.com"). - id_access_token (str|None): The access token to authenticate to the identity - server with - - Returns: - str|None: the matrix ID of the 3pid, or None if it is not recognized. - """ - if id_access_token is not None: - try: - results = yield self._lookup_3pid_v2( - id_server, id_access_token, medium, address - ) - return results - - except Exception as e: - # Catch HttpResponseExcept for a non-200 response code - # Check if this identity server does not know about v2 lookups - if isinstance(e, HttpResponseException) and e.code == 404: - # This is an old identity server that does not yet support v2 lookups - logger.warning( - "Attempted v2 lookup on v1 identity server %s. Falling " - "back to v1", - id_server, - ) - else: - logger.warning("Error when looking up hashing details: %s", e) - return None - - return (yield self._lookup_3pid_v1(id_server, medium, address)) - - @defer.inlineCallbacks - def _lookup_3pid_v1(self, id_server, medium, address): - """Looks up a 3pid in the passed identity server using v1 lookup. - - Args: - id_server (str): The server name (including port, if required) - of the identity server to use. - medium (str): The type of the third party identifier (e.g. "email"). - address (str): The third party identifier (e.g. "foo@example.com"). - - Returns: - str: the matrix ID of the 3pid, or None if it is not recognized. - """ - try: - data = yield self.simple_http_client.get_json( - "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server), - {"medium": medium, "address": address}, - ) - - if "mxid" in data: - if "signatures" not in data: - raise AuthError(401, "No signatures on 3pid binding") - yield self._verify_any_signature(data, id_server) - return data["mxid"] - - except IOError as e: - logger.warning("Error from v1 identity server lookup: %s" % (e,)) - - return None - - @defer.inlineCallbacks - def _lookup_3pid_v2(self, id_server, id_access_token, medium, address): - """Looks up a 3pid in the passed identity server using v2 lookup. - - Args: - id_server (str): The server name (including port, if required) - of the identity server to use. - id_access_token (str): The access token to authenticate to the identity server with - medium (str): The type of the third party identifier (e.g. "email"). - address (str): The third party identifier (e.g. "foo@example.com"). - - Returns: - Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised. - """ - # Check what hashing details are supported by this identity server - hash_details = yield self.simple_http_client.get_json( - "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server), - {"access_token": id_access_token}, - ) - - if not isinstance(hash_details, dict): - logger.warning( - "Got non-dict object when checking hash details of %s%s: %s", - id_server_scheme, - id_server, - hash_details, - ) - raise SynapseError( - 400, - "Non-dict object from %s%s during v2 hash_details request: %s" - % (id_server_scheme, id_server, hash_details), - ) - - # Extract information from hash_details - supported_lookup_algorithms = hash_details.get("algorithms") - lookup_pepper = hash_details.get("lookup_pepper") - if ( - not supported_lookup_algorithms - or not isinstance(supported_lookup_algorithms, list) - or not lookup_pepper - or not isinstance(lookup_pepper, str) - ): - raise SynapseError( - 400, - "Invalid hash details received from identity server %s%s: %s" - % (id_server_scheme, id_server, hash_details), - ) - - # Check if any of the supported lookup algorithms are present - if LookupAlgorithm.SHA256 in supported_lookup_algorithms: - # Perform a hashed lookup - lookup_algorithm = LookupAlgorithm.SHA256 - - # Hash address, medium and the pepper with sha256 - to_hash = "%s %s %s" % (address, medium, lookup_pepper) - lookup_value = sha256_and_url_safe_base64(to_hash) - - elif LookupAlgorithm.NONE in supported_lookup_algorithms: - # Perform a non-hashed lookup - lookup_algorithm = LookupAlgorithm.NONE - - # Combine together plaintext address and medium - lookup_value = "%s %s" % (address, medium) - - else: - logger.warning( - "None of the provided lookup algorithms of %s are supported: %s", - id_server, - supported_lookup_algorithms, - ) - raise SynapseError( - 400, - "Provided identity server does not support any v2 lookup " - "algorithms that this homeserver supports.", - ) - - # Authenticate with identity server given the access token from the client - headers = {"Authorization": create_id_access_token_header(id_access_token)} - - try: - lookup_results = yield self.simple_http_client.post_json_get_json( - "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server), - { - "addresses": [lookup_value], - "algorithm": lookup_algorithm, - "pepper": lookup_pepper, - }, - headers=headers, - ) - except Exception as e: - logger.warning("Error when performing a v2 3pid lookup: %s", e) - raise SynapseError( - 500, "Unknown error occurred during identity server lookup" - ) + return stream_id - # Check for a mapping from what we looked up to an MXID - if "mappings" not in lookup_results or not isinstance( - lookup_results["mappings"], dict - ): - logger.warning("No results from 3pid lookup") - return None - - # Return the MXID if it's available, or None otherwise - mxid = lookup_results["mappings"].get(lookup_value) - return mxid - - @defer.inlineCallbacks - def _verify_any_signature(self, data, server_hostname): - if server_hostname not in data["signatures"]: - raise AuthError(401, "No signature from server %s" % (server_hostname,)) - for key_name, signature in data["signatures"][server_hostname].items(): - key_data = yield self.simple_http_client.get_json( - "%s%s/_matrix/identity/api/v1/pubkey/%s" - % (id_server_scheme, server_hostname, key_name) - ) - if "public_key" not in key_data: - raise AuthError( - 401, "No public key named %s from %s" % (key_name, server_hostname) - ) - verify_signed_json( - data, - server_hostname, - decode_verify_key_bytes( - key_name, decode_base64(key_data["public_key"]) - ), - ) - return - - @defer.inlineCallbacks - def _make_and_store_3pid_invite( + async def _make_and_store_3pid_invite( self, - requester, - id_server, - medium, - address, - room_id, - user, - txn_id, - id_access_token=None, - ): - room_state = yield self.state_handler.get_current_state(room_id) + requester: Requester, + id_server: str, + medium: str, + address: str, + room_id: str, + user: UserID, + txn_id: Optional[str], + id_access_token: Optional[str] = None, + ) -> int: + room_state = await self.state_handler.get_current_state(room_id) inviter_display_name = "" inviter_avatar_url = "" @@ -935,25 +830,31 @@ class RoomMemberHandler(object): if room_avatar_event: room_avatar_url = room_avatar_event.content.get("url", "") - token, public_keys, fallback_public_key, display_name = ( - yield self._ask_id_server_for_third_party_invite( - requester=requester, - id_server=id_server, - medium=medium, - address=address, - room_id=room_id, - inviter_user_id=user.to_string(), - room_alias=canonical_room_alias, - room_avatar_url=room_avatar_url, - room_join_rules=room_join_rules, - room_name=room_name, - inviter_display_name=inviter_display_name, - inviter_avatar_url=inviter_avatar_url, - id_access_token=id_access_token, - ) + ( + token, + public_keys, + fallback_public_key, + display_name, + ) = await self.identity_handler.ask_id_server_for_third_party_invite( + requester=requester, + id_server=id_server, + medium=medium, + address=address, + room_id=room_id, + inviter_user_id=user.to_string(), + room_alias=canonical_room_alias, + room_avatar_url=room_avatar_url, + room_join_rules=room_join_rules, + room_name=room_name, + inviter_display_name=inviter_display_name, + inviter_avatar_url=inviter_avatar_url, + id_access_token=id_access_token, ) - yield self.event_creation_handler.create_and_send_nonmember_event( + ( + event, + stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.ThirdPartyInvite, @@ -971,146 +872,11 @@ class RoomMemberHandler(object): ratelimit=False, txn_id=txn_id, ) + return stream_id - @defer.inlineCallbacks - def _ask_id_server_for_third_party_invite( - self, - requester, - id_server, - medium, - address, - room_id, - inviter_user_id, - room_alias, - room_avatar_url, - room_join_rules, - room_name, - inviter_display_name, - inviter_avatar_url, - id_access_token=None, - ): - """ - Asks an identity server for a third party invite. - - Args: - requester (Requester) - id_server (str): hostname + optional port for the identity server. - medium (str): The literal string "email". - address (str): The third party address being invited. - room_id (str): The ID of the room to which the user is invited. - inviter_user_id (str): The user ID of the inviter. - room_alias (str): An alias for the room, for cosmetic notifications. - room_avatar_url (str): The URL of the room's avatar, for cosmetic - notifications. - room_join_rules (str): The join rules of the email (e.g. "public"). - room_name (str): The m.room.name of the room. - inviter_display_name (str): The current display name of the - inviter. - inviter_avatar_url (str): The URL of the inviter's avatar. - id_access_token (str|None): The access token to authenticate to the identity - server with - - Returns: - A deferred tuple containing: - token (str): The token which must be signed to prove authenticity. - public_keys ([{"public_key": str, "key_validity_url": str}]): - public_key is a base64-encoded ed25519 public key. - fallback_public_key: One element from public_keys. - display_name (str): A user-friendly name to represent the invited - user. - """ - invite_config = { - "medium": medium, - "address": address, - "room_id": room_id, - "room_alias": room_alias, - "room_avatar_url": room_avatar_url, - "room_join_rules": room_join_rules, - "room_name": room_name, - "sender": inviter_user_id, - "sender_display_name": inviter_display_name, - "sender_avatar_url": inviter_avatar_url, - } - - # Add the identity service access token to the JSON body and use the v2 - # Identity Service endpoints if id_access_token is present - data = None - base_url = "%s%s/_matrix/identity" % (id_server_scheme, id_server) - - if id_access_token: - key_validity_url = "%s%s/_matrix/identity/v2/pubkey/isvalid" % ( - id_server_scheme, - id_server, - ) - - # Attempt a v2 lookup - url = base_url + "/v2/store-invite" - try: - data = yield self.simple_http_client.post_json_get_json( - url, - invite_config, - {"Authorization": create_id_access_token_header(id_access_token)}, - ) - except HttpResponseException as e: - if e.code != 404: - logger.info("Failed to POST %s with JSON: %s", url, e) - raise e - - if data is None: - key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % ( - id_server_scheme, - id_server, - ) - url = base_url + "/api/v1/store-invite" - - try: - data = yield self.simple_http_client.post_json_get_json( - url, invite_config - ) - except HttpResponseException as e: - logger.warning( - "Error trying to call /store-invite on %s%s: %s", - id_server_scheme, - id_server, - e, - ) - - if data is None: - # Some identity servers may only support application/x-www-form-urlencoded - # types. This is especially true with old instances of Sydent, see - # https://github.com/matrix-org/sydent/pull/170 - try: - data = yield self.simple_http_client.post_urlencoded_get_json( - url, invite_config - ) - except HttpResponseException as e: - logger.warning( - "Error calling /store-invite on %s%s with fallback " - "encoding: %s", - id_server_scheme, - id_server, - e, - ) - raise e - - # TODO: Check for success - token = data["token"] - public_keys = data.get("public_keys", []) - if "public_key" in data: - fallback_public_key = { - "public_key": data["public_key"], - "key_validity_url": key_validity_url, - } - else: - fallback_public_key = public_keys[0] - - if not public_keys: - public_keys.append(fallback_public_key) - display_name = data["display_name"] - return token, public_keys, fallback_public_key, display_name - - @defer.inlineCallbacks - def _is_host_in_room(self, current_state_ids): + async def _is_host_in_room( + self, current_state_ids: Dict[Tuple[str, str], str] + ) -> bool: # Have we just created the room, and is this about to be the very # first member event? create_event_id = current_state_ids.get(("m.room.create", "")) @@ -1123,7 +889,7 @@ class RoomMemberHandler(object): continue event_id = current_state_ids[(etype, state_key)] - event = yield self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(event_id, allow_none=True) if not event: continue @@ -1132,11 +898,10 @@ class RoomMemberHandler(object): return False - @defer.inlineCallbacks - def _is_server_notice_room(self, room_id): + async def _is_server_notice_room(self, room_id: str) -> bool: if self._server_notices_mxid is None: return False - user_ids = yield self.store.get_users_in_room(room_id) + user_ids = await self.store.get_users_in_room(room_id) return self._server_notices_mxid in user_ids @@ -1148,20 +913,21 @@ class RoomMemberMasterHandler(RoomMemberHandler): self.distributor.declare("user_joined_room") self.distributor.declare("user_left_room") - @defer.inlineCallbacks - def _is_remote_room_too_complex(self, room_id, remote_room_hosts): + async def _is_remote_room_too_complex( + self, room_id: str, remote_room_hosts: List[str] + ) -> Optional[bool]: """ Check if complexity of a remote room is too great. Args: - room_id (str) - remote_room_hosts (list[str]) + room_id + remote_room_hosts Returns: bool of whether the complexity is too great, or None if unable to be fetched """ max_complexity = self.hs.config.limit_remote_rooms.complexity - complexity = yield self.federation_handler.get_room_complexity( + complexity = await self.federation_handler.get_room_complexity( remote_room_hosts, room_id ) @@ -1169,23 +935,26 @@ class RoomMemberMasterHandler(RoomMemberHandler): return complexity["v1"] > max_complexity return None - @defer.inlineCallbacks - def _is_local_room_too_complex(self, room_id): + async def _is_local_room_too_complex(self, room_id: str) -> bool: """ Check if the complexity of a local room is too great. Args: - room_id (str) - - Returns: bool + room_id: The room ID to check for complexity. """ max_complexity = self.hs.config.limit_remote_rooms.complexity - complexity = yield self.store.get_room_complexity(room_id) + complexity = await self.store.get_room_complexity(room_id) return complexity["v1"] > max_complexity - @defer.inlineCallbacks - def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + async def _remote_join( + self, + requester: Requester, + remote_room_hosts: List[str], + room_id: str, + user: UserID, + content: dict, + ) -> Tuple[str, int]: """Implements RoomMemberHandler._remote_join """ # filter ourselves out of remote_room_hosts: do_invite_join ignores it @@ -1200,7 +969,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): if self.hs.config.limit_remote_rooms.enabled: # Fetch the room complexity - too_complex = yield self._is_remote_room_too_complex( + too_complex = await self._is_remote_room_too_complex( room_id, remote_room_hosts ) if too_complex is True: @@ -1214,28 +983,28 @@ class RoomMemberMasterHandler(RoomMemberHandler): # join dance for now, since we're kinda implicitly checking # that we are allowed to join when we decide whether or not we # need to do the invite/join dance. - yield self.federation_handler.do_invite_join( + event_id, stream_id = await self.federation_handler.do_invite_join( remote_room_hosts, room_id, user.to_string(), content ) - yield self._user_joined_room(user, room_id) + await self._user_joined_room(user, room_id) # Check the room we just joined wasn't too large, if we didn't fetch the # complexity of it before. if self.hs.config.limit_remote_rooms.enabled: if too_complex is False: # We checked, and we're under the limit. - return + return event_id, stream_id # Check again, but with the local state events - too_complex = yield self._is_local_room_too_complex(room_id) + too_complex = await self._is_local_room_too_complex(room_id) if too_complex is False: # We're under the limit. - return + return event_id, stream_id # The room is too large. Leave. requester = types.create_requester(user, None, False, None) - yield self.update_membership( + await self.update_membership( requester=requester, target=user, room_id=room_id, action="leave" ) raise SynapseError( @@ -1244,16 +1013,24 @@ class RoomMemberMasterHandler(RoomMemberHandler): errcode=Codes.RESOURCE_LIMIT_EXCEEDED, ) - @defer.inlineCallbacks - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + return event_id, stream_id + + async def _remote_reject_invite( + self, + requester: Requester, + remote_room_hosts: List[str], + room_id: str, + target: UserID, + content: dict, + ) -> Tuple[Optional[str], int]: """Implements RoomMemberHandler._remote_reject_invite """ fed_handler = self.federation_handler try: - ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, target.to_string() + event, stream_id = await fed_handler.do_remotely_reject_invite( + remote_room_hosts, room_id, target.to_string(), content=content, ) - return ret + return event.event_id, stream_id except Exception as e: # if we were unable to reject the exception, just mark # it as rejected on our end and plough ahead. @@ -1263,24 +1040,23 @@ class RoomMemberMasterHandler(RoomMemberHandler): # logger.warning("Failed to reject invite: %s", e) - yield self.store.locally_reject_invite(target.to_string(), room_id) - return {} + stream_id = await self.locally_reject_invite(target.to_string(), room_id) + return None, stream_id - def _user_joined_room(self, target, room_id): + async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_joined_room """ - return user_joined_room(self.distributor, target, room_id) + user_joined_room(self.distributor, target, room_id) - def _user_left_room(self, target, room_id): + async def _user_left_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_left_room """ - return user_left_room(self.distributor, target, room_id) + user_left_room(self.distributor, target, room_id) - @defer.inlineCallbacks - def forget(self, user, room_id): + async def forget(self, user: UserID, room_id: str) -> None: user_id = user.to_string() - member = yield self.state_handler.get_current_state( + member = await self.state_handler.get_current_state( room_id=room_id, event_type=EventTypes.Member, state_key=user_id ) membership = member.membership if member else None @@ -1292,4 +1068,4 @@ class RoomMemberMasterHandler(RoomMemberHandler): raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) if membership: - yield self.store.forget(user_id, room_id) + await self.store.forget(user_id, room_id) |