diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0f7af982f0..32b7e323fa 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2016-2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,28 +15,43 @@
import abc
import logging
-from typing import Dict, Iterable, List, Optional, Tuple
+import random
+from http import HTTPStatus
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
-from six.moves import http_client
+from unpaddedbase64 import encode_base64
from synapse import types
-from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ LimitExceededError,
+ ShadowBanError,
+ SynapseError,
+)
+from synapse.api.ratelimiting import Ratelimiter
+from synapse.api.room_versions import EventFormatVersions
+from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
+from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
-from synapse.replication.http.membership import (
- ReplicationLocallyRejectInviteRestServlet,
-)
-from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
+from synapse.events.validator import EventValidator
+from synapse.storage.roommember import RoomsForUser
+from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
from ._base import BaseHandler
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
logger = logging.getLogger(__name__)
-class RoomMemberHandler(object):
+class RoomMemberHandler:
# TODO(paul): This handler currently contains a messy conflation of
# low-level API that works on UserID objects and so on, and REST-level
# API that takes ID strings and returns pagination chunks. These concerns
@@ -46,7 +59,7 @@ class RoomMemberHandler(object):
__metaclass__ = abc.ABCMeta
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@@ -75,10 +88,17 @@ class RoomMemberHandler(object):
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence
- else:
- self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
- hs
- )
+
+ self._join_rate_limiter_local = Ratelimiter(
+ clock=self.clock,
+ rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
+ burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
+ )
+ self._join_rate_limiter_remote = Ratelimiter(
+ clock=self.clock,
+ rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
+ burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
+ )
# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
@@ -106,46 +126,28 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
- async def _remote_reject_invite(
+ async def remote_reject_invite(
self,
+ invite_event_id: str,
+ txn_id: Optional[str],
requester: Requester,
- remote_room_hosts: List[str],
- room_id: str,
- target: UserID,
- content: dict,
- ) -> Tuple[Optional[str], int]:
- """Attempt to reject an invite for a room this server is not in. If we
- fail to do so we locally mark the invite as rejected.
+ content: JsonDict,
+ ) -> Tuple[str, int]:
+ """
+ Rejects an out-of-band invite we have received from a remote server
Args:
- requester
- remote_room_hosts: List of servers to use to try and reject invite
- room_id
- target: The user rejecting the invite
- content: The content for the rejection event
+ invite_event_id: ID of the invite to be rejected
+ txn_id: optional transaction ID supplied by the client
+ requester: user making the rejection request, according to the access token
+ content: additional content to include in the rejection event.
+ Normally an empty dict.
Returns:
- A dictionary to be returned to the client, may
- include event_id etc, or nothing if we locally rejected
+ event id, stream_id of the leave event
"""
raise NotImplementedError()
- async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
- """Mark the invite has having been rejected even though we failed to
- create a leave event for it.
- """
- if self._is_on_event_persistence_instance:
- return await self.persist_event_storage.locally_reject_invite(
- user_id, room_id
- )
- else:
- result = await self._locally_reject_client(
- instance_name=self._event_stream_writer_instance,
- user_id=user_id,
- room_id=room_id,
- )
- return result["stream_id"]
-
@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
@@ -174,7 +176,7 @@ class RoomMemberHandler(object):
target: UserID,
room_id: str,
membership: str,
- prev_event_ids: Collection[str],
+ prev_event_ids: List[str],
txn_id: Optional[str] = None,
ratelimit: bool = True,
content: Optional[dict] = None,
@@ -215,24 +217,40 @@ class RoomMemberHandler(object):
_, stream_id = await self.store.get_event_ordering(duplicate.event_id)
return duplicate.event_id, stream_id
- stream_id = await self.event_creation_handler.handle_new_client_event(
- requester, event, context, extra_users=[target], ratelimit=ratelimit
- )
-
prev_state_ids = await context.get_prev_state_ids()
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
+ newly_joined = False
if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has actually joined the
- # room. Don't bother if the user is just changing their profile
- # info.
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
+
+ # Only rate-limit if the user actually joined the room, otherwise we'll end
+ # up blocking profile updates.
if newly_joined:
- await self._user_joined_room(target, room_id)
+ time_now_s = self.clock.time()
+ (
+ allowed,
+ time_allowed,
+ ) = self._join_rate_limiter_local.can_requester_do_action(requester)
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now_s))
+ )
+
+ stream_id = await self.event_creation_handler.handle_new_client_event(
+ requester, event, context, extra_users=[target], ratelimit=ratelimit,
+ )
+
+ if event.membership == Membership.JOIN and newly_joined:
+ # Only fire user_joined_room if the user has actually joined the
+ # room. Don't bother if the user is just changing their profile
+ # info.
+ await self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
@@ -289,7 +307,32 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
- ) -> Tuple[Optional[str], int]:
+ ) -> Tuple[str, int]:
+ """Update a user's membership in a room.
+
+ Params:
+ requester: The user who is performing the update.
+ target: The user whose membership is being updated.
+ room_id: The room ID whose membership is being updated.
+ action: The membership change, see synapse.api.constants.Membership.
+ txn_id: The transaction ID, if given.
+ remote_room_hosts: Remote servers to send the update to.
+ third_party_signed: Information from a 3PID invite.
+ ratelimit: Whether to rate limit the request.
+ content: The content of the created event.
+ require_consent: Whether consent is required.
+
+ Returns:
+ A tuple of the new event ID and stream ID.
+
+ Raises:
+ ShadowBanError if a shadow-banned requester attempts to send an invite.
+ """
+ if action == Membership.INVITE and requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
key = (room_id,)
with (await self.member_linearizer.queue(key)):
@@ -320,7 +363,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
- ) -> Tuple[Optional[str], int]:
+ ) -> Tuple[str, int]:
content_specified = bool(content)
if content is None:
content = {}
@@ -329,7 +372,7 @@ class RoomMemberHandler(object):
# later on.
content = dict(content)
- if not self.allow_per_room_profiles:
+ if not self.allow_per_room_profiles or requester.shadow_banned:
# Strip profile data, knowing that new profile data will be added to the
# event's content in event_creation_handler.create_event() using the target's
# global profile.
@@ -361,7 +404,7 @@ class RoomMemberHandler(object):
if effective_membership_state == Membership.INVITE:
# block any attempts to invite the server notices mxid
if target.to_string() == self._server_notices_mxid:
- raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+ raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
block_invite = False
@@ -444,7 +487,7 @@ class RoomMemberHandler(object):
is_blocked = await self._is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
- http_client.FORBIDDEN,
+ HTTPStatus.FORBIDDEN,
"You cannot reject this invite",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
@@ -463,6 +506,17 @@ class RoomMemberHandler(object):
raise AuthError(403, "Guest access not allowed")
if not is_host_in_room:
+ time_now_s = self.clock.time()
+ (
+ allowed,
+ time_allowed,
+ ) = self._join_rate_limiter_remote.can_requester_do_action(requester,)
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now_s))
+ )
+
inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
@@ -486,24 +540,43 @@ class RoomMemberHandler(object):
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
# perhaps we've been invited
- inviter = await self._get_inviter(target.to_string(), room_id)
- if not inviter:
+ invite = await self.store.get_invite_for_local_user_in_room(
+ user_id=target.to_string(), room_id=room_id
+ ) # type: Optional[RoomsForUser]
+ if not invite:
+ logger.info(
+ "%s sent a leave request to %s, but that is not an active room "
+ "on this server, and there is no pending invite",
+ target,
+ room_id,
+ )
+
raise SynapseError(404, "Not a known room")
- if self.hs.is_mine(inviter):
- # the inviter was on our server, but has now left. Carry on
- # with the normal rejection codepath.
- #
- # This is a bit of a hack, because the room might still be
- # active on other servers.
- pass
- else:
- # send the rejection to the inviter's HS.
- remote_room_hosts = remote_room_hosts + [inviter.domain]
- return await self._remote_reject_invite(
- requester, remote_room_hosts, room_id, target, content,
+ logger.info(
+ "%s rejects invite to %s from %s", target, room_id, invite.sender
+ )
+
+ if not self.hs.is_mine_id(invite.sender):
+ # send the rejection to the inviter's HS (with fallback to
+ # local event)
+ return await self.remote_reject_invite(
+ invite.event_id, txn_id, requester, content,
)
+ # the inviter was on our server, but has now left. Carry on
+ # with the normal rejection codepath, which will also send the
+ # rejection out to any other servers we believe are still in the room.
+
+ # thanks to overzealous cleaning up of event_forward_extremities in
+ # `delete_old_current_state_events`, it's possible to end up with no
+ # forward extremities here. If that happens, let's just hang the
+ # rejection off the invite event.
+ #
+ # see: https://github.com/matrix-org/synapse/issues/7139
+ if len(latest_event_ids) == 0:
+ latest_event_ids = [invite.event_id]
+
return await self._local_membership_update(
requester=requester,
target=target,
@@ -669,9 +742,7 @@ class RoomMemberHandler(object):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target_user, room_id)
- async def _can_guest_join(
- self, current_state_ids: Dict[Tuple[str, str], str]
- ) -> bool:
+ async def _can_guest_join(self, current_state_ids: StateMap[str]) -> bool:
"""
Returns whether a guest can join a room based on its current state.
"""
@@ -681,7 +752,7 @@ class RoomMemberHandler(object):
guest_access = await self.store.get_event(guest_access_id)
- return (
+ return bool(
guest_access
and guest_access.content
and "guest_access" in guest_access.content
@@ -738,6 +809,25 @@ class RoomMemberHandler(object):
txn_id: Optional[str],
id_access_token: Optional[str] = None,
) -> int:
+ """Invite a 3PID to a room.
+
+ Args:
+ room_id: The room to invite the 3PID to.
+ inviter: The user sending the invite.
+ medium: The 3PID's medium.
+ address: The 3PID's address.
+ id_server: The identity server to use.
+ requester: The user making the request.
+ txn_id: The transaction ID this is part of, or None if this is not
+ part of a transaction.
+ id_access_token: The optional identity server access token.
+
+ Returns:
+ The new stream ID.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
+ """
if self.config.block_non_admin_invites:
is_requester_admin = await self.auth.is_server_admin(requester.user)
if not is_requester_admin:
@@ -745,6 +835,11 @@ class RoomMemberHandler(object):
403, "Invites have been disabled on this server", Codes.FORBIDDEN
)
+ if requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
# We need to rate limit *before* we send out any 3PID invites, so we
# can't just rely on the standard ratelimiting of events.
await self.base_handler.ratelimit(requester)
@@ -769,6 +864,8 @@ class RoomMemberHandler(object):
)
if invitee:
+ # Note that update_membership with an action of "invite" can raise
+ # a ShadowBanError, but this was done above already.
_, stream_id = await self.update_membership(
requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
)
@@ -874,9 +971,7 @@ class RoomMemberHandler(object):
)
return stream_id
- async def _is_host_in_room(
- self, current_state_ids: Dict[Tuple[str, str], str]
- ) -> bool:
+ async def _is_host_in_room(self, current_state_ids: StateMap[str]) -> bool:
# Have we just created the room, and is this about to be the very
# first member event?
create_event_id = current_state_ids.get(("m.room.create", ""))
@@ -967,7 +1062,11 @@ class RoomMemberMasterHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
- if self.hs.config.limit_remote_rooms.enabled:
+ check_complexity = self.hs.config.limit_remote_rooms.enabled
+ if check_complexity and self.hs.config.limit_remote_rooms.admins_can_join:
+ check_complexity = not await self.auth.is_server_admin(user)
+
+ if check_complexity:
# Fetch the room complexity
too_complex = await self._is_remote_room_too_complex(
room_id, remote_room_hosts
@@ -990,7 +1089,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# Check the room we just joined wasn't too large, if we didn't fetch the
# complexity of it before.
- if self.hs.config.limit_remote_rooms.enabled:
+ if check_complexity:
if too_complex is False:
# We checked, and we're under the limit.
return event_id, stream_id
@@ -1003,7 +1102,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return event_id, stream_id
# The room is too large. Leave.
- requester = types.create_requester(user, None, False, None)
+ requester = types.create_requester(user, None, False, False, None)
await self.update_membership(
requester=requester, target=user, room_id=room_id, action="leave"
)
@@ -1015,33 +1114,119 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return event_id, stream_id
- async def _remote_reject_invite(
+ async def remote_reject_invite(
self,
+ invite_event_id: str,
+ txn_id: Optional[str],
requester: Requester,
- remote_room_hosts: List[str],
- room_id: str,
- target: UserID,
- content: dict,
- ) -> Tuple[Optional[str], int]:
- """Implements RoomMemberHandler._remote_reject_invite
+ content: JsonDict,
+ ) -> Tuple[str, int]:
"""
+ Rejects an out-of-band invite received from a remote user
+
+ Implements RoomMemberHandler.remote_reject_invite
+ """
+ invite_event = await self.store.get_event(invite_event_id)
+ room_id = invite_event.room_id
+ target_user = invite_event.state_key
+
+ # first of all, try doing a rejection via the inviting server
fed_handler = self.federation_handler
try:
+ inviter_id = UserID.from_string(invite_event.sender)
event, stream_id = await fed_handler.do_remotely_reject_invite(
- remote_room_hosts, room_id, target.to_string(), content=content,
+ [inviter_id.domain], room_id, target_user, content=content
)
return event.event_id, stream_id
except Exception as e:
- # if we were unable to reject the exception, just mark
- # it as rejected on our end and plough ahead.
+ # if we were unable to reject the invite, we will generate our own
+ # leave event.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warning("Failed to reject invite: %s", e)
- stream_id = await self.locally_reject_invite(target.to_string(), room_id)
- return None, stream_id
+ return await self._locally_reject_invite(
+ invite_event, txn_id, requester, content
+ )
+
+ async def _locally_reject_invite(
+ self,
+ invite_event: EventBase,
+ txn_id: Optional[str],
+ requester: Requester,
+ content: JsonDict,
+ ) -> Tuple[str, int]:
+ """Generate a local invite rejection
+
+ This is called after we fail to reject an invite via a remote server. It
+ generates an out-of-band membership event locally.
+
+ Args:
+ invite_event: the invite to be rejected
+ txn_id: optional transaction ID supplied by the client
+ requester: user making the rejection request, according to the access token
+ content: additional content to include in the rejection event.
+ Normally an empty dict.
+ """
+
+ room_id = invite_event.room_id
+ target_user = invite_event.state_key
+ room_version = await self.store.get_room_version(room_id)
+
+ content["membership"] = Membership.LEAVE
+
+ # the auth events for the new event are the same as that of the invite, plus
+ # the invite itself.
+ #
+ # the prev_events are just the invite.
+ invite_hash = invite_event.event_id # type: Union[str, Tuple]
+ if room_version.event_format == EventFormatVersions.V1:
+ alg, h = compute_event_reference_hash(invite_event)
+ invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
+
+ auth_events = tuple(invite_event.auth_events) + (invite_hash,)
+ prev_events = (invite_hash,)
+
+ # we cap depth of generated events, to ensure that they are not
+ # rejected by other servers (and so that they can be persisted in
+ # the db)
+ depth = min(invite_event.depth + 1, MAX_DEPTH)
+
+ event_dict = {
+ "depth": depth,
+ "auth_events": auth_events,
+ "prev_events": prev_events,
+ "type": EventTypes.Member,
+ "room_id": room_id,
+ "sender": target_user,
+ "content": content,
+ "state_key": target_user,
+ }
+
+ event = create_local_event_from_event_dict(
+ clock=self.clock,
+ hostname=self.hs.hostname,
+ signing_key=self.hs.signing_key,
+ room_version=room_version,
+ event_dict=event_dict,
+ )
+ event.internal_metadata.outlier = True
+ event.internal_metadata.out_of_band_membership = True
+ if txn_id is not None:
+ event.internal_metadata.txn_id = txn_id
+ if requester.access_token_id is not None:
+ event.internal_metadata.token_id = requester.access_token_id
+
+ EventValidator().validate_new(event, self.config)
+
+ context = await self.state_handler.compute_event_context(event)
+ context.app_service = requester.app_service
+ stream_id = await self.event_creation_handler.handle_new_client_event(
+ requester, event, context, extra_users=[UserID.from_string(target_user)],
+ )
+ return event.event_id, stream_id
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
|