diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index c3fee116c2..5a8120db57 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -15,14 +15,22 @@
import abc
import logging
+import random
from http import HTTPStatus
-from typing import Dict, Iterable, List, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
from unpaddedbase64 import encode_base64
from synapse import types
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ LimitExceededError,
+ ShadowBanError,
+ SynapseError,
+)
+from synapse.api.ratelimiting import Ratelimiter
from synapse.api.room_versions import EventFormatVersions
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
@@ -30,24 +38,26 @@ from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
-from synapse.types import Collection, JsonDict, Requester, RoomAlias, RoomID, UserID
+from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
-from synapse.util.distributor import user_joined_room, user_left_room
+from synapse.util.distributor import user_left_room
from ._base import BaseHandler
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
logger = logging.getLogger(__name__)
-class RoomMemberHandler(object):
+class RoomMemberHandler(metaclass=abc.ABCMeta):
# TODO(paul): This handler currently contains a messy conflation of
# low-level API that works on UserID objects and so on, and REST-level
# API that takes ID strings and returns pagination chunks. These concerns
# ought to be separated out a lot better.
- __metaclass__ = abc.ABCMeta
-
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@@ -71,12 +81,16 @@ class RoomMemberHandler(object):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles
- self._event_stream_writer_instance = hs.config.worker.writers.events
- self._is_on_event_persistence_instance = (
- self._event_stream_writer_instance == hs.get_instance_name()
+ self._join_rate_limiter_local = Ratelimiter(
+ clock=self.clock,
+ rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
+ burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
+ )
+ self._join_rate_limiter_remote = Ratelimiter(
+ clock=self.clock,
+ rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
+ burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
)
- if self._is_on_event_persistence_instance:
- self.persist_event_storage = hs.get_storage().persistence
# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
@@ -127,17 +141,6 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
- async def _user_joined_room(self, target: UserID, room_id: str) -> None:
- """Notifies distributor on master process that the user has joined the
- room.
-
- Args:
- target
- room_id
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has left the
room.
@@ -154,7 +157,7 @@ class RoomMemberHandler(object):
target: UserID,
room_id: str,
membership: str,
- prev_event_ids: Collection[str],
+ prev_event_ids: List[str],
txn_id: Optional[str] = None,
ratelimit: bool = True,
content: Optional[dict] = None,
@@ -195,25 +198,35 @@ class RoomMemberHandler(object):
_, stream_id = await self.store.get_event_ordering(duplicate.event_id)
return duplicate.event_id, stream_id
- stream_id = await self.event_creation_handler.handle_new_client_event(
- requester, event, context, extra_users=[target], ratelimit=ratelimit
- )
-
prev_state_ids = await context.get_prev_state_ids()
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has actually joined the
- # room. Don't bother if the user is just changing their profile
- # info.
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
+
+ # Only rate-limit if the user actually joined the room, otherwise we'll end
+ # up blocking profile updates.
if newly_joined:
- await self._user_joined_room(target, room_id)
- elif event.membership == Membership.LEAVE:
+ time_now_s = self.clock.time()
+ (
+ allowed,
+ time_allowed,
+ ) = self._join_rate_limiter_local.can_requester_do_action(requester)
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now_s))
+ )
+
+ stream_id = await self.event_creation_handler.handle_new_client_event(
+ requester, event, context, extra_users=[target], ratelimit=ratelimit,
+ )
+
+ if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
@@ -271,7 +284,31 @@ class RoomMemberHandler(object):
new_room: bool = False,
require_consent: bool = True,
) -> Tuple[str, int]:
- """Update a user's membership in a room"""
+ """Update a user's membership in a room.
+
+ Params:
+ requester: The user who is performing the update.
+ target: The user whose membership is being updated.
+ room_id: The room ID whose membership is being updated.
+ action: The membership change, see synapse.api.constants.Membership.
+ txn_id: The transaction ID, if given.
+ remote_room_hosts: Remote servers to send the update to.
+ third_party_signed: Information from a 3PID invite.
+ ratelimit: Whether to rate limit the request.
+ content: The content of the created event.
+ require_consent: Whether consent is required.
+
+ Returns:
+ A tuple of the new event ID and stream ID.
+
+ Raises:
+ ShadowBanError if a shadow-banned requester attempts to send an invite.
+ """
+ if action == Membership.INVITE and requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
key = (room_id,)
with (await self.member_linearizer.queue(key)):
@@ -313,7 +350,7 @@ class RoomMemberHandler(object):
# later on.
content = dict(content)
- if not self.allow_per_room_profiles:
+ if not self.allow_per_room_profiles or requester.shadow_banned:
# Strip profile data, knowing that new profile data will be added to the
# event's content in event_creation_handler.create_event() using the target's
# global profile.
@@ -473,6 +510,17 @@ class RoomMemberHandler(object):
raise SynapseError(403, "Not allowed to join this room")
if not is_host_in_room:
+ time_now_s = self.clock.time()
+ (
+ allowed,
+ time_allowed,
+ ) = self._join_rate_limiter_remote.can_requester_do_action(requester,)
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now_s))
+ )
+
inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
@@ -500,26 +548,39 @@ class RoomMemberHandler(object):
user_id=target.to_string(), room_id=room_id
) # type: Optional[RoomsForUser]
if not invite:
+ logger.info(
+ "%s sent a leave request to %s, but that is not an active room "
+ "on this server, and there is no pending invite",
+ target,
+ room_id,
+ )
+
raise SynapseError(404, "Not a known room")
logger.info(
"%s rejects invite to %s from %s", target, room_id, invite.sender
)
- if self.hs.is_mine_id(invite.sender):
- # the inviter was on our server, but has now left. Carry on
- # with the normal rejection codepath.
- #
- # This is a bit of a hack, because the room might still be
- # active on other servers.
- pass
- else:
+ if not self.hs.is_mine_id(invite.sender):
# send the rejection to the inviter's HS (with fallback to
# local event)
return await self.remote_reject_invite(
invite.event_id, txn_id, requester, content,
)
+ # the inviter was on our server, but has now left. Carry on
+ # with the normal rejection codepath, which will also send the
+ # rejection out to any other servers we believe are still in the room.
+
+ # thanks to overzealous cleaning up of event_forward_extremities in
+ # `delete_old_current_state_events`, it's possible to end up with no
+ # forward extremities here. If that happens, let's just hang the
+ # rejection off the invite event.
+ #
+ # see: https://github.com/matrix-org/synapse/issues/7139
+ if len(latest_event_ids) == 0:
+ latest_event_ids = [invite.event_id]
+
return await self._local_membership_update(
requester=requester,
target=target,
@@ -669,25 +730,13 @@ class RoomMemberHandler(object):
(EventTypes.Member, event.state_key), None
)
- if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has actually joined the
- # room. Don't bother if the user is just changing their profile
- # info.
- newly_joined = True
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- newly_joined = prev_member_event.membership != Membership.JOIN
- if newly_joined:
- await self._user_joined_room(target_user, room_id)
- elif event.membership == Membership.LEAVE:
+ if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target_user, room_id)
- async def _can_guest_join(
- self, current_state_ids: Dict[Tuple[str, str], str]
- ) -> bool:
+ async def _can_guest_join(self, current_state_ids: StateMap[str]) -> bool:
"""
Returns whether a guest can join a room based on its current state.
"""
@@ -697,7 +746,7 @@ class RoomMemberHandler(object):
guest_access = await self.store.get_event(guest_access_id)
- return (
+ return bool(
guest_access
and guest_access.content
and "guest_access" in guest_access.content
@@ -755,6 +804,25 @@ class RoomMemberHandler(object):
new_room: bool = False,
id_access_token: Optional[str] = None,
) -> int:
+ """Invite a 3PID to a room.
+
+ Args:
+ room_id: The room to invite the 3PID to.
+ inviter: The user sending the invite.
+ medium: The 3PID's medium.
+ address: The 3PID's address.
+ id_server: The identity server to use.
+ requester: The user making the request.
+ txn_id: The transaction ID this is part of, or None if this is not
+ part of a transaction.
+ id_access_token: The optional identity server access token.
+
+ Returns:
+ The new stream ID.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
+ """
if self.config.block_non_admin_invites:
is_requester_admin = await self.auth.is_server_admin(requester.user)
if not is_requester_admin:
@@ -762,6 +830,11 @@ class RoomMemberHandler(object):
403, "Invites have been disabled on this server", Codes.FORBIDDEN
)
+ if requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
# We need to rate limit *before* we send out any 3PID invites, so we
# can't just rely on the standard ratelimiting of events.
await self.base_handler.ratelimit(requester)
@@ -809,6 +882,8 @@ class RoomMemberHandler(object):
raise SynapseError(403, "Invites have been disabled on this server")
if invitee:
+ # Note that update_membership with an action of "invite" can raise
+ # a ShadowBanError, but this was done above already.
_, stream_id = await self.update_membership(
requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
)
@@ -914,9 +989,7 @@ class RoomMemberHandler(object):
)
return stream_id
- async def _is_host_in_room(
- self, current_state_ids: Dict[Tuple[str, str], str]
- ) -> bool:
+ async def _is_host_in_room(self, current_state_ids: StateMap[str]) -> bool:
# Have we just created the room, and is this about to be the very
# first member event?
create_event_id = current_state_ids.get(("m.room.create", ""))
@@ -947,10 +1020,9 @@ class RoomMemberHandler(object):
class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
- super(RoomMemberMasterHandler, self).__init__(hs)
+ super().__init__(hs)
self.distributor = hs.get_distributor()
- self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")
async def _is_remote_room_too_complex(
@@ -1007,7 +1079,11 @@ class RoomMemberMasterHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
- if self.hs.config.limit_remote_rooms.enabled:
+ check_complexity = self.hs.config.limit_remote_rooms.enabled
+ if check_complexity and self.hs.config.limit_remote_rooms.admins_can_join:
+ check_complexity = not await self.auth.is_server_admin(user)
+
+ if check_complexity:
# Fetch the room complexity
too_complex = await self._is_remote_room_too_complex(
room_id, remote_room_hosts
@@ -1026,11 +1102,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
- await self._user_joined_room(user, room_id)
# Check the room we just joined wasn't too large, if we didn't fetch the
# complexity of it before.
- if self.hs.config.limit_remote_rooms.enabled:
+ if check_complexity:
if too_complex is False:
# We checked, and we're under the limit.
return event_id, stream_id
@@ -1043,7 +1118,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return event_id, stream_id
# The room is too large. Leave.
- requester = types.create_requester(user, None, False, None)
+ requester = types.create_requester(user, None, False, False, None)
await self.update_membership(
requester=requester, target=user, room_id=room_id, action="leave"
)
@@ -1169,11 +1244,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
)
return event.event_id, stream_id
- async def _user_joined_room(self, target: UserID, room_id: str) -> None:
- """Implements RoomMemberHandler._user_joined_room
- """
- user_joined_room(self.distributor, target, room_id)
-
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
|