summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7804.bugfix1
-rw-r--r--synapse/handlers/message.py23
-rw-r--r--synapse/handlers/room_member.py194
-rw-r--r--synapse/handlers/room_member_worker.py17
-rw-r--r--synapse/replication/http/membership.py92
-rw-r--r--synapse/storage/data_stores/main/events.py20
-rw-r--r--synapse/storage/persist_events.py6
7 files changed, 184 insertions, 169 deletions
diff --git a/changelog.d/7804.bugfix b/changelog.d/7804.bugfix
new file mode 100644
index 0000000000..2772eeb0db
--- /dev/null
+++ b/changelog.d/7804.bugfix
@@ -0,0 +1 @@
+Fix 'stuck invites' which happen when we are unable to reject a room invite received over federation.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 665ad19b5d..da206e1ec1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Optional, Tuple
+from typing import TYPE_CHECKING, Optional, Tuple
 
 from canonicaljson import encode_canonical_json, json
 
@@ -55,6 +55,9 @@ from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -349,7 +352,7 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
 
 
 class EventCreationHandler(object):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
@@ -814,11 +817,17 @@ class EventCreationHandler(object):
                 403, "This event is not allowed in this context", Codes.FORBIDDEN
             )
 
-        try:
-            await self.auth.check_from_context(room_version, event, context)
-        except AuthError as err:
-            logger.warning("Denying new event %r because %s", event, err)
-            raise err
+        if event.internal_metadata.is_out_of_band_membership():
+            # the only sort of out-of-band-membership events we expect to see here
+            # are invite rejections we have generated ourselves.
+            assert event.type == EventTypes.Member
+            assert event.content["membership"] == Membership.LEAVE
+        else:
+            try:
+                await self.auth.check_from_context(room_version, event, context)
+            except AuthError as err:
+                logger.warning("Denying new event %r because %s", event, err)
+                raise err
 
         # Ensure that we can round trip before trying to persist in db
         try:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 27c479da9e..178d7db94e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,7 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2016-2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,17 +16,21 @@
 import abc
 import logging
 from http import HTTPStatus
-from typing import Dict, Iterable, List, Optional, Tuple
+from typing import Dict, Iterable, List, Optional, Tuple, Union
+
+from unpaddedbase64 import encode_base64
 
 from synapse import types
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.room_versions import EventFormatVersions
+from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.events import EventBase
+from synapse.events.builder import create_local_event_from_event_dict
 from synapse.events.snapshot import EventContext
-from synapse.replication.http.membership import (
-    ReplicationLocallyRejectInviteRestServlet,
-)
-from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
+from synapse.events.validator import EventValidator
+from synapse.storage.roommember import RoomsForUser
+from synapse.types import Collection, JsonDict, Requester, RoomAlias, RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
 
@@ -74,10 +76,6 @@ class RoomMemberHandler(object):
         )
         if self._is_on_event_persistence_instance:
             self.persist_event_storage = hs.get_storage().persistence
-        else:
-            self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
-                hs
-            )
 
         # This is only used to get at ratelimit function, and
         # maybe_kick_guest_users. It's fine there are multiple of these as
