summary refs log tree commit diff
path: root/synapse/handlers/room_member.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/room_member.py')
-rw-r--r--synapse/handlers/room_member.py381
1 files changed, 287 insertions, 94 deletions
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0f7af982f0..804463b1c0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,7 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2016-2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,24 +15,47 @@
 
 import abc
 import logging
-from typing import Dict, Iterable, List, Optional, Tuple
+import random
+from http import HTTPStatus
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
 
-from six.moves import http_client
+from unpaddedbase64 import encode_base64
 
 from synapse import types
-from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    LimitExceededError,
+    ShadowBanError,
+    SynapseError,
+)
+from synapse.api.ratelimiting import Ratelimiter
+from synapse.api.room_versions import EventFormatVersions
+from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.events import EventBase
+from synapse.events.builder import create_local_event_from_event_dict
 from synapse.events.snapshot import EventContext
-from synapse.replication.http.membership import (
-    ReplicationLocallyRejectInviteRestServlet,
+from synapse.events.validator import EventValidator
+from synapse.storage.roommember import RoomsForUser
+from synapse.types import (
+    Collection,
+    JsonDict,
+    Requester,
+    RoomAlias,
+    RoomID,
+    StateMap,
+    UserID,
 )
-from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
 logger = logging.getLogger(__name__)
 
 
@@ -46,7 +67,7 @@ class RoomMemberHandler(object):
 
     __metaclass__ = abc.ABCMeta
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
@@ -75,10 +96,17 @@ class RoomMemberHandler(object):
         )
         if self._is_on_event_persistence_instance:
             self.persist_event_storage = hs.get_storage().persistence
-        else:
-            self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
-                hs
-            )
+
+        self._join_rate_limiter_local = Ratelimiter(
+            clock=self.clock,
+            rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
+            burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
+        )
+        self._join_rate_limiter_remote = Ratelimiter(
+            clock=self.clock,
+            rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
+            burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
+        )
 
         # This is only used to get at ratelimit function, and
         # maybe_kick_guest_users. It's fine there are multiple of these as
@@ -106,46 +134,28 @@ class RoomMemberHandler(object):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    async def _remote_reject_invite(
+    async def remote_reject_invite(
         self,
+        invite_event_id: str,
+        txn_id: Optional[str],
         requester: Requester,
-        remote_room_hosts: List[str],
-        room_id: str,
-        target: UserID,
-        content: dict,
-    ) -> Tuple[Optional[str], int]:
-        """Attempt to reject an invite for a room this server is not in. If we
-        fail to do so we locally mark the invite as rejected.
+        content: JsonDict,
+    ) -> Tuple[str, int]:
+        """
+        Rejects an out-of-band invite we have received from a remote server
 
         Args:
-            requester
-            remote_room_hosts: List of servers to use to try and reject invite
-            room_id
-            target: The user rejecting the invite
-            content: The content for the rejection event
+            invite_event_id: ID of the invite to be rejected
+            txn_id: optional transaction ID supplied by the client
+            requester: user making the rejection request, according to the access token
+            content: additional content to include in the rejection event.
+               Normally an empty dict.
 
         Returns:
-            A dictionary to be returned to the client, may
-            include event_id etc, or nothing if we locally rejected
+            event id, stream_id of the leave event
         """
         raise NotImplementedError()
 
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-        if self._is_on_event_persistence_instance:
-            return await self.persist_event_storage.locally_reject_invite(
-                user_id, room_id
-            )
-        else:
-            result = await self._locally_reject_client(
-                instance_name=self._event_stream_writer_instance,
-                user_id=user_id,
-                room_id=room_id,
-            )
-            return result["stream_id"]
-
     @abc.abstractmethod
     async def _user_joined_room(self, target: UserID, room_id: str) -> None:
         """Notifies distributor on master process that the user has joined the
@@ -215,24 +225,40 @@ class RoomMemberHandler(object):
             _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
             return duplicate.event_id, stream_id
 
-        stream_id = await self.event_creation_handler.handle_new_client_event(
-            requester, event, context, extra_users=[target], ratelimit=ratelimit
-        )
-
         prev_state_ids = await context.get_prev_state_ids()
 
         prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
 
+        newly_joined = False
         if event.membership == Membership.JOIN:
-            # Only fire user_joined_room if the user has actually joined the
-            # room. Don't bother if the user is just changing their profile
-            # info.
             newly_joined = True
             if prev_member_event_id:
                 prev_member_event = await self.store.get_event(prev_member_event_id)
                 newly_joined = prev_member_event.membership != Membership.JOIN
+
+            # Only rate-limit if the user actually joined the room, otherwise we'll end
+            # up blocking profile updates.
             if newly_joined:
-                await self._user_joined_room(target, room_id)
+                time_now_s = self.clock.time()
+                (
+                    allowed,
+                    time_allowed,
+                ) = self._join_rate_limiter_local.can_requester_do_action(requester)
+
+                if not allowed:
+                    raise LimitExceededError(
+                        retry_after_ms=int(1000 * (time_allowed - time_now_s))
+                    )
+
+        stream_id = await self.event_creation_handler.handle_new_client_event(
+            requester, event, context, extra_users=[target], ratelimit=ratelimit,
+        )
+
+        if event.membership == Membership.JOIN and newly_joined:
+            # Only fire user_joined_room if the user has actually joined the
+            # room. Don't bother if the user is just changing their profile
+            # info.
+            await self._user_joined_room(target, room_id)
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = await self.store.get_event(prev_member_event_id)
@@ -289,7 +315,32 @@ class RoomMemberHandler(object):
         ratelimit: bool = True,
         content: Optional[dict] = None,
         require_consent: bool = True,
-    ) -> Tuple[Optional[str], int]:
+    ) -> Tuple[str, int]:
+        """Update a user's membership in a room.
+
+        Params:
+            requester: The user who is performing the update.
+            target: The user whose membership is being updated.
+            room_id: The room ID whose membership is being updated.
+            action: The membership change, see synapse.api.constants.Membership.
+            txn_id: The transaction ID, if given.
+            remote_room_hosts: Remote servers to send the update to.
+            third_party_signed: Information from a 3PID invite.
+            ratelimit: Whether to rate limit the request.
+            content: The content of the created event.
+            require_consent: Whether consent is required.
+
+        Returns:
+            A tuple of the new event ID and stream ID.
+
+        Raises:
+            ShadowBanError if a shadow-banned requester attempts to send an invite.
+        """
+        if action == Membership.INVITE and requester.shadow_banned:
+            # We randomly sleep a bit just to annoy the requester.
+            await self.clock.sleep(random.randint(1, 10))
+            raise ShadowBanError()
+
         key = (room_id,)
 
         with (await self.member_linearizer.queue(key)):
