diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 413425fed1..2dd183018a 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,6 @@ from .admin import AdminHandler
from .directory import DirectoryHandler
from .federation import FederationHandler
from .identity import IdentityHandler
-from .register import RegistrationHandler
from .search import SearchHandler
@@ -41,7 +40,6 @@ class Handlers(object):
"""
def __init__(self, hs):
- self.registration_handler = RegistrationHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 704181d2d3..594754cfd8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -167,4 +167,4 @@ class BaseHandler(object):
ratelimit=False,
)
except Exception as e:
- logger.warn("Error kicking guest user: %s" % (e,))
+ logger.exception("Error kicking guest user: %s" % (e,))
diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py
new file mode 100644
index 0000000000..813777bf18
--- /dev/null
+++ b/synapse/handlers/acme.py
@@ -0,0 +1,151 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import attr
+from zope.interface import implementer
+
+import twisted
+import twisted.internet.error
+from twisted.internet import defer
+from twisted.python.filepath import FilePath
+from twisted.python.url import URL
+from twisted.web import server, static
+from twisted.web.resource import Resource
+
+from synapse.app import check_bind_error
+
+logger = logging.getLogger(__name__)
+
+try:
+ from txacme.interfaces import ICertificateStore
+
+ @attr.s
+ @implementer(ICertificateStore)
+ class ErsatzStore(object):
+ """
+ A store that only stores in memory.
+ """
+
+ certs = attr.ib(default=attr.Factory(dict))
+
+ def store(self, server_name, pem_objects):
+ self.certs[server_name] = [o.as_bytes() for o in pem_objects]
+ return defer.succeed(None)
+
+
+except ImportError:
+ # txacme is missing
+ pass
+
+
+class AcmeHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.reactor = hs.get_reactor()
+ self._acme_domain = hs.config.acme_domain
+
+ @defer.inlineCallbacks
+ def start_listening(self):
+
+ # Configure logging for txacme, if you need to debug
+ # from eliot import add_destinations
+ # from eliot.twisted import TwistedDestination
+ #
+ # add_destinations(TwistedDestination())
+
+ from txacme.challenges import HTTP01Responder
+ from txacme.service import AcmeIssuingService
+ from txacme.endpoint import load_or_create_client_key
+ from txacme.client import Client
+ from josepy.jwa import RS256
+
+ self._store = ErsatzStore()
+ responder = HTTP01Responder()
+
+ self._issuer = AcmeIssuingService(
+ cert_store=self._store,
+ client_creator=(
+ lambda: Client.from_url(
+ reactor=self.reactor,
+ url=URL.from_text(self.hs.config.acme_url),
+ key=load_or_create_client_key(
+ FilePath(self.hs.config.config_dir_path)
+ ),
+ alg=RS256,
+ )
+ ),
+ clock=self.reactor,
+ responders=[responder],
+ )
+
+ well_known = Resource()
+ well_known.putChild(b'acme-challenge', responder.resource)
+ responder_resource = Resource()
+ responder_resource.putChild(b'.well-known', well_known)
+ responder_resource.putChild(b'check', static.Data(b'OK', b'text/plain'))
+
+ srv = server.Site(responder_resource)
+
+ bind_addresses = self.hs.config.acme_bind_addresses
+ for host in bind_addresses:
+ logger.info(
+ "Listening for ACME requests on %s:%i", host, self.hs.config.acme_port,
+ )
+ try:
+ self.reactor.listenTCP(
+ self.hs.config.acme_port,
+ srv,
+ interface=host,
+ )
+ except twisted.internet.error.CannotListenError as e:
+ check_bind_error(e, host, bind_addresses)
+
+ # Make sure we are registered to the ACME server. There's no public API
+ # for this, it is usually triggered by startService, but since we don't
+ # want it to control where we save the certificates, we have to reach in
+ # and trigger the registration machinery ourselves.
+ self._issuer._registered = False
+ yield self._issuer._ensure_registered()
+
+ @defer.inlineCallbacks
+ def provision_certificate(self):
+
+ logger.warning("Reprovisioning %s", self._acme_domain)
+
+ try:
+ yield self._issuer.issue_cert(self._acme_domain)
+ except Exception:
+ logger.exception("Fail!")
+ raise
+ logger.warning("Reprovisioned %s, saving.", self._acme_domain)
+ cert_chain = self._store.certs[self._acme_domain]
+
+ try:
+ with open(self.hs.config.tls_private_key_file, "wb") as private_key_file:
+ for x in cert_chain:
+ if x.startswith(b"-----BEGIN RSA PRIVATE KEY-----"):
+ private_key_file.write(x)
+
+ with open(self.hs.config.tls_certificate_file, "wb") as certificate_file:
+ for x in cert_chain:
+ if x.startswith(b"-----BEGIN CERTIFICATE-----"):
+ certificate_file.write(x)
+ except Exception:
+ logger.exception("Failed saving!")
+ raise
+
+ defer.returnValue(True)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 329e3c7d71..2abd9af94f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -59,6 +59,7 @@ class AuthHandler(BaseHandler):
LoginType.EMAIL_IDENTITY: self._check_email_identity,
LoginType.MSISDN: self._check_msisdn,
LoginType.DUMMY: self._check_dummy_auth,
+ LoginType.TERMS: self._check_terms_auth,
}
self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -431,6 +432,9 @@ class AuthHandler(BaseHandler):
def _check_dummy_auth(self, authdict, _):
return defer.succeed(True)
+ def _check_terms_auth(self, authdict, _):
+ return defer.succeed(True)
+
@defer.inlineCallbacks
def _check_threepid(self, medium, authdict):
if 'threepid_creds' not in authdict:
@@ -462,6 +466,22 @@ class AuthHandler(BaseHandler):
def _get_params_recaptcha(self):
return {"public_key": self.hs.config.recaptcha_public_key}
+ def _get_params_terms(self):
+ return {
+ "policies": {
+ "privacy_policy": {
+ "version": self.hs.config.user_consent_version,
+ "en": {
+ "name": self.hs.config.user_consent_policy_name,
+ "url": "%s_matrix/consent?v=%s" % (
+ self.hs.config.public_baseurl,
+ self.hs.config.user_consent_version,
+ ),
+ },
+ },
+ },
+ }
+
def _auth_dict_for_flows(self, flows, session):
public_flows = []
for f in flows:
@@ -469,6 +489,7 @@ class AuthHandler(BaseHandler):
get_params = {
LoginType.RECAPTCHA: self._get_params_recaptcha,
+ LoginType.TERMS: self._get_params_terms,
}
params = {}
@@ -542,10 +563,10 @@ class AuthHandler(BaseHandler):
insensitively, but return None if there are multiple inexact matches.
Args:
- (str) user_id: complete @user:id
+ (unicode|bytes) user_id: complete @user:id
Returns:
- defer.Deferred: (str) canonical_user_id, or None if zero or
+ defer.Deferred: (unicode) canonical_user_id, or None if zero or
multiple matches
"""
res = yield self._find_user_id_and_pwd_hash(user_id)
@@ -933,6 +954,15 @@ class MacaroonGenerator(object):
return macaroon.serialize()
def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
+ """
+
+ Args:
+ user_id (unicode):
+ duration_in_ms (int):
+
+ Returns:
+ unicode
+ """
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
now = self.hs.get_clock().time_msec()
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9e017116a9..c708c35d4d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -20,7 +20,11 @@ from twisted.internet import defer
from synapse.api import errors
from synapse.api.constants import EventTypes
-from synapse.api.errors import FederationDeniedError
+from synapse.api.errors import (
+ FederationDeniedError,
+ HttpResponseException,
+ RequestSendFailed,
+)
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
@@ -504,13 +508,13 @@ class DeviceListEduUpdater(object):
origin = get_domain_from_id(user_id)
try:
result = yield self.federation.query_user_devices(origin, user_id)
- except NotRetryingDestination:
+ except (
+ NotRetryingDestination, RequestSendFailed, HttpResponseException,
+ ):
# TODO: Remember that we are now out of sync and try again
# later
logger.warn(
- "Failed to handle device list update for %s,"
- " we're not retrying the remote",
- user_id,
+ "Failed to handle device list update for %s", user_id,
)
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
@@ -532,6 +536,25 @@ class DeviceListEduUpdater(object):
stream_id = result["stream_id"]
devices = result["devices"]
+
+ # If the remote server has more than ~1000 devices for this user
+ # we assume that something is going horribly wrong (e.g. a bot
+ # that logs in and creates a new device every time it tries to
+ # send a message). Maintaining lots of devices per user in the
+ # cache can cause serious performance issues as if this request
+ # takes more than 60s to complete, internal replication from the
+ # inbound federation worker to the synapse master may time out
+ # causing the inbound federation to fail and causing the remote
+ # server to retry, causing a DoS. So in this scenario we give
+ # up on storing the total list of devices and only handle the
+ # delta instead.
+ if len(devices) > 1000:
+ logger.warn(
+ "Ignoring device list snapshot for %s as it has >1K devs (%d)",
+ user_id, len(devices)
+ )
+ devices = []
+
yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id,
)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 7d67bf803a..8b113307d2 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -57,8 +57,8 @@ class DirectoryHandler(BaseHandler):
# general association creation for both human users and app services
for wchar in string.whitespace:
- if wchar in room_alias.localpart:
- raise SynapseError(400, "Invalid characters in room alias")
+ if wchar in room_alias.localpart:
+ raise SynapseError(400, "Invalid characters in room alias")
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
@@ -112,7 +112,9 @@ class DirectoryHandler(BaseHandler):
403, "This user is not permitted to create this alias",
)
- if not self.config.is_alias_creation_allowed(user_id, room_alias.to_string()):
+ if not self.config.is_alias_creation_allowed(
+ user_id, room_id, room_alias.to_string(),
+ ):
# Lets just return a generic message, as there may be all sorts of
# reasons why we said no. TODO: Allow configurable error messages
# per alias creation rule?
@@ -138,9 +140,30 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def delete_association(self, requester, room_alias):
- # association deletion for human users
+ def delete_association(self, requester, room_alias, send_event=True):
+ """Remove an alias from the directory
+ (this is only meant for human users; AS users should call
+ delete_appservice_association)
+
+ Args:
+ requester (Requester):
+ room_alias (RoomAlias):
+ send_event (bool): Whether to send an updated m.room.aliases event.
+ Note that, if we delete the canonical alias, we will always attempt
+ to send an m.room.canonical_alias event
+
+ Returns:
+ Deferred[unicode]: room id that the alias used to point to
+
+ Raises:
+ NotFoundError: if the alias doesn't exist
+
+ AuthError: if the user doesn't have perms to delete the alias (ie, the user
+ is neither the creator of the alias, nor a server admin.
+
+ SynapseError: if the alias belongs to an AS
+ """
user_id = requester.user.to_string()
try:
@@ -168,10 +191,11 @@ class DirectoryHandler(BaseHandler):
room_id = yield self._delete_association(room_alias)
try:
- yield self.send_room_alias_update_event(
- requester,
- room_id
- )
+ if send_event:
+ yield self.send_room_alias_update_event(
+ requester,
+ room_id
+ )
yield self._update_canonical_alias(
requester,
@@ -373,9 +397,9 @@ class DirectoryHandler(BaseHandler):
room_id (str)
visibility (str): "public" or "private"
"""
- if not self.spam_checker.user_may_publish_room(
- requester.user.to_string(), room_id
- ):
+ user_id = requester.user.to_string()
+
+ if not self.spam_checker.user_may_publish_room(user_id, room_id):
raise AuthError(
403,
"This user is not permitted to publish rooms to the room list"
@@ -393,7 +417,24 @@ class DirectoryHandler(BaseHandler):
yield self.auth.check_can_change_room_list(room_id, requester.user)
- yield self.store.set_room_is_public(room_id, visibility == "public")
+ making_public = visibility == "public"
+ if making_public:
+ room_aliases = yield self.store.get_aliases_for_room(room_id)
+ canonical_alias = yield self.store.get_canonical_alias_for_room(room_id)
+ if canonical_alias:
+ room_aliases.append(canonical_alias)
+
+ if not self.config.is_publishing_room_allowed(
+ user_id, room_id, room_aliases,
+ ):
+ # Lets just return a generic message, as there may be all sorts of
+ # reasons why we said no. TODO: Allow configurable error messages
+ # per alias creation rule?
+ raise SynapseError(
+ 403, "Not allowed to publish room",
+ )
+
+ yield self.store.set_room_is_public(room_id, making_public)
@defer.inlineCallbacks
def edit_published_appservice_room_list(self, appservice_id, network_id,
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 5edb3cfe04..7bc174070e 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -19,7 +19,13 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError
+from synapse.api.errors import (
+ Codes,
+ NotFoundError,
+ RoomKeysVersionError,
+ StoreError,
+ SynapseError,
+)
from synapse.util.async_helpers import Linearizer
logger = logging.getLogger(__name__)
@@ -55,6 +61,8 @@ class E2eRoomKeysHandler(object):
room_id(string): room ID to get keys for, for None to get keys for all rooms
session_id(string): session ID to get keys for, for None to get keys for all
sessions
+ Raises:
+ NotFoundError: if the backup version does not exist
Returns:
A deferred list of dicts giving the session_data and message metadata for
these room keys.
@@ -63,13 +71,19 @@ class E2eRoomKeysHandler(object):
# we deliberately take the lock to get keys so that changing the version
# works atomically
with (yield self._upload_linearizer.queue(user_id)):
+ # make sure the backup version exists
+ try:
+ yield self.store.get_e2e_room_keys_version_info(user_id, version)
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown backup version")
+ else:
+ raise
+
results = yield self.store.get_e2e_room_keys(
user_id, version, room_id, session_id
)
- if results['rooms'] == {}:
- raise SynapseError(404, "No room_keys found")
-
defer.returnValue(results)
@defer.inlineCallbacks
@@ -120,7 +134,7 @@ class E2eRoomKeysHandler(object):
}
Raises:
- SynapseError: with code 404 if there are no versions defined
+ NotFoundError: if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version
"""
@@ -134,7 +148,7 @@ class E2eRoomKeysHandler(object):
version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
except StoreError as e:
if e.code == 404:
- raise SynapseError(404, "Version '%s' not found" % (version,))
+ raise NotFoundError("Version '%s' not found" % (version,))
else:
raise
@@ -148,7 +162,7 @@ class E2eRoomKeysHandler(object):
raise RoomKeysVersionError(current_version=version_info['version'])
except StoreError as e:
if e.code == 404:
- raise SynapseError(404, "Version '%s' not found" % (version,))
+ raise NotFoundError("Version '%s' not found" % (version,))
else:
raise
@@ -259,7 +273,7 @@ class E2eRoomKeysHandler(object):
version(str): Optional; if None gives the most recent version
otherwise a historical one.
Raises:
- StoreError: code 404 if the requested backup version doesn't exist
+ NotFoundError: if the requested backup version doesn't exist
Returns:
A deferred of a info dict that gives the info about the new version.
@@ -271,7 +285,13 @@ class E2eRoomKeysHandler(object):
"""
with (yield self._upload_linearizer.queue(user_id)):
- res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+ try:
+ res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown backup version")
+ else:
+ raise
defer.returnValue(res)
@defer.inlineCallbacks
@@ -282,8 +302,60 @@ class E2eRoomKeysHandler(object):
user_id(str): the user whose current backup version we're deleting
version(str): the version id of the backup being deleted
Raises:
- StoreError: code 404 if this backup version doesn't exist
+ NotFoundError: if this backup version doesn't exist
"""
with (yield self._upload_linearizer.queue(user_id)):
- yield self.store.delete_e2e_room_keys_version(user_id, version)
+ try:
+ yield self.store.delete_e2e_room_keys_version(user_id, version)
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown backup version")
+ else:
+ raise
+
+ @defer.inlineCallbacks
+ def update_version(self, user_id, version, version_info):
+ """Update the info about a given version of the user's backup
+
+ Args:
+ user_id(str): the user whose current backup version we're updating
+ version(str): the backup version we're updating
+ version_info(dict): the new information about the backup
+ Raises:
+ NotFoundError: if the requested backup version doesn't exist
+ Returns:
+ A deferred of an empty dict.
+ """
+ if "version" not in version_info:
+ raise SynapseError(
+ 400,
+ "Missing version in body",
+ Codes.MISSING_PARAM
+ )
+ if version_info["version"] != version:
+ raise SynapseError(
+ 400,
+ "Version in body does not match",
+ Codes.INVALID_PARAM
+ )
+ with (yield self._upload_linearizer.queue(user_id)):
+ try:
+ old_info = yield self.store.get_e2e_room_keys_version_info(
+ user_id, version
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown backup version")
+ else:
+ raise
+ if old_info["algorithm"] != version_info["algorithm"]:
+ raise SynapseError(
+ 400,
+ "Algorithm does not match",
+ Codes.INVALID_PARAM
+ )
+
+ yield self.store.update_e2e_room_keys_version(user_id, version, version_info)
+
+ defer.returnValue({})
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cd5b9bbb19..f80486102a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -34,6 +34,7 @@ from synapse.api.constants import (
EventTypes,
Membership,
RejectedReason,
+ RoomVersions,
)
from synapse.api.errors import (
AuthError,
@@ -43,10 +44,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
-from synapse.crypto.event_signing import (
- add_hashes_and_signatures,
- compute_event_signature,
-)
+from synapse.crypto.event_signing import compute_event_signature
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -58,7 +56,6 @@ from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
-from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
@@ -105,7 +102,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.store = hs.get_datastore() # type: synapse.storage.DataStore
+ self.store = hs.get_datastore()
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
@@ -202,27 +199,22 @@ class FederationHandler(BaseHandler):
self.room_queues[room_id].append((pdu, origin))
return
- # If we're no longer in the room just ditch the event entirely. This
- # is probably an old server that has come back and thinks we're still
- # in the room (or we've been rejoined to the room by a state reset).
+ # If we're not in the room just ditch the event entirely. This is
+ # probably an old server that has come back and thinks we're still in
+ # the room (or we've been rejoined to the room by a state reset).
#
- # If we were never in the room then maybe our database got vaped and
- # we should check if we *are* in fact in the room. If we are then we
- # can magically rejoin the room.
+ # Note that if we were never in the room then we would have already
+ # dropped the event, since we wouldn't know the room version.
is_in_room = yield self.auth.check_host_in_room(
room_id,
self.server_name
)
if not is_in_room:
- was_in_room = yield self.store.was_host_joined(
- pdu.room_id, self.server_name,
+ logger.info(
+ "[%s %s] Ignoring PDU from %s as we're not in the room",
+ room_id, event_id, origin,
)
- if was_in_room:
- logger.info(
- "[%s %s] Ignoring PDU from %s as we've left the room",
- room_id, event_id, origin,
- )
- defer.returnValue(None)
+ defer.returnValue(None)
state = None
auth_chain = []
@@ -239,7 +231,7 @@ class FederationHandler(BaseHandler):
room_id, event_id, min_depth,
)
- prevs = {e_id for e_id, _ in pdu.prev_events}
+ prevs = set(pdu.prev_event_ids())
seen = yield self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
@@ -347,6 +339,8 @@ class FederationHandler(BaseHandler):
room_id, event_id, p,
)
+ room_version = yield self.store.get_room_version(room_id)
+
with logcontext.nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
@@ -360,7 +354,7 @@ class FederationHandler(BaseHandler):
# we want the state *after* p; get_state_for_room returns the
# state *before* p.
remote_event = yield self.federation_client.get_pdu(
- [origin], p, outlier=True,
+ [origin], p, room_version, outlier=True,
)
if remote_event is None:
@@ -384,7 +378,6 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
- room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_store(
room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store),
@@ -557,86 +550,54 @@ class FederationHandler(BaseHandler):
room_id, event_id, event,
)
- # FIXME (erikj): Awful hack to make the case where we are not currently
- # in the room work
- # If state and auth_chain are None, then we don't need to do this check
- # as we already know we have enough state in the DB to handle this
- # event.
- if state and auth_chain and not event.internal_metadata.is_outlier():
- is_in_room = yield self.auth.check_host_in_room(
- room_id,
- self.server_name
- )
- else:
- is_in_room = True
-
- if not is_in_room:
- logger.info(
- "[%s %s] Got event for room we're not in",
- room_id, event_id,
- )
+ event_ids = set()
+ if state:
+ event_ids |= {e.event_id for e in state}
+ if auth_chain:
+ event_ids |= {e.event_id for e in auth_chain}
- try:
- yield self._persist_auth_tree(
- origin, auth_chain, state, event
- )
- except AuthError as e:
- raise FederationError(
- "ERROR",
- e.code,
- e.msg,
- affected=event_id,
- )
-
- else:
- event_ids = set()
- if state:
- event_ids |= {e.event_id for e in state}
- if auth_chain:
- event_ids |= {e.event_id for e in auth_chain}
+ seen_ids = yield self.store.have_seen_events(event_ids)
- seen_ids = yield self.store.have_seen_events(event_ids)
+ if state and auth_chain is not None:
+ # If we have any state or auth_chain given to us by the replication
+ # layer, then we should handle them (if we haven't before.)
- if state and auth_chain is not None:
- # If we have any state or auth_chain given to us by the replication
- # layer, then we should handle them (if we haven't before.)
+ event_infos = []
- event_infos = []
-
- for e in itertools.chain(auth_chain, state):
- if e.event_id in seen_ids:
- continue
- e.internal_metadata.outlier = True
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids or e.type == EventTypes.Create
- }
- event_infos.append({
- "event": e,
- "auth_events": auth,
- })
- seen_ids.add(e.event_id)
+ for e in itertools.chain(auth_chain, state):
+ if e.event_id in seen_ids:
+ continue
+ e.internal_metadata.outlier = True
+ auth_ids = e.auth_event_ids()
+ auth = {
+ (e.type, e.state_key): e for e in auth_chain
+ if e.event_id in auth_ids or e.type == EventTypes.Create
+ }
+ event_infos.append({
+ "event": e,
+ "auth_events": auth,
+ })
+ seen_ids.add(e.event_id)
- logger.info(
- "[%s %s] persisting newly-received auth/state events %s",
- room_id, event_id, [e["event"].event_id for e in event_infos]
- )
- yield self._handle_new_events(origin, event_infos)
+ logger.info(
+ "[%s %s] persisting newly-received auth/state events %s",
+ room_id, event_id, [e["event"].event_id for e in event_infos]
+ )
+ yield self._handle_new_events(origin, event_infos)
- try:
- context = yield self._handle_new_event(
- origin,
- event,
- state=state,
- )
- except AuthError as e:
- raise FederationError(
- "ERROR",
- e.code,
- e.msg,
- affected=event.event_id,
- )
+ try:
+ context = yield self._handle_new_event(
+ origin,
+ event,
+ state=state,
+ )
+ except AuthError as e:
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
+ )
room = yield self.store.get_room(room_id)
@@ -692,6 +653,8 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
+ room_version = yield self.store.get_room_version(room_id)
+
events = yield self.federation_client.backfill(
dest,
room_id,
@@ -726,7 +689,7 @@ class FederationHandler(BaseHandler):
edges = [
ev.event_id
for ev in events
- if set(e_id for e_id, _ in ev.prev_events) - event_ids
+ if set(ev.prev_event_ids()) - event_ids
]
logger.info(
@@ -753,7 +716,7 @@ class FederationHandler(BaseHandler):
required_auth = set(
a_id
for event in events + list(state_events.values()) + list(auth_events.values())
- for a_id, _ in event.auth_events
+ for a_id in event.auth_event_ids()
)
auth_events.update({
e_id: event_map[e_id] for e_id in required_auth if e_id in event_map
@@ -769,7 +732,7 @@ class FederationHandler(BaseHandler):
auth_events.update(ret_events)
required_auth.update(
- a_id for event in ret_events.values() for a_id, _ in event.auth_events
+ a_id for event in ret_events.values() for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
@@ -785,6 +748,7 @@ class FederationHandler(BaseHandler):
self.federation_client.get_pdu,
[dest],
event_id,
+ room_version=room_version,
outlier=True,
timeout=10000,
)
@@ -796,7 +760,7 @@ class FederationHandler(BaseHandler):
required_auth.update(
a_id
for event in results if event
- for a_id, _ in event.auth_events
+ for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
@@ -806,29 +770,52 @@ class FederationHandler(BaseHandler):
set(auth_events.keys()) | set(state_events.keys())
)
+ # We now have a chunk of events plus associated state and auth chain to
+ # persist. We do the persistence in two steps:
+ # 1. Auth events and state get persisted as outliers, plus the
+ # backward extremities get persisted (as non-outliers).
+ # 2. The rest of the events in the chunk get persisted one by one, as
+ # each one depends on the previous event for its state.
+ #
+ # The important thing is that events in the chunk get persisted as
+ # non-outliers, including when those events are also in the state or
+ # auth chain. Caution must therefore be taken to ensure that they are
+ # not accidentally marked as outliers.
+
+ # Step 1a: persist auth events that *don't* appear in the chunk
ev_infos = []
for a in auth_events.values():
- if a.event_id in seen_events:
+ # We only want to persist auth events as outliers that we haven't
+ # seen and aren't about to persist as part of the backfilled chunk.
+ if a.event_id in seen_events or a.event_id in event_map:
continue
+
a.internal_metadata.outlier = True
ev_infos.append({
"event": a,
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
- for a_id, _ in a.auth_events
+ for a_id in a.auth_event_ids()
if a_id in auth_events
}
})
+ # Step 1b: persist the events in the chunk we fetched state for (i.e.
+ # the backwards extremities) as non-outliers.
for e_id in events_to_state:
+ # For paranoia we ensure that these events are marked as
+ # non-outliers
+ ev = event_map[e_id]
+ assert(not ev.internal_metadata.is_outlier())
+
ev_infos.append({
- "event": event_map[e_id],
+ "event": ev,
"state": events_to_state[e_id],
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
- for a_id, _ in event_map[e_id].auth_events
+ for a_id in ev.auth_event_ids()
if a_id in auth_events
}
})
@@ -838,12 +825,17 @@ class FederationHandler(BaseHandler):
backfilled=True,
)
+ # Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
for event in events:
if event in events_to_state:
continue
+ # For paranoia we ensure that these events are marked as
+ # non-outliers
+ assert(not event.internal_metadata.is_outlier())
+
# We store these one at a time since each event depends on the
# previous to work out the state.
# TODO: We can probably do something more clever here.
@@ -1041,17 +1033,17 @@ class FederationHandler(BaseHandler):
Raises:
SynapseError if the event does not pass muster
"""
- if len(ev.prev_events) > 20:
+ if len(ev.prev_event_ids()) > 20:
logger.warn("Rejecting event %s which has %i prev_events",
- ev.event_id, len(ev.prev_events))
+ ev.event_id, len(ev.prev_event_ids()))
raise SynapseError(
http_client.BAD_REQUEST,
"Too many prev_events",
)
- if len(ev.auth_events) > 10:
+ if len(ev.auth_event_ids()) > 10:
logger.warn("Rejecting event %s which has %i auth_events",
- ev.event_id, len(ev.auth_events))
+ ev.event_id, len(ev.auth_event_ids()))
raise SynapseError(
http_client.BAD_REQUEST,
"Too many auth_events",
@@ -1076,7 +1068,7 @@ class FederationHandler(BaseHandler):
def on_event_auth(self, event_id):
event = yield self.store.get_event(event_id)
auth = yield self.store.get_auth_chain(
- [auth_id for auth_id, _ in event.auth_events],
+ [auth_id for auth_id in event.auth_event_ids()],
include_given=True
)
defer.returnValue([e for e in auth])
@@ -1097,7 +1089,7 @@ class FederationHandler(BaseHandler):
"""
logger.debug("Joining %s to %s", joinee, room_id)
- origin, event = yield self._make_and_verify_event(
+ origin, event, event_format_version = yield self._make_and_verify_event(
target_hosts,
room_id,
joinee,
@@ -1120,7 +1112,6 @@ class FederationHandler(BaseHandler):
handled_events = set()
try:
- event = self._sign_event(event)
# Try the host we successfully got a response to /make_join/
# request first.
try:
@@ -1128,7 +1119,9 @@ class FederationHandler(BaseHandler):
target_hosts.insert(0, origin)
except ValueError:
pass
- ret = yield self.federation_client.send_join(target_hosts, event)
+ ret = yield self.federation_client.send_join(
+ target_hosts, event, event_format_version,
+ )
origin = ret["origin"]
state = ret["state"]
@@ -1201,13 +1194,18 @@ class FederationHandler(BaseHandler):
"""
event_content = {"membership": Membership.JOIN}
- builder = self.event_builder_factory.new({
- "type": EventTypes.Member,
- "content": event_content,
- "room_id": room_id,
- "sender": user_id,
- "state_key": user_id,
- })
+ room_version = yield self.store.get_room_version(room_id)
+
+ builder = self.event_builder_factory.new(
+ room_version,
+ {
+ "type": EventTypes.Member,
+ "content": event_content,
+ "room_id": room_id,
+ "sender": user_id,
+ "state_key": user_id,
+ }
+ )
try:
event, context = yield self.event_creation_handler.create_new_client_event(
@@ -1219,7 +1217,9 @@ class FederationHandler(BaseHandler):
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_join_request`
- yield self.auth.check_from_context(event, context, do_sig_check=False)
+ yield self.auth.check_from_context(
+ room_version, event, context, do_sig_check=False,
+ )
defer.returnValue(event)
@@ -1324,11 +1324,11 @@ class FederationHandler(BaseHandler):
)
event.internal_metadata.outlier = True
- event.internal_metadata.invite_from_remote = True
+ event.internal_metadata.out_of_band_membership = True
event.signatures.update(
compute_event_signature(
- event,
+ event.get_pdu_json(),
self.hs.hostname,
self.hs.config.signing_key[0]
)
@@ -1341,7 +1341,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
- origin, event = yield self._make_and_verify_event(
+ origin, event, event_format_version = yield self._make_and_verify_event(
target_hosts,
room_id,
user_id,
@@ -1350,7 +1350,7 @@ class FederationHandler(BaseHandler):
# Mark as outlier as we don't have any state for this event; we're not
# even in the room.
event.internal_metadata.outlier = True
- event = self._sign_event(event)
+ event.internal_metadata.out_of_band_membership = True
# Try the host that we succesfully called /make_leave/ on first for
# the /send_leave/ request.
@@ -1373,7 +1373,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={}, params=None):
- origin, pdu = yield self.federation_client.make_membership_event(
+ origin, event, format_ver = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
@@ -1382,9 +1382,7 @@ class FederationHandler(BaseHandler):
params=params,
)
- logger.debug("Got response to make_%s: %s", membership, pdu)
-
- event = pdu
+ logger.debug("Got response to make_%s: %s", membership, event)
# We should assert some things.
# FIXME: Do this in a nicer way
@@ -1392,28 +1390,7 @@ class FederationHandler(BaseHandler):
assert(event.user_id == user_id)
assert(event.state_key == user_id)
assert(event.room_id == room_id)
- defer.returnValue((origin, event))
-
- def _sign_event(self, event):
- event.internal_metadata.outlier = False
-
- builder = self.event_builder_factory.new(
- unfreeze(event.get_pdu_json())
- )
-
- builder.event_id = self.event_builder_factory.create_event_id()
- builder.origin = self.hs.hostname
-
- if not hasattr(event, "signatures"):
- builder.signatures = {}
-
- add_hashes_and_signatures(
- builder,
- self.hs.hostname,
- self.hs.config.signing_key[0],
- )
-
- return builder.build()
+ defer.returnValue((origin, event, format_ver))
@defer.inlineCallbacks
@log_function
@@ -1422,13 +1399,17 @@ class FederationHandler(BaseHandler):
leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
- builder = self.event_builder_factory.new({
- "type": EventTypes.Member,
- "content": {"membership": Membership.LEAVE},
- "room_id": room_id,
- "sender": user_id,
- "state_key": user_id,
- })
+ room_version = yield self.store.get_room_version(room_id)
+ builder = self.event_builder_factory.new(
+ room_version,
+ {
+ "type": EventTypes.Member,
+ "content": {"membership": Membership.LEAVE},
+ "room_id": room_id,
+ "sender": user_id,
+ "state_key": user_id,
+ }
+ )
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
@@ -1437,7 +1418,9 @@ class FederationHandler(BaseHandler):
try:
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_leave_request`
- yield self.auth.check_from_context(event, context, do_sig_check=False)
+ yield self.auth.check_from_context(
+ room_version, event, context, do_sig_check=False,
+ )
except AuthError as e:
logger.warn("Failed to create new leave %r because %s", event, e)
raise e
@@ -1696,9 +1679,16 @@ class FederationHandler(BaseHandler):
create_event = e
break
+ if create_event is None:
+ # If the state doesn't have a create event then the room is
+ # invalid, and it would fail auth checks anyway.
+ raise SynapseError(400, "No create event in state")
+
+ room_version = create_event.content.get("room_version", RoomVersions.V1)
+
missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):
- for e_id, _ in e.auth_events:
+ for e_id in e.auth_event_ids():
if e_id not in event_map:
missing_auth_events.add(e_id)
@@ -1706,6 +1696,7 @@ class FederationHandler(BaseHandler):
m_ev = yield self.federation_client.get_pdu(
[origin],
e_id,
+ room_version=room_version,
outlier=True,
timeout=10000,
)
@@ -1717,14 +1708,14 @@ class FederationHandler(BaseHandler):
for e in itertools.chain(auth_events, state, [event]):
auth_for_e = {
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
- for e_id, _ in e.auth_events
+ for e_id in e.auth_event_ids()
if e_id in event_map
}
if create_event:
auth_for_e[(EventTypes.Create, "")] = create_event
try:
- self.auth.check(e, auth_events=auth_for_e)
+ self.auth.check(room_version, e, auth_events=auth_for_e)
except SynapseError as err:
# we may get SynapseErrors here as well as AuthErrors. For
# instance, there are a couple of (ancient) events in some
@@ -1785,10 +1776,10 @@ class FederationHandler(BaseHandler):
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
- if event.type == EventTypes.Member and not event.auth_events:
- if len(event.prev_events) == 1 and event.depth < 5:
+ if event.type == EventTypes.Member and not event.auth_event_ids():
+ if len(event.prev_event_ids()) == 1 and event.depth < 5:
c = yield self.store.get_event(
- event.prev_events[0][0],
+ event.prev_event_ids()[0],
allow_none=True,
)
if c and c.type == EventTypes.Create:
@@ -1835,7 +1826,7 @@ class FederationHandler(BaseHandler):
# Now get the current auth_chain for the event.
local_auth_chain = yield self.store.get_auth_chain(
- [auth_id for auth_id, _ in event.auth_events],
+ [auth_id for auth_id in event.auth_event_ids()],
include_given=True
)
@@ -1891,7 +1882,7 @@ class FederationHandler(BaseHandler):
"""
# Check if we have all the auth events.
current_state = set(e.event_id for e in auth_events.values())
- event_auth_events = set(e_id for e_id, _ in event.auth_events)
+ event_auth_events = set(event.auth_event_ids())
if event.is_state():
event_key = (event.type, event.state_key)
@@ -1935,7 +1926,7 @@ class FederationHandler(BaseHandler):
continue
try:
- auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth_ids = e.auth_event_ids()
auth = {
(e.type, e.state_key): e for e in remote_auth_chain
if e.event_id in auth_ids or e.type == EventTypes.Create
@@ -1956,7 +1947,7 @@ class FederationHandler(BaseHandler):
pass
have_events = yield self.store.get_seen_events_with_rejections(
- [e_id for e_id, _ in event.auth_events]
+ event.auth_event_ids()
)
seen_events = set(have_events.keys())
except Exception:
@@ -1968,6 +1959,8 @@ class FederationHandler(BaseHandler):
current_state = set(e.event_id for e in auth_events.values())
different_auth = event_auth_events - current_state
+ room_version = yield self.store.get_room_version(event.room_id)
+
if different_auth and not event.internal_metadata.is_outlier():
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
@@ -1992,8 +1985,6 @@ class FederationHandler(BaseHandler):
(d.type, d.state_key): d for d in different_events if d
})
- room_version = yield self.store.get_room_version(event.room_id)
-
new_state = yield self.state_handler.resolve_events(
room_version,
[list(local_view.values()), list(remote_view.values())],
@@ -2058,7 +2049,7 @@ class FederationHandler(BaseHandler):
continue
try:
- auth_ids = [e_id for e_id, _ in ev.auth_events]
+ auth_ids = ev.auth_event_ids()
auth = {
(e.type, e.state_key): e
for e in result["auth_chain"]
@@ -2093,7 +2084,7 @@ class FederationHandler(BaseHandler):
)
try:
- self.auth.check(event, auth_events=auth_events)
+ self.auth.check(room_version, event, auth_events=auth_events)
except AuthError as e:
logger.warn("Failed auth resolution for %r because %s", event, e)
raise e
@@ -2250,7 +2241,7 @@ class FederationHandler(BaseHandler):
missing_remote_ids = [e.event_id for e in missing_remotes]
base_remote_rejected = list(missing_remotes)
for e in missing_remotes:
- for e_id, _ in e.auth_events:
+ for e_id in e.auth_event_ids():
if e_id in missing_remote_ids:
try:
base_remote_rejected.remove(e)
@@ -2316,18 +2307,26 @@ class FederationHandler(BaseHandler):
}
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
- builder = self.event_builder_factory.new(event_dict)
- EventValidator().validate_new(builder)
+ room_version = yield self.store.get_room_version(room_id)
+ builder = self.event_builder_factory.new(room_version, event_dict)
+
+ EventValidator().validate_builder(builder)
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder
)
event, context = yield self.add_display_name_to_third_party_invite(
- event_dict, event, context
+ room_version, event_dict, event, context
)
+ EventValidator().validate_new(event)
+
+ # We need to tell the transaction queue to send this out, even
+ # though the sender isn't a local user.
+ event.internal_metadata.send_on_behalf_of = self.hs.hostname
+
try:
- yield self.auth.check_from_context(event, context)
+ yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
logger.warn("Denying new third party invite %r because %s", event, e)
raise e
@@ -2354,23 +2353,31 @@ class FederationHandler(BaseHandler):
Returns:
Deferred: resolves (to None)
"""
- builder = self.event_builder_factory.new(event_dict)
+ room_version = yield self.store.get_room_version(room_id)
+
+ # NB: event_dict has a particular specced format we might need to fudge
+ # if we change event formats too much.
+ builder = self.event_builder_factory.new(room_version, event_dict)
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
event, context = yield self.add_display_name_to_third_party_invite(
- event_dict, event, context
+ room_version, event_dict, event, context
)
try:
- self.auth.check_from_context(event, context)
+ self.auth.check_from_context(room_version, event, context)
except AuthError as e:
logger.warn("Denying third party invite %r because %s", event, e)
raise e
yield self._check_signature(event, context)
+ # We need to tell the transaction queue to send this out, even
+ # though the sender isn't a local user.
+ event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
+
# XXX we send the invite here, but send_membership_event also sends it,
# so we end up making two requests. I think this is redundant.
returned_invite = yield self.send_invite(origin, event)
@@ -2381,7 +2388,8 @@ class FederationHandler(BaseHandler):
yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
- def add_display_name_to_third_party_invite(self, event_dict, event, context):
+ def add_display_name_to_third_party_invite(self, room_version, event_dict,
+ event, context):
key = (
EventTypes.ThirdPartyInvite,
event.content["third_party_invite"]["signed"]["token"]
@@ -2405,11 +2413,12 @@ class FederationHandler(BaseHandler):
# auth checks. If we need the invite and don't have it then the
# auth check code will explode appropriately.
- builder = self.event_builder_factory.new(event_dict)
- EventValidator().validate_new(builder)
+ builder = self.event_builder_factory.new(room_version, event_dict)
+ EventValidator().validate_builder(builder)
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
+ EventValidator().validate_new(event)
defer.returnValue((event, context))
@defer.inlineCallbacks
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 173315af6c..02c508acec 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.errors import HttpResponseException, SynapseError
+from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@@ -46,13 +46,19 @@ def _create_rerouter(func_name):
# when the remote end responds with things like 403 Not
# In Group, we can communicate that to the client instead
# of a 500.
- def h(failure):
+ def http_response_errback(failure):
failure.trap(HttpResponseException)
e = failure.value
if e.code == 403:
raise e.to_synapse_error()
return failure
- d.addErrback(h)
+
+ def request_failed_errback(failure):
+ failure.trap(RequestSendFailed)
+ raise SynapseError(502, "Failed to contact group server")
+
+ d.addErrback(http_response_errback)
+ d.addErrback(request_failed_errback)
return d
return f
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 5feb3f22a6..39184f0e22 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -167,18 +167,21 @@ class IdentityHandler(BaseHandler):
"mxid": mxid,
"threepid": threepid,
}
- headers = {}
+
# we abuse the federation http client to sign the request, but we have to send it
# using the normal http client since we don't want the SRV lookup and want normal
# 'browser-like' HTTPS.
- self.federation_http_client.sign_request(
+ auth_headers = self.federation_http_client.build_auth_headers(
destination=None,
method='POST',
url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
- headers_dict=headers,
content=content,
destination_is=id_server,
)
+ headers = {
+ b"Authorization": auth_headers,
+ }
+
try:
yield self.http_client.post_json_get_json(
url,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 969e588e73..3981fe69ce 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, RoomVersions
from synapse.api.errors import (
AuthError,
Codes,
@@ -31,7 +31,6 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.urls import ConsentURIBuilder
-from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
@@ -278,9 +277,17 @@ class EventCreationHandler(object):
"""
yield self.auth.check_auth_blocking(requester.user.to_string())
- builder = self.event_builder_factory.new(event_dict)
+ if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
+ room_version = event_dict["content"]["room_version"]
+ else:
+ try:
+ room_version = yield self.store.get_room_version(event_dict["room_id"])
+ except NotFoundError:
+ raise AuthError(403, "Unknown room")
+
+ builder = self.event_builder_factory.new(room_version, event_dict)
- self.validator.validate_new(builder)
+ self.validator.validate_builder(builder)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
@@ -318,6 +325,8 @@ class EventCreationHandler(object):
prev_events_and_hashes=prev_events_and_hashes,
)
+ self.validator.validate_new(event)
+
defer.returnValue((event, context))
def _is_exempt_from_privacy_policy(self, builder, requester):
@@ -427,6 +436,9 @@ class EventCreationHandler(object):
if event.is_state():
prev_state = yield self.deduplicate_state_event(event, context)
+ logger.info(
+ "Not bothering to persist duplicate state event %s", event.event_id,
+ )
if prev_state is not None:
defer.returnValue(prev_state)
@@ -532,40 +544,19 @@ class EventCreationHandler(object):
prev_events_and_hashes = \
yield self.store.get_prev_events_for_room(builder.room_id)
- if prev_events_and_hashes:
- depth = max([d for _, _, d in prev_events_and_hashes]) + 1
- # we cap depth of generated events, to ensure that they are not
- # rejected by other servers (and so that they can be persisted in
- # the db)
- depth = min(depth, MAX_DEPTH)
- else:
- depth = 1
-
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
- builder.prev_events = prev_events
- builder.depth = depth
-
- context = yield self.state.compute_event_context(builder)
+ event = yield builder.build(
+ prev_event_ids=[p for p, _ in prev_events],
+ )
+ context = yield self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
- if builder.is_state():
- builder.prev_state = yield self.store.add_event_hashes(
- context.prev_state_events
- )
-
- yield self.auth.add_auth_events(builder, context)
-
- signing_key = self.hs.config.signing_key[0]
- add_hashes_and_signatures(
- builder, self.server_name, signing_key
- )
-
- event = builder.build()
+ self.validator.validate_new(event)
logger.debug(
"Created event %s",
@@ -600,8 +591,13 @@ class EventCreationHandler(object):
extra_users (list(UserID)): Any extra users to notify about event
"""
+ if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
+ room_version = event.content.get("room_version", RoomVersions.V1)
+ else:
+ room_version = yield self.store.get_room_version(event.room_id)
+
try:
- yield self.auth.check_from_context(event, context)
+ yield self.auth.check_from_context(room_version, event, context)
except AuthError as err:
logger.warn("Denying new event %r because %s", event, err)
raise err
@@ -749,7 +745,8 @@ class EventCreationHandler(object):
auth_events = {
(e.type, e.state_key): e for e in auth_events.values()
}
- if self.auth.check_redaction(event, auth_events=auth_events):
+ room_version = yield self.store.get_room_version(event.room_id)
+ if self.auth.check_redaction(room_version, event, auth_events=auth_events):
original_event = yield self.store.get_event(
event.redacts,
check_redacted=False,
@@ -763,6 +760,9 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
+ # We've already checked.
+ event.internal_metadata.recheck_redaction = False
+
if event.type == EventTypes.Create:
prev_state_ids = yield context.get_prev_state_ids(self.store)
if prev_state_ids:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 43f81bd607..e4fdae9266 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -136,7 +136,11 @@ class PaginationHandler(object):
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
- logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ f = Failure()
+ logger.error(
+ "[purge] failed",
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
self._purges_in_progress_by_room.discard(room_id)
@@ -235,6 +239,17 @@ class PaginationHandler(object):
"room_key", next_key
)
+ if events:
+ if event_filter:
+ events = event_filter.filter(events)
+
+ events = yield filter_events_for_client(
+ self.store,
+ user_id,
+ events,
+ is_peeking=(member_event_id is None),
+ )
+
if not events:
defer.returnValue({
"chunk": [],
@@ -242,18 +257,8 @@ class PaginationHandler(object):
"end": next_token.to_string(),
})
- if event_filter:
- events = event_filter.filter(events)
-
- events = yield filter_events_for_client(
- self.store,
- user_id,
- events,
- is_peeking=(member_event_id is None),
- )
-
state = None
- if event_filter and event_filter.lazy_load_members():
+ if event_filter and event_filter.lazy_load_members() and len(events) > 0:
# TODO: remove redundant members
# FIXME: we also care about invite targets etc.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 4c2690ba26..696469732c 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,8 +16,8 @@ import logging
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
-from synapse.util import logcontext
from ._base import BaseHandler
@@ -59,7 +59,9 @@ class ReceiptsHandler(BaseHandler):
if is_new:
# fire off a process in the background to send the receipt to
# remote servers
- self._push_remotes([receipt])
+ run_as_background_process(
+ 'push_receipts_to_remotes', self._push_remotes, receipt
+ )
@defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
@@ -125,44 +127,42 @@ class ReceiptsHandler(BaseHandler):
defer.returnValue(True)
- @logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks
- def _push_remotes(self, receipts):
- """Given a list of receipts, works out which remote servers should be
+ def _push_remotes(self, receipt):
+ """Given a receipt, works out which remote servers should be
poked and pokes them.
"""
try:
- # TODO: Some of this stuff should be coallesced.
- for receipt in receipts:
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": event_ids,
- "data": data,
- }
+ # TODO: optimise this to move some of the work to the workers.
+ room_id = receipt["room_id"]
+ receipt_type = receipt["receipt_type"]
+ user_id = receipt["user_id"]
+ event_ids = receipt["event_ids"]
+ data = receipt["data"]
+
+ users = yield self.state.get_current_user_in_room(room_id)
+ remotedomains = set(get_domain_from_id(u) for u in users)
+ remotedomains = remotedomains.copy()
+ remotedomains.discard(self.server_name)
+
+ logger.debug("Sending receipt to: %r", remotedomains)
+
+ for domain in remotedomains:
+ self.federation.send_edu(
+ destination=domain,
+ edu_type="m.receipt",
+ content={
+ room_id: {
+ receipt_type: {
+ user_id: {
+ "event_ids": event_ids,
+ "data": data,
}
- },
+ }
},
- key=(room_id, receipt_type, user_id),
- )
+ },
+ key=(room_id, receipt_type, user_id),
+ )
except Exception:
logger.exception("Error pushing receipts to remote servers")
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index e9d7b25a36..c0e06929bd 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -19,6 +19,7 @@ import logging
from twisted.internet import defer
from synapse import types
+from synapse.api.constants import LoginType
from synapse.api.errors import (
AuthError,
Codes,
@@ -26,7 +27,14 @@ from synapse.api.errors import (
RegistrationError,
SynapseError,
)
+from synapse.config.server import is_threepid_reserved
from synapse.http.client import CaptchaServerHttpClient
+from synapse.http.servlet import assert_params_in_dict
+from synapse.replication.http.login import RegisterDeviceReplicationServlet
+from synapse.replication.http.register import (
+ ReplicationPostRegisterActionsServlet,
+ ReplicationRegisterServlet,
+)
from synapse.types import RoomAlias, RoomID, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.threepids import check_3pid_allowed
@@ -50,8 +58,8 @@ class RegistrationHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
- self.room_creation_handler = self.hs.get_room_creation_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
+ self.identity_handler = self.hs.get_handlers().identity_handler
self._next_generated_user_id = None
@@ -62,6 +70,18 @@ class RegistrationHandler(BaseHandler):
)
self._server_notices_mxid = hs.config.server_notices_mxid
+ if hs.config.worker_app:
+ self._register_client = ReplicationRegisterServlet.make_client(hs)
+ self._register_device_client = (
+ RegisterDeviceReplicationServlet.make_client(hs)
+ )
+ self._post_registration_client = (
+ ReplicationPostRegisterActionsServlet.make_client(hs)
+ )
+ else:
+ self.device_handler = hs.get_device_handler()
+ self.pusher_pool = hs.get_pusherpool()
+
@defer.inlineCallbacks
def check_username(self, localpart, guest_access_token=None,
assigned_user_id=None):
@@ -127,6 +147,8 @@ class RegistrationHandler(BaseHandler):
make_guest=False,
admin=False,
threepid=None,
+ user_type=None,
+ default_display_name=None,
):
"""Registers a new client on the server.
@@ -141,6 +163,10 @@ class RegistrationHandler(BaseHandler):
since it offers no means of associating a device_id with the
access_token. Instead you should call auth_handler.issue_access_token
after registration.
+ user_type (str|None): type of user. One of the values from
+ api.constants.UserTypes, or None for a normal user.
+ default_display_name (unicode|None): if set, the new user's displayname
+ will be set to this. Defaults to 'localpart'.
Returns:
A tuple of (user_id, access_token).
Raises:
@@ -150,7 +176,7 @@ class RegistrationHandler(BaseHandler):
yield self.auth.check_auth_blocking(threepid=threepid)
password_hash = None
if password:
- password_hash = yield self.auth_handler().hash(password)
+ password_hash = yield self._auth_handler.hash(password)
if localpart:
yield self.check_username(localpart, guest_access_token=guest_access_token)
@@ -170,20 +196,25 @@ class RegistrationHandler(BaseHandler):
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
+ if was_guest:
+ # If the user was a guest then they already have a profile
+ default_display_name = None
+
+ elif default_display_name is None:
+ default_display_name = localpart
+
token = None
if generate_token:
token = self.macaroon_gen.generate_access_token(user_id)
- yield self.store.register(
+ yield self._register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
was_guest=was_guest,
make_guest=make_guest,
- create_profile_with_localpart=(
- # If the user was a guest then they already have a profile
- None if was_guest else user.localpart
- ),
+ create_profile_with_displayname=default_display_name,
admin=admin,
+ user_type=user_type,
)
if self.hs.config.user_directory_search_all_users:
@@ -204,13 +235,15 @@ class RegistrationHandler(BaseHandler):
yield self.check_user_id_not_appservice_exclusive(user_id)
if generate_token:
token = self.macaroon_gen.generate_access_token(user_id)
+ if default_display_name is None:
+ default_display_name = localpart
try:
- yield self.store.register(
+ yield self._register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
make_guest=make_guest,
- create_profile_with_localpart=user.localpart,
+ create_profile_with_displayname=default_display_name,
)
except SynapseError:
# if user id is taken, just generate another
@@ -218,16 +251,34 @@ class RegistrationHandler(BaseHandler):
user_id = None
token = None
attempts += 1
+ if not self.hs.config.user_consent_at_registration:
+ yield self._auto_join_rooms(user_id)
+
+ defer.returnValue((user_id, token))
+
+ @defer.inlineCallbacks
+ def _auto_join_rooms(self, user_id):
+ """Automatically joins users to auto join rooms - creating the room in the first place
+ if the user is the first to be created.
+ Args:
+ user_id(str): The user to join
+ """
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
- # try to create the room if we're the first user on the server
+ # try to create the room if we're the first real user on the server. Note
+ # that an auto-generated support user is not a real user and will never be
+ # the user to create the room
should_auto_create_rooms = False
- if self.hs.config.autocreate_auto_join_rooms:
+ is_support = yield self.store.is_support_user(user_id)
+ # There is an edge case where the first user is the support user, then
+ # the room is never created, though this seems unlikely and
+ # recoverable from given the support user being involved in the first
+ # place.
+ if self.hs.config.autocreate_auto_join_rooms and not is_support:
count = yield self.store.count_all_users()
should_auto_create_rooms = count == 1
-
for r in self.hs.config.auto_join_rooms:
try:
if should_auto_create_rooms:
@@ -241,7 +292,10 @@ class RegistrationHandler(BaseHandler):
else:
# create room expects the localpart of the room alias
room_alias_localpart = room_alias.localpart
- yield self.room_creation_handler.create_room(
+
+ # getting the RoomCreationHandler during init gives a dependency
+ # loop
+ yield self.hs.get_room_creation_handler().create_room(
fake_requester,
config={
"preset": "public_chat",
@@ -254,10 +308,15 @@ class RegistrationHandler(BaseHandler):
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
- # We used to generate default identicons here, but nowadays
- # we want clients to generate their own as part of their branding
- # rather than there being consistent matrix-wide ones, so we don't.
- defer.returnValue((user_id, token))
+ @defer.inlineCallbacks
+ def post_consent_actions(self, user_id):
+ """A series of registration actions that can only be carried out once consent
+ has been granted
+
+ Args:
+ user_id (str): The user to join
+ """
+ yield self._auto_join_rooms(user_id)
@defer.inlineCallbacks
def appservice_register(self, user_localpart, as_token):
@@ -278,11 +337,11 @@ class RegistrationHandler(BaseHandler):
user_id, allowed_appservice=service
)
- yield self.store.register(
+ yield self._register_with_store(
user_id=user_id,
password_hash="",
appservice_id=service_id,
- create_profile_with_localpart=user.localpart,
+ create_profile_with_displayname=user.localpart,
)
defer.returnValue(user_id)
@@ -310,35 +369,6 @@ class RegistrationHandler(BaseHandler):
logger.info("Valid captcha entered from %s", ip)
@defer.inlineCallbacks
- def register_saml2(self, localpart):
- """
- Registers email_id as SAML2 Based Auth.
- """
- if types.contains_invalid_mxid_characters(localpart):
- raise SynapseError(
- 400,
- "User ID can only contain characters a-z, 0-9, or '=_-./'",
- )
- yield self.auth.check_auth_blocking()
- user = UserID(localpart, self.hs.hostname)
- user_id = user.to_string()
-
- yield self.check_user_id_not_appservice_exclusive(user_id)
- token = self.macaroon_gen.generate_access_token(user_id)
- try:
- yield self.store.register(
- user_id=user_id,
- token=token,
- password_hash=None,
- create_profile_with_localpart=user.localpart,
- )
- except Exception as e:
- yield self.store.add_access_token_to_user(user_id, token)
- # Ignore Registration errors
- logger.exception(e)
- defer.returnValue((user_id, token))
-
- @defer.inlineCallbacks
def register_email(self, threepidCreds):
"""
Registers emails with an identity server.
@@ -350,8 +380,7 @@ class RegistrationHandler(BaseHandler):
logger.info("validating threepidcred sid %s on id server %s",
c['sid'], c['idServer'])
try:
- identity_handler = self.hs.get_handlers().identity_handler
- threepid = yield identity_handler.threepid_from_creds(c)
+ threepid = yield self.identity_handler.threepid_from_creds(c)
except Exception:
logger.exception("Couldn't validate 3pid")
raise RegistrationError(400, "Couldn't validate 3pid")
@@ -375,9 +404,8 @@ class RegistrationHandler(BaseHandler):
# Now we have a matrix ID, bind it to the threepids we were given
for c in threepidCreds:
- identity_handler = self.hs.get_handlers().identity_handler
# XXX: This should be a deferred list, shouldn't it?
- yield identity_handler.bind_threepid(c, user_id)
+ yield self.identity_handler.bind_threepid(c, user_id)
def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
# don't allow people to register the server notices mxid
@@ -432,7 +460,7 @@ class RegistrationHandler(BaseHandler):
lines = response.split('\n')
json = {
"valid": lines[0] == 'true',
- "error_url": "http://www.google.com/recaptcha/api/challenge?" +
+ "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?" +
"error=%s" % lines[1]
}
defer.returnValue(json)
@@ -443,7 +471,7 @@ class RegistrationHandler(BaseHandler):
Used only by c/s api v1
"""
data = yield self.captcha_client.post_urlencoded_get_raw(
- "http://www.google.com:80/recaptcha/api/verify",
+ "http://www.recaptcha.net:80/recaptcha/api/verify",
args={
'privatekey': private_key,
'remoteip': ip_addr,
@@ -485,11 +513,11 @@ class RegistrationHandler(BaseHandler):
token = self.macaroon_gen.generate_access_token(user_id)
if need_register:
- yield self.store.register(
+ yield self._register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
- create_profile_with_localpart=user.localpart,
+ create_profile_with_displayname=user.localpart,
)
else:
yield self._auth_handler.delete_access_tokens_for_user(user_id)
@@ -503,9 +531,6 @@ class RegistrationHandler(BaseHandler):
defer.returnValue((user_id, token))
- def auth_handler(self):
- return self.hs.get_auth_handler()
-
@defer.inlineCallbacks
def get_or_register_3pid_guest(self, medium, address, inviter_user_id):
"""Get a guest access token for a 3PID, creating a guest account if
@@ -564,3 +589,275 @@ class RegistrationHandler(BaseHandler):
action="join",
ratelimit=False,
)
+
+ def _register_with_store(self, user_id, token=None, password_hash=None,
+ was_guest=False, make_guest=False, appservice_id=None,
+ create_profile_with_displayname=None, admin=False,
+ user_type=None):
+ """Register user in the datastore.
+
+ Args:
+ user_id (str): The desired user ID to register.
+ token (str): The desired access token to use for this user. If this
+ is not None, the given access token is associated with the user
+ id.
+ password_hash (str|None): Optional. The password hash for this user.
+ was_guest (bool): Optional. Whether this is a guest account being
+ upgraded to a non-guest account.
+ make_guest (boolean): True if the the new user should be guest,
+ false to add a regular user account.
+ appservice_id (str|None): The ID of the appservice registering the user.
+ create_profile_with_displayname (unicode|None): Optionally create a
+ profile for the user, setting their displayname to the given value
+ admin (boolean): is an admin user?
+ user_type (str|None): type of user. One of the values from
+ api.constants.UserTypes, or None for a normal user.
+
+ Returns:
+ Deferred
+ """
+ if self.hs.config.worker_app:
+ return self._register_client(
+ user_id=user_id,
+ token=token,
+ password_hash=password_hash,
+ was_guest=was_guest,
+ make_guest=make_guest,
+ appservice_id=appservice_id,
+ create_profile_with_displayname=create_profile_with_displayname,
+ admin=admin,
+ user_type=user_type,
+ )
+ else:
+ return self.store.register(
+ user_id=user_id,
+ token=token,
+ password_hash=password_hash,
+ was_guest=was_guest,
+ make_guest=make_guest,
+ appservice_id=appservice_id,
+ create_profile_with_displayname=create_profile_with_displayname,
+ admin=admin,
+ user_type=user_type,
+ )
+
+ @defer.inlineCallbacks
+ def register_device(self, user_id, device_id, initial_display_name,
+ is_guest=False):
+ """Register a device for a user and generate an access token.
+
+ Args:
+ user_id (str): full canonical @user:id
+ device_id (str|None): The device ID to check, or None to generate
+ a new one.
+ initial_display_name (str|None): An optional display name for the
+ device.
+ is_guest (bool): Whether this is a guest account
+
+ Returns:
+ defer.Deferred[tuple[str, str]]: Tuple of device ID and access token
+ """
+
+ if self.hs.config.worker_app:
+ r = yield self._register_device_client(
+ user_id=user_id,
+ device_id=device_id,
+ initial_display_name=initial_display_name,
+ is_guest=is_guest,
+ )
+ defer.returnValue((r["device_id"], r["access_token"]))
+ else:
+ device_id = yield self.device_handler.check_device_registered(
+ user_id, device_id, initial_display_name
+ )
+ if is_guest:
+ access_token = self.macaroon_gen.generate_access_token(
+ user_id, ["guest = true"]
+ )
+ else:
+ access_token = yield self._auth_handler.get_access_token_for_user_id(
+ user_id, device_id=device_id,
+ )
+
+ defer.returnValue((device_id, access_token))
+
+ @defer.inlineCallbacks
+ def post_registration_actions(self, user_id, auth_result, access_token,
+ bind_email, bind_msisdn):
+ """A user has completed registration
+
+ Args:
+ user_id (str): The user ID that consented
+ auth_result (dict): The authenticated credentials of the newly
+ registered user.
+ access_token (str|None): The access token of the newly logged in
+ device, or None if `inhibit_login` enabled.
+ bind_email (bool): Whether to bind the email with the identity
+ server
+ bind_msisdn (bool): Whether to bind the msisdn with the identity
+ server
+ """
+ if self.hs.config.worker_app:
+ yield self._post_registration_client(
+ user_id=user_id,
+ auth_result=auth_result,
+ access_token=access_token,
+ bind_email=bind_email,
+ bind_msisdn=bind_msisdn,
+ )
+ return
+
+ if auth_result and LoginType.EMAIL_IDENTITY in auth_result:
+ threepid = auth_result[LoginType.EMAIL_IDENTITY]
+ # Necessary due to auth checks prior to the threepid being
+ # written to the db
+ if is_threepid_reserved(
+ self.hs.config.mau_limits_reserved_threepids, threepid
+ ):
+ yield self.store.upsert_monthly_active_user(user_id)
+
+ yield self._register_email_threepid(
+ user_id, threepid, access_token,
+ bind_email,
+ )
+
+ if auth_result and LoginType.MSISDN in auth_result:
+ threepid = auth_result[LoginType.MSISDN]
+ yield self._register_msisdn_threepid(
+ user_id, threepid, bind_msisdn,
+ )
+
+ if auth_result and LoginType.TERMS in auth_result:
+ yield self._on_user_consented(
+ user_id, self.hs.config.user_consent_version,
+ )
+
+ @defer.inlineCallbacks
+ def _on_user_consented(self, user_id, consent_version):
+ """A user consented to the terms on registration
+
+ Args:
+ user_id (str): The user ID that consented
+ consent_version (str): version of the policy the user has
+ consented to.
+ """
+ logger.info("%s has consented to the privacy policy", user_id)
+ yield self.store.user_set_consent_version(
+ user_id, consent_version,
+ )
+ yield self.post_consent_actions(user_id)
+
+ @defer.inlineCallbacks
+ def _register_email_threepid(self, user_id, threepid, token, bind_email):
+ """Add an email address as a 3pid identifier
+
+ Also adds an email pusher for the email address, if configured in the
+ HS config
+
+ Also optionally binds emails to the given user_id on the identity server
+
+ Must be called on master.
+
+ Args:
+ user_id (str): id of user
+ threepid (object): m.login.email.identity auth response
+ token (str|None): access_token for the user, or None if not logged
+ in.
+ bind_email (bool): true if the client requested the email to be
+ bound at the identity server
+ Returns:
+ defer.Deferred:
+ """
+ reqd = ('medium', 'address', 'validated_at')
+ if any(x not in threepid for x in reqd):
+ # This will only happen if the ID server returns a malformed response
+ logger.info("Can't add incomplete 3pid")
+ return
+
+ yield self._auth_handler.add_threepid(
+ user_id,
+ threepid['medium'],
+ threepid['address'],
+ threepid['validated_at'],
+ )
+
+ # And we add an email pusher for them by default, but only
+ # if email notifications are enabled (so people don't start
+ # getting mail spam where they weren't before if email
+ # notifs are set up on a home server)
+ if (self.hs.config.email_enable_notifs and
+ self.hs.config.email_notif_for_new_users
+ and token):
+ # Pull the ID of the access token back out of the db
+ # It would really make more sense for this to be passed
+ # up when the access token is saved, but that's quite an
+ # invasive change I'd rather do separately.
+ user_tuple = yield self.store.get_user_by_access_token(
+ token
+ )
+ token_id = user_tuple["token_id"]
+
+ yield self.pusher_pool.add_pusher(
+ user_id=user_id,
+ access_token=token_id,
+ kind="email",
+ app_id="m.email",
+ app_display_name="Email Notifications",
+ device_display_name=threepid["address"],
+ pushkey=threepid["address"],
+ lang=None, # We don't know a user's language here
+ data={},
+ )
+
+ if bind_email:
+ logger.info("bind_email specified: binding")
+ logger.debug("Binding emails %s to %s" % (
+ threepid, user_id
+ ))
+ yield self.identity_handler.bind_threepid(
+ threepid['threepid_creds'], user_id
+ )
+ else:
+ logger.info("bind_email not specified: not binding email")
+
+ @defer.inlineCallbacks
+ def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn):
+ """Add a phone number as a 3pid identifier
+
+ Also optionally binds msisdn to the given user_id on the identity server
+
+ Must be called on master.
+
+ Args:
+ user_id (str): id of user
+ threepid (object): m.login.msisdn auth response
+ token (str): access_token for the user
+ bind_email (bool): true if the client requested the email to be
+ bound at the identity server
+ Returns:
+ defer.Deferred:
+ """
+ try:
+ assert_params_in_dict(threepid, ['medium', 'address', 'validated_at'])
+ except SynapseError as ex:
+ if ex.errcode == Codes.MISSING_PARAM:
+ # This will only happen if the ID server returns a malformed response
+ logger.info("Can't add incomplete 3pid")
+ defer.returnValue(None)
+ raise
+
+ yield self._auth_handler.add_threepid(
+ user_id,
+ threepid['medium'],
+ threepid['address'],
+ threepid['validated_at'],
+ )
+
+ if bind_msisdn:
+ logger.info("bind_msisdn specified: binding")
+ logger.debug("Binding msisdn %s to %s", threepid, user_id)
+ yield self.identity_handler.bind_threepid(
+ threepid['threepid_creds'], user_id
+ )
+ else:
+ logger.info("bind_msisdn not specified: not binding msisdn")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3ba92bdb4c..67b15697fd 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -21,7 +21,7 @@ import math
import string
from collections import OrderedDict
-from six import string_types
+from six import iteritems, string_types
from twisted.internet import defer
@@ -32,10 +32,11 @@ from synapse.api.constants import (
JoinRules,
RoomCreationPreset,
)
-from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
+from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
+from synapse.util.async_helpers import Linearizer
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -73,6 +74,372 @@ class RoomCreationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
+ self.room_member_handler = hs.get_room_member_handler()
+
+ # linearizer to stop two upgrades happening at once
+ self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
+
+ @defer.inlineCallbacks
+ def upgrade_room(self, requester, old_room_id, new_version):
+ """Replace a room with a new room with a different version
+
+ Args:
+ requester (synapse.types.Requester): the user requesting the upgrade
+ old_room_id (unicode): the id of the room to be replaced
+ new_version (unicode): the new room version to use
+
+ Returns:
+ Deferred[unicode]: the new room id
+ """
+ yield self.ratelimit(requester)
+
+ user_id = requester.user.to_string()
+
+ with (yield self._upgrade_linearizer.queue(old_room_id)):
+ # start by allocating a new room id
+ r = yield self.store.get_room(old_room_id)
+ if r is None:
+ raise NotFoundError("Unknown room id %s" % (old_room_id,))
+ new_room_id = yield self._generate_room_id(
+ creator_id=user_id, is_public=r["is_public"],
+ )
+
+ logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
+
+ # we create and auth the tombstone event before properly creating the new
+ # room, to check our user has perms in the old room.
+ tombstone_event, tombstone_context = (
+ yield self.event_creation_handler.create_event(
+ requester, {
+ "type": EventTypes.Tombstone,
+ "state_key": "",
+ "room_id": old_room_id,
+ "sender": user_id,
+ "content": {
+ "body": "This room has been replaced",
+ "replacement_room": new_room_id,
+ }
+ },
+ token_id=requester.access_token_id,
+ )
+ )
+ old_room_version = yield self.store.get_room_version(old_room_id)
+ yield self.auth.check_from_context(
+ old_room_version, tombstone_event, tombstone_context,
+ )
+
+ yield self.clone_existing_room(
+ requester,
+ old_room_id=old_room_id,
+ new_room_id=new_room_id,
+ new_room_version=new_version,
+ tombstone_event_id=tombstone_event.event_id,
+ )
+
+ # now send the tombstone
+ yield self.event_creation_handler.send_nonmember_event(
+ requester, tombstone_event, tombstone_context,
+ )
+
+ old_room_state = yield tombstone_context.get_current_state_ids(self.store)
+
+ # update any aliases
+ yield self._move_aliases_to_new_room(
+ requester, old_room_id, new_room_id, old_room_state,
+ )
+
+ # and finally, shut down the PLs in the old room, and update them in the new
+ # room.
+ yield self._update_upgraded_room_pls(
+ requester, old_room_id, new_room_id, old_room_state,
+ )
+
+ defer.returnValue(new_room_id)
+
+ @defer.inlineCallbacks
+ def _update_upgraded_room_pls(
+ self, requester, old_room_id, new_room_id, old_room_state,
+ ):
+ """Send updated power levels in both rooms after an upgrade
+
+ Args:
+ requester (synapse.types.Requester): the user requesting the upgrade
+ old_room_id (unicode): the id of the room to be replaced
+ new_room_id (unicode): the id of the replacement room
+ old_room_state (dict[tuple[str, str], str]): the state map for the old room
+
+ Returns:
+ Deferred
+ """
+ old_room_pl_event_id = old_room_state.get((EventTypes.PowerLevels, ""))
+
+ if old_room_pl_event_id is None:
+ logger.warning(
+ "Not supported: upgrading a room with no PL event. Not setting PLs "
+ "in old room.",
+ )
+ return
+
+ old_room_pl_state = yield self.store.get_event(old_room_pl_event_id)
+
+ # we try to stop regular users from speaking by setting the PL required
+ # to send regular events and invites to 'Moderator' level. That's normally
+ # 50, but if the default PL in a room is 50 or more, then we set the
+ # required PL above that.
+
+ pl_content = dict(old_room_pl_state.content)
+ users_default = int(pl_content.get("users_default", 0))
+ restricted_level = max(users_default + 1, 50)
+
+ updated = False
+ for v in ("invite", "events_default"):
+ current = int(pl_content.get(v, 0))
+ if current < restricted_level:
+ logger.info(
+ "Setting level for %s in %s to %i (was %i)",
+ v, old_room_id, restricted_level, current,
+ )
+ pl_content[v] = restricted_level
+ updated = True
+ else:
+ logger.info(
+ "Not setting level for %s (already %i)",
+ v, current,
+ )
+
+ if updated:
+ try:
+ yield self.event_creation_handler.create_and_send_nonmember_event(
+ requester, {
+ "type": EventTypes.PowerLevels,
+ "state_key": '',
+ "room_id": old_room_id,
+ "sender": requester.user.to_string(),
+ "content": pl_content,
+ }, ratelimit=False,
+ )
+ except AuthError as e:
+ logger.warning("Unable to update PLs in old room: %s", e)
+
+ logger.info("Setting correct PLs in new room")
+ yield self.event_creation_handler.create_and_send_nonmember_event(
+ requester, {
+ "type": EventTypes.PowerLevels,
+ "state_key": '',
+ "room_id": new_room_id,
+ "sender": requester.user.to_string(),
+ "content": old_room_pl_state.content,
+ }, ratelimit=False,
+ )
+
+ @defer.inlineCallbacks
+ def clone_existing_room(
+ self, requester, old_room_id, new_room_id, new_room_version,
+ tombstone_event_id,
+ ):
+ """Populate a new room based on an old room
+
+ Args:
+ requester (synapse.types.Requester): the user requesting the upgrade
+ old_room_id (unicode): the id of the room to be replaced
+ new_room_id (unicode): the id to give the new room (should already have been
+ created with _gemerate_room_id())
+ new_room_version (unicode): the new room version to use
+ tombstone_event_id (unicode|str): the ID of the tombstone event in the old
+ room.
+ Returns:
+ Deferred[None]
+ """
+ user_id = requester.user.to_string()
+
+ if not self.spam_checker.user_may_create_room(user_id):
+ raise SynapseError(403, "You are not permitted to create rooms")
+
+ creation_content = {
+ "room_version": new_room_version,
+ "predecessor": {
+ "room_id": old_room_id,
+ "event_id": tombstone_event_id,
+ }
+ }
+
+ # Check if old room was non-federatable
+
+ # Get old room's create event
+ old_room_create_event = yield self.store.get_create_event_for_room(old_room_id)
+
+ # Check if the create event specified a non-federatable room
+ if not old_room_create_event.content.get("m.federate", True):
+ # If so, mark the new room as non-federatable as well
+ creation_content["m.federate"] = False
+
+ initial_state = dict()
+
+ # Replicate relevant room events
+ types_to_copy = (
+ (EventTypes.JoinRules, ""),
+ (EventTypes.Name, ""),
+ (EventTypes.Topic, ""),
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.GuestAccess, ""),
+ (EventTypes.RoomAvatar, ""),
+ (EventTypes.Encryption, ""),
+ (EventTypes.ServerACL, ""),
+ )
+
+ old_room_state_ids = yield self.store.get_filtered_current_state_ids(
+ old_room_id, StateFilter.from_types(types_to_copy),
+ )
+ # map from event_id to BaseEvent
+ old_room_state_events = yield self.store.get_events(old_room_state_ids.values())
+
+ for k, old_event_id in iteritems(old_room_state_ids):
+ old_event = old_room_state_events.get(old_event_id)
+ if old_event:
+ initial_state[k] = old_event.content
+
+ yield self._send_events_for_new_room(
+ requester,
+ new_room_id,
+
+ # we expect to override all the presets with initial_state, so this is
+ # somewhat arbitrary.
+ preset_config=RoomCreationPreset.PRIVATE_CHAT,
+
+ invite_list=[],
+ initial_state=initial_state,
+ creation_content=creation_content,
+ )
+
+ # Transfer membership events
+ old_room_member_state_ids = yield self.store.get_filtered_current_state_ids(
+ old_room_id, StateFilter.from_types([(EventTypes.Member, None)]),
+ )
+
+ # map from event_id to BaseEvent
+ old_room_member_state_events = yield self.store.get_events(
+ old_room_member_state_ids.values(),
+ )
+ for k, old_event in iteritems(old_room_member_state_events):
+ # Only transfer ban events
+ if ("membership" in old_event.content and
+ old_event.content["membership"] == "ban"):
+ yield self.room_member_handler.update_membership(
+ requester,
+ UserID.from_string(old_event['state_key']),
+ new_room_id,
+ "ban",
+ ratelimit=False,
+ content=old_event.content,
+ )
+
+ # XXX invites/joins
+ # XXX 3pid invites
+
+ @defer.inlineCallbacks
+ def _move_aliases_to_new_room(
+ self, requester, old_room_id, new_room_id, old_room_state,
+ ):
+ directory_handler = self.hs.get_handlers().directory_handler
+
+ aliases = yield self.store.get_aliases_for_room(old_room_id)
+
+ # check to see if we have a canonical alias.
+ canonical_alias = None
+ canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
+ if canonical_alias_event_id:
+ canonical_alias_event = yield self.store.get_event(canonical_alias_event_id)
+ if canonical_alias_event:
+ canonical_alias = canonical_alias_event.content.get("alias", "")
+
+ # first we try to remove the aliases from the old room (we suppress sending
+ # the room_aliases event until the end).
+ #
+ # Note that we'll only be able to remove aliases that (a) aren't owned by an AS,
+ # and (b) unless the user is a server admin, which the user created.
+ #
+ # This is probably correct - given we don't allow such aliases to be deleted
+ # normally, it would be odd to allow it in the case of doing a room upgrade -
+ # but it makes the upgrade less effective, and you have to wonder why a room
+ # admin can't remove aliases that point to that room anyway.
+ # (cf https://github.com/matrix-org/synapse/issues/2360)
+ #
+ removed_aliases = []
+ for alias_str in aliases:
+ alias = RoomAlias.from_string(alias_str)
+ try:
+ yield directory_handler.delete_association(
+ requester, alias, send_event=False,
+ )
+ removed_aliases.append(alias_str)
+ except SynapseError as e:
+ logger.warning(
+ "Unable to remove alias %s from old room: %s",
+ alias, e,
+ )
+
+ # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest
+ # of this.
+ if not removed_aliases:
+ return
+
+ try:
+ # this can fail if, for some reason, our user doesn't have perms to send
+ # m.room.aliases events in the old room (note that we've already checked that
+ # they have perms to send a tombstone event, so that's not terribly likely).
+ #
+ # If that happens, it's regrettable, but we should carry on: it's the same
+ # as when you remove an alias from the directory normally - it just means that
+ # the aliases event gets out of sync with the directory
+ # (cf https://github.com/vector-im/riot-web/issues/2369)
+ yield directory_handler.send_room_alias_update_event(
+ requester, old_room_id,
+ )
+ except AuthError as e:
+ logger.warning(
+ "Failed to send updated alias event on old room: %s", e,
+ )
+
+ # we can now add any aliases we successfully removed to the new room.
+ for alias in removed_aliases:
+ try:
+ yield directory_handler.create_association(
+ requester, RoomAlias.from_string(alias),
+ new_room_id, servers=(self.hs.hostname, ),
+ send_event=False,
+ )
+ logger.info("Moved alias %s to new room", alias)
+ except SynapseError as e:
+ # I'm not really expecting this to happen, but it could if the spam
+ # checking module decides it shouldn't, or similar.
+ logger.error(
+ "Error adding alias %s to new room: %s",
+ alias, e,
+ )
+
+ try:
+ if canonical_alias and (canonical_alias in removed_aliases):
+ yield self.event_creation_handler.create_and_send_nonmember_event(
+ requester,
+ {
+ "type": EventTypes.CanonicalAlias,
+ "state_key": "",
+ "room_id": new_room_id,
+ "sender": requester.user.to_string(),
+ "content": {"alias": canonical_alias, },
+ },
+ ratelimit=False
+ )
+
+ yield directory_handler.send_room_alias_update_event(
+ requester, new_room_id,
+ )
+ except SynapseError as e:
+ # again I'm not really expecting this to fail, but if it does, I'd rather
+ # we returned the new room to the client at this point.
+ logger.error(
+ "Unable to send updated alias events in new room: %s", e,
+ )
@defer.inlineCallbacks
def create_room(self, requester, config, ratelimit=True,
@@ -104,7 +471,7 @@ class RoomCreationHandler(BaseHandler):
"""
user_id = requester.user.to_string()
- self.auth.check_auth_blocking(user_id)
+ yield self.auth.check_auth_blocking(user_id)
if not self.spam_checker.user_may_create_room(user_id):
raise SynapseError(403, "You are not permitted to create rooms")
@@ -165,28 +532,7 @@ class RoomCreationHandler(BaseHandler):
visibility = config.get("visibility", None)
is_public = visibility == "public"
- # autogen room IDs and try to create it. We may clash, so just
- # try a few times till one goes through, giving up eventually.
- attempts = 0
- room_id = None
- while attempts < 5:
- try:
- random_string = stringutils.random_string(18)
- gen_room_id = RoomID(
- random_string,
- self.hs.hostname,
- )
- yield self.store.store_room(
- room_id=gen_room_id.to_string(),
- room_creator_user_id=user_id,
- is_public=is_public
- )
- room_id = gen_room_id.to_string()
- break
- except StoreError:
- attempts += 1
- if not room_id:
- raise StoreError(500, "Couldn't generate a room ID.")
+ room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
@@ -216,18 +562,15 @@ class RoomCreationHandler(BaseHandler):
# override any attempt to set room versions via the creation_content
creation_content["room_version"] = room_version
- room_member_handler = self.hs.get_room_member_handler()
-
yield self._send_events_for_new_room(
requester,
room_id,
- room_member_handler,
preset_config=preset_config,
invite_list=invite_list,
initial_state=initial_state,
creation_content=creation_content,
room_alias=room_alias,
- power_level_content_override=config.get("power_level_content_override", {}),
+ power_level_content_override=config.get("power_level_content_override"),
creator_join_profile=creator_join_profile,
)
@@ -263,7 +606,7 @@ class RoomCreationHandler(BaseHandler):
if is_direct:
content["is_direct"] = is_direct
- yield room_member_handler.update_membership(
+ yield self.room_member_handler.update_membership(
requester,
UserID.from_string(invitee),
room_id,
@@ -301,14 +644,13 @@ class RoomCreationHandler(BaseHandler):
self,
creator, # A Requester object.
room_id,
- room_member_handler,
preset_config,
invite_list,
initial_state,
creation_content,
- room_alias,
- power_level_content_override,
- creator_join_profile,
+ room_alias=None,
+ power_level_content_override=None,
+ creator_join_profile=None,
):
def create(etype, content, **kwargs):
e = {
@@ -324,6 +666,7 @@ class RoomCreationHandler(BaseHandler):
@defer.inlineCallbacks
def send(etype, content, **kwargs):
event = create(etype, content, **kwargs)
+ logger.info("Sending %s in new room", etype)
yield self.event_creation_handler.create_and_send_nonmember_event(
creator,
event,
@@ -346,7 +689,8 @@ class RoomCreationHandler(BaseHandler):
content=creation_content,
)
- yield room_member_handler.update_membership(
+ logger.info("Sending %s in new room", EventTypes.Member)
+ yield self.room_member_handler.update_membership(
creator,
creator.user,
room_id,
@@ -388,7 +732,8 @@ class RoomCreationHandler(BaseHandler):
for invitee in invite_list:
power_level_content["users"][invitee] = 100
- power_level_content.update(power_level_content_override)
+ if power_level_content_override:
+ power_level_content.update(power_level_content_override)
yield send(
etype=EventTypes.PowerLevels,
@@ -427,6 +772,30 @@ class RoomCreationHandler(BaseHandler):
content=content,
)
+ @defer.inlineCallbacks
+ def _generate_room_id(self, creator_id, is_public):
+ # autogen room IDs and try to create it. We may clash, so just
+ # try a few times till one goes through, giving up eventually.
+ attempts = 0
+ while attempts < 5:
+ try:
+ random_string = stringutils.random_string(18)
+ gen_room_id = RoomID(
+ random_string,
+ self.hs.hostname,
+ ).to_string()
+ if isinstance(gen_room_id, bytes):
+ gen_room_id = gen_room_id.decode('utf-8')
+ yield self.store.store_room(
+ room_id=gen_room_id,
+ room_creator_user_id=creator_id,
+ is_public=is_public,
+ )
+ defer.returnValue(gen_room_id)
+ except StoreError:
+ attempts += 1
+ raise StoreError(500, "Couldn't generate a room ID.")
+
class RoomContextHandler(object):
def __init__(self, hs):
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 4f51a464e7..afa508d729 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -74,8 +74,14 @@ class RoomListHandler(BaseHandler):
# We explicitly don't bother caching searches or requests for
# appservice specific lists.
logger.info("Bypassing cache as search request.")
+
+ # XXX: Quick hack to stop room directory queries taking too long.
+ # Timeout request after 60s. Probably want a more fundamental
+ # solution at some point
+ timeout = self.clock.time() + 60
return self._get_public_room_list(
- limit, since_token, search_filter, network_tuple=network_tuple,
+ limit, since_token, search_filter,
+ network_tuple=network_tuple, timeout=timeout,
)
key = (limit, since_token, network_tuple)
@@ -90,7 +96,22 @@ class RoomListHandler(BaseHandler):
def _get_public_room_list(self, limit=None, since_token=None,
search_filter=None,
network_tuple=EMPTY_THIRD_PARTY_ID,
- from_federation=False,):
+ from_federation=False,
+ timeout=None,):
+ """Generate a public room list.
+ Args:
+ limit (int|None): Maximum amount of rooms to return.
+ since_token (str|None)
+ search_filter (dict|None): Dictionary to filter rooms by.
+ network_tuple (ThirdPartyInstanceID): Which public list to use.
+ This can be (None, None) to indicate the main list, or a particular
+ appservice and network id to use an appservice specific one.
+ Setting to None returns all public rooms across all lists.
+ from_federation (bool): Whether this request originated from a
+ federating server or a client. Used for room filtering.
+ timeout (int|None): Amount of seconds to wait for a response before
+ timing out.
+ """
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
else:
@@ -205,6 +226,9 @@ class RoomListHandler(BaseHandler):
chunk = []
for i in range(0, len(rooms_to_scan), step):
+ if timeout and self.clock.time() > timeout:
+ raise Exception("Timed out searching room directory")
+
batch = rooms_to_scan[i:i + step]
logger.info("Processing %i rooms for result", len(batch))
yield concurrently_execute(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 07fd3e82fc..190ea2c7b1 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -61,9 +61,9 @@ class RoomMemberHandler(object):
self.federation_handler = hs.get_handlers().federation_handler
self.directory_handler = hs.get_handlers().directory_handler
- self.registration_handler = hs.get_handlers().registration_handler
+ self.registration_handler = hs.get_registration_handler()
self.profile_handler = hs.get_profile_handler()
- self.event_creation_hander = hs.get_event_creation_handler()
+ self.event_creation_handler = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
@@ -161,6 +161,8 @@ class RoomMemberHandler(object):
ratelimit=True,
content=None,
):
+ user_id = target.to_string()
+
if content is None:
content = {}
@@ -168,14 +170,14 @@ class RoomMemberHandler(object):
if requester.is_guest:
content["kind"] = "guest"
- event, context = yield self.event_creation_hander.create_event(
+ event, context = yield self.event_creation_handler.create_event(
requester,
{
"type": EventTypes.Member,
"content": content,
"room_id": room_id,
"sender": requester.user.to_string(),
- "state_key": target.to_string(),
+ "state_key": user_id,
# For backwards compatibility:
"membership": membership,
@@ -186,14 +188,14 @@ class RoomMemberHandler(object):
)
# Check if this event matches the previous membership event for the user.
- duplicate = yield self.event_creation_hander.deduplicate_state_event(
+ duplicate = yield self.event_creation_handler.deduplicate_state_event(
event, context,
)
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
defer.returnValue(duplicate)
- yield self.event_creation_hander.handle_new_client_event(
+ yield self.event_creation_handler.handle_new_client_event(
requester,
event,
context,
@@ -204,12 +206,12 @@ class RoomMemberHandler(object):
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_member_event_id = prev_state_ids.get(
- (EventTypes.Member, target.to_string()),
+ (EventTypes.Member, user_id),
None
)
if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has acutally joined the
+ # Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
newly_joined = True
@@ -218,6 +220,18 @@ class RoomMemberHandler(object):
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
yield self._user_joined_room(target, room_id)
+
+ # Copy over direct message status and room tags if this is a join
+ # on an upgraded room
+
+ # Check if this is an upgraded room
+ predecessor = yield self.store.get_room_predecessor(room_id)
+
+ if predecessor:
+ # It is an upgraded room. Copy over old tags
+ self.copy_room_tags_and_direct_to_room(
+ predecessor["room_id"], room_id, user_id,
+ )
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -227,6 +241,55 @@ class RoomMemberHandler(object):
defer.returnValue(event)
@defer.inlineCallbacks
+ def copy_room_tags_and_direct_to_room(
+ self,
+ old_room_id,
+ new_room_id,
+ user_id,
+ ):
+ """Copies the tags and direct room state from one room to another.
+
+ Args:
+ old_room_id (str)
+ new_room_id (str)
+ user_id (str)
+
+ Returns:
+ Deferred[None]
+ """
+ # Retrieve user account data for predecessor room
+ user_account_data, _ = yield self.store.get_account_data_for_user(
+ user_id,
+ )
+
+ # Copy direct message state if applicable
+ direct_rooms = user_account_data.get("m.direct", {})
+
+ # Check which key this room is under
+ if isinstance(direct_rooms, dict):
+ for key, room_id_list in direct_rooms.items():
+ if old_room_id in room_id_list and new_room_id not in room_id_list:
+ # Add new room_id to this key
+ direct_rooms[key].append(new_room_id)
+
+ # Save back to user's m.direct account data
+ yield self.store.add_account_data_for_user(
+ user_id, "m.direct", direct_rooms,
+ )
+ break
+
+ # Copy room tags if applicable
+ room_tags = yield self.store.get_tags_for_room(
+ user_id, old_room_id,
+ )
+
+ # Copy each room tag to the new room
+ for tag, tag_content in room_tags.items():
+ yield self.store.add_tag_to_room(
+ user_id, new_room_id, tag, tag_content
+ )
+
+ @defer.inlineCallbacks
def update_membership(
self,
requester,
@@ -493,7 +556,7 @@ class RoomMemberHandler(object):
else:
requester = synapse.types.create_requester(target_user)
- prev_event = yield self.event_creation_hander.deduplicate_state_event(
+ prev_event = yield self.event_creation_handler.deduplicate_state_event(
event, context,
)
if prev_event is not None:
@@ -513,7 +576,7 @@ class RoomMemberHandler(object):
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
- yield self.event_creation_hander.handle_new_client_event(
+ yield self.event_creation_handler.handle_new_client_event(
requester,
event,
context,
@@ -527,7 +590,7 @@ class RoomMemberHandler(object):
)
if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has acutally joined the
+ # Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
newly_joined = True
@@ -755,7 +818,7 @@ class RoomMemberHandler(object):
)
)
- yield self.event_creation_hander.create_and_send_nonmember_event(
+ yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.ThirdPartyInvite,
@@ -877,7 +940,8 @@ class RoomMemberHandler(object):
# first member event?
create_event_id = current_state_ids.get(("m.room.create", ""))
if len(current_state_ids) == 1 and create_event_id:
- defer.returnValue(self.hs.is_mine_id(create_event_id))
+ # We can only get here if we're in the process of creating the room
+ defer.returnValue(True)
for etype, state_key in current_state_ids:
if etype != EventTypes.Member or not self.hs.is_mine_id(state_key):
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 80e7b15de8..49c439313e 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -38,6 +38,41 @@ class SearchHandler(BaseHandler):
super(SearchHandler, self).__init__(hs)
@defer.inlineCallbacks
+ def get_old_rooms_from_upgraded_room(self, room_id):
+ """Retrieves room IDs of old rooms in the history of an upgraded room.
+
+ We do so by checking the m.room.create event of the room for a
+ `predecessor` key. If it exists, we add the room ID to our return
+ list and then check that room for a m.room.create event and so on
+ until we can no longer find any more previous rooms.
+
+ The full list of all found rooms in then returned.
+
+ Args:
+ room_id (str): id of the room to search through.
+
+ Returns:
+ Deferred[iterable[unicode]]: predecessor room ids
+ """
+
+ historical_room_ids = []
+
+ while True:
+ predecessor = yield self.store.get_room_predecessor(room_id)
+
+ # If no predecessor, assume we've hit a dead end
+ if not predecessor:
+ break
+
+ # Add predecessor's room ID
+ historical_room_ids.append(predecessor["room_id"])
+
+ # Scan through the old room for further predecessors
+ room_id = predecessor["room_id"]
+
+ defer.returnValue(historical_room_ids)
+
+ @defer.inlineCallbacks
def search(self, user, content, batch=None):
"""Performs a full text search for a user.
@@ -50,6 +85,9 @@ class SearchHandler(BaseHandler):
dict to be returned to the client with results of search
"""
+ if not self.hs.config.enable_search:
+ raise SynapseError(400, "Search is disabled on this homeserver")
+
batch_group = None
batch_group_key = None
batch_token = None
@@ -134,6 +172,18 @@ class SearchHandler(BaseHandler):
)
room_ids = set(r.room_id for r in rooms)
+ # If doing a subset of all rooms seearch, check if any of the rooms
+ # are from an upgraded room, and search their contents as well
+ if search_filter.rooms:
+ historical_room_ids = []
+ for room_id in search_filter.rooms:
+ # Add any previous rooms to the search if they exist
+ ids = yield self.get_old_rooms_from_upgraded_room(room_id)
+ historical_room_ids += ids
+
+ # Prevent any historical events from being filtered
+ search_filter = search_filter.with_room_ids(historical_room_ids)
+
room_ids = search_filter.filter_rooms(room_ids)
if batch_group == "room_id":
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 09739f2862..bd97241ab4 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -895,14 +895,17 @@ class SyncHandler(object):
Returns:
Deferred(SyncResult)
"""
- logger.info("Calculating sync response for %r", sync_config.user)
-
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = yield self.event_sources.get_current_token()
+ logger.info(
+ "Calculating sync response for %r between %s and %s",
+ sync_config.user, since_token, now_token,
+ )
+
user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
@@ -1390,6 +1393,12 @@ class SyncHandler(object):
room_entries = []
invited = []
for room_id, events in iteritems(mem_change_events_by_room_id):
+ logger.info(
+ "Membership changes in %s: [%s]",
+ room_id,
+ ", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)),
+ )
+
non_joins = [e for e in events if e.membership != Membership.JOIN]
has_join = len(non_joins) != len(events)
@@ -1473,10 +1482,22 @@ class SyncHandler(object):
if since_token and since_token.is_after(leave_token):
continue
+ # If this is an out of band message, like a remote invite
+ # rejection, we include it in the recents batch. Otherwise, we
+ # let _load_filtered_recents handle fetching the correct
+ # batches.
+ #
+ # This is all screaming out for a refactor, as the logic here is
+ # subtle and the moving parts numerous.
+ if leave_event.internal_metadata.is_out_of_band_membership():
+ batch_events = [leave_event]
+ else:
+ batch_events = None
+
room_entries.append(RoomSyncResultBuilder(
room_id=room_id,
rtype="archived",
- events=None,
+ events=batch_events,
newly_joined=room_id in newly_joined_rooms,
full_state=False,
since_token=since_token,
@@ -1668,13 +1689,17 @@ class SyncHandler(object):
"content": content,
})
- account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data_events = sync_config.filter_collection.filter_room_account_data(
account_data_events
)
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
- if not (always_include or batch or account_data or ephemeral or full_state):
+ if not (always_include
+ or batch
+ or account_data_events
+ or ephemeral
+ or full_state):
return
state = yield self.compute_state_delta(
@@ -1745,7 +1770,7 @@ class SyncHandler(object):
room_id=room_id,
timeline=batch,
state=state,
- account_data=account_data,
+ account_data=account_data_events,
)
if room_sync or always_include:
sync_result_builder.archived.append(room_sync)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c610933dd4..a61bbf9392 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -63,11 +63,8 @@ class TypingHandler(object):
self._member_typing_until = {} # clock time we expect to stop
self._member_last_federation_poke = {}
- # map room IDs to serial numbers
- self._room_serials = {}
self._latest_room_serial = 0
- # map room IDs to sets of users currently typing
- self._room_typing = {}
+ self._reset()
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
@@ -79,6 +76,15 @@ class TypingHandler(object):
5000,
)
+ def _reset(self):
+ """
+ Reset the typing handler's data caches.
+ """
+ # map room IDs to serial numbers
+ self._room_serials = {}
+ # map room IDs to sets of users currently typing
+ self._room_typing = {}
+
def _handle_timeouts(self):
logger.info("Checking for typing timeouts")
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index f11b430126..283c6c1b81 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -19,6 +19,7 @@ from six import iteritems
from twisted.internet import defer
+import synapse.metrics
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
@@ -125,9 +126,12 @@ class UserDirectoryHandler(object):
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
- yield self.store.update_profile_in_user_dir(
- user_id, profile.display_name, profile.avatar_url, None,
- )
+ is_support = yield self.store.is_support_user(user_id)
+ # Support users are for diagnostics and should not appear in the user directory.
+ if not is_support:
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url, None
+ )
@defer.inlineCallbacks
def handle_user_deactivated(self, user_id):
@@ -160,6 +164,12 @@ class UserDirectoryHandler(object):
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
+
+ # Expose current event processing position to prometheus
+ synapse.metrics.event_processing_positions.labels("user_dir").set(
+ self.pos
+ )
+
yield self.store.update_user_directory_stream_pos(self.pos)
@defer.inlineCallbacks
@@ -182,21 +192,25 @@ class UserDirectoryHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
logger.info("Processed all rooms.")
if self.search_all_users:
num_processed_users = 0
user_ids = yield self.store.get_all_local_users()
- logger.info("Doing initial update of user directory. %d users", len(user_ids))
+ logger.info(
+ "Doing initial update of user directory. %d users", len(user_ids)
+ )
for user_id in user_ids:
# We add profiles for all users even if they don't match the
# include pattern, just in case we want to change it in future
- logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
+ logger.info(
+ "Handling user %d/%d", num_processed_users + 1, len(user_ids)
+ )
yield self._handle_local_user(user_id)
num_processed_users += 1
- yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
logger.info("Processed all users")
@@ -215,24 +229,24 @@ class UserDirectoryHandler(object):
if not is_in_room:
return
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
+ is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+ room_id
+ )
users_with_profile = yield self.state.get_current_user_in_room(room_id)
user_ids = set(users_with_profile)
unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
- room_id, {
- user_id: users_with_profile[user_id] for user_id in unhandled_users
- }
+ room_id,
+ {user_id: users_with_profile[user_id] for user_id in unhandled_users},
)
self.initially_handled_users |= unhandled_users
if is_public:
yield self.store.add_users_to_public_room(
- room_id,
- user_ids=user_ids - self.initially_handled_users_in_public
+ room_id, user_ids=user_ids - self.initially_handled_users_in_public
)
self.initially_handled_users_in_public |= user_ids
@@ -244,7 +258,7 @@ class UserDirectoryHandler(object):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
if not self.is_mine_id(user_id):
count += 1
@@ -259,7 +273,7 @@ class UserDirectoryHandler(object):
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
count += 1
user_set = (user_id, other_user_id)
@@ -281,25 +295,23 @@ class UserDirectoryHandler(object):
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_room(
- room_id, not is_public, to_insert,
+ room_id, not is_public, to_insert
)
to_insert.clear()
if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update,
+ room_id, not is_public, to_update
)
to_update.clear()
if to_insert:
- yield self.store.add_users_who_share_room(
- room_id, not is_public, to_insert,
- )
+ yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
to_insert.clear()
if to_update:
yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update,
+ room_id, not is_public, to_update
)
to_update.clear()
@@ -320,50 +332,55 @@ class UserDirectoryHandler(object):
# may have become public or not and add/remove the users in said room
if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
yield self._handle_room_publicity_change(
- room_id, prev_event_id, event_id, typ,
+ room_id, prev_event_id, event_id, typ
)
elif typ == EventTypes.Member:
change = yield self._get_key_change(
- prev_event_id, event_id,
+ prev_event_id,
+ event_id,
key_name="membership",
public_value=Membership.JOIN,
)
- if change is None:
- # Handle any profile changes
- yield self._handle_profile_change(
- state_key, room_id, prev_event_id, event_id,
- )
- continue
-
- if not change:
+ if change is False:
# Need to check if the server left the room entirely, if so
# we might need to remove all the users in that room
is_in_room = yield self.store.is_host_joined(
- room_id, self.server_name,
+ room_id, self.server_name
)
if not is_in_room:
logger.info("Server left room: %r", room_id)
# Fetch all the users that we marked as being in user
# directory due to being in the room and then check if
# need to remove those users or not
- user_ids = yield self.store.get_users_in_dir_due_to_room(room_id)
+ user_ids = yield self.store.get_users_in_dir_due_to_room(
+ room_id
+ )
for user_id in user_ids:
yield self._handle_remove_user(room_id, user_id)
return
else:
logger.debug("Server is still in room: %r", room_id)
- if change: # The user joined
- event = yield self.store.get_event(event_id, allow_none=True)
- profile = ProfileInfo(
- avatar_url=event.content.get("avatar_url"),
- display_name=event.content.get("displayname"),
- )
+ is_support = yield self.store.is_support_user(state_key)
+ if not is_support:
+ if change is None:
+ # Handle any profile changes
+ yield self._handle_profile_change(
+ state_key, room_id, prev_event_id, event_id
+ )
+ continue
+
+ if change: # The user joined
+ event = yield self.store.get_event(event_id, allow_none=True)
+ profile = ProfileInfo(
+ avatar_url=event.content.get("avatar_url"),
+ display_name=event.content.get("displayname"),
+ )
- yield self._handle_new_user(room_id, state_key, profile)
- else: # The user left
- yield self._handle_remove_user(room_id, state_key)
+ yield self._handle_new_user(room_id, state_key, profile)
+ else: # The user left
+ yield self._handle_remove_user(room_id, state_key)
else:
logger.debug("Ignoring irrelevant type: %r", typ)
@@ -382,13 +399,15 @@ class UserDirectoryHandler(object):
if typ == EventTypes.RoomHistoryVisibility:
change = yield self._get_key_change(
- prev_event_id, event_id,
+ prev_event_id,
+ event_id,
key_name="history_visibility",
public_value="world_readable",
)
elif typ == EventTypes.JoinRules:
change = yield self._get_key_change(
- prev_event_id, event_id,
+ prev_event_id,
+ event_id,
key_name="join_rule",
public_value=JoinRules.PUBLIC,
)
@@ -513,7 +532,7 @@ class UserDirectoryHandler(object):
)
if self.is_mine_id(other_user_id) and not is_appservice:
shared_is_private = yield self.store.get_if_users_share_a_room(
- other_user_id, user_id,
+ other_user_id, user_id
)
if shared_is_private is True:
# We've already marked in the database they share a private room
@@ -528,13 +547,11 @@ class UserDirectoryHandler(object):
to_insert.add((other_user_id, user_id))
if to_insert:
- yield self.store.add_users_who_share_room(
- room_id, not is_public, to_insert,
- )
+ yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
if to_update:
yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update,
+ room_id, not is_public, to_update
)
@defer.inlineCallbacks
@@ -553,15 +570,15 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_public_room(user_id)
update_user_in_public = row and row["room_id"] == room_id
- if (update_user_in_public or update_user_dir):
+ if update_user_in_public or update_user_dir:
# XXX: Make this faster?
rooms = yield self.store.get_rooms_for_user(user_id)
for j_room_id in rooms:
- if (not update_user_in_public and not update_user_dir):
+ if not update_user_in_public and not update_user_dir:
break
is_in_room = yield self.store.is_host_joined(
- j_room_id, self.server_name,
+ j_room_id, self.server_name
)
if not is_in_room:
@@ -589,19 +606,19 @@ class UserDirectoryHandler(object):
# Get a list of user tuples that were in the DB due to this room and
# users (this includes tuples where the other user matches `user_id`)
user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
- user_id, room_id,
+ user_id, room_id
)
for user_id, other_user_id in user_tuples:
# For each user tuple get a list of rooms that they still share,
# trying to find a private room, and update the entry in the DB
- rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id)
+ rooms = yield self.store.get_rooms_in_common_for_users(
+ user_id, other_user_id
+ )
# If they dont share a room anymore, remove the mapping
if not rooms:
- yield self.store.remove_user_who_share_room(
- user_id, other_user_id,
- )
+ yield self.store.remove_user_who_share_room(user_id, other_user_id)
continue
found_public_share = None
@@ -615,13 +632,13 @@ class UserDirectoryHandler(object):
else:
found_public_share = None
yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)],
+ room_id, not is_public, [(user_id, other_user_id)]
)
break
if found_public_share:
yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)],
+ room_id, not is_public, [(user_id, other_user_id)]
)
@defer.inlineCallbacks
@@ -649,7 +666,7 @@ class UserDirectoryHandler(object):
if prev_name != new_name or prev_avatar != new_avatar:
yield self.store.update_profile_in_user_dir(
- user_id, new_name, new_avatar, room_id,
+ user_id, new_name, new_avatar, room_id
)
@defer.inlineCallbacks
|