@@ -105,46 +103,28 @@ class RoomMemberHandler(object):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    async def _remote_reject_invite(
+    async def remote_reject_invite(
         self,
+        invite_event_id: str,
+        txn_id: Optional[str],
         requester: Requester,
-        remote_room_hosts: List[str],
-        room_id: str,
-        target: UserID,
-        content: dict,
+        content: JsonDict,
     ) -> Tuple[Optional[str], int]:
-        """Attempt to reject an invite for a room this server is not in. If we
-        fail to do so we locally mark the invite as rejected.
+        """
+        Rejects an out-of-band invite we have received from a remote server
 
         Args:
-            requester
-            remote_room_hosts: List of servers to use to try and reject invite
-            room_id
-            target: The user rejecting the invite
-            content: The content for the rejection event
+            invite_event_id: ID of the invite to be rejected
+            txn_id: optional transaction ID supplied by the client
+            requester: user making the rejection request, according to the access token
+            content: additional content to include in the rejection event.
+               Normally an empty dict.
 
         Returns:
-            A dictionary to be returned to the client, may
-            include event_id etc, or nothing if we locally rejected
+            event id, stream_id of the leave event
         """
         raise NotImplementedError()
 
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-        if self._is_on_event_persistence_instance:
-            return await self.persist_event_storage.locally_reject_invite(
-                user_id, room_id
-            )
-        else:
-            result = await self._locally_reject_client(
-                instance_name=self._event_stream_writer_instance,
-                user_id=user_id,
-                room_id=room_id,
-            )
-            return result["stream_id"]
-
     @abc.abstractmethod
     async def _user_joined_room(self, target: UserID, room_id: str) -> None:
         """Notifies distributor on master process that the user has joined the
@@ -485,11 +465,17 @@ class RoomMemberHandler(object):
         elif effective_membership_state == Membership.LEAVE:
             if not is_host_in_room:
                 # perhaps we've been invited
-                inviter = await self._get_inviter(target.to_string(), room_id)
-                if not inviter:
+                invite = await self.store.get_invite_for_local_user_in_room(
+                    user_id=target.to_string(), room_id=room_id
+                )  # type: Optional[RoomsForUser]
+                if not invite:
                     raise SynapseError(404, "Not a known room")
 
-                if self.hs.is_mine(inviter):
+                logger.info(
+                    "%s rejects invite to %s from %s", target, room_id, invite.sender
+                )
+
+                if self.hs.is_mine_id(invite.sender):
                     # the inviter was on our server, but has now left. Carry on
                     # with the normal rejection codepath.
                     #
@@ -497,10 +483,10 @@ class RoomMemberHandler(object):
                     # active on other servers.
                     pass
                 else:
-                    # send the rejection to the inviter's HS.
-                    remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    return await self._remote_reject_invite(
-                        requester, remote_room_hosts, room_id, target, content,
+                    # send the rejection to the inviter's HS (with fallback to
+                    # local event)
+                    return await self.remote_reject_invite(
+                        invite.event_id, txn_id, requester, content,
                     )
 
         return await self._local_membership_update(
@@ -1014,33 +1000,119 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         return event_id, stream_id
 
-    async def _remote_reject_invite(
+    async def remote_reject_invite(
         self,
+        invite_event_id: str,
+        txn_id: Optional[str],
         requester: Requester,
-        remote_room_hosts: List[str],
-        room_id: str,
-        target: UserID,
-        content: dict,
+        content: JsonDict,
     ) -> Tuple[Optional[str], int]:
-        """Implements RoomMemberHandler._remote_reject_invite
         """
+        Rejects an out-of-band invite received from a remote user
+
+        Implements RoomMemberHandler.remote_reject_invite
+        """
+        invite_event = await self.store.get_event(invite_event_id)
+        room_id = invite_event.room_id
+        target_user = invite_event.state_key
+
+        # first of all, try doing a rejection via the inviting server
         fed_handler = self.federation_handler
         try:
+            inviter_id = UserID.from_string(invite_event.sender)
             event, stream_id = await fed_handler.do_remotely_reject_invite(
-                remote_room_hosts, room_id, target.to_string(), content=content,
+                [inviter_id.domain], room_id, target_user, content=content
             )
             return event.event_id, stream_id
         except Exception as e:
-            # if we were unable to reject the exception, just mark
-            # it as rejected on our end and plough ahead.
+            # if we were unable to reject the invite, we will generate our own
+            # leave event.
             #
             # The 'except' clause is very broad, but we need to
             # capture everything from DNS failures upwards
             #
             logger.warning("Failed to reject invite: %s", e)
 
-            stream_id = await self.locally_reject_invite(target.to_string(), room_id)
-            return None, stream_id
+            return await self._locally_reject_invite(
+                invite_event, txn_id, requester, content
+            )
+
+    async def _locally_reject_invite(
+        self,
+        invite_event: EventBase,
+        txn_id: Optional[str],
+        requester: Requester,
+        content: JsonDict,
+    ) -> Tuple[str, int]:
+        """Generate a local invite rejection
+
+        This is called after we fail to reject an invite via a remote server. It
+        generates an out-of-band membership event locally.
+
+        Args:
+            invite_event: the invite to be rejected
+            txn_id: optional transaction ID supplied by the client
+            requester:  user making the rejection request, according to the access token
+            content: additional content to include in the rejection event.
+               Normally an empty dict.
+        """
+
+        room_id = invite_event.room_id
+        target_user = invite_event.state_key
+        room_version = await self.store.get_room_version(room_id)
+
+        content["membership"] = Membership.LEAVE
+
+        # the auth events for the new event are the same as that of the invite, plus
+        # the invite itself.
+        #
+        # the prev_events are just the invite.
+        invite_hash = invite_event.event_id  # type: Union[str, Tuple]
+        if room_version.event_format == EventFormatVersions.V1:
+            alg, h = compute_event_reference_hash(invite_event)
+            invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
+
+        auth_events = invite_event.auth_events + (invite_hash,)
+        prev_events = (invite_hash,)
+
+        # we cap depth of generated events, to ensure that they are not
+        # rejected by other servers (and so that they can be persisted in
+        # the db)
+        depth = min(invite_event.depth + 1, MAX_DEPTH)
+
+        event_dict = {
+            "depth": depth,
+            "auth_events": auth_events,
+            "prev_events": prev_events,
+            "type": EventTypes.Member,
+            "room_id": room_id,
+            "sender": target_user,
+            "content": content,
+            "state_key": target_user,
+        }
+
+        event = create_local_event_from_event_dict(
+            clock=self.clock,
+            hostname=self.hs.hostname,
+            signing_key=self.hs.signing_key,
+            room_version=room_version,
+            event_dict=event_dict,
+        )
+        event.internal_metadata.outlier = True
+        event.internal_metadata.out_of_band_membership = True
+        if txn_id is not None:
+            event.internal_metadata.txn_id = txn_id
+        if requester.access_token_id is not None:
+            event.internal_metadata.token_id = requester.access_token_id
+
+        EventValidator().validate_new(event, self.config)
+
+        context = await self.state_handler.compute_event_context(event)
+        context.app_service = requester.app_service
+        stream_id = await self.event_creation_handler.handle_new_client_event(
+            requester, event, context, extra_users=[UserID.from_string(target_user)],
+        )
+        return event.event_id, stream_id
 
     async def _user_joined_room(self, target: UserID, room_id: str) -> None:
         """Implements RoomMemberHandler._user_joined_room
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 02e0c4103d..ac03f15166 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -61,21 +61,22 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
 
         return ret["event_id"], ret["stream_id"]
 
-    async def _remote_reject_invite(
+    async def remote_reject_invite(
         self,
+        invite_event_id: str,
+        txn_id: Optional[str],
         requester: Requester,
-        remote_room_hosts: List[str],
-        room_id: str,
-        target: UserID,
         content: dict,
     ) -> Tuple[Optional[str], int]:
-        """Implements RoomMemberHandler._remote_reject_invite
+        """
+        Rejects an out-of-band invite received from a remote user
+
+        Implements RoomMemberHandler.remote_reject_invite
         """
         ret = await self._remote_reject_client(
+            invite_event_id=invite_event_id,
+            txn_id=txn_id,
             requester=requester,
-            remote_room_hosts=remote_room_hosts,
-            room_id=room_id,
-            user_id=target.to_string(),
             content=content,
         )
         return ret["event_id"], ret["stream_id"]
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index a7174c4a8f..63ef6eb7be 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -14,11 +14,11 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
 
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import Requester, UserID
+from synapse.types import JsonDict, Requester, UserID
 from synapse.util.distributor import user_joined_room, user_left_room
 
 if TYPE_CHECKING:
@@ -88,49 +88,54 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
 
 
 class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
-    """Rejects the invite for the user and room.
+    """Rejects an out-of-band invite we have received from a remote server
 
     Request format:
 
-        POST /_synapse/replication/remote_reject_invite/:room_id/:user_id
+        POST /_synapse/replication/remote_reject_invite/:event_id
 
         {
+            "txn_id": ...,
             "requester": ...,
-            "remote_room_hosts": [...],
             "content": { ... }
         }
     """
 
     NAME = "remote_reject_invite"
-    PATH_ARGS = ("room_id", "user_id")
+    PATH_ARGS = ("invite_event_id",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
 
-        self.federation_handler = hs.get_handlers().federation_handler
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
         self.member_handler = hs.get_room_member_handler()
 
     @staticmethod
-    def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
+    def _serialize_payload(  # type: ignore
+        invite_event_id: str,
+        txn_id: Optional[str],
+        requester: Requester,
+        content: JsonDict,
+    ):
         """
         Args:
-            requester(Requester)
-            room_id (str)
-            user_id (str)
-            remote_room_hosts (list[str]): Servers to try and reject via
+            invite_event_id: ID of the invite to be rejected
+            txn_id: optional transaction ID supplied by the client
+            requester: user making the rejection request, according to the access token
+            content: additional content to include in the rejection event.
+               Normally an empty dict.
         """
         return {
+            "txn_id": txn_id,
             "requester": requester.serialize(),
-            "remote_room_hosts": remote_room_hosts,
             "content": content,
         }
 
-    async def _handle_request(self, request, room_id, user_id):
+    async def _handle_request(self, request, invite_event_id):
         content = parse_json_object_from_request(request)
 
-        remote_room_hosts = content["remote_room_hosts"]
+        txn_id = content["txn_id"]
         event_content = content["content"]
 
         requester = Requester.deserialize(self.store, content["requester"])
@@ -138,60 +143,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         if requester.user:
             request.authenticated_entity = requester.user.to_string()
 
-        logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)
-
-        try:
-            event, stream_id = await self.federation_handler.do_remotely_reject_invite(
-                remote_room_hosts, room_id, user_id, event_content,
-            )
-            event_id = event.event_id
-        except Exception as e:
-            # if we were unable to reject the exception, just mark
-            # it as rejected on our end and plough ahead.
-            #
-            # The 'except' clause is very broad, but we need to
-            # capture everything from DNS failures upwards
-            #
-            logger.warning("Failed to reject invite: %s", e)
-
-            stream_id = await self.member_handler.locally_reject_invite(
-                user_id, room_id
-            )
-            event_id = None
+        # hopefully we're now on the master, so this won't recurse!
+        event_id, stream_id = await self.member_handler.remote_reject_invite(
+            invite_event_id, txn_id, requester, event_content,
+        )
 
         return 200, {"event_id": event_id, "stream_id": stream_id}
 
 
-class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint):
-    """Rejects the invite for the user and room locally.
-
-    Request format:
-
-        POST /_synapse/replication/locally_reject_invite/:room_id/:user_id
-
-        {}
-    """
-
-    NAME = "locally_reject_invite"
-    PATH_ARGS = ("room_id", "user_id")
-
-    def __init__(self, hs: "HomeServer"):
-        super().__init__(hs)
-
-        self.member_handler = hs.get_room_member_handler()
-
-    @staticmethod
-    def _serialize_payload(room_id, user_id):
-        return {}
-
-    async def _handle_request(self, request, room_id, user_id):
-        logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id)
-
-        stream_id = await self.member_handler.locally_reject_invite(user_id, room_id)
-
-        return 200, {"stream_id": stream_id}
-
-
 class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
     """Notifies that a user has joined or left the room
 
@@ -245,4 +204,3 @@ def register_servlets(hs, http_server):
     ReplicationRemoteJoinRestServlet(hs).register(http_server)
     ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
     ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
-    ReplicationLocallyRejectInviteRestServlet(hs).register(http_server)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a18317366c..230fb5cd7f 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -1541,23 +1541,3 @@ class PersistEventsStore:
                 if not ev.internal_metadata.is_outlier()
             ],
         )
-
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-
-        def f(txn, stream_ordering):
-            # Clear this entry from `local_current_membership`.
-            # Ideally we'd point to a leave event, but we don't have one, so
-            # nevermind.
-            self.db.simple_delete_txn(
-                txn,
-                table="local_current_membership",
-                keyvalues={"room_id": room_id, "user_id": user_id},
-            )
-
-        with self._stream_id_gen.get_next() as stream_ordering:
-            await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
-
-        return stream_ordering
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index ec894a91cb..fa46041676 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -783,9 +783,3 @@ class EventsPersistenceStorage(object):
 
         for user_id in left_users:
             await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
-
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-        return await self.persist_events_store.locally_reject_invite(user_id, room_id)