@@ -320,7 +371,7 @@ class RoomMemberHandler(object):
         ratelimit: bool = True,
         content: Optional[dict] = None,
         require_consent: bool = True,
-    ) -> Tuple[Optional[str], int]:
+    ) -> Tuple[str, int]:
         content_specified = bool(content)
         if content is None:
             content = {}
@@ -361,7 +412,7 @@ class RoomMemberHandler(object):
         if effective_membership_state == Membership.INVITE:
             # block any attempts to invite the server notices mxid
             if target.to_string() == self._server_notices_mxid:
-                raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+                raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
             block_invite = False
 
@@ -444,7 +495,7 @@ class RoomMemberHandler(object):
                 is_blocked = await self._is_server_notice_room(room_id)
                 if is_blocked:
                     raise SynapseError(
-                        http_client.FORBIDDEN,
+                        HTTPStatus.FORBIDDEN,
                         "You cannot reject this invite",
                         errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
                     )
@@ -463,6 +514,17 @@ class RoomMemberHandler(object):
                     raise AuthError(403, "Guest access not allowed")
 
             if not is_host_in_room:
+                time_now_s = self.clock.time()
+                (
+                    allowed,
+                    time_allowed,
+                ) = self._join_rate_limiter_remote.can_requester_do_action(requester,)
+
+                if not allowed:
+                    raise LimitExceededError(
+                        retry_after_ms=int(1000 * (time_allowed - time_now_s))
+                    )
+
                 inviter = await self._get_inviter(target.to_string(), room_id)
                 if inviter and not self.hs.is_mine(inviter):
                     remote_room_hosts.append(inviter.domain)
@@ -486,24 +548,43 @@ class RoomMemberHandler(object):
         elif effective_membership_state == Membership.LEAVE:
             if not is_host_in_room:
                 # perhaps we've been invited
-                inviter = await self._get_inviter(target.to_string(), room_id)
-                if not inviter:
+                invite = await self.store.get_invite_for_local_user_in_room(
+                    user_id=target.to_string(), room_id=room_id
+                )  # type: Optional[RoomsForUser]
+                if not invite:
+                    logger.info(
+                        "%s sent a leave request to %s, but that is not an active room "
+                        "on this server, and there is no pending invite",
+                        target,
+                        room_id,
+                    )
+
                     raise SynapseError(404, "Not a known room")
 
