diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 665ad19b5d..da206e1ec1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Optional, Tuple
+from typing import TYPE_CHECKING, Optional, Tuple
from canonicaljson import encode_canonical_json, json
@@ -55,6 +55,9 @@ from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
@@ -349,7 +352,7 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
class EventCreationHandler(object):
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
@@ -814,11 +817,17 @@ class EventCreationHandler(object):
403, "This event is not allowed in this context", Codes.FORBIDDEN
)
- try:
- await self.auth.check_from_context(room_version, event, context)
- except AuthError as err:
- logger.warning("Denying new event %r because %s", event, err)
- raise err
+ if event.internal_metadata.is_out_of_band_membership():
+ # the only sort of out-of-band-membership events we expect to see here
+ # are invite rejections we have generated ourselves.
+ assert event.type == EventTypes.Member
+ assert event.content["membership"] == Membership.LEAVE
+ else:
+ try:
+ await self.auth.check_from_context(room_version, event, context)
+ except AuthError as err:
+ logger.warning("Denying new event %r because %s", event, err)
+ raise err
# Ensure that we can round trip before trying to persist in db
try:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 27c479da9e..178d7db94e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2016-2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,17 +16,21 @@
import abc
import logging
from http import HTTPStatus
-from typing import Dict, Iterable, List, Optional, Tuple
+from typing import Dict, Iterable, List, Optional, Tuple, Union
+
+from unpaddedbase64 import encode_base64
from synapse import types
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.room_versions import EventFormatVersions
+from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
+from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
-from synapse.replication.http.membership import (
- ReplicationLocallyRejectInviteRestServlet,
-)
-from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
+from synapse.events.validator import EventValidator
+from synapse.storage.roommember import RoomsForUser
+from synapse.types import Collection, JsonDict, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -74,10 +76,6 @@ class RoomMemberHandler(object):
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence
- else:
- self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
- hs
- )
# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
@@ -105,46 +103,28 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
- async def _remote_reject_invite(
+ async def remote_reject_invite(
self,
+ invite_event_id: str,
+ txn_id: Optional[str],
requester: Requester,
- remote_room_hosts: List[str],
- room_id: str,
- target: UserID,
- content: dict,
+ content: JsonDict,
) -> Tuple[Optional[str], int]:
- """Attempt to reject an invite for a room this server is not in. If we
- fail to do so we locally mark the invite as rejected.
+ """
+ Rejects an out-of-band invite we have received from a remote server
Args:
- requester
- remote_room_hosts: List of servers to use to try and reject invite
- room_id
- target: The user rejecting the invite
- content: The content for the rejection event
+ invite_event_id: ID of the invite to be rejected
+ txn_id: optional transaction ID supplied by the client
+ requester: user making the rejection request, according to the access token
+ content: additional content to include in the rejection event.
+ Normally an empty dict.
Returns:
- A dictionary to be returned to the client, may
- include event_id etc, or nothing if we locally rejected
+ event id, stream_id of the leave event
"""
raise NotImplementedError()
- async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
- """Mark the invite has having been rejected even though we failed to
- create a leave event for it.
- """
- if self._is_on_event_persistence_instance:
- return await self.persist_event_storage.locally_reject_invite(
- user_id, room_id
- )
- else:
- result = await self._locally_reject_client(
- instance_name=self._event_stream_writer_instance,
- user_id=user_id,
- room_id=room_id,
- )
- return result["stream_id"]
-
@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
@@ -485,11 +465,17 @@ class RoomMemberHandler(object):
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
# perhaps we've been invited
- inviter = await self._get_inviter(target.to_string(), room_id)
- if not inviter:
+ invite = await self.store.get_invite_for_local_user_in_room(
+ user_id=target.to_string(), room_id=room_id
+ ) # type: Optional[RoomsForUser]
+ if not invite:
raise SynapseError(404, "Not a known room")
- if self.hs.is_mine(inviter):
+ logger.info(
+ "%s rejects invite to %s from %s", target, room_id, invite.sender
+ )
+
+ if self.hs.is_mine_id(invite.sender):
# the inviter was on our server, but has now left. Carry on
# with the normal rejection codepath.
#
@@ -497,10 +483,10 @@ class RoomMemberHandler(object):
# active on other servers.
pass
else:
- # send the rejection to the inviter's HS.
- remote_room_hosts = remote_room_hosts + [inviter.domain]
- return await self._remote_reject_invite(
- requester, remote_room_hosts, room_id, target, content,
+ # send the rejection to the inviter's HS (with fallback to
+ # local event)
+ return await self.remote_reject_invite(
+ invite.event_id, txn_id, requester, content,
)
return await self._local_membership_update(
@@ -1014,33 +1000,119 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return event_id, stream_id
- async def _remote_reject_invite(
+ async def remote_reject_invite(
self,
+ invite_event_id: str,
+ txn_id: Optional[str],
requester: Requester,
- remote_room_hosts: List[str],
- room_id: str,
- target: UserID,
- content: dict,
+ content: JsonDict,
) -> Tuple[Optional[str], int]:
- """Implements RoomMemberHandler._remote_reject_invite
"""
+ Rejects an out-of-band invite received from a remote user
+
+ Implements RoomMemberHandler.remote_reject_invite
+ """
+ invite_event = await self.store.get_event(invite_event_id)
+ room_id = invite_event.room_id
+ target_user = invite_event.state_key
+
+ # first of all, try doing a rejection via the inviting server
fed_handler = self.federation_handler
try:
+ inviter_id = UserID.from_string(invite_event.sender)
event, stream_id = await fed_handler.do_remotely_reject_invite(
- remote_room_hosts, room_id, target.to_string(), content=content,
+ [inviter_id.domain], room_id, target_user, content=content
)
return event.event_id, stream_id
except Exception as e:
- # if we were unable to reject the exception, just mark
- # it as rejected on our end and plough ahead.
+ # if we were unable to reject the invite, we will generate our own
+ # leave event.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warning("Failed to reject invite: %s", e)
- stream_id = await self.locally_reject_invite(target.to_string(), room_id)
- return None, stream_id
+ return await self._locally_reject_invite(
+ invite_event, txn_id, requester, content
+ )
+
+ async def _locally_reject_invite(
+ self,
+ invite_event: EventBase,
+ txn_id: Optional[str],
+ requester: Requester,
+ content: JsonDict,
+ ) -> Tuple[str, int]:
+ """Generate a local invite rejection
+
+ This is called after we fail to reject an invite via a remote server. It
+ generates an out-of-band membership event locally.
+
+ Args:
+ invite_event: the invite to be rejected
+ txn_id: optional transaction ID supplied by the client
+ requester: user making the rejection request, according to the access token
+ content: additional content to include in the rejection event.
+ Normally an empty dict.
+ """
+
+ room_id = invite_event.room_id
+ target_user = invite_event.state_key
+ room_version = await self.store.get_room_version(room_id)
+
+ content["membership"] = Membership.LEAVE
+
+ # the auth events for the new event are the same as that of the invite, plus
+ # the invite itself.
+ #
+ # the prev_events are just the invite.
+ invite_hash = invite_event.event_id # type: Union[str, Tuple]
+ if room_version.event_format == EventFormatVersions.V1:
+ alg, h = compute_event_reference_hash(invite_event)
+ invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
+
+ auth_events = invite_event.auth_events + (invite_hash,)
+ prev_events = (invite_hash,)
+
+ # we cap depth of generated events, to ensure that they are not
+ # rejected by other servers (and so that they can be persisted in
+ # the db)
+ depth = min(invite_event.depth + 1, MAX_DEPTH)
+
+ event_dict = {
+ "depth": depth,
+ "auth_events": auth_events,
+ "prev_events": prev_events,
+ "type": EventTypes.Member,
+ "room_id": room_id,
+ "sender": target_user,
+ "content": content,
+ "state_key": target_user,
+ }
+
+ event = create_local_event_from_event_dict(
+ clock=self.clock,
+ hostname=self.hs.hostname,
+ signing_key=self.hs.signing_key,
+ room_version=room_version,
+ event_dict=event_dict,
+ )
+ event.internal_metadata.outlier = True
+ event.internal_metadata.out_of_band_membership = True
+ if txn_id is not None:
+ event.internal_metadata.txn_id = txn_id
+ if requester.access_token_id is not None:
+ event.internal_metadata.token_id = requester.access_token_id
+
+ EventValidator().validate_new(event, self.config)
+
+ context = await self.state_handler.compute_event_context(event)
+ context.app_service = requester.app_service
+ stream_id = await self.event_creation_handler.handle_new_client_event(
+ requester, event, context, extra_users=[UserID.from_string(target_user)],
+ )
+ return event.event_id, stream_id
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 02e0c4103d..ac03f15166 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -61,21 +61,22 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
return ret["event_id"], ret["stream_id"]
- async def _remote_reject_invite(
+ async def remote_reject_invite(
self,
+ invite_event_id: str,
+ txn_id: Optional[str],
requester: Requester,
- remote_room_hosts: List[str],
- room_id: str,
- target: UserID,
content: dict,
) -> Tuple[Optional[str], int]:
- """Implements RoomMemberHandler._remote_reject_invite
+ """
+ Rejects an out-of-band invite received from a remote user
+
+ Implements RoomMemberHandler.remote_reject_invite
"""
ret = await self._remote_reject_client(
+ invite_event_id=invite_event_id,
+ txn_id=txn_id,
requester=requester,
- remote_room_hosts=remote_room_hosts,
- room_id=room_id,
- user_id=target.to_string(),
content=content,
)
return ret["event_id"], ret["stream_id"]
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index a7174c4a8f..63ef6eb7be 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -14,11 +14,11 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import Requester, UserID
+from synapse.types import JsonDict, Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
if TYPE_CHECKING:
@@ -88,49 +88,54 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
- """Rejects the invite for the user and room.
+ """Rejects an out-of-band invite we have received from a remote server
Request format:
- POST /_synapse/replication/remote_reject_invite/:room_id/:user_id
+ POST /_synapse/replication/remote_reject_invite/:event_id
{
+ "txn_id": ...,
"requester": ...,
- "remote_room_hosts": [...],
"content": { ... }
}
"""
NAME = "remote_reject_invite"
- PATH_ARGS = ("room_id", "user_id")
+ PATH_ARGS = ("invite_event_id",)
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
- self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.member_handler = hs.get_room_member_handler()
@staticmethod
- def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
+ def _serialize_payload( # type: ignore
+ invite_event_id: str,
+ txn_id: Optional[str],
+ requester: Requester,
+ content: JsonDict,
+ ):
"""
Args:
- requester(Requester)
- room_id (str)
- user_id (str)
- remote_room_hosts (list[str]): Servers to try and reject via
+ invite_event_id: ID of the invite to be rejected
+ txn_id: optional transaction ID supplied by the client
+ requester: user making the rejection request, according to the access token
+ content: additional content to include in the rejection event.
+ Normally an empty dict.
"""
return {
+ "txn_id": txn_id,
"requester": requester.serialize(),
- "remote_room_hosts": remote_room_hosts,
"content": content,
}
- async def _handle_request(self, request, room_id, user_id):
+ async def _handle_request(self, request, invite_event_id):
content = parse_json_object_from_request(request)
- remote_room_hosts = content["remote_room_hosts"]
+ txn_id = content["txn_id"]
event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -138,60 +143,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
if requester.user:
request.authenticated_entity = requester.user.to_string()
- logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)
-
- try:
- event, stream_id = await self.federation_handler.do_remotely_reject_invite(
- remote_room_hosts, room_id, user_id, event_content,
- )
- event_id = event.event_id
- except Exception as e:
- # if we were unable to reject the exception, just mark
- # it as rejected on our end and plough ahead.
- #
- # The 'except' clause is very broad, but we need to
- # capture everything from DNS failures upwards
- #
- logger.warning("Failed to reject invite: %s", e)
-
- stream_id = await self.member_handler.locally_reject_invite(
- user_id, room_id
- )
- event_id = None
+ # hopefully we're now on the master, so this won't recurse!
+ event_id, stream_id = await self.member_handler.remote_reject_invite(
+ invite_event_id, txn_id, requester, event_content,
+ )
return 200, {"event_id": event_id, "stream_id": stream_id}
-class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint):
- """Rejects the invite for the user and room locally.
-
- Request format:
-
- POST /_synapse/replication/locally_reject_invite/:room_id/:user_id
-
- {}
- """
-
- NAME = "locally_reject_invite"
- PATH_ARGS = ("room_id", "user_id")
-
- def __init__(self, hs: "HomeServer"):
- super().__init__(hs)
-
- self.member_handler = hs.get_room_member_handler()
-
- @staticmethod
- def _serialize_payload(room_id, user_id):
- return {}
-
- async def _handle_request(self, request, room_id, user_id):
- logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id)
-
- stream_id = await self.member_handler.locally_reject_invite(user_id, room_id)
-
- return 200, {"stream_id": stream_id}
-
-
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
"""Notifies that a user has joined or left the room
@@ -245,4 +204,3 @@ def register_servlets(hs, http_server):
ReplicationRemoteJoinRestServlet(hs).register(http_server)
ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
- ReplicationLocallyRejectInviteRestServlet(hs).register(http_server)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a18317366c..230fb5cd7f 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -1541,23 +1541,3 @@ class PersistEventsStore:
if not ev.internal_metadata.is_outlier()
],
)
-
- async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
- """Mark the invite has having been rejected even though we failed to
- create a leave event for it.
- """
-
- def f(txn, stream_ordering):
- # Clear this entry from `local_current_membership`.
- # Ideally we'd point to a leave event, but we don't have one, so
- # nevermind.
- self.db.simple_delete_txn(
- txn,
- table="local_current_membership",
- keyvalues={"room_id": room_id, "user_id": user_id},
- )
-
- with self._stream_id_gen.get_next() as stream_ordering:
- await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
-
- return stream_ordering
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index ec894a91cb..fa46041676 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -783,9 +783,3 @@ class EventsPersistenceStorage(object):
for user_id in left_users:
await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
-
- async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
- """Mark the invite has having been rejected even though we failed to
- create a leave event for it.
- """
- return await self.persist_events_store.locally_reject_invite(user_id, room_id)
|