diff options
Diffstat (limited to 'synapse/handlers/room_member.py')
-rw-r--r-- | synapse/handlers/room_member.py | 334 |
1 files changed, 221 insertions, 113 deletions
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index d1199a0644..5d4adf5bfd 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -26,18 +26,14 @@ from synapse.api.constants import ( GuestAccess, Membership, ) -from synapse.api.errors import ( - AuthError, - Codes, - LimitExceededError, - ShadowBanError, - SynapseError, -) +from synapse.api.errors import AuthError, Codes, ShadowBanError, SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.event_auth import get_named_level, get_power_level_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN +from synapse.logging import opentracing +from synapse.module_api import NOT_SPAM from synapse.storage.state import StateFilter from synapse.types import ( JsonDict, @@ -99,26 +95,57 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): rate_hz=hs.config.ratelimiting.rc_joins_local.per_second, burst_count=hs.config.ratelimiting.rc_joins_local.burst_count, ) + # Tracks joins from local users to rooms this server isn't a member of. + # I.e. joins this server makes by requesting /make_join /send_join from + # another server. self._join_rate_limiter_remote = Ratelimiter( store=self.store, clock=self.clock, rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second, burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count, ) + # TODO: find a better place to keep this Ratelimiter. + # It needs to be + # - written to by event persistence code + # - written to by something which can snoop on replication streams + # - read by the RoomMemberHandler to rate limit joins from local users + # - read by the FederationServer to rate limit make_joins and send_joins from + # other homeservers + # I wonder if a homeserver-wide collection of rate limiters might be cleaner? + self._join_rate_per_room_limiter = Ratelimiter( + store=self.store, + clock=self.clock, + rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second, + burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count, + ) + # Ratelimiter for invites, keyed by room (across all issuers, all + # recipients). self._invites_per_room_limiter = Ratelimiter( store=self.store, clock=self.clock, rate_hz=hs.config.ratelimiting.rc_invites_per_room.per_second, burst_count=hs.config.ratelimiting.rc_invites_per_room.burst_count, ) - self._invites_per_user_limiter = Ratelimiter( + + # Ratelimiter for invites, keyed by recipient (across all rooms, all + # issuers). + self._invites_per_recipient_limiter = Ratelimiter( store=self.store, clock=self.clock, rate_hz=hs.config.ratelimiting.rc_invites_per_user.per_second, burst_count=hs.config.ratelimiting.rc_invites_per_user.burst_count, ) + # Ratelimiter for invites, keyed by issuer (across all rooms, all + # recipients). + self._invites_per_issuer_limiter = Ratelimiter( + store=self.store, + clock=self.clock, + rate_hz=hs.config.ratelimiting.rc_invites_per_issuer.per_second, + burst_count=hs.config.ratelimiting.rc_invites_per_issuer.burst_count, + ) + self._third_party_invite_limiter = Ratelimiter( store=self.store, clock=self.clock, @@ -127,6 +154,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ) self.request_ratelimiter = hs.get_request_ratelimiter() + hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room) + + def _on_user_joined_room(self, event_id: str, room_id: str) -> None: + """Notify the rate limiter that a room join has occurred. + + Use this to inform the RoomMemberHandler about joins that have either + - taken place on another homeserver, or + - on another worker in this homeserver. + Joins actioned by this worker should use the usual `ratelimit` method, which + checks the limit and increments the counter in one go. + """ + self._join_rate_per_room_limiter.record_action(requester=None, key=room_id) @abc.abstractmethod async def _remote_join( @@ -140,7 +179,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): """Try and join a room that this server is not in Args: - requester + requester: The user making the request, according to the access token. remote_room_hosts: List of servers that can be used to join via. room_id: Room that we are trying to join user: User who is trying to join @@ -263,7 +302,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if room_id: await self._invites_per_room_limiter.ratelimit(requester, room_id) - await self._invites_per_user_limiter.ratelimit(requester, invitee_user_id) + await self._invites_per_recipient_limiter.ratelimit(requester, invitee_user_id) + if requester is not None: + await self._invites_per_issuer_limiter.ratelimit(requester) async def _local_membership_update( self, @@ -274,6 +315,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, state_event_ids: Optional[List[str]] = None, + depth: Optional[int] = None, txn_id: Optional[str] = None, ratelimit: bool = True, content: Optional[dict] = None, @@ -304,6 +346,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): prev_events are set so we need to set them ourself via this argument. This should normally be left as None, which will cause the auth_event_ids to be calculated based on the room state at the prev_events. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. txn_id: ratelimit: @@ -359,6 +404,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, state_event_ids=state_event_ids, + depth=depth, require_consent=require_consent, outlier=outlier, historical=historical, @@ -379,24 +425,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # Only rate-limit if the user actually joined the room, otherwise we'll end # up blocking profile updates. if newly_joined and ratelimit: - time_now_s = self.clock.time() - ( - allowed, - time_allowed, - ) = await self._join_rate_limiter_local.can_do_action(requester) - - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now_s)) - ) - - result_event = await self.event_creation_handler.handle_new_client_event( - requester, - event, - context, - extra_users=[target], - ratelimit=ratelimit, - ) + await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, key=room_id, update=False + ) + with opentracing.start_active_span("handle_new_client_event"): + result_event = await self.event_creation_handler.handle_new_client_event( + requester, + event, + context, + extra_users=[target], + ratelimit=ratelimit, + ) if event.membership == Membership.LEAVE: if prev_member_event_id: @@ -464,6 +504,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, state_event_ids: Optional[List[str]] = None, + depth: Optional[int] = None, ) -> Tuple[str, int]: """Update a user's membership in a room. @@ -499,6 +540,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): prev_events are set so we need to set them ourself via this argument. This should normally be left as None, which will cause the auth_event_ids to be calculated based on the room state at the prev_events. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. Returns: A tuple of the new event ID and stream ID. @@ -521,24 +565,26 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # by application services), and then by room ID. async with self.member_as_limiter.queue(as_id): async with self.member_linearizer.queue(key): - result = await self.update_membership_locked( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - new_room=new_room, - require_consent=require_consent, - outlier=outlier, - historical=historical, - allow_no_prev_events=allow_no_prev_events, - prev_event_ids=prev_event_ids, - state_event_ids=state_event_ids, - ) + with opentracing.start_active_span("update_membership_locked"): + result = await self.update_membership_locked( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + new_room=new_room, + require_consent=require_consent, + outlier=outlier, + historical=historical, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + depth=depth, + ) return result @@ -560,6 +606,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, state_event_ids: Optional[List[str]] = None, + depth: Optional[int] = None, ) -> Tuple[str, int]: """Helper for update_membership. @@ -597,10 +644,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): prev_events are set so we need to set them ourself via this argument. This should normally be left as None, which will cause the auth_event_ids to be calculated based on the room state at the prev_events. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. Returns: A tuple of the new event ID and stream ID. """ + content_specified = bool(content) if content is None: content = {} @@ -638,7 +689,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): errcode=Codes.BAD_JSON, ) - if "avatar_url" in content: + if "avatar_url" in content and content.get("avatar_url") is not None: if not await self.profile_handler.check_avatar_size_and_mime_type( content["avatar_url"], ): @@ -683,7 +734,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if target_id == self._server_notices_mxid: raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user") - block_invite = False + block_invite_result = None if ( self._server_notices_mxid is not None @@ -693,7 +744,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): is_requester_admin = True else: - is_requester_admin = await self.auth.is_server_admin(requester.user) + is_requester_admin = await self.auth.is_server_admin(requester) if not is_requester_admin: if self.config.server.block_non_admin_invites: @@ -701,16 +752,22 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): "Blocking invite: user is not admin and non-admin " "invites disabled" ) - block_invite = True + block_invite_result = (Codes.FORBIDDEN, {}) - if not await self.spam_checker.user_may_invite( + spam_check = await self.spam_checker.user_may_invite( requester.user.to_string(), target_id, room_id - ): + ) + if spam_check != NOT_SPAM: logger.info("Blocking invite due to spam checker") - block_invite = True + block_invite_result = spam_check - if block_invite: - raise SynapseError(403, "Invites have been disabled on this server") + if block_invite_result is not None: + raise SynapseError( + 403, + "Invites have been disabled on this server", + errcode=block_invite_result[0], + additional_fields=block_invite_result[1], + ) # An empty prev_events list is allowed as long as the auth_event_ids are present if prev_event_ids is not None: @@ -724,6 +781,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, state_event_ids=state_event_ids, + depth=depth, content=content, require_consent=require_consent, outlier=outlier, @@ -732,14 +790,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): latest_event_ids = await self.store.get_prev_events_for_room(room_id) - current_state_ids = await self.state_handler.get_current_state_ids( - room_id, latest_event_ids=latest_event_ids + state_before_join = await self.state_handler.compute_state_after_events( + room_id, latest_event_ids ) # TODO: Refactor into dictionary of explicitly allowed transitions # between old and new state, with specific error messages for some # transitions and generic otherwise - old_state_id = current_state_ids.get((EventTypes.Member, target.to_string())) + old_state_id = state_before_join.get((EventTypes.Member, target.to_string())) if old_state_id: old_state = await self.store.get_event(old_state_id, allow_none=True) old_membership = old_state.content.get("membership") if old_state else None @@ -790,11 +848,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if action == "kick": raise AuthError(403, "The target user is not in the room") - is_host_in_room = await self._is_host_in_room(current_state_ids) + is_host_in_room = await self._is_host_in_room(state_before_join) if effective_membership_state == Membership.JOIN: if requester.is_guest: - guest_can_join = await self._can_guest_join(current_state_ids) + guest_can_join = await self._can_guest_join(state_before_join) if not guest_can_join: # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. @@ -810,7 +868,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): bypass_spam_checker = True else: - bypass_spam_checker = await self.auth.is_server_admin(requester.user) + bypass_spam_checker = await self.auth.is_server_admin(requester) inviter = await self._get_inviter(target.to_string(), room_id) if ( @@ -818,30 +876,37 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # We assume that if the spam checker allowed the user to create # a room then they're allowed to join it. and not new_room - and not await self.spam_checker.user_may_join_room( + ): + spam_check = await self.spam_checker.user_may_join_room( target.to_string(), room_id, is_invited=inviter is not None ) - ): - raise SynapseError(403, "Not allowed to join this room") + if spam_check != NOT_SPAM: + raise SynapseError( + 403, + "Not allowed to join this room", + errcode=spam_check[0], + additional_fields=spam_check[1], + ) # Check if a remote join should be performed. remote_join, remote_room_hosts = await self._should_perform_remote_join( - target.to_string(), room_id, remote_room_hosts, content, is_host_in_room + target.to_string(), + room_id, + remote_room_hosts, + content, + is_host_in_room, + state_before_join, ) if remote_join: if ratelimit: - time_now_s = self.clock.time() - ( - allowed, - time_allowed, - ) = await self._join_rate_limiter_remote.can_do_action( + await self._join_rate_limiter_remote.ratelimit( requester, ) - - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now_s)) - ) + await self._join_rate_per_room_limiter.ratelimit( + requester, + key=room_id, + update=False, + ) inviter = await self._get_inviter(target.to_string(), room_id) if inviter and not self.hs.is_mine(inviter): @@ -849,10 +914,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): content["membership"] = Membership.JOIN - profile = self.profile_handler - if not content_specified: - content["displayname"] = await profile.get_displayname(target) - content["avatar_url"] = await profile.get_avatar_url(target) + try: + profile = self.profile_handler + if not content_specified: + content["displayname"] = await profile.get_displayname(target) + content["avatar_url"] = await profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information while processing remote join for %r: %s", + target, + e, + ) if requester.is_guest: content["kind"] = "guest" @@ -929,11 +1001,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): content["membership"] = Membership.KNOCK - profile = self.profile_handler - if "displayname" not in content: - content["displayname"] = await profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = await profile.get_avatar_url(target) + try: + profile = self.profile_handler + if "displayname" not in content: + content["displayname"] = await profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = await profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information while processing remote knock for %r: %s", + target, + e, + ) return await self.remote_knock( remote_room_hosts, room_id, target, content @@ -948,6 +1027,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit=ratelimit, prev_event_ids=latest_event_ids, state_event_ids=state_event_ids, + depth=depth, content=content, require_consent=require_consent, outlier=outlier, @@ -960,6 +1040,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): remote_room_hosts: List[str], content: JsonDict, is_host_in_room: bool, + state_before_join: StateMap[str], ) -> Tuple[bool, List[str]]: """ Check whether the server should do a remote join (as opposed to a local @@ -979,6 +1060,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): content: The content to use as the event body of the join. This may be modified. is_host_in_room: True if the host is in the room. + state_before_join: The state before the join event (i.e. the resolution of + the states after its parent events). Returns: A tuple of: @@ -995,20 +1078,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # If the host is in the room, but not one of the authorised hosts # for restricted join rules, a remote join must be used. room_version = await self.store.get_room_version(room_id) - current_state_ids = await self._storage_controllers.state.get_current_state_ids( - room_id - ) # If restricted join rules are not being used, a local join can always # be used. if not await self.event_auth_handler.has_restricted_join_rules( - current_state_ids, room_version + state_before_join, room_version ): return False, [] # If the user is invited to the room or already joined, the join # event can always be issued locally. - prev_member_event_id = current_state_ids.get((EventTypes.Member, user_id), None) + prev_member_event_id = state_before_join.get((EventTypes.Member, user_id), None) prev_member_event = None if prev_member_event_id: prev_member_event = await self.store.get_event(prev_member_event_id) @@ -1023,10 +1103,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # # If not, generate a new list of remote hosts based on which # can issue invites. - event_map = await self.store.get_events(current_state_ids.values()) + event_map = await self.store.get_events(state_before_join.values()) current_state = { state_key: event_map[event_id] - for state_key, event_id in current_state_ids.items() + for state_key, event_id in state_before_join.items() } allowed_servers = get_servers_from_users( get_users_which_can_issue_invite(current_state) @@ -1040,7 +1120,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # Ensure the member should be allowed access via membership in a room. await self.event_auth_handler.check_restricted_join_rules( - current_state_ids, room_version, user_id, prev_member_event + state_before_join, room_version, user_id, prev_member_event ) # If this is going to be a local join, additional information must @@ -1050,7 +1130,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): EventContentFields.AUTHORISING_USER ] = await self.event_auth_handler.get_user_which_could_invite( room_id, - current_state_ids, + state_before_join, ) return False, [] @@ -1302,8 +1382,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): id_server: str, requester: Requester, txn_id: Optional[str], - id_access_token: Optional[str] = None, - ) -> int: + id_access_token: str, + prev_event_ids: Optional[List[str]] = None, + depth: Optional[int] = None, + ) -> Tuple[str, int]: """Invite a 3PID to a room. Args: @@ -1315,16 +1397,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): requester: The user making the request. txn_id: The transaction ID this is part of, or None if this is not part of a transaction. - id_access_token: The optional identity server access token. + id_access_token: Identity server access token. + depth: Override the depth used to order the event in the DAG. + prev_event_ids: The event IDs to use as the prev events + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. Returns: - The new stream ID. + Tuple of event ID and stream ordering position Raises: ShadowBanError if the requester has been shadow-banned. """ if self.config.server.block_non_admin_invites: - is_requester_admin = await self.auth.is_server_admin(requester.user) + is_requester_admin = await self.auth.is_server_admin(requester) if not is_requester_admin: raise SynapseError( 403, "Invites have been disabled on this server", Codes.FORBIDDEN @@ -1364,20 +1450,26 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # We don't check the invite against the spamchecker(s) here (through # user_may_invite) because we'll do it further down the line anyway (in # update_membership_locked). - _, stream_id = await self.update_membership( + event_id, stream_id = await self.update_membership( requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id ) else: # Check if the spamchecker(s) allow this invite to go through. - if not await self.spam_checker.user_may_send_3pid_invite( + spam_check = await self.spam_checker.user_may_send_3pid_invite( inviter_userid=requester.user.to_string(), medium=medium, address=address, room_id=room_id, - ): - raise SynapseError(403, "Cannot send threepid invite") + ) + if spam_check != NOT_SPAM: + raise SynapseError( + 403, + "Cannot send threepid invite", + errcode=spam_check[0], + additional_fields=spam_check[1], + ) - stream_id = await self._make_and_store_3pid_invite( + event, stream_id = await self._make_and_store_3pid_invite( requester, id_server, medium, @@ -1386,9 +1478,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): inviter, txn_id=txn_id, id_access_token=id_access_token, + prev_event_ids=prev_event_ids, + depth=depth, ) + event_id = event.event_id - return stream_id + return event_id, stream_id async def _make_and_store_3pid_invite( self, @@ -1399,8 +1494,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): room_id: str, user: UserID, txn_id: Optional[str], - id_access_token: Optional[str] = None, - ) -> int: + id_access_token: str, + prev_event_ids: Optional[List[str]] = None, + depth: Optional[int] = None, + ) -> Tuple[EventBase, int]: room_state = await self._storage_controllers.state.get_current_state( room_id, StateFilter.from_types( @@ -1493,8 +1590,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): }, ratelimit=False, txn_id=txn_id, + prev_event_ids=prev_event_ids, + depth=depth, ) - return stream_id + return event, stream_id async def _is_host_in_room(self, current_state_ids: StateMap[str]) -> bool: # Have we just created the room, and is this about to be the very @@ -1521,8 +1620,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): async def _is_server_notice_room(self, room_id: str) -> bool: if self._server_notices_mxid is None: return False - user_ids = await self.store.get_users_in_room(room_id) - return self._server_notices_mxid in user_ids + is_server_notices_room = await self.store.check_local_user_in_room( + user_id=self._server_notices_mxid, room_id=room_id + ) + return is_server_notices_room class RoomMemberMasterHandler(RoomMemberHandler): @@ -1583,14 +1684,18 @@ class RoomMemberMasterHandler(RoomMemberHandler): ] if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") + raise SynapseError( + 404, + "Can't join remote room because no servers " + "that are in the room have been provided.", + ) check_complexity = self.hs.config.server.limit_remote_rooms.enabled if ( check_complexity and self.hs.config.server.limit_remote_rooms.admins_can_join ): - check_complexity = not await self.auth.is_server_admin(user) + check_complexity = not await self.store.is_server_admin(user) if check_complexity: # Fetch the room complexity @@ -1820,8 +1925,11 @@ class RoomMemberMasterHandler(RoomMemberHandler): ]: raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) - if membership: - await self.store.forget(user_id, room_id) + # In normal case this call is only required if `membership` is not `None`. + # But: After the last member had left the room, the background update + # `_background_remove_left_rooms` is deleting rows related to this room from + # the table `current_state_events` and `get_current_state_events` is `None`. + await self.store.forget(user_id, room_id) def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: |