-                if self.hs.is_mine(inviter):
-                    # the inviter was on our server, but has now left. Carry on
-                    # with the normal rejection codepath.
-                    #
-                    # This is a bit of a hack, because the room might still be
-                    # active on other servers.
-                    pass
-                else:
-                    # send the rejection to the inviter's HS.
-                    remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    return await self._remote_reject_invite(
-                        requester, remote_room_hosts, room_id, target, content,
+                logger.info(
+                    "%s rejects invite to %s from %s", target, room_id, invite.sender
+                )
+
+                if not self.hs.is_mine_id(invite.sender):
+                    # send the rejection to the inviter's HS (with fallback to
+                    # local event)
+                    return await self.remote_reject_invite(
+                        invite.event_id, txn_id, requester, content,
                     )
 
+                # the inviter was on our server, but has now left. Carry on
+                # with the normal rejection codepath, which will also send the
+                # rejection out to any other servers we believe are still in the room.
+
+                # thanks to overzealous cleaning up of event_forward_extremities in
+                # `delete_old_current_state_events`, it's possible to end up with no
+                # forward extremities here. If that happens, let's just hang the
+                # rejection off the invite event.
+                #
+                # see: https://github.com/matrix-org/synapse/issues/7139
+                if len(latest_event_ids) == 0:
+                    latest_event_ids = [invite.event_id]
+
         return await self._local_membership_update(
             requester=requester,
             target=target,
@@ -669,9 +750,7 @@ class RoomMemberHandler(object):
                 if prev_member_event.membership == Membership.JOIN:
                     await self._user_left_room(target_user, room_id)
 
-    async def _can_guest_join(
-        self, current_state_ids: Dict[Tuple[str, str], str]
-    ) -> bool:
+    async def _can_guest_join(self, current_state_ids: StateMap[str]) -> bool:
         """
         Returns whether a guest can join a room based on its current state.
         """
@@ -681,7 +760,7 @@ class RoomMemberHandler(object):
 
         guest_access = await self.store.get_event(guest_access_id)
 
-        return (
+        return bool(
             guest_access
             and guest_access.content
             and "guest_access" in guest_access.content
@@ -738,6 +817,25 @@ class RoomMemberHandler(object):
         txn_id: Optional[str],
         id_access_token: Optional[str] = None,
     ) -> int:
+        """Invite a 3PID to a room.
+
+        Args:
+            room_id: The room to invite the 3PID to.
+            inviter: The user sending the invite.
+            medium: The 3PID's medium.
+            address: The 3PID's address.
+            id_server: The identity server to use.
+            requester: The user making the request.
+            txn_id: The transaction ID this is part of, or None if this is not
+                part of a transaction.
+            id_access_token: The optional identity server access token.
+
+        Returns:
+             The new stream ID.
+
+        Raises:
+            ShadowBanError if the requester has been shadow-banned.
+        """
         if self.config.block_non_admin_invites:
             is_requester_admin = await self.auth.is_server_admin(requester.user)
             if not is_requester_admin:
@@ -745,6 +843,11 @@ class RoomMemberHandler(object):
                     403, "Invites have been disabled on this server", Codes.FORBIDDEN
                 )
 
+        if requester.shadow_banned:
+            # We randomly sleep a bit just to annoy the requester.
+            await self.clock.sleep(random.randint(1, 10))
+            raise ShadowBanError()
+
         # We need to rate limit *before* we send out any 3PID invites, so we
         # can't just rely on the standard ratelimiting of events.
         await self.base_handler.ratelimit(requester)
@@ -769,6 +872,8 @@ class RoomMemberHandler(object):
         )
 
         if invitee:
+            # Note that update_membership with an action of "invite" can raise
+            # a ShadowBanError, but this was done above already.
             _, stream_id = await self.update_membership(
                 requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
             )
@@ -874,9 +979,7 @@ class RoomMemberHandler(object):
         )
         return stream_id
 
-    async def _is_host_in_room(
-        self, current_state_ids: Dict[Tuple[str, str], str]
-    ) -> bool:
+    async def _is_host_in_room(self, current_state_ids: StateMap[str]) -> bool:
         # Have we just created the room, and is this about to be the very
         # first member event?
         create_event_id = current_state_ids.get(("m.room.create", ""))
