diff options
author | Sorunome <mail@sorunome.de> | 2021-06-09 20:39:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-09 19:39:51 +0100 |
commit | d936371b698ea3085472ee83ae9a88ea7832280e (patch) | |
tree | 7392154f4697974cd1d1d5f2c3f974c507e51a74 /synapse/handlers | |
parent | Limit the number of in-flight /keys/query requests from a single device. (#10... (diff) | |
download | synapse-d936371b698ea3085472ee83ae9a88ea7832280e.tar.xz |
Implement knock feature (#6739)
This PR aims to implement the knock feature as proposed in https://github.com/matrix-org/matrix-doc/pull/2403 Signed-off-by: Sorunome mail@sorunome.de Signed-off-by: Andrew Morgan andrewm@element.io
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/federation.py | 186 | ||||
-rw-r--r-- | synapse/handlers/message.py | 30 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 197 | ||||
-rw-r--r-- | synapse/handlers/room_member_worker.py | 55 | ||||
-rw-r--r-- | synapse/handlers/stats.py | 7 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 87 |
6 files changed, 495 insertions, 67 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index abbb71424d..6e40e2c216 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1,6 +1,5 @@ -# Copyright 2014-2016 OpenMarket Ltd -# Copyright 2017-2018 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2014-2021 The Matrix.org Foundation C.I.C. +# Copyright 2020 Sorunome # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1550,6 +1549,77 @@ class FederationHandler(BaseHandler): run_in_background(self._handle_queued_pdus, room_queue) + @log_function + async def do_knock( + self, + target_hosts: List[str], + room_id: str, + knockee: str, + content: JsonDict, + ) -> Tuple[str, int]: + """Sends the knock to the remote server. + + This first triggers a make_knock request that returns a partial + event that we can fill out and sign. This is then sent to the + remote server via send_knock. + + Knock events must be signed by the knockee's server before distributing. + + Args: + target_hosts: A list of hosts that we want to try knocking through. + room_id: The ID of the room to knock on. + knockee: The ID of the user who is knocking. + content: The content of the knock event. + + Returns: + A tuple of (event ID, stream ID). + + Raises: + SynapseError: If the chosen remote server returns a 3xx/4xx code. + RuntimeError: If no servers were reachable. + """ + logger.debug("Knocking on room %s on behalf of user %s", room_id, knockee) + + # Inform the remote server of the room versions we support + supported_room_versions = list(KNOWN_ROOM_VERSIONS.keys()) + + # Ask the remote server to create a valid knock event for us. Once received, + # we sign the event + params = {"ver": supported_room_versions} # type: Dict[str, Iterable[str]] + origin, event, event_format_version = await self._make_and_verify_event( + target_hosts, room_id, knockee, Membership.KNOCK, content, params=params + ) + + # Record the room ID and its version so that we have a record of the room + await self._maybe_store_room_on_outlier_membership( + room_id=event.room_id, room_version=event_format_version + ) + + # Initially try the host that we successfully called /make_knock on + try: + target_hosts.remove(origin) + target_hosts.insert(0, origin) + except ValueError: + pass + + # Send the signed event back to the room, and potentially receive some + # further information about the room in the form of partial state events + stripped_room_state = await self.federation_client.send_knock( + target_hosts, event + ) + + # Store any stripped room state events in the "unsigned" key of the event. + # This is a bit of a hack and is cribbing off of invites. Basically we + # store the room state here and retrieve it again when this event appears + # in the invitee's sync stream. It is stripped out for all other local users. + event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"] + + context = await self.state_handler.compute_event_context(event) + stream_id = await self.persist_events_and_notify( + event.room_id, [(event, context)] + ) + return event.event_id, stream_id + async def _handle_queued_pdus( self, room_queue: List[Tuple[EventBase, str]] ) -> None: @@ -1915,6 +1985,116 @@ class FederationHandler(BaseHandler): return None + @log_function + async def on_make_knock_request( + self, origin: str, room_id: str, user_id: str + ) -> EventBase: + """We've received a make_knock request, so we create a partial + knock event for the room and return that. We do *not* persist or + process it until the other server has signed it and sent it back. + + Args: + origin: The (verified) server name of the requesting server. + room_id: The room to create the knock event in. + user_id: The user to create the knock for. + + Returns: + The partial knock event. + """ + if get_domain_from_id(user_id) != origin: + logger.info( + "Get /xyz.amorgan.knock/make_knock request for user %r" + "from different origin %s, ignoring", + user_id, + origin, + ) + raise SynapseError(403, "User not from origin", Codes.FORBIDDEN) + + room_version = await self.store.get_room_version_id(room_id) + + builder = self.event_builder_factory.new( + room_version, + { + "type": EventTypes.Member, + "content": {"membership": Membership.KNOCK}, + "room_id": room_id, + "sender": user_id, + "state_key": user_id, + }, + ) + + event, context = await self.event_creation_handler.create_new_client_event( + builder=builder + ) + + event_allowed = await self.third_party_event_rules.check_event_allowed( + event, context + ) + if not event_allowed: + logger.warning("Creation of knock %s forbidden by third-party rules", event) + raise SynapseError( + 403, "This event is not allowed in this context", Codes.FORBIDDEN + ) + + try: + # The remote hasn't signed it yet, obviously. We'll do the full checks + # when we get the event back in `on_send_knock_request` + await self.auth.check_from_context( + room_version, event, context, do_sig_check=False + ) + except AuthError as e: + logger.warning("Failed to create new knock %r because %s", event, e) + raise e + + return event + + @log_function + async def on_send_knock_request( + self, origin: str, event: EventBase + ) -> EventContext: + """ + We have received a knock event for a room. Verify that event and send it into the room + on the knocking homeserver's behalf. + + Args: + origin: The remote homeserver of the knocking user. + event: The knocking member event that has been signed by the remote homeserver. + + Returns: + The context of the event after inserting it into the room graph. + """ + logger.debug( + "on_send_knock_request: Got event: %s, signatures: %s", + event.event_id, + event.signatures, + ) + + if get_domain_from_id(event.sender) != origin: + logger.info( + "Got /xyz.amorgan.knock/send_knock request for user %r " + "from different origin %s", + event.sender, + origin, + ) + raise SynapseError(403, "User not from origin", Codes.FORBIDDEN) + + event.internal_metadata.outlier = False + + context = await self.state_handler.compute_event_context(event) + + await self._auth_and_persist_event(origin, event, context) + + event_allowed = await self.third_party_event_rules.check_event_allowed( + event, context + ) + if not event_allowed: + logger.info("Sending of knock %s forbidden by third-party rules", event) + raise SynapseError( + 403, "This event is not allowed in this context", Codes.FORBIDDEN + ) + + return context + async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]: """Returns the state at the event. i.e. not including said event.""" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9f365eb5ad..4d2255bdf1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1,6 +1,7 @@ # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017-2018 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2020 The Matrix.org Foundation C.I.C. +# Copyrignt 2020 Sorunome # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -398,13 +399,14 @@ class EventCreationHandler: self._events_shard_config = self.config.worker.events_shard_config self._instance_name = hs.get_instance_name() - self.room_invite_state_types = self.hs.config.api.room_prejoin_state + self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state - self.membership_types_to_include_profile_data_in = ( - {Membership.JOIN, Membership.INVITE} - if self.hs.config.include_profile_data_on_invite - else {Membership.JOIN} - ) + self.membership_types_to_include_profile_data_in = { + Membership.JOIN, + Membership.KNOCK, + } + if self.hs.config.include_profile_data_on_invite: + self.membership_types_to_include_profile_data_in.add(Membership.INVITE) self.send_event = ReplicationSendEventRestServlet.make_client(hs) @@ -961,8 +963,8 @@ class EventCreationHandler: room_version = await self.store.get_room_version_id(event.room_id) if event.internal_metadata.is_out_of_band_membership(): - # the only sort of out-of-band-membership events we expect to see here - # are invite rejections we have generated ourselves. + # the only sort of out-of-band-membership events we expect to see here are + # invite rejections and rescinded knocks that we have generated ourselves. assert event.type == EventTypes.Member assert event.content["membership"] == Membership.LEAVE else: @@ -1239,7 +1241,7 @@ class EventCreationHandler: "invite_room_state" ] = await self.store.get_stripped_room_state_from_event_context( context, - self.room_invite_state_types, + self.room_prejoin_state_types, membership_user_id=event.sender, ) @@ -1257,6 +1259,14 @@ class EventCreationHandler: # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) + if event.content["membership"] == Membership.KNOCK: + event.unsigned[ + "knock_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, + ) + if event.type == EventTypes.Redaction: original_event = await self.store.get_event( event.redacts, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index d6fc43e798..c26963b1e1 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1,4 +1,5 @@ # Copyright 2016-2020 The Matrix.org Foundation C.I.C. +# Copyright 2020 Sorunome # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import abc import logging import random @@ -30,7 +30,15 @@ from synapse.api.errors import ( from synapse.api.ratelimiting import Ratelimiter from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID +from synapse.types import ( + JsonDict, + Requester, + RoomAlias, + RoomID, + StateMap, + UserID, + get_domain_from_id, +) from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_left_room @@ -126,6 +134,24 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod + async def remote_knock( + self, + remote_room_hosts: List[str], + room_id: str, + user: UserID, + content: dict, + ) -> Tuple[str, int]: + """Try and knock on a room that this server is not in + + Args: + remote_room_hosts: List of servers that can be used to knock via. + room_id: Room that we are trying to knock on. + user: User who is trying to knock. + content: A dict that should be used as the content of the knock event. + """ + raise NotImplementedError() + + @abc.abstractmethod async def remote_reject_invite( self, invite_event_id: str, @@ -149,6 +175,27 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod + async def remote_rescind_knock( + self, + knock_event_id: str, + txn_id: Optional[str], + requester: Requester, + content: JsonDict, + ) -> Tuple[str, int]: + """Rescind a local knock made on a remote room. + + Args: + knock_event_id: The ID of the knock event to rescind. + txn_id: An optional transaction ID supplied by the client. + requester: The user making the request, according to the access token. + content: The content of the generated leave event. + + Returns: + A tuple containing (event_id, stream_id of the leave event). + """ + raise NotImplementedError() + + @abc.abstractmethod async def _user_left_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has left the room. @@ -603,53 +650,82 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): elif effective_membership_state == Membership.LEAVE: if not is_host_in_room: - # perhaps we've been invited + # Figure out the user's current membership state for the room ( current_membership_type, current_membership_event_id, ) = await self.store.get_local_current_membership_for_user_in_room( target.to_string(), room_id ) - if ( - current_membership_type != Membership.INVITE - or not current_membership_event_id - ): + if not current_membership_type or not current_membership_event_id: logger.info( "%s sent a leave request to %s, but that is not an active room " - "on this server, and there is no pending invite", + "on this server, or there is no pending invite or knock", target, room_id, ) raise SynapseError(404, "Not a known room") - invite = await self.store.get_event(current_membership_event_id) - logger.info( - "%s rejects invite to %s from %s", target, room_id, invite.sender - ) + # perhaps we've been invited + if current_membership_type == Membership.INVITE: + invite = await self.store.get_event(current_membership_event_id) + logger.info( + "%s rejects invite to %s from %s", + target, + room_id, + invite.sender, + ) - if not self.hs.is_mine_id(invite.sender): - # send the rejection to the inviter's HS (with fallback to - # local event) - return await self.remote_reject_invite( - invite.event_id, - txn_id, - requester, - content, + if not self.hs.is_mine_id(invite.sender): + # send the rejection to the inviter's HS (with fallback to + # local event) + return await self.remote_reject_invite( + invite.event_id, + txn_id, + requester, + content, + ) + + # the inviter was on our server, but has now left. Carry on + # with the normal rejection codepath, which will also send the + # rejection out to any other servers we believe are still in the room. + + # thanks to overzealous cleaning up of event_forward_extremities in + # `delete_old_current_state_events`, it's possible to end up with no + # forward extremities here. If that happens, let's just hang the + # rejection off the invite event. + # + # see: https://github.com/matrix-org/synapse/issues/7139 + if len(latest_event_ids) == 0: + latest_event_ids = [invite.event_id] + + # or perhaps this is a remote room that a local user has knocked on + elif current_membership_type == Membership.KNOCK: + knock = await self.store.get_event(current_membership_event_id) + return await self.remote_rescind_knock( + knock.event_id, txn_id, requester, content ) - # the inviter was on our server, but has now left. Carry on - # with the normal rejection codepath, which will also send the - # rejection out to any other servers we believe are still in the room. + elif ( + self.config.experimental.msc2403_enabled + and effective_membership_state == Membership.KNOCK + ): + if not is_host_in_room: + # The knock needs to be sent over federation instead + remote_room_hosts.append(get_domain_from_id(room_id)) - # thanks to overzealous cleaning up of event_forward_extremities in - # `delete_old_current_state_events`, it's possible to end up with no - # forward extremities here. If that happens, let's just hang the - # rejection off the invite event. - # - # see: https://github.com/matrix-org/synapse/issues/7139 - if len(latest_event_ids) == 0: - latest_event_ids = [invite.event_id] + content["membership"] = Membership.KNOCK + + profile = self.profile_handler + if "displayname" not in content: + content["displayname"] = await profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = await profile.get_avatar_url(target) + + return await self.remote_knock( + remote_room_hosts, room_id, target, content + ) return await self._local_membership_update( requester=requester, @@ -1209,6 +1285,35 @@ class RoomMemberMasterHandler(RoomMemberHandler): invite_event, txn_id, requester, content ) + async def remote_rescind_knock( + self, + knock_event_id: str, + txn_id: Optional[str], + requester: Requester, + content: JsonDict, + ) -> Tuple[str, int]: + """ + Rescinds a local knock made on a remote room + + Args: + knock_event_id: The ID of the knock event to rescind. + txn_id: The transaction ID to use. + requester: The originator of the request. + content: The content of the leave event. + + Implements RoomMemberHandler.remote_rescind_knock + """ + # TODO: We don't yet support rescinding knocks over federation + # as we don't know which homeserver to send it to. An obvious + # candidate is the remote homeserver we originally knocked through, + # however we don't currently store that information. + + # Just rescind the knock locally + knock_event = await self.store.get_event(knock_event_id) + return await self._generate_local_out_of_band_leave( + knock_event, txn_id, requester, content + ) + async def _generate_local_out_of_band_leave( self, previous_membership_event: EventBase, @@ -1272,6 +1377,36 @@ class RoomMemberMasterHandler(RoomMemberHandler): return result_event.event_id, result_event.internal_metadata.stream_ordering + async def remote_knock( + self, + remote_room_hosts: List[str], + room_id: str, + user: UserID, + content: dict, + ) -> Tuple[str, int]: + """Sends a knock to a room. Attempts to do so via one remote out of a given list. + + Args: + remote_room_hosts: A list of homeservers to try knocking through. + room_id: The ID of the room to knock on. + user: The user to knock on behalf of. + content: The content of the knock event. + + Returns: + A tuple of (event ID, stream ID). + """ + # filter ourselves out of remote_room_hosts + remote_room_hosts = [ + host for host in remote_room_hosts if host != self.hs.hostname + ] + + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + return await self.federation_handler.do_knock( + remote_room_hosts, room_id, user.to_string(), content=content + ) + async def _user_left_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_left_room""" user_left_room(self.distributor, target, room_id) diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 3e89dd2315..221552a2a6 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -1,4 +1,4 @@ -# Copyright 2018 New Vector Ltd +# Copyright 2018-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,10 +19,12 @@ from synapse.api.errors import SynapseError from synapse.handlers.room_member import RoomMemberHandler from synapse.replication.http.membership import ( ReplicationRemoteJoinRestServlet as ReplRemoteJoin, + ReplicationRemoteKnockRestServlet as ReplRemoteKnock, ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite, + ReplicationRemoteRescindKnockRestServlet as ReplRescindKnock, ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft, ) -from synapse.types import Requester, UserID +from synapse.types import JsonDict, Requester, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -35,7 +37,9 @@ class RoomMemberWorkerHandler(RoomMemberHandler): super().__init__(hs) self._remote_join_client = ReplRemoteJoin.make_client(hs) + self._remote_knock_client = ReplRemoteKnock.make_client(hs) self._remote_reject_client = ReplRejectInvite.make_client(hs) + self._remote_rescind_client = ReplRescindKnock.make_client(hs) self._notify_change_client = ReplJoinedLeft.make_client(hs) async def _remote_join( @@ -80,6 +84,53 @@ class RoomMemberWorkerHandler(RoomMemberHandler): ) return ret["event_id"], ret["stream_id"] + async def remote_rescind_knock( + self, + knock_event_id: str, + txn_id: Optional[str], + requester: Requester, + content: JsonDict, + ) -> Tuple[str, int]: + """ + Rescinds a local knock made on a remote room + + Args: + knock_event_id: the knock event + txn_id: optional transaction ID supplied by the client + requester: user making the request, according to the access token + content: additional content to include in the leave event. + Normally an empty dict. + + Returns: + A tuple containing (event_id, stream_id of the leave event) + """ + ret = await self._remote_rescind_client( + knock_event_id=knock_event_id, + txn_id=txn_id, + requester=requester, + content=content, + ) + return ret["event_id"], ret["stream_id"] + + async def remote_knock( + self, + remote_room_hosts: List[str], + room_id: str, + user: UserID, + content: dict, + ) -> Tuple[str, int]: + """Sends a knock to a room. + + Implements RoomMemberHandler.remote_knock + """ + ret = await self._remote_knock_client( + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user=user, + content=content, + ) + return ret["event_id"], ret["stream_id"] + async def _user_left_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_left_room""" await self._notify_change_client( diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 383e34026e..4e45d1da57 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -1,4 +1,5 @@ -# Copyright 2018 New Vector Ltd +# Copyright 2018-2021 The Matrix.org Foundation C.I.C. +# Copyright 2020 Sorunome # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -230,6 +231,8 @@ class StatsHandler: room_stats_delta["left_members"] -= 1 elif prev_membership == Membership.BAN: room_stats_delta["banned_members"] -= 1 + elif prev_membership == Membership.KNOCK: + room_stats_delta["knocked_members"] -= 1 else: raise ValueError( "%r is not a valid prev_membership" % (prev_membership,) @@ -251,6 +254,8 @@ class StatsHandler: room_stats_delta["left_members"] += 1 elif membership == Membership.BAN: room_stats_delta["banned_members"] += 1 + elif membership == Membership.KNOCK: + room_stats_delta["knocked_members"] += 1 else: raise ValueError("%r is not a valid membership" % (membership,)) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b1c58ffdc8..7f2138d804 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -160,6 +160,16 @@ class InvitedSyncResult: @attr.s(slots=True, frozen=True) +class KnockedSyncResult: + room_id = attr.ib(type=str) + knock = attr.ib(type=EventBase) + + def __bool__(self) -> bool: + """Knocked rooms should always be reported to the client""" + return True + + +@attr.s(slots=True, frozen=True) class GroupsSyncResult: join = attr.ib(type=JsonDict) invite = attr.ib(type=JsonDict) @@ -192,6 +202,7 @@ class _RoomChanges: room_entries = attr.ib(type=List["RoomSyncResultBuilder"]) invited = attr.ib(type=List[InvitedSyncResult]) + knocked = attr.ib(type=List[KnockedSyncResult]) newly_joined_rooms = attr.ib(type=List[str]) newly_left_rooms = attr.ib(type=List[str]) @@ -205,6 +216,7 @@ class SyncResult: account_data: List of account_data events for the user. joined: JoinedSyncResult for each joined room. invited: InvitedSyncResult for each invited room. + knocked: KnockedSyncResult for each knocked on room. archived: ArchivedSyncResult for each archived room. to_device: List of direct messages for the device. device_lists: List of user_ids whose devices have changed @@ -220,6 +232,7 @@ class SyncResult: account_data = attr.ib(type=List[JsonDict]) joined = attr.ib(type=List[JoinedSyncResult]) invited = attr.ib(type=List[InvitedSyncResult]) + knocked = attr.ib(type=List[KnockedSyncResult]) archived = attr.ib(type=List[ArchivedSyncResult]) to_device = attr.ib(type=List[JsonDict]) device_lists = attr.ib(type=DeviceLists) @@ -236,6 +249,7 @@ class SyncResult: self.presence or self.joined or self.invited + or self.knocked or self.archived or self.account_data or self.to_device @@ -1031,7 +1045,7 @@ class SyncHandler: res = await self._generate_sync_entry_for_rooms( sync_result_builder, account_data_by_room ) - newly_joined_rooms, newly_joined_or_invited_users, _, _ = res + newly_joined_rooms, newly_joined_or_invited_or_knocked_users, _, _ = res _, _, newly_left_rooms, newly_left_users = res block_all_presence_data = ( @@ -1040,7 +1054,9 @@ class SyncHandler: if self.hs_config.use_presence and not block_all_presence_data: logger.debug("Fetching presence data") await self._generate_sync_entry_for_presence( - sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users + sync_result_builder, + newly_joined_rooms, + newly_joined_or_invited_or_knocked_users, ) logger.debug("Fetching to-device data") @@ -1049,7 +1065,7 @@ class SyncHandler: device_lists = await self._generate_sync_entry_for_device_list( sync_result_builder, newly_joined_rooms=newly_joined_rooms, - newly_joined_or_invited_users=newly_joined_or_invited_users, + newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, newly_left_rooms=newly_left_rooms, newly_left_users=newly_left_users, ) @@ -1083,6 +1099,7 @@ class SyncHandler: account_data=sync_result_builder.account_data, joined=sync_result_builder.joined, invited=sync_result_builder.invited, + knocked=sync_result_builder.knocked, archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, device_lists=device_lists, @@ -1142,7 +1159,7 @@ class SyncHandler: self, sync_result_builder: "SyncResultBuilder", newly_joined_rooms: Set[str], - newly_joined_or_invited_users: Set[str], + newly_joined_or_invited_or_knocked_users: Set[str], newly_left_rooms: Set[str], newly_left_users: Set[str], ) -> DeviceLists: @@ -1151,8 +1168,9 @@ class SyncHandler: Args: sync_result_builder newly_joined_rooms: Set of rooms user has joined since previous sync - newly_joined_or_invited_users: Set of users that have joined or - been invited to a room since previous sync. + newly_joined_or_invited_or_knocked_users: Set of users that have joined, + been invited to a room or are knocking on a room since + previous sync. newly_left_rooms: Set of rooms user has left since previous sync newly_left_users: Set of users that have left a room we're in since previous sync @@ -1163,7 +1181,9 @@ class SyncHandler: # We're going to mutate these fields, so lets copy them rather than # assume they won't get used later. - newly_joined_or_invited_users = set(newly_joined_or_invited_users) + newly_joined_or_invited_or_knocked_users = set( + newly_joined_or_invited_or_knocked_users + ) newly_left_users = set(newly_left_users) if since_token and since_token.device_list_key: @@ -1202,11 +1222,11 @@ class SyncHandler: # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: joined_users = await self.store.get_users_in_room(room_id) - newly_joined_or_invited_users.update(joined_users) + newly_joined_or_invited_or_knocked_users.update(joined_users) # TODO: Check that these users are actually new, i.e. either they # weren't in the previous sync *or* they left and rejoined. - users_that_have_changed.update(newly_joined_or_invited_users) + users_that_have_changed.update(newly_joined_or_invited_or_knocked_users) user_signatures_changed = ( await self.store.get_users_whose_signatures_changed( @@ -1452,6 +1472,7 @@ class SyncHandler: room_entries = room_changes.room_entries invited = room_changes.invited + knocked = room_changes.knocked newly_joined_rooms = room_changes.newly_joined_rooms newly_left_rooms = room_changes.newly_left_rooms @@ -1472,9 +1493,10 @@ class SyncHandler: await concurrently_execute(handle_room_entries, room_entries, 10) sync_result_builder.invited.extend(invited) + sync_result_builder.knocked.extend(knocked) - # Now we want to get any newly joined or invited users - newly_joined_or_invited_users = set() + # Now we want to get any newly joined, invited or knocking users + newly_joined_or_invited_or_knocked_users = set() newly_left_users = set() if since_token: for joined_sync in sync_result_builder.joined: @@ -1486,19 +1508,22 @@ class SyncHandler: if ( event.membership == Membership.JOIN or event.membership == Membership.INVITE + or event.membership == Membership.KNOCK ): - newly_joined_or_invited_users.add(event.state_key) + newly_joined_or_invited_or_knocked_users.add( + event.state_key + ) else: prev_content = event.unsigned.get("prev_content", {}) prev_membership = prev_content.get("membership", None) if prev_membership == Membership.JOIN: newly_left_users.add(event.state_key) - newly_left_users -= newly_joined_or_invited_users + newly_left_users -= newly_joined_or_invited_or_knocked_users return ( set(newly_joined_rooms), - newly_joined_or_invited_users, + newly_joined_or_invited_or_knocked_users, set(newly_left_rooms), newly_left_users, ) @@ -1553,6 +1578,7 @@ class SyncHandler: newly_left_rooms = [] room_entries = [] invited = [] + knocked = [] for room_id, events in mem_change_events_by_room_id.items(): logger.debug( "Membership changes in %s: [%s]", @@ -1632,9 +1658,17 @@ class SyncHandler: should_invite = non_joins[-1].membership == Membership.INVITE if should_invite: if event.sender not in ignored_users: - room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) - if room_sync: - invited.append(room_sync) + invite_room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if invite_room_sync: + invited.append(invite_room_sync) + + # Only bother if our latest membership in the room is knock (and we haven't + # been accepted/rejected in the meantime). + should_knock = non_joins[-1].membership == Membership.KNOCK + if should_knock: + knock_room_sync = KnockedSyncResult(room_id, knock=non_joins[-1]) + if knock_room_sync: + knocked.append(knock_room_sync) # Always include leave/ban events. Just take the last one. # TODO: How do we handle ban -> leave in same batch? @@ -1738,7 +1772,13 @@ class SyncHandler: ) room_entries.append(entry) - return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms) + return _RoomChanges( + room_entries, + invited, + knocked, + newly_joined_rooms, + newly_left_rooms, + ) async def _get_all_rooms( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str] @@ -1758,6 +1798,7 @@ class SyncHandler: membership_list = ( Membership.INVITE, + Membership.KNOCK, Membership.JOIN, Membership.LEAVE, Membership.BAN, @@ -1769,6 +1810,7 @@ class SyncHandler: room_entries = [] invited = [] + knocked = [] for event in room_list: if event.membership == Membership.JOIN: @@ -1788,8 +1830,11 @@ class SyncHandler: continue invite = await self.store.get_event(event.event_id) invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite)) + elif event.membership == Membership.KNOCK: + knock = await self.store.get_event(event.event_id) + knocked.append(KnockedSyncResult(room_id=event.room_id, knock=knock)) elif event.membership in (Membership.LEAVE, Membership.BAN): - # Always send down rooms we were banned or kicked from. + # Always send down rooms we were banned from or kicked from. if not sync_config.filter_collection.include_leave: if event.membership == Membership.LEAVE: if user_id == event.sender: @@ -1810,7 +1855,7 @@ class SyncHandler: ) ) - return _RoomChanges(room_entries, invited, [], []) + return _RoomChanges(room_entries, invited, knocked, [], []) async def _generate_room_entry( self, @@ -2101,6 +2146,7 @@ class SyncResultBuilder: account_data (list) joined (list[JoinedSyncResult]) invited (list[InvitedSyncResult]) + knocked (list[KnockedSyncResult]) archived (list[ArchivedSyncResult]) groups (GroupsSyncResult|None) to_device (list) @@ -2116,6 +2162,7 @@ class SyncResultBuilder: account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list)) joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list)) invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list)) + knocked = attr.ib(type=List[KnockedSyncResult], default=attr.Factory(list)) archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list)) groups = attr.ib(type=Optional[GroupsSyncResult], default=None) to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list)) |