diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 17 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 19 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 380 | ||||
-rw-r--r-- | synapse/handlers/device.py | 181 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 139 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 134 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 41 | ||||
-rw-r--r-- | synapse/handlers/message.py | 89 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 12 | ||||
-rw-r--r-- | synapse/handlers/register.py | 49 | ||||
-rw-r--r-- | synapse/handlers/room.py | 4 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 20 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 2 |
13 files changed, 858 insertions, 229 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index d28e07f0d9..1a50a2ec98 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -31,10 +31,21 @@ from .search import SearchHandler class Handlers(object): - """ A collection of all the event handlers. + """ Deprecated. A collection of handlers. - There's no need to lazily create these; we'll just make them all eagerly - at construction time. + At some point most of the classes whose name ended "Handler" were + accessed through this class. + + However this makes it painful to unit test the handlers and to run cut + down versions of synapse that only use specific handlers because using a + single handler required creating all of the handlers. So some of the + handlers have been lifted out of the Handlers object and are now accessed + directly through the homeserver object itself. + + Any new handlers should follow the new pattern of being accessed through + the homeserver object and should not be added to the Handlers object. + + The remaining handlers should be moved out of the handlers object. """ def __init__(self, hs): diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index c904c6c500..11081a0cd5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from synapse.api.errors import LimitExceededError +import synapse.types from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID, Requester - - -import logging +from synapse.api.errors import LimitExceededError +from synapse.types import UserID logger = logging.getLogger(__name__) @@ -31,11 +31,15 @@ class BaseHandler(object): Common base class for the event handlers. Attributes: - store (synapse.storage.events.StateStore): + store (synapse.storage.DataStore): state_handler (synapse.state.StateHandler): """ def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): + """ self.store = hs.get_datastore() self.auth = hs.get_auth() self.notifier = hs.get_notifier() @@ -120,7 +124,8 @@ class BaseHandler(object): # and having homeservers have their own users leave keeps more # of that decision-making and control local to the guest-having # homeserver. - requester = Requester(target_user, "", True) + requester = synapse.types.create_requester( + target_user, is_guest=True) handler = self.hs.get_handlers().room_member_handler yield handler.update_membership( requester, diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index b38f81e999..2e138f328f 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -20,6 +20,7 @@ from synapse.api.constants import LoginType from synapse.types import UserID from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError from synapse.util.async import run_on_reactor +from synapse.config.ldap import LDAPMode from twisted.web.client import PartialDownloadError @@ -28,6 +29,12 @@ import bcrypt import pymacaroons import simplejson +try: + import ldap3 +except ImportError: + ldap3 = None + pass + import synapse.util.stringutils as stringutils @@ -38,6 +45,10 @@ class AuthHandler(BaseHandler): SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000 def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): + """ super(AuthHandler, self).__init__(hs) self.checkers = { LoginType.PASSWORD: self._check_password_auth, @@ -50,19 +61,23 @@ class AuthHandler(BaseHandler): self.INVALID_TOKEN_HTTP_STATUS = 401 self.ldap_enabled = hs.config.ldap_enabled - self.ldap_server = hs.config.ldap_server - self.ldap_port = hs.config.ldap_port - self.ldap_tls = hs.config.ldap_tls - self.ldap_search_base = hs.config.ldap_search_base - self.ldap_search_property = hs.config.ldap_search_property - self.ldap_email_property = hs.config.ldap_email_property - self.ldap_full_name_property = hs.config.ldap_full_name_property - - if self.ldap_enabled is True: - import ldap - logger.info("Import ldap version: %s", ldap.__version__) + if self.ldap_enabled: + if not ldap3: + raise RuntimeError( + 'Missing ldap3 library. This is required for LDAP Authentication.' + ) + self.ldap_mode = hs.config.ldap_mode + self.ldap_uri = hs.config.ldap_uri + self.ldap_start_tls = hs.config.ldap_start_tls + self.ldap_base = hs.config.ldap_base + self.ldap_filter = hs.config.ldap_filter + self.ldap_attributes = hs.config.ldap_attributes + if self.ldap_mode == LDAPMode.SEARCH: + self.ldap_bind_dn = hs.config.ldap_bind_dn + self.ldap_bind_password = hs.config.ldap_bind_password self.hs = hs # FIXME better possibility to access registrationHandler later? + self.device_handler = hs.get_device_handler() @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip): @@ -220,7 +235,6 @@ class AuthHandler(BaseHandler): sess = self._get_session_info(session_id) return sess.setdefault('serverdict', {}).get(key, default) - @defer.inlineCallbacks def _check_password_auth(self, authdict, _): if "user" not in authdict or "password" not in authdict: raise LoginError(400, "", Codes.MISSING_PARAM) @@ -230,11 +244,7 @@ class AuthHandler(BaseHandler): if not user_id.startswith('@'): user_id = UserID.create(user_id, self.hs.hostname).to_string() - if not (yield self._check_password(user_id, password)): - logger.warn("Failed password login for user %s", user_id) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) - - defer.returnValue(user_id) + return self._check_password(user_id, password) @defer.inlineCallbacks def _check_recaptcha(self, authdict, clientip): @@ -270,8 +280,17 @@ class AuthHandler(BaseHandler): data = pde.response resp_body = simplejson.loads(data) - if 'success' in resp_body and resp_body['success']: - defer.returnValue(True) + if 'success' in resp_body: + # Note that we do NOT check the hostname here: we explicitly + # intend the CAPTCHA to be presented by whatever client the + # user is using, we just care that they have completed a CAPTCHA. + logger.info( + "%s reCAPTCHA from hostname %s", + "Successful" if resp_body['success'] else "Failed", + resp_body.get('hostname') + ) + if resp_body['success']: + defer.returnValue(True) raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) @defer.inlineCallbacks @@ -338,67 +357,84 @@ class AuthHandler(BaseHandler): return self.sessions[session_id] - @defer.inlineCallbacks - def login_with_password(self, user_id, password): + def validate_password_login(self, user_id, password): """ Authenticates the user with their username and password. Used only by the v1 login API. Args: - user_id (str): User ID + user_id (str): complete @user:id password (str): Password Returns: - A tuple of: - The user's ID. - The access token for the user's session. - The refresh token for the user's session. + defer.Deferred: (str) canonical user id Raises: - StoreError if there was a problem storing the token. + StoreError if there was a problem accessing the database LoginError if there was an authentication problem. """ - - if not (yield self._check_password(user_id, password)): - logger.warn("Failed password login for user %s", user_id) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) - - logger.info("Logging in user %s", user_id) - access_token = yield self.issue_access_token(user_id) - refresh_token = yield self.issue_refresh_token(user_id) - defer.returnValue((user_id, access_token, refresh_token)) + return self._check_password(user_id, password) @defer.inlineCallbacks - def get_login_tuple_for_user_id(self, user_id): + def get_login_tuple_for_user_id(self, user_id, device_id=None, + initial_display_name=None): """ Gets login tuple for the user with the given user ID. + + Creates a new access/refresh token for the user. + The user is assumed to have been authenticated by some other - machanism (e.g. CAS) + machanism (e.g. CAS), and the user_id converted to the canonical case. + + The device will be recorded in the table if it is not there already. Args: - user_id (str): User ID + user_id (str): canonical User ID + device_id (str|None): the device ID to associate with the tokens. + None to leave the tokens unassociated with a device (deprecated: + we should always have a device ID) + initial_display_name (str): display name to associate with the + device if it needs re-registering Returns: A tuple of: - The user's ID. The access token for the user's session. The refresh token for the user's session. Raises: StoreError if there was a problem storing the token. LoginError if there was an authentication problem. """ - user_id, ignored = yield self._find_user_id_and_pwd_hash(user_id) + logger.info("Logging in user %s on device %s", user_id, device_id) + access_token = yield self.issue_access_token(user_id, device_id) + refresh_token = yield self.issue_refresh_token(user_id, device_id) + + # the device *should* have been registered before we got here; however, + # it's possible we raced against a DELETE operation. The thing we + # really don't want is active access_tokens without a record of the + # device, so we double-check it here. + if device_id is not None: + yield self.device_handler.check_device_registered( + user_id, device_id, initial_display_name + ) - logger.info("Logging in user %s", user_id) - access_token = yield self.issue_access_token(user_id) - refresh_token = yield self.issue_refresh_token(user_id) - defer.returnValue((user_id, access_token, refresh_token)) + defer.returnValue((access_token, refresh_token)) @defer.inlineCallbacks - def does_user_exist(self, user_id): + def check_user_exists(self, user_id): + """ + Checks to see if a user with the given id exists. Will check case + insensitively, but return None if there are multiple inexact matches. + + Args: + (str) user_id: complete @user:id + + Returns: + defer.Deferred: (str) canonical_user_id, or None if zero or + multiple matches + """ try: - yield self._find_user_id_and_pwd_hash(user_id) - defer.returnValue(True) + res = yield self._find_user_id_and_pwd_hash(user_id) + defer.returnValue(res[0]) except LoginError: - defer.returnValue(False) + defer.returnValue(None) @defer.inlineCallbacks def _find_user_id_and_pwd_hash(self, user_id): @@ -428,84 +464,232 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def _check_password(self, user_id, password): - """ + """Authenticate a user against the LDAP and local databases. + + user_id is checked case insensitively against the local database, but + will throw if there are multiple inexact matches. + + Args: + user_id (str): complete @user:id Returns: - True if the user_id successfully authenticated + (str) the canonical_user_id + Raises: + LoginError if the password was incorrect """ valid_ldap = yield self._check_ldap_password(user_id, password) if valid_ldap: - defer.returnValue(True) + defer.returnValue(user_id) - valid_local_password = yield self._check_local_password(user_id, password) - if valid_local_password: - defer.returnValue(True) - - defer.returnValue(False) + result = yield self._check_local_password(user_id, password) + defer.returnValue(result) @defer.inlineCallbacks def _check_local_password(self, user_id, password): - try: - user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) - defer.returnValue(self.validate_hash(password, password_hash)) - except LoginError: - defer.returnValue(False) + """Authenticate a user against the local password database. + + user_id is checked case insensitively, but will throw if there are + multiple inexact matches. + + Args: + user_id (str): complete @user:id + Returns: + (str) the canonical_user_id + Raises: + LoginError if the password was incorrect + """ + user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) + result = self.validate_hash(password, password_hash) + if not result: + logger.warn("Failed password login for user %s", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + defer.returnValue(user_id) @defer.inlineCallbacks def _check_ldap_password(self, user_id, password): - if not self.ldap_enabled: - logger.debug("LDAP not configured") + """ Attempt to authenticate a user against an LDAP Server + and register an account if none exists. + + Returns: + True if authentication against LDAP was successful + """ + + if not ldap3 or not self.ldap_enabled: defer.returnValue(False) - import ldap + if self.ldap_mode not in LDAPMode.LIST: + raise RuntimeError( + 'Invalid ldap mode specified: {mode}'.format( + mode=self.ldap_mode + ) + ) - logger.info("Authenticating %s with LDAP" % user_id) try: - ldap_url = "%s:%s" % (self.ldap_server, self.ldap_port) - logger.debug("Connecting LDAP server at %s" % ldap_url) - l = ldap.initialize(ldap_url) - if self.ldap_tls: - logger.debug("Initiating TLS") - self._connection.start_tls_s() - - local_name = UserID.from_string(user_id).localpart - - dn = "%s=%s, %s" % ( - self.ldap_search_property, - local_name, - self.ldap_search_base) - logger.debug("DN for LDAP authentication: %s" % dn) - - l.simple_bind_s(dn.encode('utf-8'), password.encode('utf-8')) - - if not (yield self.does_user_exist(user_id)): - handler = self.hs.get_handlers().registration_handler - user_id, access_token = ( - yield handler.register(localpart=local_name) + server = ldap3.Server(self.ldap_uri) + logger.debug( + "Attempting ldap connection with %s", + self.ldap_uri + ) + + localpart = UserID.from_string(user_id).localpart + if self.ldap_mode == LDAPMode.SIMPLE: + # bind with the the local users ldap credentials + bind_dn = "{prop}={value},{base}".format( + prop=self.ldap_attributes['uid'], + value=localpart, + base=self.ldap_base + ) + conn = ldap3.Connection(server, bind_dn, password) + logger.debug( + "Established ldap connection in simple mode: %s", + conn ) + if self.ldap_start_tls: + conn.start_tls() + logger.debug( + "Upgraded ldap connection in simple mode through StartTLS: %s", + conn + ) + + conn.bind() + + elif self.ldap_mode == LDAPMode.SEARCH: + # connect with preconfigured credentials and search for local user + conn = ldap3.Connection( + server, + self.ldap_bind_dn, + self.ldap_bind_password + ) + logger.debug( + "Established ldap connection in search mode: %s", + conn + ) + + if self.ldap_start_tls: + conn.start_tls() + logger.debug( + "Upgraded ldap connection in search mode through StartTLS: %s", + conn + ) + + conn.bind() + + # find matching dn + query = "({prop}={value})".format( + prop=self.ldap_attributes['uid'], + value=localpart + ) + if self.ldap_filter: + query = "(&{query}{filter})".format( + query=query, + filter=self.ldap_filter + ) + logger.debug("ldap search filter: %s", query) + result = conn.search(self.ldap_base, query) + + if result and len(conn.response) == 1: + # found exactly one result + user_dn = conn.response[0]['dn'] + logger.debug('ldap search found dn: %s', user_dn) + + # unbind and reconnect, rebind with found dn + conn.unbind() + conn = ldap3.Connection( + server, + user_dn, + password, + auto_bind=True + ) + else: + # found 0 or > 1 results, abort! + logger.warn( + "ldap search returned unexpected (%d!=1) amount of results", + len(conn.response) + ) + defer.returnValue(False) + + logger.info( + "User authenticated against ldap server: %s", + conn + ) + + # check for existing account, if none exists, create one + if not (yield self.check_user_exists(user_id)): + # query user metadata for account creation + query = "({prop}={value})".format( + prop=self.ldap_attributes['uid'], + value=localpart + ) + + if self.ldap_mode == LDAPMode.SEARCH and self.ldap_filter: + query = "(&{filter}{user_filter})".format( + filter=query, + user_filter=self.ldap_filter + ) + logger.debug("ldap registration filter: %s", query) + + result = conn.search( + search_base=self.ldap_base, + search_filter=query, + attributes=[ + self.ldap_attributes['name'], + self.ldap_attributes['mail'] + ] + ) + + if len(conn.response) == 1: + attrs = conn.response[0]['attributes'] + mail = attrs[self.ldap_attributes['mail']][0] + name = attrs[self.ldap_attributes['name']][0] + + # create account + registration_handler = self.hs.get_handlers().registration_handler + user_id, access_token = ( + yield registration_handler.register(localpart=localpart) + ) + + # TODO: bind email, set displayname with data from ldap directory + + logger.info( + "ldap registration successful: %d: %s (%s, %)", + user_id, + localpart, + name, + mail + ) + else: + logger.warn( + "ldap registration failed: unexpected (%d!=1) amount of results", + len(result) + ) + defer.returnValue(False) + defer.returnValue(True) - except ldap.LDAPError, e: - logger.warn("LDAP error: %s", e) + except ldap3.core.exceptions.LDAPException as e: + logger.warn("Error during ldap authentication: %s", e) defer.returnValue(False) @defer.inlineCallbacks - def issue_access_token(self, user_id): + def issue_access_token(self, user_id, device_id=None): access_token = self.generate_access_token(user_id) - yield self.store.add_access_token_to_user(user_id, access_token) + yield self.store.add_access_token_to_user(user_id, access_token, + device_id) defer.returnValue(access_token) @defer.inlineCallbacks - def issue_refresh_token(self, user_id): + def issue_refresh_token(self, user_id, device_id=None): refresh_token = self.generate_refresh_token(user_id) - yield self.store.add_refresh_token_to_user(user_id, refresh_token) + yield self.store.add_refresh_token_to_user(user_id, refresh_token, + device_id) defer.returnValue(refresh_token) - def generate_access_token(self, user_id, extra_caveats=None): + def generate_access_token(self, user_id, extra_caveats=None, + duration_in_ms=(60 * 60 * 1000)): extra_caveats = extra_caveats or [] macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = access") now = self.hs.get_clock().time_msec() - expiry = now + (60 * 60 * 1000) + expiry = now + duration_in_ms macaroon.add_first_party_caveat("time < %d" % (expiry,)) for caveat in extra_caveats: macaroon.add_first_party_caveat(caveat) @@ -613,7 +797,8 @@ class AuthHandler(BaseHandler): Returns: Hashed password (str). """ - return bcrypt.hashpw(password, bcrypt.gensalt(self.bcrypt_rounds)) + return bcrypt.hashpw(password + self.hs.config.password_pepper, + bcrypt.gensalt(self.bcrypt_rounds)) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -626,6 +811,7 @@ class AuthHandler(BaseHandler): Whether self.hash(password) == stored_hash (bool). """ if stored_hash: - return bcrypt.hashpw(password, stored_hash.encode('utf-8')) == stored_hash + return bcrypt.hashpw(password + self.hs.config.password_pepper, + stored_hash.encode('utf-8')) == stored_hash else: return False diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py new file mode 100644 index 0000000000..8d630c6b1a --- /dev/null +++ b/synapse/handlers/device.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.api import errors +from synapse.util import stringutils +from twisted.internet import defer +from ._base import BaseHandler + +import logging + +logger = logging.getLogger(__name__) + + +class DeviceHandler(BaseHandler): + def __init__(self, hs): + super(DeviceHandler, self).__init__(hs) + + @defer.inlineCallbacks + def check_device_registered(self, user_id, device_id, + initial_device_display_name=None): + """ + If the given device has not been registered, register it with the + supplied display name. + + If no device_id is supplied, we make one up. + + Args: + user_id (str): @user:id + device_id (str | None): device id supplied by client + initial_device_display_name (str | None): device display name from + client + Returns: + str: device id (generated if none was supplied) + """ + if device_id is not None: + yield self.store.store_device( + user_id=user_id, + device_id=device_id, + initial_device_display_name=initial_device_display_name, + ignore_if_known=True, + ) + defer.returnValue(device_id) + + # if the device id is not specified, we'll autogen one, but loop a few + # times in case of a clash. + attempts = 0 + while attempts < 5: + try: + device_id = stringutils.random_string_with_symbols(16) + yield self.store.store_device( + user_id=user_id, + device_id=device_id, + initial_device_display_name=initial_device_display_name, + ignore_if_known=False, + ) + defer.returnValue(device_id) + except errors.StoreError: + attempts += 1 + + raise errors.StoreError(500, "Couldn't generate a device ID.") + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """ + Retrieve the given user's devices + + Args: + user_id (str): + Returns: + defer.Deferred: list[dict[str, X]]: info on each device + """ + + device_map = yield self.store.get_devices_by_user(user_id) + + ips = yield self.store.get_last_client_ip_by_device( + devices=((user_id, device_id) for device_id in device_map.keys()) + ) + + devices = device_map.values() + for device in devices: + _update_device_from_client_ips(device, ips) + + defer.returnValue(devices) + + @defer.inlineCallbacks + def get_device(self, user_id, device_id): + """ Retrieve the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: dict[str, X]: info on the device + Raises: + errors.NotFoundError: if the device was not found + """ + try: + device = yield self.store.get_device(user_id, device_id) + except errors.StoreError: + raise errors.NotFoundError + ips = yield self.store.get_last_client_ip_by_device( + devices=((user_id, device_id),) + ) + _update_device_from_client_ips(device, ips) + defer.returnValue(device) + + @defer.inlineCallbacks + def delete_device(self, user_id, device_id): + """ Delete the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: + """ + + try: + yield self.store.delete_device(user_id, device_id) + except errors.StoreError, e: + if e.code == 404: + # no match + pass + else: + raise + + yield self.store.user_delete_access_tokens( + user_id, device_id=device_id, + delete_refresh_tokens=True, + ) + + yield self.store.delete_e2e_keys_by_device( + user_id=user_id, device_id=device_id + ) + + @defer.inlineCallbacks + def update_device(self, user_id, device_id, content): + """ Update the given device + + Args: + user_id (str): + device_id (str): + content (dict): body of update request + + Returns: + defer.Deferred: + """ + + try: + yield self.store.update_device( + user_id, + device_id, + new_display_name=content.get("display_name") + ) + except errors.StoreError, e: + if e.code == 404: + raise errors.NotFoundError() + else: + raise + + +def _update_device_from_client_ips(device, client_ips): + ip = client_ips.get((device["user_id"], device["device_id"]), {}) + device.update({ + "last_seen_ts": ip.get("last_seen"), + "last_seen_ip": ip.get("ip"), + }) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py new file mode 100644 index 0000000000..2c7bfd91ed --- /dev/null +++ b/synapse/handlers/e2e_keys.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import json +import logging + +from twisted.internet import defer + +from synapse.api import errors +import synapse.types + +logger = logging.getLogger(__name__) + + +class E2eKeysHandler(object): + def __init__(self, hs): + self.store = hs.get_datastore() + self.federation = hs.get_replication_layer() + self.is_mine_id = hs.is_mine_id + self.server_name = hs.hostname + + # doesn't really work as part of the generic query API, because the + # query request requires an object POST, but we abuse the + # "query handler" interface. + self.federation.register_query_handler( + "client_keys", self.on_federation_query_client_keys + ) + + @defer.inlineCallbacks + def query_devices(self, query_body): + """ Handle a device key query from a client + + { + "device_keys": { + "<user_id>": ["<device_id>"] + } + } + -> + { + "device_keys": { + "<user_id>": { + "<device_id>": { + ... + } + } + } + } + """ + device_keys_query = query_body.get("device_keys", {}) + + # separate users by domain. + # make a map from domain to user_id to device_ids + queries_by_domain = collections.defaultdict(dict) + for user_id, device_ids in device_keys_query.items(): + user = synapse.types.UserID.from_string(user_id) + queries_by_domain[user.domain][user_id] = device_ids + + # do the queries + # TODO: do these in parallel + results = {} + for destination, destination_query in queries_by_domain.items(): + if destination == self.server_name: + res = yield self.query_local_devices(destination_query) + else: + res = yield self.federation.query_client_keys( + destination, {"device_keys": destination_query} + ) + res = res["device_keys"] + for user_id, keys in res.items(): + if user_id in destination_query: + results[user_id] = keys + + defer.returnValue((200, {"device_keys": results})) + + @defer.inlineCallbacks + def query_local_devices(self, query): + """Get E2E device keys for local users + + Args: + query (dict[string, list[string]|None): map from user_id to a list + of devices to query (None for all devices) + + Returns: + defer.Deferred: (resolves to dict[string, dict[string, dict]]): + map from user_id -> device_id -> device details + """ + local_query = [] + + result_dict = {} + for user_id, device_ids in query.items(): + if not self.is_mine_id(user_id): + logger.warning("Request for keys for non-local user %s", + user_id) + raise errors.SynapseError(400, "Not a user here") + + if not device_ids: + local_query.append((user_id, None)) + else: + for device_id in device_ids: + local_query.append((user_id, device_id)) + + # make sure that each queried user appears in the result dict + result_dict[user_id] = {} + + results = yield self.store.get_e2e_device_keys(local_query) + + # Build the result structure, un-jsonify the results, and add the + # "unsigned" section + for user_id, device_keys in results.items(): + for device_id, device_info in device_keys.items(): + r = json.loads(device_info["key_json"]) + r["unsigned"] = {} + display_name = device_info["device_display_name"] + if display_name is not None: + r["unsigned"]["device_display_name"] = display_name + result_dict[user_id][device_id] = r + + defer.returnValue(result_dict) + + @defer.inlineCallbacks + def on_federation_query_client_keys(self, query_body): + """ Handle a device key query from a federated server + """ + device_keys_query = query_body.get("device_keys", {}) + res = yield self.query_local_devices(device_keys_query) + defer.returnValue({"device_keys": res}) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6c0bc7eafa..618cb53629 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -124,7 +124,7 @@ class FederationHandler(BaseHandler): try: event_stream_id, max_stream_id = yield self._persist_auth_tree( - auth_chain, state, event + origin, auth_chain, state, event ) except AuthError as e: raise FederationError( @@ -335,31 +335,58 @@ class FederationHandler(BaseHandler): state_events.update({s.event_id: s for s in state}) events_to_state[e_id] = state - seen_events = yield self.store.have_events( - set(auth_events.keys()) | set(state_events.keys()) - ) - - all_events = events + state_events.values() + auth_events.values() required_auth = set( - a_id for event in all_events for a_id, _ in event.auth_events + a_id + for event in events + state_events.values() + auth_events.values() + for a_id, _ in event.auth_events ) - + auth_events.update({ + e_id: event_map[e_id] for e_id in required_auth if e_id in event_map + }) missing_auth = required_auth - set(auth_events) - if missing_auth: + failed_to_fetch = set() + + # Try and fetch any missing auth events from both DB and remote servers. + # We repeatedly do this until we stop finding new auth events. + while missing_auth - failed_to_fetch: logger.info("Missing auth for backfill: %r", missing_auth) - results = yield defer.gatherResults( - [ - self.replication_layer.get_pdu( - [dest], - event_id, - outlier=True, - timeout=10000, - ) - for event_id in missing_auth - ], - consumeErrors=True - ).addErrback(unwrapFirstError) - auth_events.update({a.event_id: a for a in results}) + ret_events = yield self.store.get_events(missing_auth - failed_to_fetch) + auth_events.update(ret_events) + + required_auth.update( + a_id for event in ret_events.values() for a_id, _ in event.auth_events + ) + missing_auth = required_auth - set(auth_events) + + if missing_auth - failed_to_fetch: + logger.info( + "Fetching missing auth for backfill: %r", + missing_auth - failed_to_fetch + ) + + results = yield defer.gatherResults( + [ + self.replication_layer.get_pdu( + [dest], + event_id, + outlier=True, + timeout=10000, + ) + for event_id in missing_auth - failed_to_fetch + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + auth_events.update({a.event_id: a for a in results}) + required_auth.update( + a_id for event in results for a_id, _ in event.auth_events + ) + missing_auth = required_auth - set(auth_events) + + failed_to_fetch = missing_auth - set(auth_events) + + seen_events = yield self.store.have_events( + set(auth_events.keys()) | set(state_events.keys()) + ) ev_infos = [] for a in auth_events.values(): @@ -372,6 +399,7 @@ class FederationHandler(BaseHandler): (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] for a_id, _ in a.auth_events + if a_id in auth_events } }) @@ -383,6 +411,7 @@ class FederationHandler(BaseHandler): (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] for a_id, _ in event_map[e_id].auth_events + if a_id in auth_events } }) @@ -637,7 +666,7 @@ class FederationHandler(BaseHandler): pass event_stream_id, max_stream_id = yield self._persist_auth_tree( - auth_chain, state, event + origin, auth_chain, state, event ) with PreserveLoggingContext(): @@ -688,7 +717,9 @@ class FederationHandler(BaseHandler): logger.warn("Failed to create join %r because %s", event, e) raise e - self.auth.check(event, auth_events=context.current_state) + # The remote hasn't signed it yet, obviously. We'll do the full checks + # when we get the event back in `on_send_join_request` + self.auth.check(event, auth_events=context.current_state, do_sig_check=False) defer.returnValue(event) @@ -918,7 +949,9 @@ class FederationHandler(BaseHandler): ) try: - self.auth.check(event, auth_events=context.current_state) + # The remote hasn't signed it yet, obviously. We'll do the full checks + # when we get the event back in `on_send_leave_request` + self.auth.check(event, auth_events=context.current_state, do_sig_check=False) except AuthError as e: logger.warn("Failed to create new leave %r because %s", event, e) raise e @@ -987,14 +1020,9 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): + def get_state_for_pdu(self, room_id, event_id): yield run_on_reactor() - if do_auth: - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) @@ -1114,11 +1142,12 @@ class FederationHandler(BaseHandler): backfilled=backfilled, ) - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( - event_stream_id, max_stream_id - ) + if not backfilled: + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1150,11 +1179,19 @@ class FederationHandler(BaseHandler): ) @defer.inlineCallbacks - def _persist_auth_tree(self, auth_events, state, event): + def _persist_auth_tree(self, origin, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event seperately. + Will attempt to fetch missing auth events. + + Args: + origin (str): Where the events came from + auth_events (list) + state (list) + event (Event) + Returns: 2-tuple of (event_stream_id, max_stream_id) from the persist_event call for `event` @@ -1167,7 +1204,7 @@ class FederationHandler(BaseHandler): event_map = { e.event_id: e - for e in auth_events + for e in itertools.chain(auth_events, state, [event]) } create_event = None @@ -1176,10 +1213,29 @@ class FederationHandler(BaseHandler): create_event = e break + missing_auth_events = set() + for e in itertools.chain(auth_events, state, [event]): + for e_id, _ in e.auth_events: + if e_id not in event_map: + missing_auth_events.add(e_id) + + for e_id in missing_auth_events: + m_ev = yield self.replication_layer.get_pdu( + [origin], + e_id, + outlier=True, + timeout=10000, + ) + if m_ev and m_ev.event_id == e_id: + event_map[e_id] = m_ev + else: + logger.info("Failed to find auth event %r", e_id) + for e in itertools.chain(auth_events, state, [event]): auth_for_e = { (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] for e_id, _ in e.auth_events + if e_id in event_map } if create_event: auth_for_e[(EventTypes.Create, "")] = create_event @@ -1413,7 +1469,7 @@ class FederationHandler(BaseHandler): local_view = dict(auth_events) remote_view = dict(auth_events) remote_view.update({ - (d.type, d.state_key): d for d in different_events + (d.type, d.state_key): d for d in different_events if d }) new_state, prev_state = self.state_handler.resolve_events( diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 656ce124f9..559e5d5a71 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -21,7 +21,7 @@ from synapse.api.errors import ( ) from ._base import BaseHandler from synapse.util.async import run_on_reactor -from synapse.api.errors import SynapseError +from synapse.api.errors import SynapseError, Codes import json import logging @@ -41,6 +41,20 @@ class IdentityHandler(BaseHandler): hs.config.use_insecure_ssl_client_just_for_testing_do_not_use ) + def _should_trust_id_server(self, id_server): + if id_server not in self.trusted_id_servers: + if self.trust_any_id_server_just_for_testing_do_not_use: + logger.warn( + "Trusting untrustworthy ID server %r even though it isn't" + " in the trusted id list for testing because" + " 'use_insecure_ssl_client_just_for_testing_do_not_use'" + " is set in the config", + id_server, + ) + else: + return False + return True + @defer.inlineCallbacks def threepid_from_creds(self, creds): yield run_on_reactor() @@ -59,19 +73,12 @@ class IdentityHandler(BaseHandler): else: raise SynapseError(400, "No client_secret in creds") - if id_server not in self.trusted_id_servers: - if self.trust_any_id_server_just_for_testing_do_not_use: - logger.warn( - "Trusting untrustworthy ID server %r even though it isn't" - " in the trusted id list for testing because" - " 'use_insecure_ssl_client_just_for_testing_do_not_use'" - " is set in the config", - id_server, - ) - else: - logger.warn('%s is not a trusted ID server: rejecting 3pid ' + - 'credentials', id_server) - defer.returnValue(None) + if not self._should_trust_id_server(id_server): + logger.warn( + '%s is not a trusted ID server: rejecting 3pid ' + + 'credentials', id_server + ) + defer.returnValue(None) data = {} try: @@ -129,6 +136,12 @@ class IdentityHandler(BaseHandler): def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): yield run_on_reactor() + if not self._should_trust_id_server(id_server): + raise SynapseError( + 400, "Untrusted ID server '%s'" % id_server, + Codes.SERVER_NOT_TRUSTED + ) + params = { 'email': email, 'client_secret': client_secret, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 15caf1950a..dc76d34a52 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -26,7 +26,7 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id ) from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute, run_on_reactor +from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.logcontext import preserve_fn from synapse.visibility import filter_events_for_client @@ -50,9 +50,23 @@ class MessageHandler(BaseHandler): self.validator = EventValidator() self.snapshot_cache = SnapshotCache() + self.pagination_lock = ReadWriteLock() + + @defer.inlineCallbacks + def purge_history(self, room_id, event_id): + event = yield self.store.get_event(event_id) + + if event.room_id != room_id: + raise SynapseError(400, "Event is for wrong room.") + + depth = event.depth + + with (yield self.pagination_lock.write(room_id)): + yield self.store.delete_old_state(room_id, depth) + @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, - as_client_event=True): + as_client_event=True, event_filter=None): """Get messages in a room. Args: @@ -61,11 +75,11 @@ class MessageHandler(BaseHandler): pagin_config (synapse.api.streams.PaginationConfig): The pagination config rules to apply, if any. as_client_event (bool): True to get events in client-server format. + event_filter (Filter): Filter to apply to results or None Returns: dict: Pagination API results """ user_id = requester.user.to_string() - data_source = self.hs.get_event_sources().sources["room"] if pagin_config.from_token: room_token = pagin_config.from_token.room_key @@ -85,42 +99,48 @@ class MessageHandler(BaseHandler): source_config = pagin_config.get_source_config("room") - membership, member_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) + with (yield self.pagination_lock.read(room_id)): + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) - if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token_for_stream_and_room( - room_id, room_token.stream - ) + if source_config.direction == 'b': + # if we're going backwards, we might need to backfill. This + # requires that we have a topo token. + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token( + room_id, room_token.stream + ) + + if membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + leave_token = yield self.store.get_topological_token_for_event( + member_event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < max_topo: + source_config.from_key = str(leave_token) - if membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - leave_token = yield self.store.get_topological_token_for_event( - member_event_id + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, max_topo ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo + events, next_key = yield self.store.paginate_room_events( + room_id=room_id, + from_key=source_config.from_key, + to_key=source_config.to_key, + direction=source_config.direction, + limit=source_config.limit, + event_filter=event_filter, ) - events, next_key = yield data_source.get_pagination_rows( - requester.user, source_config, room_id - ) - - next_token = pagin_config.from_token.copy_and_replace( - "room_key", next_key - ) + next_token = pagin_config.from_token.copy_and_replace( + "room_key", next_key + ) if not events: defer.returnValue({ @@ -129,6 +149,9 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) + if event_filter: + events = event_filter.filter(events) + events = yield filter_events_for_client( self.store, user_id, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 711a6a567f..d9ac09078d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -13,15 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer +import synapse.types from synapse.api.errors import SynapseError, AuthError, CodeMessageException -from synapse.types import UserID, Requester - +from synapse.types import UserID from ._base import BaseHandler -import logging - logger = logging.getLogger(__name__) @@ -165,7 +165,9 @@ class ProfileHandler(BaseHandler): try: # Assume the user isn't a guest because we don't let guests set # profile or avatar data. - requester = Requester(user, "", False) + # XXX why are we recreating `requester` here for each room? + # what was wrong with the `requester` we were passed? + requester = synapse.types.create_requester(user) yield handler.update_membership( requester, user, diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 0b7517221d..dd75c4fecf 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -14,18 +14,19 @@ # limitations under the License. """Contains functions for registering clients.""" +import logging +import urllib + from twisted.internet import defer -from synapse.types import UserID, Requester +import synapse.types from synapse.api.errors import ( AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError ) -from ._base import BaseHandler -from synapse.util.async import run_on_reactor from synapse.http.client import CaptchaServerHttpClient - -import logging -import urllib +from synapse.types import UserID +from synapse.util.async import run_on_reactor +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -52,6 +53,13 @@ class RegistrationHandler(BaseHandler): Codes.INVALID_USERNAME ) + if localpart[0] == '_': + raise SynapseError( + 400, + "User ID may not begin with _", + Codes.INVALID_USERNAME + ) + user = UserID(localpart, self.hs.hostname) user_id = user.to_string() @@ -90,7 +98,8 @@ class RegistrationHandler(BaseHandler): password=None, generate_token=True, guest_access_token=None, - make_guest=False + make_guest=False, + admin=False, ): """Registers a new client on the server. @@ -98,8 +107,13 @@ class RegistrationHandler(BaseHandler): localpart : The local part of the user ID to register. If None, one will be generated. password (str) : The password to assign to this user so they can - login again. This can be None which means they cannot login again - via a password (e.g. the user is an application service user). + login again. This can be None which means they cannot login again + via a password (e.g. the user is an application service user). + generate_token (bool): Whether a new access token should be + generated. Having this be True should be considered deprecated, + since it offers no means of associating a device_id with the + access_token. Instead you should call auth_handler.issue_access_token + after registration. Returns: A tuple of (user_id, access_token). Raises: @@ -141,6 +155,7 @@ class RegistrationHandler(BaseHandler): # If the user was a guest then they already have a profile None if was_guest else user.localpart ), + admin=admin, ) else: # autogen a sequential user ID @@ -194,15 +209,13 @@ class RegistrationHandler(BaseHandler): user_id, allowed_appservice=service ) - token = self.auth_handler().generate_access_token(user_id) yield self.store.register( user_id=user_id, - token=token, password_hash="", appservice_id=service_id, create_profile_with_localpart=user.localpart, ) - defer.returnValue((user_id, token)) + defer.returnValue(user_id) @defer.inlineCallbacks def check_recaptcha(self, ip, private_key, challenge, response): @@ -358,7 +371,8 @@ class RegistrationHandler(BaseHandler): defer.returnValue(data) @defer.inlineCallbacks - def get_or_create_user(self, localpart, displayname, duration_seconds): + def get_or_create_user(self, localpart, displayname, duration_in_ms, + password_hash=None): """Creates a new user if the user does not exist, else revokes all previous access tokens and generates a new one. @@ -387,14 +401,14 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() - token = self.auth_handler().generate_short_term_login_token( - user_id, duration_seconds) + token = self.auth_handler().generate_access_token( + user_id, None, duration_in_ms) if need_register: yield self.store.register( user_id=user_id, token=token, - password_hash=None, + password_hash=password_hash, create_profile_with_localpart=user.localpart, ) else: @@ -404,8 +418,9 @@ class RegistrationHandler(BaseHandler): if displayname is not None: logger.info("setting user display name: %s -> %s", user_id, displayname) profile_handler = self.hs.get_handlers().profile_handler + requester = synapse.types.create_requester(user) yield profile_handler.set_displayname( - user, Requester(user, token, False), displayname + user, requester, displayname ) defer.returnValue((user_id, token)) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ae44c7a556..bf6b1c1535 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler): class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache() - self.remote_list_request_cache = ResponseCache() + self.response_cache = ResponseCache(hs) + self.remote_list_request_cache = ResponseCache(hs) self.remote_list_cache = {} self.fetch_looping_call = hs.get_clock().looping_call( self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7e616f44fd..8cec8fc4ed 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,24 +14,22 @@ # limitations under the License. -from twisted.internet import defer +import logging -from ._base import BaseHandler +from signedjson.key import decode_verify_key_bytes +from signedjson.sign import verify_signed_json +from twisted.internet import defer +from unpaddedbase64 import decode_base64 -from synapse.types import UserID, RoomID, Requester +import synapse.types from synapse.api.constants import ( EventTypes, Membership, ) from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room - -from signedjson.sign import verify_signed_json -from signedjson.key import decode_verify_key_bytes - -from unpaddedbase64 import decode_base64 - -import logging +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -315,7 +313,7 @@ class RoomMemberHandler(BaseHandler): ) assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) else: - requester = Requester(target_user, None, False) + requester = synapse.types.create_requester(target_user) message_handler = self.hs.get_handlers().message_handler prev_event = message_handler.deduplicate_state_event(event, context) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index be26a491ff..0ee4ebe504 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -138,7 +138,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache() + self.response_cache = ResponseCache(hs) def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): |