@@ -967,7 +1070,11 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
-        if self.hs.config.limit_remote_rooms.enabled:
+        check_complexity = self.hs.config.limit_remote_rooms.enabled
+        if check_complexity and self.hs.config.limit_remote_rooms.admins_can_join:
+            check_complexity = not await self.auth.is_server_admin(user)
+
+        if check_complexity:
             # Fetch the room complexity
             too_complex = await self._is_remote_room_too_complex(
                 room_id, remote_room_hosts
@@ -990,7 +1097,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         # Check the room we just joined wasn't too large, if we didn't fetch the
         # complexity of it before.
-        if self.hs.config.limit_remote_rooms.enabled:
+        if check_complexity:
             if too_complex is False:
                 # We checked, and we're under the limit.
                 return event_id, stream_id
@@ -1003,7 +1110,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
                 return event_id, stream_id
 
             # The room is too large. Leave.
-            requester = types.create_requester(user, None, False, None)
+            requester = types.create_requester(user, None, False, False, None)
             await self.update_membership(
                 requester=requester, target=user, room_id=room_id, action="leave"
             )
@@ -1015,33 +1122,119 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         return event_id, stream_id
 
-    async def _remote_reject_invite(
+    async def remote_reject_invite(
         self,
+        invite_event_id: str,
+        txn_id: Optional[str],
         requester: Requester,
-        remote_room_hosts: List[str],
-        room_id: str,
-        target: UserID,
-        content: dict,
-    ) -> Tuple[Optional[str], int]:
-        """Implements RoomMemberHandler._remote_reject_invite
+        content: JsonDict,
+    ) -> Tuple[str, int]:
         """
+        Rejects an out-of-band invite received from a remote user
+
+        Implements RoomMemberHandler.remote_reject_invite
+        """
+        invite_event = await self.store.get_event(invite_event_id)
+        room_id = invite_event.room_id
+        target_user = invite_event.state_key
+
+        # first of all, try doing a rejection via the inviting server
         fed_handler = self.federation_handler
         try:
+            inviter_id = UserID.from_string(invite_event.sender)
             event, stream_id = await fed_handler.do_remotely_reject_invite(
-                remote_room_hosts, room_id, target.to_string(), content=content,
+                [inviter_id.domain], room_id, target_user, content=content
             )
             return event.event_id, stream_id
         except Exception as e:
-            # if we were unable to reject the exception, just mark
-            # it as rejected on our end and plough ahead.
+            # if we were unable to reject the invite, we will generate our own
+            # leave event.
             #
             # The 'except' clause is very broad, but we need to
             # capture everything from DNS failures upwards
             #
             logger.warning("Failed to reject invite: %s", e)
 
-            stream_id = await self.locally_reject_invite(target.to_string(), room_id)
-            return None, stream_id
+            return await self._locally_reject_invite(
+                invite_event, txn_id, requester, content
+            )
+
+    async def _locally_reject_invite(
+        self,
+        invite_event: EventBase,
+        txn_id: Optional[str],
+        requester: Requester,
+        content: JsonDict,
+    ) -> Tuple[str, int]:
+        """Generate a local invite rejection
+
+        This is called after we fail to reject an invite via a remote server. It
+        generates an out-of-band membership event locally.
+
+        Args:
+            invite_event: the invite to be rejected
+            txn_id: optional transaction ID supplied by the client
+            requester:  user making the rejection request, according to the access token
+            content: additional content to include in the rejection event.
+               Normally an empty dict.
+        """
+
+        room_id = invite_event.room_id
+        target_user = invite_event.state_key
+        room_version = await self.store.get_room_version(room_id)
+
+        content["membership"] = Membership.LEAVE
+
+        # the auth events for the new event are the same as that of the invite, plus
+        # the invite itself.
+        #
+        # the prev_events are just the invite.
+        invite_hash = invite_event.event_id  # type: Union[str, Tuple]
+        if room_version.event_format == EventFormatVersions.V1:
+            alg, h = compute_event_reference_hash(invite_event)
+            invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
+
+        auth_events = tuple(invite_event.auth_events) + (invite_hash,)
+        prev_events = (invite_hash,)
+
+        # we cap depth of generated events, to ensure that they are not
+        # rejected by other servers (and so that they can be persisted in
+        # the db)
+        depth = min(invite_event.depth + 1, MAX_DEPTH)
+
+        event_dict = {
+            "depth": depth,
+            "auth_events": auth_events,
+            "prev_events": prev_events,
+            "type": EventTypes.Member,
+            "room_id": room_id,
+            "sender": target_user,
+            "content": content,
+            "state_key": target_user,
+        }
+
+        event = create_local_event_from_event_dict(
+            clock=self.clock,
+            hostname=self.hs.hostname,
+            signing_key=self.hs.signing_key,
+            room_version=room_version,
+            event_dict=event_dict,
+        )
+        event.internal_metadata.outlier = True
+        event.internal_metadata.out_of_band_membership = True
+        if txn_id is not None:
+            event.internal_metadata.txn_id = txn_id
+        if requester.access_token_id is not None:
+            event.internal_metadata.token_id = requester.access_token_id
+
+        EventValidator().validate_new(event, self.config)
+
+        context = await self.state_handler.compute_event_context(event)
+        context.app_service = requester.app_service
+        stream_id = await self.event_creation_handler.handle_new_client_event(
+            requester, event, context, extra_users=[UserID.from_string(target_user)],
+        )
+        return event.event_id, stream_id
 
     async def _user_joined_room(self, target: UserID, room_id: str) -> None:
         """Implements RoomMemberHandler._user_joined_room