diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 8feba8c90a..c002886324 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,12 +17,10 @@ import abc
import logging
import random
from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
-
-from unpaddedbase64 import encode_base64
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse import types
-from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -31,13 +29,8 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
-from synapse.api.room_versions import EventFormatVersions
-from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
-from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
-from synapse.events.validator import EventValidator
-from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_left_room
@@ -64,9 +57,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.state_handler = hs.get_state_handler()
self.config = hs.config
- self.federation_handler = hs.get_handlers().federation_handler
- self.directory_handler = hs.get_handlers().directory_handler
- self.identity_handler = hs.get_handlers().identity_handler
+ self.federation_handler = hs.get_federation_handler()
+ self.directory_handler = hs.get_directory_handler()
+ self.identity_handler = hs.get_identity_handler()
self.registration_handler = hs.get_registration_handler()
self.profile_handler = hs.get_profile_handler()
self.event_creation_handler = hs.get_event_creation_handler()
@@ -171,6 +164,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if requester.is_guest:
content["kind"] = "guest"
+ # Check if we already have an event with a matching transaction ID. (We
+ # do this check just before we persist an event as well, but may as well
+ # do it up front for efficiency.)
+ if txn_id and requester.access_token_id:
+ existing_event_id = await self.store.get_event_id_from_transaction_id(
+ room_id, requester.user.to_string(), requester.access_token_id, txn_id,
+ )
+ if existing_event_id:
+ event_pos = await self.store.get_position_for_event(existing_event_id)
+ return existing_event_id, event_pos.stream
+
event, context = await self.event_creation_handler.create_event(
requester,
{
@@ -182,21 +186,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# For backwards compatibility:
"membership": membership,
},
- token_id=requester.access_token_id,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
require_consent=require_consent,
)
- # Check if this event matches the previous membership event for the user.
- duplicate = await self.event_creation_handler.deduplicate_state_event(
- event, context
- )
- if duplicate is not None:
- # Discard the new event since this membership change is a no-op.
- _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
- return duplicate.event_id, stream_id
-
prev_state_ids = await context.get_prev_state_ids()
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
@@ -221,7 +215,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)
- stream_id = await self.event_creation_handler.handle_new_client_event(
+ result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)
@@ -231,7 +225,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)
- return event.event_id, stream_id
+ # we know it was persisted, so should have a stream ordering
+ assert result_event.internal_metadata.stream_ordering
+ return result_event.event_id, result_event.internal_metadata.stream_ordering
async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
@@ -247,7 +243,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
user_account_data, _ = await self.store.get_account_data_for_user(user_id)
# Copy direct message state if applicable
- direct_rooms = user_account_data.get("m.direct", {})
+ direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {})
# Check which key this room is under
if isinstance(direct_rooms, dict):
@@ -258,7 +254,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# Save back to user's m.direct account data
await self.store.add_account_data_for_user(
- user_id, "m.direct", direct_rooms
+ user_id, AccountDataTypes.DIRECT, direct_rooms
)
break
@@ -310,7 +306,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
key = (room_id,)
with (await self.member_linearizer.queue(key)):
- result = await self._update_membership(
+ result = await self.update_membership_locked(
requester,
target,
room_id,
@@ -325,7 +321,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
return result
- async def _update_membership(
+ async def update_membership_locked(
self,
requester: Requester,
target: UserID,
@@ -338,6 +334,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content: Optional[dict] = None,
require_consent: bool = True,
) -> Tuple[str, int]:
+ """Helper for update_membership.
+
+ Assumes that the membership linearizer is already held for the room.
+ """
content_specified = bool(content)
if content is None:
content = {}
@@ -346,7 +346,15 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# later on.
content = dict(content)
- if not self.allow_per_room_profiles or requester.shadow_banned:
+ # allow the server notices mxid to set room-level profile
+ is_requester_server_notices_user = (
+ self._server_notices_mxid is not None
+ and requester.user.to_string() == self._server_notices_mxid
+ )
+
+ if (
+ not self.allow_per_room_profiles and not is_requester_server_notices_user
+ ) or requester.shadow_banned:
# Strip profile data, knowing that new profile data will be added to the
# event's content in event_creation_handler.create_event() using the target's
# global profile.
@@ -441,12 +449,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
same_membership = old_membership == effective_membership_state
same_sender = requester.user.to_string() == old_state.sender
if same_sender and same_membership and same_content:
- _, stream_id = await self.store.get_event_ordering(
- old_state.event_id
- )
+ # duplicate event.
+ # we know it was persisted, so must have a stream ordering.
+ assert old_state.internal_metadata.stream_ordering
return (
old_state.event_id,
- stream_id,
+ old_state.internal_metadata.stream_ordering,
)
if old_membership in ["ban", "leave"] and action == "kick":
@@ -514,10 +522,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
# perhaps we've been invited
- invite = await self.store.get_invite_for_local_user_in_room(
- user_id=target.to_string(), room_id=room_id
- ) # type: Optional[RoomsForUser]
- if not invite:
+ (
+ current_membership_type,
+ current_membership_event_id,
+ ) = await self.store.get_local_current_membership_for_user_in_room(
+ target.to_string(), room_id
+ )
+ if (
+ current_membership_type != Membership.INVITE
+ or not current_membership_event_id
+ ):
logger.info(
"%s sent a leave request to %s, but that is not an active room "
"on this server, and there is no pending invite",
@@ -527,6 +541,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
raise SynapseError(404, "Not a known room")
+ invite = await self.store.get_event(current_membership_event_id)
logger.info(
"%s rejects invite to %s from %s", target, room_id, invite.sender
)
@@ -642,7 +657,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
async def send_membership_event(
self,
- requester: Requester,
+ requester: Optional[Requester],
event: EventBase,
context: EventContext,
ratelimit: bool = True,
@@ -672,12 +687,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
else:
requester = types.create_requester(target_user)
- prev_event = await self.event_creation_handler.deduplicate_state_event(
- event, context
- )
- if prev_event is not None:
- return
-
prev_state_ids = await context.get_prev_state_ids()
if event.membership == Membership.JOIN:
if requester.is_guest:
@@ -692,7 +701,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
- await self.event_creation_handler.handle_new_client_event(
+ event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target_user], ratelimit=ratelimit
)
@@ -970,6 +979,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
self.distributor = hs.get_distributor()
self.distributor.declare("user_left_room")
+ self._server_name = hs.hostname
async def _is_remote_room_too_complex(
self, room_id: str, remote_room_hosts: List[str]
@@ -1064,7 +1074,9 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return event_id, stream_id
# The room is too large. Leave.
- requester = types.create_requester(user, None, False, False, None)
+ requester = types.create_requester(
+ user, authenticated_entity=self._server_name
+ )
await self.update_membership(
requester=requester, target=user, room_id=room_id, action="leave"
)
@@ -1109,57 +1121,38 @@ class RoomMemberMasterHandler(RoomMemberHandler):
#
logger.warning("Failed to reject invite: %s", e)
- return await self._locally_reject_invite(
+ return await self._generate_local_out_of_band_leave(
invite_event, txn_id, requester, content
)
- async def _locally_reject_invite(
+ async def _generate_local_out_of_band_leave(
self,
- invite_event: EventBase,
+ previous_membership_event: EventBase,
txn_id: Optional[str],
requester: Requester,
content: JsonDict,
) -> Tuple[str, int]:
- """Generate a local invite rejection
+ """Generate a local leave event for a room
- This is called after we fail to reject an invite via a remote server. It
- generates an out-of-band membership event locally.
+ This can be called after we e.g fail to reject an invite via a remote server.
+ It generates an out-of-band membership event locally.
Args:
- invite_event: the invite to be rejected
+ previous_membership_event: the previous membership event for this user
txn_id: optional transaction ID supplied by the client
- requester: user making the rejection request, according to the access token
- content: additional content to include in the rejection event.
+ requester: user making the request, according to the access token
+ content: additional content to include in the leave event.
Normally an empty dict.
- """
- room_id = invite_event.room_id
- target_user = invite_event.state_key
- room_version = await self.store.get_room_version(room_id)
+ Returns:
+ A tuple containing (event_id, stream_id of the leave event)
+ """
+ room_id = previous_membership_event.room_id
+ target_user = previous_membership_event.state_key
content["membership"] = Membership.LEAVE
- # the auth events for the new event are the same as that of the invite, plus
- # the invite itself.
- #
- # the prev_events are just the invite.
- invite_hash = invite_event.event_id # type: Union[str, Tuple]
- if room_version.event_format == EventFormatVersions.V1:
- alg, h = compute_event_reference_hash(invite_event)
- invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
-
- auth_events = tuple(invite_event.auth_events) + (invite_hash,)
- prev_events = (invite_hash,)
-
- # we cap depth of generated events, to ensure that they are not
- # rejected by other servers (and so that they can be persisted in
- # the db)
- depth = min(invite_event.depth + 1, MAX_DEPTH)
-
event_dict = {
- "depth": depth,
- "auth_events": auth_events,
- "prev_events": prev_events,
"type": EventTypes.Member,
"room_id": room_id,
"sender": target_user,
@@ -1167,28 +1160,30 @@ class RoomMemberMasterHandler(RoomMemberHandler):
"state_key": target_user,
}
- event = create_local_event_from_event_dict(
- clock=self.clock,
- hostname=self.hs.hostname,
- signing_key=self.hs.signing_key,
- room_version=room_version,
- event_dict=event_dict,
+ # the auth events for the new event are the same as that of the previous event, plus
+ # the event itself.
+ #
+ # the prev_events consist solely of the previous membership event.
+ prev_event_ids = [previous_membership_event.event_id]
+ auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids
+
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
)
event.internal_metadata.outlier = True
event.internal_metadata.out_of_band_membership = True
- if txn_id is not None:
- event.internal_metadata.txn_id = txn_id
- if requester.access_token_id is not None:
- event.internal_metadata.token_id = requester.access_token_id
- EventValidator().validate_new(event, self.config)
-
- context = await self.state_handler.compute_event_context(event)
- context.app_service = requester.app_service
- stream_id = await self.event_creation_handler.handle_new_client_event(
+ result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
- return event.event_id, stream_id
+ # we know it was persisted, so must have a stream ordering
+ assert result_event.internal_metadata.stream_ordering
+
+ return result_event.event_id, result_event.internal_metadata.stream_ordering
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
|