diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 904c96eeec..92d4c6e16c 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -48,8 +48,7 @@ class ApplicationServicesHandler(object):
self.current_max = 0
self.is_processing = False
- @defer.inlineCallbacks
- def notify_interested_services(self, current_id):
+ async def notify_interested_services(self, current_id):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
@@ -74,7 +73,7 @@ class ApplicationServicesHandler(object):
(
upper_bound,
events,
- ) = yield self.store.get_new_events_for_appservice(
+ ) = await self.store.get_new_events_for_appservice(
self.current_max, limit
)
@@ -85,10 +84,9 @@ class ApplicationServicesHandler(object):
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
- @defer.inlineCallbacks
- def handle_event(event):
+ async def handle_event(event):
# Gather interested services
- services = yield self._get_services_for_event(event)
+ services = await self._get_services_for_event(event)
if len(services) == 0:
return # no services need notifying
@@ -96,9 +94,9 @@ class ApplicationServicesHandler(object):
# query API for all services which match that user regex.
# This needs to block as these user queries need to be
# made BEFORE pushing the event.
- yield self._check_user_exists(event.sender)
+ await self._check_user_exists(event.sender)
if event.type == EventTypes.Member:
- yield self._check_user_exists(event.state_key)
+ await self._check_user_exists(event.state_key)
if not self.started_scheduler:
@@ -115,17 +113,16 @@ class ApplicationServicesHandler(object):
self.scheduler.submit_event_for_as(service, event)
now = self.clock.time_msec()
- ts = yield self.store.get_received_ts(event.event_id)
+ ts = await self.store.get_received_ts(event.event_id)
synapse.metrics.event_processing_lag_by_event.labels(
"appservice_sender"
).observe((now - ts) / 1000)
- @defer.inlineCallbacks
- def handle_room_events(events):
+ async def handle_room_events(events):
for event in events:
- yield handle_event(event)
+ await handle_event(event)
- yield make_deferred_yieldable(
+ await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
@@ -135,10 +132,10 @@ class ApplicationServicesHandler(object):
)
)
- yield self.store.set_appservice_last_pos(upper_bound)
+ await self.store.set_appservice_last_pos(upper_bound)
now = self.clock.time_msec()
- ts = yield self.store.get_received_ts(events[-1].event_id)
+ ts = await self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_positions.labels(
"appservice_sender"
@@ -161,8 +158,7 @@ class ApplicationServicesHandler(object):
finally:
self.is_processing = False
- @defer.inlineCallbacks
- def query_user_exists(self, user_id):
+ async def query_user_exists(self, user_id):
"""Check if any application service knows this user_id exists.
Args:
@@ -170,15 +166,14 @@ class ApplicationServicesHandler(object):
Returns:
True if this user exists on at least one application service.
"""
- user_query_services = yield self._get_services_for_user(user_id=user_id)
+ user_query_services = self._get_services_for_user(user_id=user_id)
for user_service in user_query_services:
- is_known_user = yield self.appservice_api.query_user(user_service, user_id)
+ is_known_user = await self.appservice_api.query_user(user_service, user_id)
if is_known_user:
return True
return False
- @defer.inlineCallbacks
- def query_room_alias_exists(self, room_alias):
+ async def query_room_alias_exists(self, room_alias):
"""Check if an application service knows this room alias exists.
Args:
@@ -193,19 +188,18 @@ class ApplicationServicesHandler(object):
s for s in services if (s.is_interested_in_alias(room_alias_str))
]
for alias_service in alias_query_services:
- is_known_alias = yield self.appservice_api.query_alias(
+ is_known_alias = await self.appservice_api.query_alias(
alias_service, room_alias_str
)
if is_known_alias:
# the alias exists now so don't query more ASes.
- result = yield self.store.get_association_from_room_alias(room_alias)
+ result = await self.store.get_association_from_room_alias(room_alias)
return result
- @defer.inlineCallbacks
- def query_3pe(self, kind, protocol, fields):
- services = yield self._get_services_for_3pn(protocol)
+ async def query_3pe(self, kind, protocol, fields):
+ services = self._get_services_for_3pn(protocol)
- results = yield make_deferred_yieldable(
+ results = await make_deferred_yieldable(
defer.DeferredList(
[
run_in_background(
@@ -224,8 +218,7 @@ class ApplicationServicesHandler(object):
return ret
- @defer.inlineCallbacks
- def get_3pe_protocols(self, only_protocol=None):
+ async def get_3pe_protocols(self, only_protocol=None):
services = self.store.get_app_services()
protocols = {}
@@ -238,7 +231,7 @@ class ApplicationServicesHandler(object):
if p not in protocols:
protocols[p] = []
- info = yield self.appservice_api.get_3pe_protocol(s, p)
+ info = await self.appservice_api.get_3pe_protocol(s, p)
if info is not None:
protocols[p].append(info)
@@ -263,8 +256,7 @@ class ApplicationServicesHandler(object):
return protocols
- @defer.inlineCallbacks
- def _get_services_for_event(self, event):
+ async def _get_services_for_event(self, event):
"""Retrieve a list of application services interested in this event.
Args:
@@ -280,7 +272,7 @@ class ApplicationServicesHandler(object):
# inside of a list comprehension anymore.
interested_list = []
for s in services:
- if (yield s.is_interested(event, self.store)):
+ if await s.is_interested(event, self.store):
interested_list.append(s)
return interested_list
@@ -288,21 +280,20 @@ class ApplicationServicesHandler(object):
def _get_services_for_user(self, user_id):
services = self.store.get_app_services()
interested_list = [s for s in services if (s.is_interested_in_user(user_id))]
- return defer.succeed(interested_list)
+ return interested_list
def _get_services_for_3pn(self, protocol):
services = self.store.get_app_services()
interested_list = [s for s in services if s.is_interested_in_protocol(protocol)]
- return defer.succeed(interested_list)
+ return interested_list
- @defer.inlineCallbacks
- def _is_unknown_user(self, user_id):
+ async def _is_unknown_user(self, user_id):
if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
return False
- user_info = yield self.store.get_user_by_id(user_id)
+ user_info = await self.store.get_user_by_id(user_id)
if user_info:
return False
@@ -311,10 +302,9 @@ class ApplicationServicesHandler(object):
service_list = [s for s in services if s.sender == user_id]
return len(service_list) == 0
- @defer.inlineCallbacks
- def _check_user_exists(self, user_id):
- unknown_user = yield self._is_unknown_user(user_id)
+ async def _check_user_exists(self, user_id):
+ unknown_user = await self._is_unknown_user(user_id)
if unknown_user:
- exists = yield self.query_user_exists(user_id)
+ exists = await self.query_user_exists(user_id)
return exists
return True
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d713a06bf9..a162392e4c 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import time
import unicodedata
@@ -24,7 +23,6 @@ import attr
import bcrypt # type: ignore[import]
import pymacaroons
-import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import (
AuthError,
@@ -45,6 +43,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import ModuleApi
from synapse.push.mailer import load_jinja2_templates
from synapse.types import Requester, UserID
+from synapse.util import stringutils as stringutils
from synapse.util.threepids import canonicalise_email
from ._base import BaseHandler
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 76f213723a..d79ffefdb5 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -12,11 +12,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import urllib
-import xml.etree.ElementTree as ET
from typing import Dict, Optional, Tuple
+from xml.etree import ElementTree as ET
from twisted.web.client import PartialDownloadError
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3326535410..a94e467aef 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1577,7 +1577,7 @@ class FederationHandler(BaseHandler):
room_version,
event.get_pdu_json(),
self.hs.hostname,
- self.hs.config.signing_key[0],
+ self.hs.signing_key,
)
)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 7cb106e365..ecdb12a7bf 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -70,7 +70,7 @@ class GroupsLocalWorkerHandler(object):
self.clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.is_mine_id = hs.is_mine_id
- self.signing_key = hs.config.signing_key[0]
+ self.signing_key = hs.signing_key
self.server_name = hs.hostname
self.notifier = hs.get_notifier()
self.attestations = hs.get_groups_attestation_signing()
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index a77088e295..88537a5be3 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -268,10 +268,10 @@ class IdentityHandler(BaseHandler):
url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
auth_headers = self.federation_http_client.build_auth_headers(
destination=None,
- method="POST",
+ method=b"POST",
url_bytes=url_bytes,
content=content,
- destination_is=id_server,
+ destination_is=id_server.encode("ascii"),
)
headers = {b"Authorization": auth_headers}
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 665ad19b5d..da206e1ec1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Optional, Tuple
+from typing import TYPE_CHECKING, Optional, Tuple
from canonicaljson import encode_canonical_json, json
@@ -55,6 +55,9 @@ from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
@@ -349,7 +352,7 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
class EventCreationHandler(object):
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
@@ -814,11 +817,17 @@ class EventCreationHandler(object):
403, "This event is not allowed in this context", Codes.FORBIDDEN
)
- try:
- await self.auth.check_from_context(room_version, event, context)
- except AuthError as err:
- logger.warning("Denying new event %r because %s", event, err)
- raise err
+ if event.internal_metadata.is_out_of_band_membership():
+ # the only sort of out-of-band-membership events we expect to see here
+ # are invite rejections we have generated ourselves.
+ assert event.type == EventTypes.Member
+ assert event.content["membership"] == Membership.LEAVE
+ else:
+ try:
+ await self.auth.check_from_context(room_version, event, context)
+ except AuthError as err:
+ logger.warning("Denying new event %r because %s", event, err)
+ raise err
# Ensure that we can round trip before trying to persist in db
try:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index d585114f2d..c3fee116c2 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2016-2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,17 +16,21 @@
import abc
import logging
from http import HTTPStatus
-from typing import Dict, Iterable, List, Optional, Tuple
+from typing import Dict, Iterable, List, Optional, Tuple, Union
+
+from unpaddedbase64 import encode_base64
from synapse import types
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.room_versions import EventFormatVersions
+from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
+from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
-from synapse.replication.http.membership import (
- ReplicationLocallyRejectInviteRestServlet,
-)
-from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
+from synapse.events.validator import EventValidator
+from synapse.storage.roommember import RoomsForUser
+from synapse.types import Collection, JsonDict, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -75,10 +77,6 @@ class RoomMemberHandler(object):
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence
- else:
- self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
- hs
- )
# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
@@ -106,46 +104,28 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
- async def _remote_reject_invite(
+ async def remote_reject_invite(
self,
+ invite_event_id: str,
+ txn_id: Optional[str],
requester: Requester,
- remote_room_hosts: List[str],
- room_id: str,
- target: UserID,
- content: dict,
- ) -> Tuple[Optional[str], int]:
- """Attempt to reject an invite for a room this server is not in. If we
- fail to do so we locally mark the invite as rejected.
+ content: JsonDict,
+ ) -> Tuple[str, int]:
+ """
+ Rejects an out-of-band invite we have received from a remote server
Args:
- requester
- remote_room_hosts: List of servers to use to try and reject invite
- room_id
- target: The user rejecting the invite
- content: The content for the rejection event
+ invite_event_id: ID of the invite to be rejected
+ txn_id: optional transaction ID supplied by the client
+ requester: user making the rejection request, according to the access token
+ content: additional content to include in the rejection event.
+ Normally an empty dict.
Returns:
- A dictionary to be returned to the client, may
- include event_id etc, or nothing if we locally rejected
+ event id, stream_id of the leave event
"""
raise NotImplementedError()
- async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
- """Mark the invite has having been rejected even though we failed to
- create a leave event for it.
- """
- if self._is_on_event_persistence_instance:
- return await self.persist_event_storage.locally_reject_invite(
- user_id, room_id
- )
- else:
- result = await self._locally_reject_client(
- instance_name=self._event_stream_writer_instance,
- user_id=user_id,
- room_id=room_id,
- )
- return result["stream_id"]
-
@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
@@ -290,7 +270,7 @@ class RoomMemberHandler(object):
content: Optional[dict] = None,
new_room: bool = False,
require_consent: bool = True,
- ) -> Tuple[Optional[str], int]:
+ ) -> Tuple[str, int]:
"""Update a user's membership in a room"""
key = (room_id,)
@@ -324,7 +304,7 @@ class RoomMemberHandler(object):
content: Optional[dict] = None,
new_room: bool = False,
require_consent: bool = True,
- ) -> Tuple[Optional[str], int]:
+ ) -> Tuple[str, int]:
content_specified = bool(content)
if content is None:
content = {}
@@ -516,11 +496,17 @@ class RoomMemberHandler(object):
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
# perhaps we've been invited
- inviter = await self._get_inviter(target.to_string(), room_id)
- if not inviter:
+ invite = await self.store.get_invite_for_local_user_in_room(
+ user_id=target.to_string(), room_id=room_id
+ ) # type: Optional[RoomsForUser]
+ if not invite:
raise SynapseError(404, "Not a known room")
- if self.hs.is_mine(inviter):
+ logger.info(
+ "%s rejects invite to %s from %s", target, room_id, invite.sender
+ )
+
+ if self.hs.is_mine_id(invite.sender):
# the inviter was on our server, but has now left. Carry on
# with the normal rejection codepath.
#
@@ -528,10 +514,10 @@ class RoomMemberHandler(object):
# active on other servers.
pass
else:
- # send the rejection to the inviter's HS.
- remote_room_hosts = remote_room_hosts + [inviter.domain]
- return await self._remote_reject_invite(
- requester, remote_room_hosts, room_id, target, content,
+ # send the rejection to the inviter's HS (with fallback to
+ # local event)
+ return await self.remote_reject_invite(
+ invite.event_id, txn_id, requester, content,
)
return await self._local_membership_update(
@@ -1069,33 +1055,119 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return event_id, stream_id
- async def _remote_reject_invite(
+ async def remote_reject_invite(
self,
+ invite_event_id: str,
+ txn_id: Optional[str],
requester: Requester,
- remote_room_hosts: List[str],
- room_id: str,
- target: UserID,
- content: dict,
- ) -> Tuple[Optional[str], int]:
- """Implements RoomMemberHandler._remote_reject_invite
+ content: JsonDict,
+ ) -> Tuple[str, int]:
+ """
+ Rejects an out-of-band invite received from a remote user
+
+ Implements RoomMemberHandler.remote_reject_invite
"""
+ invite_event = await self.store.get_event(invite_event_id)
+ room_id = invite_event.room_id
+ target_user = invite_event.state_key
+
+ # first of all, try doing a rejection via the inviting server
fed_handler = self.federation_handler
try:
+ inviter_id = UserID.from_string(invite_event.sender)
event, stream_id = await fed_handler.do_remotely_reject_invite(
- remote_room_hosts, room_id, target.to_string(), content=content,
+ [inviter_id.domain], room_id, target_user, content=content
)
return event.event_id, stream_id
except Exception as e:
- # if we were unable to reject the exception, just mark
- # it as rejected on our end and plough ahead.
+ # if we were unable to reject the invite, we will generate our own
+ # leave event.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warning("Failed to reject invite: %s", e)
- stream_id = await self.locally_reject_invite(target.to_string(), room_id)
- return None, stream_id
+ return await self._locally_reject_invite(
+ invite_event, txn_id, requester, content
+ )
+
+ async def _locally_reject_invite(
+ self,
+ invite_event: EventBase,
+ txn_id: Optional[str],
+ requester: Requester,
+ content: JsonDict,
+ ) -> Tuple[str, int]:
+ """Generate a local invite rejection
+
+ This is called after we fail to reject an invite via a remote server. It
+ generates an out-of-band membership event locally.
+
+ Args:
+ invite_event: the invite to be rejected
+ txn_id: optional transaction ID supplied by the client
+ requester: user making the rejection request, according to the access token
+ content: additional content to include in the rejection event.
+ Normally an empty dict.
+ """
+
+ room_id = invite_event.room_id
+ target_user = invite_event.state_key
+ room_version = await self.store.get_room_version(room_id)
+
+ content["membership"] = Membership.LEAVE
+
+ # the auth events for the new event are the same as that of the invite, plus
+ # the invite itself.
+ #
+ # the prev_events are just the invite.
+ invite_hash = invite_event.event_id # type: Union[str, Tuple]
+ if room_version.event_format == EventFormatVersions.V1:
+ alg, h = compute_event_reference_hash(invite_event)
+ invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
+
+ auth_events = tuple(invite_event.auth_events) + (invite_hash,)
+ prev_events = (invite_hash,)
+
+ # we cap depth of generated events, to ensure that they are not
+ # rejected by other servers (and so that they can be persisted in
+ # the db)
+ depth = min(invite_event.depth + 1, MAX_DEPTH)
+
+ event_dict = {
+ "depth": depth,
+ "auth_events": auth_events,
+ "prev_events": prev_events,
+ "type": EventTypes.Member,
+ "room_id": room_id,
+ "sender": target_user,
+ "content": content,
+ "state_key": target_user,
+ }
+
+ event = create_local_event_from_event_dict(
+ clock=self.clock,
+ hostname=self.hs.hostname,
+ signing_key=self.hs.signing_key,
+ room_version=room_version,
+ event_dict=event_dict,
+ )
+ event.internal_metadata.outlier = True
+ event.internal_metadata.out_of_band_membership = True
+ if txn_id is not None:
+ event.internal_metadata.txn_id = txn_id
+ if requester.access_token_id is not None:
+ event.internal_metadata.token_id = requester.access_token_id
+
+ EventValidator().validate_new(event, self.config)
+
+ context = await self.state_handler.compute_event_context(event)
+ context.app_service = requester.app_service
+ stream_id = await self.event_creation_handler.handle_new_client_event(
+ requester, event, context, extra_users=[UserID.from_string(target_user)],
+ )
+ return event.event_id, stream_id
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 02e0c4103d..897338fd54 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -61,21 +61,22 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
return ret["event_id"], ret["stream_id"]
- async def _remote_reject_invite(
+ async def remote_reject_invite(
self,
+ invite_event_id: str,
+ txn_id: Optional[str],
requester: Requester,
- remote_room_hosts: List[str],
- room_id: str,
- target: UserID,
content: dict,
- ) -> Tuple[Optional[str], int]:
- """Implements RoomMemberHandler._remote_reject_invite
+ ) -> Tuple[str, int]:
+ """
+ Rejects an out-of-band invite received from a remote user
+
+ Implements RoomMemberHandler.remote_reject_invite
"""
ret = await self._remote_reject_client(
+ invite_event_id=invite_event_id,
+ txn_id=txn_id,
requester=requester,
- remote_room_hosts=remote_room_hosts,
- room_id=room_id,
- user_id=target.to_string(),
content=content,
)
return ret["event_id"], ret["stream_id"]
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 6c7abaa578..879c4c07c6 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -294,6 +294,9 @@ class TypingHandler(object):
rows.sort()
limited = False
+ # We, unusually, use a strict limit here as we have all the rows in
+ # memory rather than pulling them out of the database with a `LIMIT ?`
+ # clause.
if len(rows) > limit:
rows = rows[:limit]
current_id = rows[-1][0]
|