diff options
69 files changed, 2120 insertions, 398 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index e1d5e876dc..03668370a9 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,67 @@ +Changes in synapse v0.17.0-rc1 (2016-07-28) +=========================================== + +This release changes the LDAP configuration format in a backwards incompatible +way, see PR #843 for details. + +This release contains significant security bug fixes regarding authenticating +events received over federation. Please upgrade. + + +Features: + +* Add purge_media_cache admin API (PR #902) +* Add deactivate account admin API (PR #903) +* Add optional pepper to password hashing (PR #907, #910 by KentShikama) +* Add an admin option to shared secret registration (breaks backwards compat) + (PR #909) +* Add purge local room history API (PR #911, #923, #924) +* Add requestToken endpoints (PR #915) +* Add an /account/deactivate endpoint (PR #921) +* Add filter param to /messages. Add 'contains_url' to filter. (PR #922) +* Add device_id support to /login (PR #929) +* Add device_id support to /v2/register flow. (PR #937, #942) +* Add GET /devices endpoint (PR #939, #944) +* Add GET /device/{deviceId} (PR #943) +* Add update and delete APIs for devices (PR #949) + + +Changes: + +* Rewrite LDAP Authentication against ldap3 (PR #843 by mweinelt) +* Linearize some federation endpoints based on (origin, room_id) (PR #879) +* Remove the legacy v0 content upload API. (PR #888) +* Use similar naming we use in email notifs for push (PR #894) +* Optionally include password hash in createUser endpoint (PR #905 by + KentShikama) +* Use a query that postgresql optimises better for get_events_around (PR #906) +* Fall back to 'username' if 'user' is not given for appservice registration. + (PR #927 by Half-Shot) +* Add metrics for psutil derived memory usage (PR #936) +* Record device_id in client_ips (PR #938) +* Send the correct host header when fetching keys (PR #941) +* Log the hostname the reCAPTCHA was completed on (PR #946) +* Make the device id on e2e key upload optional (PR #956) +* Add r0.2.0 to the "supported versions" list (PR #960) +* Don't include name of room for invites in push (PR #961) + + +Bug fixes: + +* Fix substitution failure in mail template (PR #887) +* Put most recent 20 messages in email notif (PR #892) +* Ensure that the guest user is in the database when upgrading accounts + (PR #914) +* Fix various edge cases in auth handling (PR #919) +* Fix 500 ISE when sending alias event without a state_key (PR #925) +* Fix bug where we stored rejections in the state_group, persist all + rejections (PR #948) +* Fix lack of check of if the user is banned when handling 3pid invites + (PR #952) +* Fix a couple of bugs in the transaction and keyring code (PR #954, #955) + + + Changes in synapse v0.16.1-r1 (2016-07-08) ========================================== diff --git a/README.rst b/README.rst index ebcb15a977..89458badc9 100644 --- a/README.rst +++ b/README.rst @@ -445,7 +445,7 @@ You have two choices here, which will influence the form of your Matrix user IDs: 1) Use the machine's own hostname as available on public DNS in the form of - its A or AAAA records. This is easier to set up initially, perhaps for + its A records. This is easier to set up initially, perhaps for testing, but lacks the flexibility of SRV. 2) Set up a SRV record for your domain name. This requires you create a SRV diff --git a/UPGRADE.rst b/UPGRADE.rst index 699f04c2c2..9f044719a0 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -27,7 +27,7 @@ running: # Pull the latest version of the master branch. git pull # Update the versions of synapse's python dependencies. - python synapse/python_dependencies.py | xargs -n1 pip install + python synapse/python_dependencies.py | xargs -n1 pip install --upgrade Upgrading to v0.15.0 diff --git a/docs/admin_api/README.rst b/docs/admin_api/README.rst new file mode 100644 index 0000000000..d4f564cfae --- /dev/null +++ b/docs/admin_api/README.rst @@ -0,0 +1,12 @@ +Admin APIs +========== + +This directory includes documentation for the various synapse specific admin +APIs available. + +Only users that are server admins can use these APIs. A user can be marked as a +server admin by updating the database directly, e.g.: + +``UPDATE users SET admin = 1 WHERE name = '@foo:bar.com'`` + +Restarting may be required for the changes to register. diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst new file mode 100644 index 0000000000..986efe40f9 --- /dev/null +++ b/docs/admin_api/purge_history_api.rst @@ -0,0 +1,15 @@ +Purge History API +================= + +The purge history API allows server admins to purge historic events from their +database, reclaiming disk space. + +Depending on the amount of history being purged a call to the API may take +several minutes or longer. During this period users will not be able to +paginate further back in the room from the point being purged from. + +The API is simply: + +``POST /_matrix/client/r0/admin/purge_history/<room_id>/<event_id>`` + +including an ``access_token`` of a server admin. diff --git a/docs/admin_api/purge_remote_media.rst b/docs/admin_api/purge_remote_media.rst new file mode 100644 index 0000000000..b26c6a9e7b --- /dev/null +++ b/docs/admin_api/purge_remote_media.rst @@ -0,0 +1,19 @@ +Purge Remote Media API +====================== + +The purge remote media API allows server admins to purge old cached remote +media. + +The API is:: + + POST /_matrix/client/r0/admin/purge_media_cache + + { + "before_ts": <unix_timestamp_in_ms> + } + +Which will remove all cached media that was last accessed before +``<unix_timestamp_in_ms>``. + +If the user re-requests purged remote media, synapse will re-request the media +from the originating server. diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index 50268e0982..f715cd559a 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -69,8 +69,8 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_BASE:=8000} -: ${PORT_COUNT=20} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} ./jenkins/prep_sytest_for_postgres.sh @@ -82,6 +82,7 @@ echo >&2 "Running sytest with PostgreSQL"; --dendron $WORKSPACE/dendron/bin/dendron \ --pusher \ --synchrotron \ + --federation-reader \ --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) cd .. diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index 2f0768fcb7..7a43df0d58 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -43,8 +43,8 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_BASE:=8000} -: ${PORT_COUNT=20} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} ./jenkins/prep_sytest_for_postgres.sh diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index da603c5af8..27e61af6ee 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -41,8 +41,9 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_COUNT=20} -: ${PORT_BASE:=8000} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} + ./jenkins/install_and_run.sh --coverage \ --python $TOX_BIN/python \ --synapse-directory $WORKSPACE \ diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index ea62dceb36..caa3cee4e7 100644 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -116,11 +116,12 @@ def get_json(origin_name, origin_key, destination, path): authorization_headers = [] for key, sig in signed_json["signatures"][origin_name].items(): - authorization_headers.append(bytes( - "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( - origin_name, key, sig, - ) - )) + header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( + origin_name, key, sig, + ) + authorization_headers.append(bytes(header)) + sys.stderr.write(header) + sys.stderr.write("\n") result = requests.get( lookup(destination, path), diff --git a/setup.cfg b/setup.cfg index 5ebce1c56b..da8eafbb39 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,5 @@ ignore = [flake8] max-line-length = 90 -ignore = W503 ; W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it. - -[pep8] -max-line-length = 90 +# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it. +ignore = W503 diff --git a/synapse/__init__.py b/synapse/__init__.py index 2750ad3f7a..8f0176e182 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.16.1-r1" +__version__ = "0.17.0-rc1" diff --git a/synapse/api/auth.py b/synapse/api/auth.py index eca8513905..59db76debc 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -13,22 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +import pymacaroons from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json, SignatureVerifyException - from twisted.internet import defer +from unpaddedbase64 import decode_base64 +import synapse.types from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError -from synapse.types import Requester, UserID, get_domain_from_id -from synapse.util.logutils import log_function +from synapse.types import UserID, get_domain_from_id from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logutils import log_function from synapse.util.metrics import Measure -from unpaddedbase64 import decode_base64 - -import logging -import pymacaroons logger = logging.getLogger(__name__) @@ -376,6 +376,10 @@ class Auth(object): if Membership.INVITE == membership and "third_party_invite" in event.content: if not self._verify_third_party_invite(event, auth_events): raise AuthError(403, "You are not invited to this room.") + if target_banned: + raise AuthError( + 403, "%s is banned from the room" % (target_user_id,) + ) return True if Membership.JOIN != membership: @@ -566,8 +570,7 @@ class Auth(object): Args: request - An HTTP request with an access_token query parameter. Returns: - defer.Deferred: resolves to a namedtuple including "user" (UserID) - "access_token_id" (int), "is_guest" (bool) + defer.Deferred: resolves to a ``synapse.types.Requester`` object Raises: AuthError if no user by that token exists or the token is invalid. """ @@ -576,9 +579,7 @@ class Auth(object): user_id = yield self._get_appservice_user_id(request.args) if user_id: request.authenticated_entity = user_id - defer.returnValue( - Requester(UserID.from_string(user_id), "", False) - ) + defer.returnValue(synapse.types.create_requester(user_id)) access_token = request.args["access_token"][0] user_info = yield self.get_user_by_access_token(access_token, rights) @@ -612,7 +613,8 @@ class Auth(object): request.authenticated_entity = user.to_string() - defer.returnValue(Requester(user, token_id, is_guest)) + defer.returnValue(synapse.types.create_requester( + user, token_id, is_guest, device_id)) except KeyError: raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token.", diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index 1bc4279807..9c2b627590 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -16,13 +16,11 @@ import sys sys.dont_write_bytecode = True -from synapse.python_dependencies import ( - check_requirements, MissingRequirementError -) # NOQA +from synapse import python_dependencies # noqa: E402 try: - check_requirements() -except MissingRequirementError as e: + python_dependencies.check_requirements() +except python_dependencies.MissingRequirementError as e: message = "\n".join([ "Missing Requirement: %s" % (e.message,), "To install run:", diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py new file mode 100644 index 0000000000..58d425f9ac --- /dev/null +++ b/synapse/app/federation_reader.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.api.urls import FEDERATION_PREFIX +from synapse.federation.transport.server import TransportLayerServer +from synapse.crypto import context_factory + + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.federation_reader") + + +class FederationReaderSlavedStore( + SlavedEventStore, + SlavedKeyStore, + RoomStore, + DirectoryStore, + TransactionStore, + BaseSlavedStore, +): + pass + + +class FederationReaderServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "federation": + resources.update({ + FEDERATION_PREFIX: TransportLayerServer(self), + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse federation reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(5) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse federation reader", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.federation_reader" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = FederationReaderServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-federation-reader", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 54b83da9d8..c2bd64d6c2 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -77,10 +77,12 @@ class SynapseKeyClientProtocol(HTTPClient): def __init__(self): self.remote_key = defer.Deferred() self.host = None + self._peer = None def connectionMade(self): - self.host = self.transport.getHost() - logger.debug("Connected to %s", self.host) + self._peer = self.transport.getPeer() + logger.debug("Connected to %s", self._peer) + self.sendCommand(b"GET", self.path) if self.host: self.sendHeader(b"Host", self.host) @@ -124,7 +126,10 @@ class SynapseKeyClientProtocol(HTTPClient): self.timer.cancel() def on_timeout(self): - logger.debug("Timeout waiting for response from %s", self.host) + logger.debug( + "Timeout waiting for response from %s: %s", + self.host, self._peer, + ) self.errback(IOError("Timeout waiting for response")) self.transport.abortConnection() @@ -133,4 +138,5 @@ class SynapseKeyClientFactory(Factory): def protocol(self): protocol = SynapseKeyClientProtocol() protocol.path = self.path + protocol.host = self.host return protocol diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d08ee0aa91..5012c10ee8 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -44,7 +44,21 @@ import logging logger = logging.getLogger(__name__) -KeyGroup = namedtuple("KeyGroup", ("server_name", "group_id", "key_ids")) +VerifyKeyRequest = namedtuple("VerifyRequest", ( + "server_name", "key_ids", "json_object", "deferred" +)) +""" +A request for a verify key to verify a JSON object. + +Attributes: + server_name(str): The name of the server to verify against. + key_ids(set(str)): The set of key_ids to that could be used to verify the + JSON object + json_object(dict): The JSON object to verify. + deferred(twisted.internet.defer.Deferred): + A deferred (server_name, key_id, verify_key) tuple that resolves when + a verify key has been fetched +""" class Keyring(object): @@ -74,39 +88,32 @@ class Keyring(object): list of deferreds indicating success or failure to verify each json object's signature for the given server_name. """ - group_id_to_json = {} - group_id_to_group = {} - group_ids = [] - - next_group_id = 0 - deferreds = {} + verify_requests = [] for server_name, json_object in server_and_json: logger.debug("Verifying for %s", server_name) - group_id = next_group_id - next_group_id += 1 - group_ids.append(group_id) key_ids = signature_ids(json_object, server_name) if not key_ids: - deferreds[group_id] = defer.fail(SynapseError( + deferred = defer.fail(SynapseError( 400, "Not signed with a supported algorithm", Codes.UNAUTHORIZED, )) else: - deferreds[group_id] = defer.Deferred() + deferred = defer.Deferred() - group = KeyGroup(server_name, group_id, key_ids) + verify_request = VerifyKeyRequest( + server_name, key_ids, json_object, deferred + ) - group_id_to_group[group_id] = group - group_id_to_json[group_id] = json_object + verify_requests.append(verify_request) @defer.inlineCallbacks - def handle_key_deferred(group, deferred): - server_name = group.server_name + def handle_key_deferred(verify_request): + server_name = verify_request.server_name try: - _, _, key_id, verify_key = yield deferred + _, key_id, verify_key = yield verify_request.deferred except IOError as e: logger.warn( "Got IOError when downloading keys for %s: %s %s", @@ -128,7 +135,7 @@ class Keyring(object): Codes.UNAUTHORIZED, ) - json_object = group_id_to_json[group.group_id] + json_object = verify_request.json_object try: verify_signed_json(json_object, server_name, verify_key) @@ -157,36 +164,34 @@ class Keyring(object): # Actually start fetching keys. wait_on_deferred.addBoth( - lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) + lambda _: self.get_server_verify_keys(verify_requests) ) # When we've finished fetching all the keys for a given server_name, # resolve the deferred passed to `wait_for_previous_lookups` so that # any lookups waiting will proceed. - server_to_gids = {} + server_to_request_ids = {} - def remove_deferreds(res, server_name, group_id): - server_to_gids[server_name].discard(group_id) - if not server_to_gids[server_name]: + def remove_deferreds(res, server_name, verify_request): + request_id = id(verify_request) + server_to_request_ids[server_name].discard(request_id) + if not server_to_request_ids[server_name]: d = server_to_deferred.pop(server_name, None) if d: d.callback(None) return res - for g_id, deferred in deferreds.items(): - server_name = group_id_to_group[g_id].server_name - server_to_gids.setdefault(server_name, set()).add(g_id) - deferred.addBoth(remove_deferreds, server_name, g_id) + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + deferred.addBoth(remove_deferreds, server_name, verify_request) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified return [ - preserve_context_over_fn( - handle_key_deferred, - group_id_to_group[g_id], - deferreds[g_id], - ) - for g_id in group_ids + preserve_context_over_fn(handle_key_deferred, verify_request) + for verify_request in verify_requests ] @defer.inlineCallbacks @@ -220,7 +225,7 @@ class Keyring(object): d.addBoth(rm, server_name) - def get_server_verify_keys(self, group_id_to_group, group_id_to_deferred): + def get_server_verify_keys(self, verify_requests): """Takes a dict of KeyGroups and tries to find at least one key for each group. """ @@ -237,62 +242,64 @@ class Keyring(object): merged_results = {} missing_keys = {} - for group in group_id_to_group.values(): - missing_keys.setdefault(group.server_name, set()).update( - group.key_ids + for verify_request in verify_requests: + missing_keys.setdefault(verify_request.server_name, set()).update( + verify_request.key_ids ) for fn in key_fetch_fns: results = yield fn(missing_keys.items()) merged_results.update(results) - # We now need to figure out which groups we have keys for - # and which we don't - missing_groups = {} - for group in group_id_to_group.values(): - for key_id in group.key_ids: - if key_id in merged_results[group.server_name]: + # We now need to figure out which verify requests we have keys + # for and which we don't + missing_keys = {} + requests_missing_keys = [] + for verify_request in verify_requests: + server_name = verify_request.server_name + result_keys = merged_results[server_name] + + if verify_request.deferred.called: + # We've already called this deferred, which probably + # means that we've already found a key for it. + continue + + for key_id in verify_request.key_ids: + if key_id in result_keys: with PreserveLoggingContext(): - group_id_to_deferred[group.group_id].callback(( - group.group_id, - group.server_name, + verify_request.deferred.callback(( + server_name, key_id, - merged_results[group.server_name][key_id], + result_keys[key_id], )) break else: - missing_groups.setdefault( - group.server_name, [] - ).append(group) - - if not missing_groups: + # The else block is only reached if the loop above + # doesn't break. + missing_keys.setdefault(server_name, set()).update( + verify_request.key_ids + ) + requests_missing_keys.append(verify_request) + + if not missing_keys: break - missing_keys = { - server_name: set( - key_id for group in groups for key_id in group.key_ids - ) - for server_name, groups in missing_groups.items() - } - - for group in missing_groups.values(): - group_id_to_deferred[group.group_id].errback(SynapseError( + for verify_request in requests_missing_keys.values(): + verify_request.deferred.errback(SynapseError( 401, "No key for %s with id %s" % ( - group.server_name, group.key_ids, + verify_request.server_name, verify_request.key_ids, ), Codes.UNAUTHORIZED, )) def on_err(err): - for deferred in group_id_to_deferred.values(): - if not deferred.called: - deferred.errback(err) + for verify_request in verify_requests: + if not verify_request.deferred.called: + verify_request.deferred.errback(err) do_iterations().addErrback(on_err) - return group_id_to_deferred - @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): res = yield defer.gatherResults( @@ -447,7 +454,7 @@ class Keyring(object): ) processed_response = yield self.process_v2_response( - perspective_name, response + perspective_name, response, only_from_server=False ) for server_name, response_keys in processed_response.items(): @@ -527,7 +534,7 @@ class Keyring(object): @defer.inlineCallbacks def process_v2_response(self, from_server, response_json, - requested_ids=[]): + requested_ids=[], only_from_server=True): time_now_ms = self.clock.time_msec() response_keys = {} verify_keys = {} @@ -551,6 +558,13 @@ class Keyring(object): results = {} server_name = response_json["server_name"] + if only_from_server: + if server_name != from_server: + raise ValueError( + "Expected a response for server %r not %r" % ( + from_server, server_name + ) + ) for key_id in response_json["signatures"].get(server_name, {}): if key_id not in response_json["verify_keys"]: raise ValueError( diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6264aa0d9a..11081a0cd5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from synapse.api.errors import LimitExceededError +import synapse.types from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID, Requester - - -import logging +from synapse.api.errors import LimitExceededError +from synapse.types import UserID logger = logging.getLogger(__name__) @@ -124,7 +124,8 @@ class BaseHandler(object): # and having homeservers have their own users leave keeps more # of that decision-making and control local to the guest-having # homeserver. - requester = Requester(target_user, "", True) + requester = synapse.types.create_requester( + target_user, is_guest=True) handler = self.hs.get_handlers().room_member_handler yield handler.update_membership( requester, diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 8f83923ddb..2e138f328f 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -77,6 +77,7 @@ class AuthHandler(BaseHandler): self.ldap_bind_password = hs.config.ldap_bind_password self.hs = hs # FIXME better possibility to access registrationHandler later? + self.device_handler = hs.get_device_handler() @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip): @@ -279,8 +280,17 @@ class AuthHandler(BaseHandler): data = pde.response resp_body = simplejson.loads(data) - if 'success' in resp_body and resp_body['success']: - defer.returnValue(True) + if 'success' in resp_body: + # Note that we do NOT check the hostname here: we explicitly + # intend the CAPTCHA to be presented by whatever client the + # user is using, we just care that they have completed a CAPTCHA. + logger.info( + "%s reCAPTCHA from hostname %s", + "Successful" if resp_body['success'] else "Failed", + resp_body.get('hostname') + ) + if resp_body['success']: + defer.returnValue(True) raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) @defer.inlineCallbacks @@ -365,7 +375,8 @@ class AuthHandler(BaseHandler): return self._check_password(user_id, password) @defer.inlineCallbacks - def get_login_tuple_for_user_id(self, user_id, device_id=None): + def get_login_tuple_for_user_id(self, user_id, device_id=None, + initial_display_name=None): """ Gets login tuple for the user with the given user ID. @@ -374,9 +385,15 @@ class AuthHandler(BaseHandler): The user is assumed to have been authenticated by some other machanism (e.g. CAS), and the user_id converted to the canonical case. + The device will be recorded in the table if it is not there already. + Args: user_id (str): canonical User ID - device_id (str): the device ID to associate with the access token + device_id (str|None): the device ID to associate with the tokens. + None to leave the tokens unassociated with a device (deprecated: + we should always have a device ID) + initial_display_name (str): display name to associate with the + device if it needs re-registering Returns: A tuple of: The access token for the user's session. @@ -388,6 +405,16 @@ class AuthHandler(BaseHandler): logger.info("Logging in user %s on device %s", user_id, device_id) access_token = yield self.issue_access_token(user_id, device_id) refresh_token = yield self.issue_refresh_token(user_id, device_id) + + # the device *should* have been registered before we got here; however, + # it's possible we raced against a DELETE operation. The thing we + # really don't want is active access_tokens without a record of the + # device, so we double-check it here. + if device_id is not None: + yield self.device_handler.check_device_registered( + user_id, device_id, initial_display_name + ) + defer.returnValue((access_token, refresh_token)) @defer.inlineCallbacks diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 8d7d9874f8..f4bf159bb5 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -12,7 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import StoreError + +from synapse.api import errors from synapse.util import stringutils from twisted.internet import defer from ._base import BaseHandler @@ -65,7 +66,116 @@ class DeviceHandler(BaseHandler): ignore_if_known=False, ) defer.returnValue(device_id) - except StoreError: + except errors.StoreError: attempts += 1 - raise StoreError(500, "Couldn't generate a device ID.") + raise errors.StoreError(500, "Couldn't generate a device ID.") + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """ + Retrieve the given user's devices + + Args: + user_id (str): + Returns: + defer.Deferred: list[dict[str, X]]: info on each device + """ + + device_map = yield self.store.get_devices_by_user(user_id) + + ips = yield self.store.get_last_client_ip_by_device( + devices=((user_id, device_id) for device_id in device_map.keys()) + ) + + devices = device_map.values() + for device in devices: + _update_device_from_client_ips(device, ips) + + defer.returnValue(devices) + + @defer.inlineCallbacks + def get_device(self, user_id, device_id): + """ Retrieve the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: dict[str, X]: info on the device + Raises: + errors.NotFoundError: if the device was not found + """ + try: + device = yield self.store.get_device(user_id, device_id) + except errors.StoreError: + raise errors.NotFoundError + ips = yield self.store.get_last_client_ip_by_device( + devices=((user_id, device_id),) + ) + _update_device_from_client_ips(device, ips) + defer.returnValue(device) + + @defer.inlineCallbacks + def delete_device(self, user_id, device_id): + """ Delete the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: + """ + + try: + yield self.store.delete_device(user_id, device_id) + except errors.StoreError, e: + if e.code == 404: + # no match + pass + else: + raise + + yield self.store.user_delete_access_tokens( + user_id, device_id=device_id, + delete_refresh_tokens=True, + ) + + yield self.store.delete_e2e_keys_by_device( + user_id=user_id, device_id=device_id + ) + + @defer.inlineCallbacks + def update_device(self, user_id, device_id, content): + """ Update the given device + + Args: + user_id (str): + device_id (str): + content (dict): body of update request + + Returns: + defer.Deferred: + """ + + try: + yield self.store.update_device( + user_id, + device_id, + new_display_name=content.get("display_name") + ) + except errors.StoreError, e: + if e.code == 404: + raise errors.NotFoundError() + else: + raise + + +def _update_device_from_client_ips(device, client_ips): + ip = client_ips.get((device["user_id"], device["device_id"]), {}) + device.update({ + "last_seen_ts": ip.get("last_seen"), + "last_seen_ip": ip.get("ip"), + }) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fcad41d7b6..187bfc4315 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -124,7 +124,7 @@ class FederationHandler(BaseHandler): try: event_stream_id, max_stream_id = yield self._persist_auth_tree( - auth_chain, state, event + origin, auth_chain, state, event ) except AuthError as e: raise FederationError( @@ -637,7 +637,7 @@ class FederationHandler(BaseHandler): pass event_stream_id, max_stream_id = yield self._persist_auth_tree( - auth_chain, state, event + origin, auth_chain, state, event ) with PreserveLoggingContext(): @@ -1150,11 +1150,19 @@ class FederationHandler(BaseHandler): ) @defer.inlineCallbacks - def _persist_auth_tree(self, auth_events, state, event): + def _persist_auth_tree(self, origin, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event seperately. + Will attempt to fetch missing auth events. + + Args: + origin (str): Where the events came from + auth_events (list) + state (list) + event (Event) + Returns: 2-tuple of (event_stream_id, max_stream_id) from the persist_event call for `event` @@ -1167,7 +1175,7 @@ class FederationHandler(BaseHandler): event_map = { e.event_id: e - for e in auth_events + for e in itertools.chain(auth_events, state, [event]) } create_event = None @@ -1176,10 +1184,29 @@ class FederationHandler(BaseHandler): create_event = e break + missing_auth_events = set() + for e in itertools.chain(auth_events, state, [event]): + for e_id, _ in e.auth_events: + if e_id not in event_map: + missing_auth_events.add(e_id) + + for e_id in missing_auth_events: + m_ev = yield self.replication_layer.get_pdu( + [origin], + e_id, + outlier=True, + timeout=10000, + ) + if m_ev and m_ev.event_id == e_id: + event_map[e_id] = m_ev + else: + logger.info("Failed to find auth event %r", e_id) + for e in itertools.chain(auth_events, state, [event]): auth_for_e = { (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] for e_id, _ in e.auth_events + if e_id in event_map } if create_event: auth_for_e[(EventTypes.Create, "")] = create_event diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 711a6a567f..d9ac09078d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -13,15 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer +import synapse.types from synapse.api.errors import SynapseError, AuthError, CodeMessageException -from synapse.types import UserID, Requester - +from synapse.types import UserID from ._base import BaseHandler -import logging - logger = logging.getLogger(__name__) @@ -165,7 +165,9 @@ class ProfileHandler(BaseHandler): try: # Assume the user isn't a guest because we don't let guests set # profile or avatar data. - requester = Requester(user, "", False) + # XXX why are we recreating `requester` here for each room? + # what was wrong with the `requester` we were passed? + requester = synapse.types.create_requester(user) yield handler.update_membership( requester, user, diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 94b19d0cb0..dd75c4fecf 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -14,18 +14,19 @@ # limitations under the License. """Contains functions for registering clients.""" +import logging +import urllib + from twisted.internet import defer -from synapse.types import UserID, Requester +import synapse.types from synapse.api.errors import ( AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError ) -from ._base import BaseHandler -from synapse.util.async import run_on_reactor from synapse.http.client import CaptchaServerHttpClient - -import logging -import urllib +from synapse.types import UserID +from synapse.util.async import run_on_reactor +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -52,6 +53,13 @@ class RegistrationHandler(BaseHandler): Codes.INVALID_USERNAME ) + if localpart[0] == '_': + raise SynapseError( + 400, + "User ID may not begin with _", + Codes.INVALID_USERNAME + ) + user = UserID(localpart, self.hs.hostname) user_id = user.to_string() @@ -410,8 +418,9 @@ class RegistrationHandler(BaseHandler): if displayname is not None: logger.info("setting user display name: %s -> %s", user_id, displayname) profile_handler = self.hs.get_handlers().profile_handler + requester = synapse.types.create_requester(user) yield profile_handler.set_displayname( - user, Requester(user, token, False), displayname + user, requester, displayname ) defer.returnValue((user_id, token)) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7e616f44fd..8cec8fc4ed 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,24 +14,22 @@ # limitations under the License. -from twisted.internet import defer +import logging -from ._base import BaseHandler +from signedjson.key import decode_verify_key_bytes +from signedjson.sign import verify_signed_json +from twisted.internet import defer +from unpaddedbase64 import decode_base64 -from synapse.types import UserID, RoomID, Requester +import synapse.types from synapse.api.constants import ( EventTypes, Membership, ) from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room - -from signedjson.sign import verify_signed_json -from signedjson.key import decode_verify_key_bytes - -from unpaddedbase64 import decode_base64 - -import logging +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -315,7 +313,7 @@ class RoomMemberHandler(BaseHandler): ) assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) else: - requester = Requester(target_user, None, False) + requester = synapse.types.create_requester(target_user) message_handler = self.hs.get_handlers().message_handler prev_event = message_handler.deduplicate_state_event(event, context) diff --git a/synapse/http/server.py b/synapse/http/server.py index f705abab94..2b3c05a740 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -205,6 +205,7 @@ class JsonResource(HttpServer, resource.Resource): def register_paths(self, method, path_patterns, callback): for path_pattern in path_patterns: + logger.debug("Registering for %s %s", method, path_pattern.pattern) self.path_regexs.setdefault(method, []).append( self._PathEntry(path_pattern, callback) ) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 12a3ec7fd8..6600c9cd55 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -14,6 +14,7 @@ # limitations under the License. from twisted.internet import defer, reactor +from twisted.internet.error import AlreadyCalled, AlreadyCancelled import logging @@ -92,7 +93,11 @@ class EmailPusher(object): def on_stop(self): if self.timed_call: - self.timed_call.cancel() + try: + self.timed_call.cancel() + except (AlreadyCalled, AlreadyCancelled): + pass + self.timed_call = None @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): @@ -140,9 +145,8 @@ class EmailPusher(object): being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( - self.user_id, start, self.max_stream_ordering - ) + fn = self.store.get_unread_push_actions_for_user_in_range_for_email + unprocessed = yield fn(self.user_id, start, self.max_stream_ordering) soonest_due_at = None @@ -190,7 +194,10 @@ class EmailPusher(object): soonest_due_at = should_notify_at if self.timed_call is not None: - self.timed_call.cancel() + try: + self.timed_call.cancel() + except (AlreadyCalled, AlreadyCancelled): + pass self.timed_call = None if soonest_due_at is not None: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 2acc6cc214..feedb075e2 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -16,6 +16,7 @@ from synapse.push import PusherConfigException from twisted.internet import defer, reactor +from twisted.internet.error import AlreadyCalled, AlreadyCancelled import logging import push_rule_evaluator @@ -109,7 +110,11 @@ class HttpPusher(object): def on_stop(self): if self.timed_call: - self.timed_call.cancel() + try: + self.timed_call.cancel() + except (AlreadyCalled, AlreadyCancelled): + pass + self.timed_call = None @defer.inlineCallbacks def _process(self): @@ -141,7 +146,8 @@ class HttpPusher(object): run once per pusher. """ - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( + fn = self.store.get_unread_push_actions_for_user_in_range_for_http + unprocessed = yield fn( self.user_id, self.last_stream_ordering, self.max_stream_ordering ) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 6f2d1ad57d..d555a33e9a 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -54,7 +54,7 @@ def get_context_for_event(state_handler, ev, user_id): room_state = yield state_handler.get_current_state(ev.room_id) # we no longer bother setting room_alias, and make room_name the - # human-readable name instead, be that m.room.namer, an alias or + # human-readable name instead, be that m.room.name, an alias or # a list of people in the room name = calculate_room_name( room_state, user_id, fallback_to_single_member=False diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py new file mode 100644 index 0000000000..5fbe3a303a --- /dev/null +++ b/synapse/replication/slave/storage/directory.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.directory import DirectoryStore + + +class DirectoryStore(BaseSlavedStore): + get_aliases_for_room = DirectoryStore.__dict__[ + "get_aliases_for_room" + ].orig diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 369d839464..f4f31f2d27 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -93,8 +93,11 @@ class SlavedEventStore(BaseSlavedStore): StreamStore.__dict__["get_recent_event_ids_for_room"] ) - get_unread_push_actions_for_user_in_range = ( - DataStore.get_unread_push_actions_for_user_in_range.__func__ + get_unread_push_actions_for_user_in_range_for_http = ( + DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ + ) + get_unread_push_actions_for_user_in_range_for_email = ( + DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__ ) get_push_action_users_in_range = ( DataStore.get_push_action_users_in_range.__func__ @@ -142,6 +145,15 @@ class SlavedEventStore(BaseSlavedStore): _get_events_around_txn = DataStore._get_events_around_txn.__func__ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ + get_backfill_events = DataStore.get_backfill_events.__func__ + _get_backfill_events = DataStore._get_backfill_events.__func__ + get_missing_events = DataStore.get_missing_events.__func__ + _get_missing_events = DataStore._get_missing_events.__func__ + + get_auth_chain = DataStore.get_auth_chain.__func__ + get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__ + _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py new file mode 100644 index 0000000000..dd2ae49e48 --- /dev/null +++ b/synapse/replication/slave/storage/keys.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.keys import KeyStore + + +class SlavedKeyStore(BaseSlavedStore): + _get_server_verify_key = KeyStore.__dict__[ + "_get_server_verify_key" + ] + + get_server_verify_keys = DataStore.get_server_verify_keys.__func__ + store_server_verify_key = DataStore.store_server_verify_key.__func__ + + get_server_certificate = DataStore.get_server_certificate.__func__ + store_server_certificate = DataStore.store_server_certificate.__func__ + + get_server_keys_json = DataStore.get_server_keys_json.__func__ + store_server_keys_json = DataStore.store_server_keys_json.__func__ diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py new file mode 100644 index 0000000000..d5bb0f98ea --- /dev/null +++ b/synapse/replication/slave/storage/room.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore + + +class RoomStore(BaseSlavedStore): + get_public_room_ids = DataStore.get_public_room_ids.__func__ diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py new file mode 100644 index 0000000000..6f2ba98af5 --- /dev/null +++ b/synapse/replication/slave/storage/transactions.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.transactions import TransactionStore + + +class TransactionStore(BaseSlavedStore): + get_destination_retry_timings = TransactionStore.__dict__[ + "get_destination_retry_timings" + ].orig + _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ + + # For now, don't record the destination rety timings + def set_destination_retry_timings(*args, **kwargs): + return defer.succeed(None) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 8b223e032b..14227f1cdb 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -46,6 +46,7 @@ from synapse.rest.client.v2_alpha import ( account_data, report_event, openid, + devices, ) from synapse.http.server import JsonResource @@ -90,3 +91,4 @@ class ClientRestResource(JsonResource): account_data.register_servlets(hs, client_resource) report_event.register_servlets(hs, client_resource) openid.register_servlets(hs, client_resource) + devices.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index e8b791519c..92fcae674a 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -152,7 +152,10 @@ class LoginRestServlet(ClientV1RestServlet): ) device_id = yield self._register_device(user_id, login_submission) access_token, refresh_token = ( - yield auth_handler.get_login_tuple_for_user_id(user_id, device_id) + yield auth_handler.get_login_tuple_for_user_id( + user_id, device_id, + login_submission.get("initial_device_display_name") + ) ) result = { "user_id": user_id, # may have changed @@ -173,7 +176,10 @@ class LoginRestServlet(ClientV1RestServlet): ) device_id = yield self._register_device(user_id, login_submission) access_token, refresh_token = ( - yield auth_handler.get_login_tuple_for_user_id(user_id, device_id) + yield auth_handler.get_login_tuple_for_user_id( + user_id, device_id, + login_submission.get("initial_device_display_name") + ) ) result = { "user_id": user_id, # may have changed @@ -262,7 +268,8 @@ class LoginRestServlet(ClientV1RestServlet): ) access_token, refresh_token = ( yield auth_handler.get_login_tuple_for_user_id( - registered_user_id, device_id + registered_user_id, device_id, + login_submission.get("initial_device_display_name") ) ) result = { diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index b6faa2b0e6..20e765f48f 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -25,7 +25,9 @@ import logging logger = logging.getLogger(__name__) -def client_v2_patterns(path_regex, releases=(0,)): +def client_v2_patterns(path_regex, releases=(0,), + v2_alpha=True, + unstable=True): """Creates a regex compiled client path with the correct client path prefix. @@ -35,9 +37,12 @@ def client_v2_patterns(path_regex, releases=(0,)): Returns: SRE_Pattern """ - patterns = [re.compile("^" + CLIENT_V2_ALPHA_PREFIX + path_regex)] - unstable_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/unstable") - patterns.append(re.compile("^" + unstable_prefix + path_regex)) + patterns = [] + if v2_alpha: + patterns.append(re.compile("^" + CLIENT_V2_ALPHA_PREFIX + path_regex)) + if unstable: + unstable_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/unstable") + patterns.append(re.compile("^" + unstable_prefix + path_regex)) for release in releases: new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release) patterns.append(re.compile("^" + new_prefix + path_regex)) diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py new file mode 100644 index 0000000000..8fbd3d3dfc --- /dev/null +++ b/synapse/rest/client/v2_alpha/devices.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.http import servlet +from ._base import client_v2_patterns + +logger = logging.getLogger(__name__) + + +class DevicesRestServlet(servlet.RestServlet): + PATTERNS = client_v2_patterns("/devices$", releases=[], v2_alpha=False) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(DevicesRestServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.device_handler = hs.get_device_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request) + devices = yield self.device_handler.get_devices_by_user( + requester.user.to_string() + ) + defer.returnValue((200, {"devices": devices})) + + +class DeviceRestServlet(servlet.RestServlet): + PATTERNS = client_v2_patterns("/devices/(?P<device_id>[^/]*)$", + releases=[], v2_alpha=False) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(DeviceRestServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.device_handler = hs.get_device_handler() + + @defer.inlineCallbacks + def on_GET(self, request, device_id): + requester = yield self.auth.get_user_by_req(request) + device = yield self.device_handler.get_device( + requester.user.to_string(), + device_id, + ) + defer.returnValue((200, device)) + + @defer.inlineCallbacks + def on_DELETE(self, request, device_id): + # XXX: it's not completely obvious we want to expose this endpoint. + # It allows the client to delete access tokens, which feels like a + # thing which merits extra auth. But if we want to do the interactive- + # auth dance, we should really make it possible to delete more than one + # device at a time. + requester = yield self.auth.get_user_by_req(request) + yield self.device_handler.delete_device( + requester.user.to_string(), + device_id, + ) + defer.returnValue((200, {})) + + @defer.inlineCallbacks + def on_PUT(self, request, device_id): + requester = yield self.auth.get_user_by_req(request) + + body = servlet.parse_json_object_from_request(request) + yield self.device_handler.update_device( + requester.user.to_string(), + device_id, + body + ) + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + DevicesRestServlet(hs).register(http_server) + DeviceRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 89ab39491c..dc1d4d8fc6 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -13,24 +13,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +import simplejson as json +from canonicaljson import encode_canonical_json from twisted.internet import defer +import synapse.api.errors +import synapse.server +import synapse.types from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID - -from canonicaljson import encode_canonical_json - from ._base import client_v2_patterns -import logging -import simplejson as json - logger = logging.getLogger(__name__) class KeyUploadServlet(RestServlet): """ - POST /keys/upload/<device_id> HTTP/1.1 + POST /keys/upload HTTP/1.1 Content-Type: application/json { @@ -53,23 +54,45 @@ class KeyUploadServlet(RestServlet): }, } """ - PATTERNS = client_v2_patterns("/keys/upload/(?P<device_id>[^/]*)", releases=()) + PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$", + releases=()) def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ super(KeyUploadServlet, self).__init__() self.store = hs.get_datastore() self.clock = hs.get_clock() self.auth = hs.get_auth() + self.device_handler = hs.get_device_handler() @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() - # TODO: Check that the device_id matches that in the authentication - # or derive the device_id from the authentication instead. body = parse_json_object_from_request(request) + if device_id is not None: + # passing the device_id here is deprecated; however, we allow it + # for now for compatibility with older clients. + if (requester.device_id is not None and + device_id != requester.device_id): + logger.warning("Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, device_id) + else: + device_id = requester.device_id + + if device_id is None: + raise synapse.api.errors.SynapseError( + 400, + "To upload keys, you must pass device_id when authenticating" + ) + time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. @@ -102,13 +125,14 @@ class KeyUploadServlet(RestServlet): user_id, device_id, time_now, key_list ) - result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - defer.returnValue((200, {"one_time_key_counts": result})) - - @defer.inlineCallbacks - def on_GET(self, request, device_id): - requester = yield self.auth.get_user_by_req(request) - user_id = requester.user.to_string() + # the device should have been registered already, but it may have been + # deleted due to a race with a DELETE request. Or we may be using an + # old access_token without an associated device_id. Either way, we + # need to double-check the device is registered to avoid ending up with + # keys without a corresponding device. + self.device_handler.check_device_registered( + user_id, device_id, "unknown device" + ) result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue((200, {"one_time_key_counts": result})) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index d401722224..943f5676a3 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -196,12 +196,12 @@ class RegisterRestServlet(RestServlet): [LoginType.EMAIL_IDENTITY] ] - authed, result, params, session_id = yield self.auth_handler.check_auth( + authed, auth_result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) ) if not authed: - defer.returnValue((401, result)) + defer.returnValue((401, auth_result)) return if registered_user_id is not None: @@ -236,18 +236,18 @@ class RegisterRestServlet(RestServlet): add_email = True - result = yield self._create_registration_details( - registered_user_id, body + return_dict = yield self._create_registration_details( + registered_user_id, params ) - if add_email and result and LoginType.EMAIL_IDENTITY in result: - threepid = result[LoginType.EMAIL_IDENTITY] + if add_email and auth_result and LoginType.EMAIL_IDENTITY in auth_result: + threepid = auth_result[LoginType.EMAIL_IDENTITY] yield self._register_email_threepid( - registered_user_id, threepid, result["access_token"], + registered_user_id, threepid, return_dict["access_token"], params.get("bind_email") ) - defer.returnValue((200, result)) + defer.returnValue((200, return_dict)) def on_OPTIONS(self, _): return 200, {} @@ -356,10 +356,8 @@ class RegisterRestServlet(RestServlet): else: logger.info("bind_email not specified: not binding email") - defer.returnValue() - @defer.inlineCallbacks - def _create_registration_details(self, user_id, body): + def _create_registration_details(self, user_id, params): """Complete registration of newly-registered user Allocates device_id if one was not given; also creates access_token @@ -367,21 +365,20 @@ class RegisterRestServlet(RestServlet): Args: (str) user_id: full canonical @user:id - (object) body: dictionary supplied to /register call, from - which we pull device_id and initial_device_name - + (object) params: registration parameters, from which we pull + device_id and initial_device_name Returns: defer.Deferred: (object) dictionary for response from /register """ - device_id = yield self._register_device(user_id, body) + device_id = yield self._register_device(user_id, params) - access_token = yield self.auth_handler.issue_access_token( - user_id, device_id=device_id + access_token, refresh_token = ( + yield self.auth_handler.get_login_tuple_for_user_id( + user_id, device_id=device_id, + initial_display_name=params.get("initial_device_display_name") + ) ) - refresh_token = yield self.auth_handler.issue_refresh_token( - user_id, device_id=device_id - ) defer.returnValue({ "user_id": user_id, "access_token": access_token, @@ -390,7 +387,7 @@ class RegisterRestServlet(RestServlet): "device_id": device_id, }) - def _register_device(self, user_id, body): + def _register_device(self, user_id, params): """Register a device for a user. This is called after the user's credentials have been validated, but @@ -398,14 +395,14 @@ class RegisterRestServlet(RestServlet): Args: (str) user_id: full canonical @user:id - (object) body: dictionary supplied to /register call, from - which we pull device_id and initial_device_name + (object) params: registration parameters, from which we pull + device_id and initial_device_name Returns: defer.Deferred: (str) device_id """ # register the user's device - device_id = body.get("device_id") - initial_display_name = body.get("initial_device_display_name") + device_id = params.get("device_id") + initial_display_name = params.get("initial_device_display_name") device_id = self.device_handler.check_device_registered( user_id, device_id, initial_display_name ) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index ca5468c402..e984ea47db 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -26,7 +26,11 @@ class VersionsRestServlet(RestServlet): def on_GET(self, request): return (200, { - "versions": ["r0.0.1"] + "versions": [ + "r0.0.1", + "r0.1.0", + "r0.2.0", + ] }) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 66a995157d..30d0e4c5dc 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from . import engines from twisted.internet import defer @@ -87,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - while True: - if self._background_update_timer is not None: - return + assert self._background_update_timer is None, \ + "background updates already running" + + logger.info("Starting background schema updates") + while True: sleep = defer.Deferred() self._background_update_timer = self._clock.call_later( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None @@ -101,22 +104,23 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_timer = None try: - result = yield self.do_background_update( + result = yield self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except: logger.exception("Error doing update") - - if result is None: - logger.info( - "No more background updates to do." - " Unscheduling background update task." - ) - return + else: + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + defer.returnValue(None) @defer.inlineCallbacks - def do_background_update(self, desired_duration_ms): - """Does some amount of work on a background update + def do_next_background_update(self, desired_duration_ms): + """Does some amount of work on the next queued background update + Args: desired_duration_ms(float): How long we want to spend updating. @@ -135,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_queue.append(update['update_name']) if not self._background_update_queue: + # no work left to do defer.returnValue(None) + # pop from the front, and add back to the back update_name = self._background_update_queue.pop(0) self._background_update_queue.append(update_name) + res = yield self._do_background_update(update_name, desired_duration_ms) + defer.returnValue(res) + + @defer.inlineCallbacks + def _do_background_update(self, update_name, desired_duration_ms): + logger.info("Starting update batch on background update '%s'", + update_name) + update_handler = self._background_update_handlers[update_name] performance = self._background_update_performance.get(update_name) @@ -202,6 +216,64 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_background_index_update(self, update_name, index_name, + table, columns): + """Helper for store classes to do a background index addition + + To use: + + 1. use a schema delta file to add a background update. Example: + INSERT INTO background_updates (update_name, progress_json) VALUES + ('my_new_index', '{}'); + + 2. In the Store constructor, call this method + + Args: + update_name (str): update_name to register for + index_name (str): name of index to add + table (str): table to add index to + columns (list[str]): columns/expressions to include in index + """ + + # if this is postgres, we add the indexes concurrently. Otherwise + # we fall back to doing it inline + if isinstance(self.database_engine, engines.PostgresEngine): + conc = True + else: + conc = False + + sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ + % { + "conc": "CONCURRENTLY" if conc else "", + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + + def create_index_concurrently(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + c = conn.cursor() + c.execute(sql) + conn.set_session(autocommit=False) + + def create_index(conn): + c = conn.cursor() + c.execute(sql) + + @defer.inlineCallbacks + def updater(progress, batch_size): + logger.info("Adding index %s to %s", index_name, table) + if conc: + yield self.runWithConnection(create_index_concurrently) + else: + yield self.runWithConnection(create_index) + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, updater) + def start_background_update(self, update_name, progress): """Starts a background update running. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 74330a8ddf..71e5ea112f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -13,10 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, Cache +import logging from twisted.internet import defer +from ._base import Cache +from . import background_updates + +logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits @@ -24,8 +28,7 @@ from twisted.internet import defer LAST_SEEN_GRANULARITY = 120 * 1000 -class ClientIpStore(SQLBaseStore): - +class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", @@ -34,6 +37,13 @@ class ClientIpStore(SQLBaseStore): super(ClientIpStore, self).__init__(hs) + self.register_background_index_update( + "user_ips_device_index", + index_name="user_ips_device_id", + table="user_ips", + columns=["user_id", "device_id", "last_seen"], + ) + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) @@ -67,3 +77,69 @@ class ClientIpStore(SQLBaseStore): desc="insert_client_ip", lock=False, ) + + @defer.inlineCallbacks + def get_last_client_ip_by_device(self, devices): + """For each device_id listed, give the user_ip it was last seen on + + Args: + devices (iterable[(str, str)]): list of (user_id, device_id) pairs + + Returns: + defer.Deferred: resolves to a dict, where the keys + are (user_id, device_id) tuples. The values are also dicts, with + keys giving the column names + """ + + res = yield self.runInteraction( + "get_last_client_ip_by_device", + self._get_last_client_ip_by_device_txn, + retcols=( + "user_id", + "access_token", + "ip", + "user_agent", + "device_id", + "last_seen", + ), + devices=devices + ) + + ret = {(d["user_id"], d["device_id"]): d for d in res} + defer.returnValue(ret) + + @classmethod + def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols): + where_clauses = [] + bindings = [] + for (user_id, device_id) in devices: + if device_id is None: + where_clauses.append("(user_id = ? AND device_id IS NULL)") + bindings.extend((user_id, )) + else: + where_clauses.append("(user_id = ? AND device_id = ?)") + bindings.extend((user_id, device_id)) + + inner_select = ( + "SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips " + "WHERE %(where)s " + "GROUP BY user_id, device_id" + ) % { + "where": " OR ".join(where_clauses), + } + + sql = ( + "SELECT %(retcols)s FROM user_ips " + "JOIN (%(inner_select)s) ips ON" + " user_ips.last_seen = ips.mls AND" + " user_ips.user_id = ips.user_id AND" + " (user_ips.device_id = ips.device_id OR" + " (user_ips.device_id IS NULL AND ips.device_id IS NULL)" + " )" + ) % { + "retcols": ",".join("user_ips." + c for c in retcols), + "inner_select": inner_select, + } + + txn.execute(sql, bindings) + return cls.cursor_to_dict(txn) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 9065e96d28..afd6530cab 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -65,7 +65,7 @@ class DeviceStore(SQLBaseStore): user_id (str): The ID of the user which owns the device device_id (str): The ID of the device to retrieve Returns: - defer.Deferred for a namedtuple containing the device information + defer.Deferred for a dict containing the device information Raises: StoreError: if the device is not found """ @@ -75,3 +75,63 @@ class DeviceStore(SQLBaseStore): retcols=("user_id", "device_id", "display_name"), desc="get_device", ) + + def delete_device(self, user_id, device_id): + """Delete a device. + + Args: + user_id (str): The ID of the user which owns the device + device_id (str): The ID of the device to delete + Returns: + defer.Deferred + """ + return self._simple_delete_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_device", + ) + + def update_device(self, user_id, device_id, new_display_name=None): + """Update a device. + + Args: + user_id (str): The ID of the user which owns the device + device_id (str): The ID of the device to update + new_display_name (str|None): new displayname for device; None + to leave unchanged + Raises: + StoreError: if the device is not found + Returns: + defer.Deferred + """ + updates = {} + if new_display_name is not None: + updates["display_name"] = new_display_name + if not updates: + return defer.succeed(None) + return self._simple_update_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + updatevalues=updates, + desc="update_device", + ) + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """Retrieve all of a user's registered devices. + + Args: + user_id (str): + Returns: + defer.Deferred: resolves to a dict from device_id to a dict + containing "device_id", "user_id" and "display_name" for each + device. + """ + devices = yield self._simple_select_list( + table="devices", + keyvalues={"user_id": user_id}, + retcols=("user_id", "device_id", "display_name"), + desc="get_devices_by_user" + ) + + defer.returnValue({d["device_id"]: d for d in devices}) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 2e89066515..62b7790e91 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import twisted.internet.defer + from ._base import SQLBaseStore @@ -123,3 +125,16 @@ class EndToEndKeyStore(SQLBaseStore): return self.runInteraction( "claim_e2e_one_time_keys", _claim_e2e_one_time_keys ) + + @twisted.internet.defer.inlineCallbacks + def delete_e2e_keys_by_device(self, user_id, device_id): + yield self._simple_delete( + table="e2e_device_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_e2e_device_keys_by_device" + ) + yield self._simple_delete( + table="e2e_one_time_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_e2e_one_time_keys_by_device" + ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3d93285f84..df4000d0da 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -117,24 +117,42 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks - def get_unread_push_actions_for_user_in_range(self, user_id, - min_stream_ordering, - max_stream_ordering=None, - limit=20): + def get_unread_push_actions_for_user_in_range_for_http( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. Called by the httppusher. + + Args: + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions". + The list will be ordered by ascending stream_ordering. + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the next + # push actions def get_after_receipt(txn): + # find rooms that have a read receipt in them and return the next + # push actions sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " - "e.received_ts " - "FROM (" - " SELECT room_id, user_id, " - " max(topological_ordering) as topological_ordering, " - " max(stream_ordering) as stream_ordering " - " FROM events" - " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" - " GROUP BY room_id, user_id" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" ") AS rl," " event_push_actions AS ep" - " INNER JOIN events AS e USING (room_id, event_id)" " WHERE" " ep.room_id = rl.room_id" " AND (" @@ -144,44 +162,159 @@ class EventPushActionsStore(SQLBaseStore): " AND ep.stream_ordering > rl.stream_ordering" " )" " )" - " AND ep.stream_ordering > ?" " AND ep.user_id = ?" - " AND ep.user_id = rl.user_id" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" ) - args = [min_stream_ordering, user_id] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_after_receipt + "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt ) + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. def get_no_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," " e.received_ts" " FROM event_push_actions AS ep" - " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" - " WHERE ep.room_id not in (" - " SELECT room_id FROM events NATURAL JOIN receipts_linearized" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + no_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt + ) + + notifs = [ + { + "event_id": row[0], + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + } for row in after_read_receipt + no_read_receipt + ] + + # Now sort it so it's ordered correctly, since currently it will + # contain results from the first query, correctly ordered, followed + # by results from the second query, but we want them all ordered + # by stream_ordering, oldest first. + notifs.sort(key=lambda r: r['stream_ordering']) + + # Take only up to the limit. We have to stop at the limit because + # one of the subqueries may have hit the limit. + defer.returnValue(notifs[:limit]) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range_for_email( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. Called by the emailpusher + + Args: + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions", "received_ts". + The list will be ordered by descending received_ts. + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the most recent + # push actions + def get_after_receipt(txn): + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" " WHERE receipt_type = 'm.read' AND user_id = ?" " GROUP BY room_id" - ") AND ep.user_id = ? AND ep.stream_ordering > ?" + ") AS rl," + " event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id = rl.room_id" + " AND (" + " ep.topological_ordering > rl.topological_ordering" + " OR (" + " ep.topological_ordering = rl.topological_ordering" + " AND ep.stream_ordering > rl.stream_ordering" + " )" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + after_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt + ) + + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. + def get_no_receipt(txn): + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" ) - args = [user_id, user_id, min_stream_ordering] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() no_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_no_receipt + "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt ) # Make a list of dicts from the two sets of results. @@ -198,7 +331,7 @@ class EventPushActionsStore(SQLBaseStore): # Now sort it so it's ordered correctly, since currently it will # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered - # by received_ts + # by received_ts (most recent first) notifs.sort(key=lambda r: -(r['received_ts'] or 0)) # Now return the first `limit` diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6610549281..c63ca36df6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -397,6 +397,12 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled): + """Insert some number of room events into the necessary database tables. + + Rejected events are only inserted into the events table, the events_json table, + and the rejections table. Things reading from those table will need to check + whether the event was rejected. + """ depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -407,21 +413,11 @@ class EventsStore(SQLBaseStore): event.room_id, event.internal_metadata.stream_ordering, ) - if not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier() and not context.rejected: depth_updates[event.room_id] = max( event.depth, depth_updates.get(event.room_id, event.depth) ) - if context.push_actions: - self._set_push_actions_for_event_and_users_txn( - txn, event, context.push_actions - ) - - if event.type == EventTypes.Redaction and event.redacts is not None: - self._remove_push_actions_for_event_id_txn( - txn, event.room_id, event.redacts - ) - for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) @@ -431,14 +427,24 @@ class EventsStore(SQLBaseStore): ), [event.event_id for event, _ in events_and_contexts] ) + have_persisted = { event_id: outlier for event_id, outlier in txn.fetchall() } + # Remove the events that we've seen before. event_map = {} to_remove = set() for event, context in events_and_contexts: + if context.rejected: + # If the event is rejected then we don't care if the event + # was an outlier or not. + if event.event_id in have_persisted: + # If we have already seen the event then ignore it. + to_remove.add(event) + continue + # Handle the case of the list including the same event multiple # times. The tricky thing here is when they differ by whether # they are an outlier. @@ -463,6 +469,12 @@ class EventsStore(SQLBaseStore): outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: + # We received a copy of an event that we had already stored as + # an outlier in the database. We now have some state at that + # so we need to update the state_groups table with that state. + + # insert into the state_group, state_groups_state and + # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, ((event, context),)) metadata_json = encode_json( @@ -478,6 +490,8 @@ class EventsStore(SQLBaseStore): (metadata_json, event.event_id,) ) + # Add an entry to the ex_outlier_stream table to replicate the + # change in outlier status to our workers. stream_order = event.internal_metadata.stream_ordering state_group_id = context.state_group or context.new_state_group_id self._simple_insert_txn( @@ -499,6 +513,8 @@ class EventsStore(SQLBaseStore): (False, event.event_id,) ) + # Update the event_backward_extremities table now that this + # event isn't an outlier any more. self._update_extremeties(txn, [event]) events_and_contexts = [ @@ -506,38 +522,12 @@ class EventsStore(SQLBaseStore): ] if not events_and_contexts: + # Make sure we don't pass an empty list to functions that expect to + # be storing at least one element. return - self._store_mult_state_groups_txn(txn, events_and_contexts) - - self._handle_mult_prev_events( - txn, - events=[event for event, _ in events_and_contexts], - ) - - for event, _ in events_and_contexts: - if event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Message: - self._store_room_message_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - elif event.type == EventTypes.RoomHistoryVisibility: - self._store_history_visibility_txn(txn, event) - elif event.type == EventTypes.GuestAccess: - self._store_guest_access_txn(txn, event) - - self._store_room_members_txn( - txn, - [ - event - for event, _ in events_and_contexts - if event.type == EventTypes.Member - ], - backfilled=backfilled, - ) + # From this point onwards the events are only events that we haven't + # seen before. def event_dict(event): return { @@ -591,10 +581,41 @@ class EventsStore(SQLBaseStore): ], ) - if context.rejected: - self._store_rejections_txn( - txn, event.event_id, context.rejected - ) + # Remove the rejected events from the list now that we've added them + # to the events table and the events_json table. + to_remove = set() + for event, context in events_and_contexts: + if context.rejected: + # Insert the event_id into the rejections table + self._store_rejections_txn( + txn, event.event_id, context.rejected + ) + to_remove.add(event) + + events_and_contexts = [ + ec for ec in events_and_contexts if ec[0] not in to_remove + ] + + if not events_and_contexts: + # Make sure we don't pass an empty list to functions that expect to + # be storing at least one element. + return + + # From this point onwards the events are only ones that weren't rejected. + + for event, context in events_and_contexts: + # Insert all the push actions into the event_push_actions table. + if context.push_actions: + self._set_push_actions_for_event_and_users_txn( + txn, event, context.push_actions + ) + + if event.type == EventTypes.Redaction and event.redacts is not None: + # Remove the entries in the event_push_actions table for the + # redacted event. + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts + ) self._simple_insert_many_txn( txn, @@ -610,6 +631,49 @@ class EventsStore(SQLBaseStore): ], ) + # Insert into the state_groups, state_groups_state, and + # event_to_state_groups tables. + self._store_mult_state_groups_txn(txn, events_and_contexts) + + # Update the event_forward_extremities, event_backward_extremities and + # event_edges tables. + self._handle_mult_prev_events( + txn, + events=[event for event, _ in events_and_contexts], + ) + + for event, _ in events_and_contexts: + if event.type == EventTypes.Name: + # Insert into the room_names and event_search tables. + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + # Insert into the topics table and event_search table. + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Message: + # Insert into the event_search table. + self._store_room_message_txn(txn, event) + elif event.type == EventTypes.Redaction: + # Insert into the redactions table. + self._store_redaction(txn, event) + elif event.type == EventTypes.RoomHistoryVisibility: + # Insert into the event_search table. + self._store_history_visibility_txn(txn, event) + elif event.type == EventTypes.GuestAccess: + # Insert into the event_search table. + self._store_guest_access_txn(txn, event) + + # Insert into the room_memberships table. + self._store_room_members_txn( + txn, + [ + event + for event, _ in events_and_contexts + if event.type == EventTypes.Member + ], + backfilled=backfilled, + ) + + # Insert event_reference_hashes table. self._store_event_reference_hashes_txn( txn, [event for event, _ in events_and_contexts] ) @@ -654,6 +718,7 @@ class EventsStore(SQLBaseStore): ], ) + # Prefill the event cache self._add_to_cache(txn, events_and_contexts) if backfilled: @@ -666,11 +731,6 @@ class EventsStore(SQLBaseStore): # Outlier events shouldn't clobber the current state. continue - if context.rejected: - # If the event failed it's auth checks then it shouldn't - # clobbler the current state. - continue - txn.call_after( self._get_current_state_for_key.invalidate, (event.room_id, event.type, event.state_key,) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index a495a8a7d9..86b37b9ddd 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -22,6 +22,10 @@ import OpenSSL from signedjson.key import decode_verify_key_bytes import hashlib +import logging + +logger = logging.getLogger(__name__) + class KeyStore(SQLBaseStore): """Persistence for signature verification keys and tls X.509 certificates @@ -74,22 +78,22 @@ class KeyStore(SQLBaseStore): ) @cachedInlineCallbacks() - def get_all_server_verify_keys(self, server_name): - rows = yield self._simple_select_list( + def _get_server_verify_key(self, server_name, key_id): + verify_key_bytes = yield self._simple_select_one_onecol( table="server_signature_keys", keyvalues={ "server_name": server_name, + "key_id": key_id, }, - retcols=["key_id", "verify_key"], - desc="get_all_server_verify_keys", + retcol="verify_key", + desc="_get_server_verify_key", + allow_none=True, ) - defer.returnValue({ - row["key_id"]: decode_verify_key_bytes( - row["key_id"], str(row["verify_key"]) - ) - for row in rows - }) + if verify_key_bytes: + defer.returnValue(decode_verify_key_bytes( + key_id, str(verify_key_bytes) + )) @defer.inlineCallbacks def get_server_verify_keys(self, server_name, key_ids): @@ -101,12 +105,12 @@ class KeyStore(SQLBaseStore): Returns: (list of VerifyKey): The verification keys. """ - keys = yield self.get_all_server_verify_keys(server_name) - defer.returnValue({ - k: keys[k] - for k in key_ids - if k in keys and keys[k] - }) + keys = {} + for key_id in key_ids: + key = yield self._get_server_verify_key(server_name, key_id) + if key: + keys[key_id] = key + defer.returnValue(keys) @defer.inlineCallbacks def store_server_verify_key(self, server_name, from_server, time_now_ms, @@ -133,8 +137,6 @@ class KeyStore(SQLBaseStore): desc="store_server_verify_key", ) - self.get_all_server_verify_keys.invalidate((server_name,)) - def store_server_keys_json(self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes): """Stores the JSON bytes for a set of keys from a server diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9a92b35361..7e7d32eb66 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -18,18 +18,31 @@ import re from twisted.internet import defer from synapse.api.errors import StoreError, Codes - -from ._base import SQLBaseStore +from synapse.storage import background_updates from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -class RegistrationStore(SQLBaseStore): +class RegistrationStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): super(RegistrationStore, self).__init__(hs) self.clock = hs.get_clock() + self.register_background_index_update( + "access_tokens_device_index", + index_name="access_tokens_device_id", + table="access_tokens", + columns=["user_id", "device_id"], + ) + + self.register_background_index_update( + "refresh_tokens_device_index", + index_name="refresh_tokens_device_id", + table="refresh_tokens", + columns=["user_id", "device_id"], + ) + @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id=None): """Adds an access token for the given user. @@ -238,16 +251,37 @@ class RegistrationStore(SQLBaseStore): self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[]): - def f(txn): - sql = "SELECT token FROM access_tokens WHERE user_id = ?" + def user_delete_access_tokens(self, user_id, except_token_ids=[], + device_id=None, + delete_refresh_tokens=False): + """ + Invalidate access/refresh tokens belonging to a user + + Args: + user_id (str): ID of user the tokens belong to + except_token_ids (list[str]): list of access_tokens which should + *not* be deleted + device_id (str|None): ID of device the tokens are associated with. + If None, tokens associated with any device (or no device) will + be deleted + delete_refresh_tokens (bool): True to delete refresh tokens as + well as access tokens. + Returns: + defer.Deferred: + """ + def f(txn, table, except_tokens, call_after_delete): + sql = "SELECT token FROM %s WHERE user_id = ?" % table clauses = [user_id] - if except_token_ids: + if device_id is not None: + sql += " AND device_id = ?" + clauses.append(device_id) + + if except_tokens: sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_token_ids]), + ",".join(["?" for _ in except_tokens]), ) - clauses += except_token_ids + clauses += except_tokens txn.execute(sql, clauses) @@ -256,16 +290,33 @@ class RegistrationStore(SQLBaseStore): n = 100 chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] for chunk in chunks: - for row in chunk: - txn.call_after(self.get_user_by_access_token.invalidate, (row[0],)) + if call_after_delete: + for row in chunk: + txn.call_after(call_after_delete, (row[0],)) txn.execute( - "DELETE FROM access_tokens WHERE token in (%s)" % ( + "DELETE FROM %s WHERE token in (%s)" % ( + table, ",".join(["?" for _ in chunk]), ), [r[0] for r in chunk] ) - yield self.runInteraction("user_delete_access_tokens", f) + # delete refresh tokens first, to stop new access tokens being + # allocated while our backs are turned + if delete_refresh_tokens: + yield self.runInteraction( + "user_delete_access_tokens", f, + table="refresh_tokens", + except_tokens=[], + call_after_delete=None, + ) + + yield self.runInteraction( + "user_delete_access_tokens", f, + table="access_tokens", + except_tokens=except_token_ids, + call_after_delete=self.get_user_by_access_token.invalidate, + ) def delete_access_token(self, access_token): def f(txn): @@ -288,9 +339,8 @@ class RegistrationStore(SQLBaseStore): Args: token (str): The access token of a user. Returns: - dict: Including the name (user_id) and the ID of their access token. - Raises: - StoreError if no user was found. + defer.Deferred: None, if the token did not match, otherwise dict + including the keys `name`, `is_guest`, `device_id`, `token_id`. """ return self.runInteraction( "get_user_by_access_token", diff --git a/synapse/storage/schema/delta/33/access_tokens_device_index.sql b/synapse/storage/schema/delta/33/access_tokens_device_index.sql new file mode 100644 index 0000000000..61ad3fe3e8 --- /dev/null +++ b/synapse/storage/schema/delta/33/access_tokens_device_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('access_tokens_device_index', '{}'); diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql new file mode 100644 index 0000000000..140f2b63e0 --- /dev/null +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql @@ -0,0 +1,19 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- make sure that we have a device record for each set of E2E keys, so that the +-- user can delete them if they like. +INSERT INTO devices + SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json; diff --git a/synapse/storage/schema/delta/33/refreshtoken_device_index.sql b/synapse/storage/schema/delta/33/refreshtoken_device_index.sql new file mode 100644 index 0000000000..bb225dafbf --- /dev/null +++ b/synapse/storage/schema/delta/33/refreshtoken_device_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('refresh_tokens_device_index', '{}'); diff --git a/synapse/storage/schema/delta/33/user_ips_index.sql b/synapse/storage/schema/delta/33/user_ips_index.sql new file mode 100644 index 0000000000..473f75a78e --- /dev/null +++ b/synapse/storage/schema/delta/33/user_ips_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('user_ips_device_index', '{}'); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6c7481a728..6258ff1725 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -24,6 +24,7 @@ from collections import namedtuple import itertools import logging +import ujson as json logger = logging.getLogger(__name__) @@ -101,7 +102,7 @@ class TransactionStore(SQLBaseStore): ) if result and result["response_code"]: - return result["response_code"], result["response_json"] + return result["response_code"], json.loads(str(result["response_json"])) else: return None diff --git a/synapse/types.py b/synapse/types.py index f639651a73..5349b0c450 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,38 @@ from synapse.api.errors import SynapseError from collections import namedtuple -Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) +Requester = namedtuple("Requester", + ["user", "access_token_id", "is_guest", "device_id"]) +""" +Represents the user making a request + +Attributes: + user (UserID): id of the user making the request + access_token_id (int|None): *ID* of the access token used for this + request, or None if it came via the appservice API or similar + is_guest (bool): True if the user making this request is a guest user + device_id (str|None): device_id which was set at authentication time +""" + + +def create_requester(user_id, access_token_id=None, is_guest=False, + device_id=None): + """ + Create a new ``Requester`` object + + Args: + user_id (str|UserID): id of the user making the request + access_token_id (int|None): *ID* of the access token used for this + request, or None if it came via the appservice API or similar + is_guest (bool): True if the user making this request is a guest user + device_id (str|None): device_id which was set at authentication time + + Returns: + Requester + """ + if not isinstance(user_id, UserID): + user_id = UserID.from_string(user_id) + return Requester(user_id, access_token_id, is_guest, device_id) def get_domain_from_id(string): diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index e1f374807e..0b944d3e63 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -84,7 +84,7 @@ class Measure(object): if context != self.start_context: logger.warn( - "Context have unexpectedly changed from '%s' to '%s'. (%r)", + "Context has unexpectedly changed from '%s' to '%s'. (%r)", context, self.start_context, self.name ) return diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py index 4c54812e6f..f68676e9e7 100644 --- a/synapse/util/presentable_names.py +++ b/synapse/util/presentable_names.py @@ -83,7 +83,10 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True, ): if ("m.room.member", my_member_event.sender) in room_state: inviter_member_event = room_state[("m.room.member", my_member_event.sender)] - return "Invite from %s" % (name_from_member_event(inviter_member_event),) + if fallback_to_single_member: + return "Invite from %s" % (name_from_member_event(inviter_member_event),) + else: + return None else: return "Room Invite" diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 43cf11f3f6..49527f4d21 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -128,7 +128,7 @@ class RetryDestinationLimiter(object): ) valid_err_code = False - if exc_type is CodeMessageException: + if exc_type is not None and issubclass(exc_type, CodeMessageException): valid_err_code = 0 <= exc_val.code < 500 if exc_type is None or valid_err_code: diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index cc6512ccc7..85a970a6c9 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -15,22 +15,30 @@ from twisted.internet import defer -from synapse.handlers.device import DeviceHandler -from tests import unittest -from tests.utils import setup_test_homeserver +import synapse.api.errors +import synapse.handlers.device +import synapse.storage +from synapse import types +from tests import unittest, utils -class DeviceHandlers(object): - def __init__(self, hs): - self.device_handler = DeviceHandler(hs) +user1 = "@boris:aaa" +user2 = "@theresa:bbb" class DeviceTestCase(unittest.TestCase): + def __init__(self, *args, **kwargs): + super(DeviceTestCase, self).__init__(*args, **kwargs) + self.store = None # type: synapse.storage.DataStore + self.handler = None # type: synapse.handlers.device.DeviceHandler + self.clock = None # type: utils.MockClock + @defer.inlineCallbacks def setUp(self): - self.hs = yield setup_test_homeserver(handlers=None) - self.hs.handlers = handlers = DeviceHandlers(self.hs) - self.handler = handlers.device_handler + hs = yield utils.setup_test_homeserver(handlers=None) + self.handler = synapse.handlers.device.DeviceHandler(hs) + self.store = hs.get_datastore() + self.clock = hs.get_clock() @defer.inlineCallbacks def test_device_is_created_if_doesnt_exist(self): @@ -73,3 +81,104 @@ class DeviceTestCase(unittest.TestCase): dev = yield self.handler.store.get_device("theresa", device_id) self.assertEqual(dev["display_name"], "display") + + @defer.inlineCallbacks + def test_get_devices_by_user(self): + yield self._record_users() + + res = yield self.handler.get_devices_by_user(user1) + self.assertEqual(3, len(res)) + device_map = { + d["device_id"]: d for d in res + } + self.assertDictContainsSubset({ + "user_id": user1, + "device_id": "xyz", + "display_name": "display 0", + "last_seen_ip": None, + "last_seen_ts": None, + }, device_map["xyz"]) + self.assertDictContainsSubset({ + "user_id": user1, + "device_id": "fco", + "display_name": "display 1", + "last_seen_ip": "ip1", + "last_seen_ts": 1000000, + }, device_map["fco"]) + self.assertDictContainsSubset({ + "user_id": user1, + "device_id": "abc", + "display_name": "display 2", + "last_seen_ip": "ip3", + "last_seen_ts": 3000000, + }, device_map["abc"]) + + @defer.inlineCallbacks + def test_get_device(self): + yield self._record_users() + + res = yield self.handler.get_device(user1, "abc") + self.assertDictContainsSubset({ + "user_id": user1, + "device_id": "abc", + "display_name": "display 2", + "last_seen_ip": "ip3", + "last_seen_ts": 3000000, + }, res) + + @defer.inlineCallbacks + def test_delete_device(self): + yield self._record_users() + + # delete the device + yield self.handler.delete_device(user1, "abc") + + # check the device was deleted + with self.assertRaises(synapse.api.errors.NotFoundError): + yield self.handler.get_device(user1, "abc") + + # we'd like to check the access token was invalidated, but that's a + # bit of a PITA. + + @defer.inlineCallbacks + def test_update_device(self): + yield self._record_users() + + update = {"display_name": "new display"} + yield self.handler.update_device(user1, "abc", update) + + res = yield self.handler.get_device(user1, "abc") + self.assertEqual(res["display_name"], "new display") + + @defer.inlineCallbacks + def test_update_unknown_device(self): + update = {"display_name": "new_display"} + with self.assertRaises(synapse.api.errors.NotFoundError): + yield self.handler.update_device("user_id", "unknown_device_id", + update) + + @defer.inlineCallbacks + def _record_users(self): + # check this works for both devices which have a recorded client_ip, + # and those which don't. + yield self._record_user(user1, "xyz", "display 0") + yield self._record_user(user1, "fco", "display 1", "token1", "ip1") + yield self._record_user(user1, "abc", "display 2", "token2", "ip2") + yield self._record_user(user1, "abc", "display 2", "token3", "ip3") + + yield self._record_user(user2, "def", "dispkay", "token4", "ip4") + + @defer.inlineCallbacks + def _record_user(self, user_id, device_id, display_name, + access_token=None, ip=None): + device_id = yield self.handler.check_device_registered( + user_id=user_id, + device_id=device_id, + initial_device_display_name=display_name + ) + + if ip is not None: + yield self.store.insert_client_ip( + types.UserID.from_string(user_id), + access_token, ip, "user_agent", device_id) + self.clock.advance_time(1000) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 4f2c14e4ff..f1f664275f 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -19,11 +19,12 @@ from twisted.internet import defer from mock import Mock, NonCallableMock +import synapse.types from synapse.api.errors import AuthError from synapse.handlers.profile import ProfileHandler from synapse.types import UserID -from tests.utils import setup_test_homeserver, requester_for_user +from tests.utils import setup_test_homeserver class ProfileHandlers(object): @@ -86,7 +87,7 @@ class ProfileTestCase(unittest.TestCase): def test_set_my_name(self): yield self.handler.set_displayname( self.frank, - requester_for_user(self.frank), + synapse.types.create_requester(self.frank), "Frank Jr." ) @@ -99,7 +100,7 @@ class ProfileTestCase(unittest.TestCase): def test_set_my_name_noauth(self): d = self.handler.set_displayname( self.frank, - requester_for_user(self.bob), + synapse.types.create_requester(self.bob), "Frank Jr." ) @@ -144,7 +145,8 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_set_my_avatar(self): yield self.handler.set_avatar_url( - self.frank, requester_for_user(self.frank), "http://my.server/pic.gif" + self.frank, synapse.types.create_requester(self.frank), + "http://my.server/pic.gif" ) self.assertEquals( diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 842e3d29d7..e70ac6f14d 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -13,15 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.replication.resource import ReplicationResource -from synapse.types import Requester, UserID +import contextlib +import json +from mock import Mock, NonCallableMock from twisted.internet import defer + +import synapse.types +from synapse.replication.resource import ReplicationResource +from synapse.types import UserID from tests import unittest -from tests.utils import setup_test_homeserver, requester_for_user -from mock import Mock, NonCallableMock -import json -import contextlib +from tests.utils import setup_test_homeserver class ReplicationResourceCase(unittest.TestCase): @@ -61,7 +63,7 @@ class ReplicationResourceCase(unittest.TestCase): def test_events_and_state(self): get = self.get(events="-1", state="-1", timeout="0") yield self.hs.get_handlers().room_creation_handler.create_room( - Requester(self.user, "", False), {} + synapse.types.create_requester(self.user), {} ) code, body = yield get self.assertEquals(code, 200) @@ -144,7 +146,7 @@ class ReplicationResourceCase(unittest.TestCase): def send_text_message(self, room_id, message): handler = self.hs.get_handlers().message_handler event = yield handler.create_and_send_nonmember_event( - requester_for_user(self.user), + synapse.types.create_requester(self.user), { "type": "m.room.message", "content": {"body": "message", "msgtype": "m.text"}, @@ -157,7 +159,7 @@ class ReplicationResourceCase(unittest.TestCase): @defer.inlineCallbacks def create_room(self): result = yield self.hs.get_handlers().room_creation_handler.create_room( - Requester(self.user, "", False), {} + synapse.types.create_requester(self.user), {} ) defer.returnValue(result["room_id"]) diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index af02fce8fb..1e95e97538 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -14,17 +14,14 @@ # limitations under the License. """Tests REST events for /profile paths.""" -from tests import unittest -from twisted.internet import defer - from mock import Mock +from twisted.internet import defer -from ....utils import MockHttpResource, setup_test_homeserver - +import synapse.types from synapse.api.errors import SynapseError, AuthError -from synapse.types import Requester, UserID - from synapse.rest.client.v1 import profile +from tests import unittest +from ....utils import MockHttpResource, setup_test_homeserver myid = "@1234ABCD:test" PATH_PREFIX = "/_matrix/client/api/v1" @@ -52,7 +49,7 @@ class ProfileTestCase(unittest.TestCase): ) def _get_user_by_req(request=None, allow_guest=False): - return Requester(UserID.from_string(myid), "", False) + return synapse.types.create_requester(myid) hs.get_v1auth().get_user_by_req = _get_user_by_req diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 3bd7065e32..8ac56a1fb2 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -65,13 +65,16 @@ class RegisterRestServletTestCase(unittest.TestCase): self.registration_handler.appservice_register = Mock( return_value=user_id ) - self.auth_handler.issue_access_token = Mock(return_value=token) + self.auth_handler.get_login_tuple_for_user_id = Mock( + return_value=(token, "kermits_refresh_token") + ) (code, result) = yield self.servlet.on_POST(self.request) self.assertEquals(code, 200) det_data = { "user_id": user_id, "access_token": token, + "refresh_token": "kermits_refresh_token", "home_server": self.hs.hostname } self.assertDictContainsSubset(det_data, result) @@ -121,7 +124,9 @@ class RegisterRestServletTestCase(unittest.TestCase): "password": "monkey" }, None) self.registration_handler.register = Mock(return_value=(user_id, None)) - self.auth_handler.issue_access_token = Mock(return_value=token) + self.auth_handler.get_login_tuple_for_user_id = Mock( + return_value=(token, "kermits_refresh_token") + ) self.device_handler.check_device_registered = \ Mock(return_value=device_id) @@ -130,13 +135,14 @@ class RegisterRestServletTestCase(unittest.TestCase): det_data = { "user_id": user_id, "access_token": token, + "refresh_token": "kermits_refresh_token", "home_server": self.hs.hostname, "device_id": device_id, } self.assertDictContainsSubset(det_data, result) self.assertIn("refresh_token", result) - self.auth_handler.issue_access_token.assert_called_once_with( - user_id, device_id=device_id) + self.auth_handler.get_login_tuple_for_user_id( + user_id, device_id=device_id, initial_device_display_name=None) def test_POST_disabled_registration(self): self.hs.config.enable_registration = False diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 6e4d9b1373..1286b4ce2d 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - hs = yield setup_test_homeserver() + hs = yield setup_test_homeserver() # type: synapse.server.HomeServer self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -20,11 +20,20 @@ class BackgroundUpdateTestCase(unittest.TestCase): "test_update", self.update_handler ) + # run the real background updates, to get them out the way + # (perhaps we should run them as part of the test HS setup, since we + # run all of the other schema setup stuff there?) + while True: + res = yield self.store.do_next_background_update(1000) + if res is None: + break + @defer.inlineCallbacks def test_do_background_update(self): desired_count = 1000 duration_ms = 42 + # first step: make a bit of progress @defer.inlineCallbacks def update(progress, count): self.clock.advance_time_msec(count * duration_ms) @@ -42,7 +51,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): yield self.store.start_background_update("test_update", {"my_key": 1}) self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNotNone(result) @@ -50,15 +59,15 @@ class BackgroundUpdateTestCase(unittest.TestCase): {"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE ) + # second step: complete the update @defer.inlineCallbacks def update(progress, count): yield self.store._end_background_update("test_update") defer.returnValue(count) self.update_handler.side_effect = update - self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNotNone(result) @@ -66,8 +75,9 @@ class BackgroundUpdateTestCase(unittest.TestCase): {"my_key": 2}, desired_count ) + # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNone(result) diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py new file mode 100644 index 0000000000..1f0c0e7c37 --- /dev/null +++ b/tests/storage/test_client_ips.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import synapse.server +import synapse.storage +import synapse.types +import tests.unittest +import tests.utils + + +class ClientIpStoreTestCase(tests.unittest.TestCase): + def __init__(self, *args, **kwargs): + super(ClientIpStoreTestCase, self).__init__(*args, **kwargs) + self.store = None # type: synapse.storage.DataStore + self.clock = None # type: tests.utils.MockClock + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def test_insert_new_client_ip(self): + self.clock.now = 12345678 + user_id = "@user:id" + yield self.store.insert_client_ip( + synapse.types.UserID.from_string(user_id), + "access_token", "ip", "user_agent", "device_id", + ) + + # deliberately use an iterable here to make sure that the lookup + # method doesn't iterate it twice + device_list = iter(((user_id, "device_id"),)) + result = yield self.store.get_last_client_ip_by_device(device_list) + + r = result[(user_id, "device_id")] + self.assertDictContainsSubset( + { + "user_id": user_id, + "device_id": "device_id", + "access_token": "access_token", + "ip": "ip", + "user_agent": "user_agent", + "last_seen": 12345678000, + }, + r + ) diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py new file mode 100644 index 0000000000..f8725acea0 --- /dev/null +++ b/tests/storage/test_devices.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import synapse.api.errors +import tests.unittest +import tests.utils + + +class DeviceStoreTestCase(tests.unittest.TestCase): + def __init__(self, *args, **kwargs): + super(DeviceStoreTestCase, self).__init__(*args, **kwargs) + self.store = None # type: synapse.storage.DataStore + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_store_new_device(self): + yield self.store.store_device( + "user_id", "device_id", "display_name" + ) + + res = yield self.store.get_device("user_id", "device_id") + self.assertDictContainsSubset({ + "user_id": "user_id", + "device_id": "device_id", + "display_name": "display_name", + }, res) + + @defer.inlineCallbacks + def test_get_devices_by_user(self): + yield self.store.store_device( + "user_id", "device1", "display_name 1" + ) + yield self.store.store_device( + "user_id", "device2", "display_name 2" + ) + yield self.store.store_device( + "user_id2", "device3", "display_name 3" + ) + + res = yield self.store.get_devices_by_user("user_id") + self.assertEqual(2, len(res.keys())) + self.assertDictContainsSubset({ + "user_id": "user_id", + "device_id": "device1", + "display_name": "display_name 1", + }, res["device1"]) + self.assertDictContainsSubset({ + "user_id": "user_id", + "device_id": "device2", + "display_name": "display_name 2", + }, res["device2"]) + + @defer.inlineCallbacks + def test_update_device(self): + yield self.store.store_device( + "user_id", "device_id", "display_name 1" + ) + + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 1", res["display_name"]) + + # do a no-op first + yield self.store.update_device( + "user_id", "device_id", + ) + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 1", res["display_name"]) + + # do the update + yield self.store.update_device( + "user_id", "device_id", + new_display_name="display_name 2", + ) + + # check it worked + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 2", res["display_name"]) + + @defer.inlineCallbacks + def test_update_unknown_device(self): + with self.assertRaises(synapse.api.errors.StoreError) as cm: + yield self.store.update_device( + "user_id", "unknown_device_id", + new_display_name="display_name 2", + ) + self.assertEqual(404, cm.exception.code) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py new file mode 100644 index 0000000000..e9044afa2e --- /dev/null +++ b/tests/storage/test_event_push_actions.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import tests.unittest +import tests.utils + +USER_ID = "@user:example.com" + + +class EventPushActionsStoreTestCase(tests.unittest.TestCase): + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_get_unread_push_actions_for_user_in_range_for_http(self): + yield self.store.get_unread_push_actions_for_user_in_range_for_http( + USER_ID, 0, 1000, 20 + ) + + @defer.inlineCallbacks + def test_get_unread_push_actions_for_user_in_range_for_email(self): + yield self.store.get_unread_push_actions_for_user_in_range_for_email( + USER_ID, 0, 1000, 20 + ) diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index b03ca303a2..f7d74dea8e 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -128,6 +128,40 @@ class RegistrationStoreTestCase(unittest.TestCase): with self.assertRaises(StoreError): yield self.store.exchange_refresh_token(last_token, generator.generate) + @defer.inlineCallbacks + def test_user_delete_access_tokens(self): + # add some tokens + generator = TokenGenerator() + refresh_token = generator.generate(self.user_id) + yield self.store.register(self.user_id, self.tokens[0], self.pwhash) + yield self.store.add_access_token_to_user(self.user_id, self.tokens[1], + self.device_id) + yield self.store.add_refresh_token_to_user(self.user_id, refresh_token, + self.device_id) + + # now delete some + yield self.store.user_delete_access_tokens( + self.user_id, device_id=self.device_id, delete_refresh_tokens=True) + + # check they were deleted + user = yield self.store.get_user_by_access_token(self.tokens[1]) + self.assertIsNone(user, "access token was not deleted by device_id") + with self.assertRaises(StoreError): + yield self.store.exchange_refresh_token(refresh_token, + generator.generate) + + # check the one not associated with the device was not deleted + user = yield self.store.get_user_by_access_token(self.tokens[0]) + self.assertEqual(self.user_id, user["name"]) + + # now delete the rest + yield self.store.user_delete_access_tokens( + self.user_id, delete_refresh_tokens=True) + + user = yield self.store.get_user_by_access_token(self.tokens[0]) + self.assertIsNone(user, + "access token was not deleted without device_id") + class TokenGenerator: def __init__(self): diff --git a/tests/unittest.py b/tests/unittest.py index 5b22abfe74..38715972dd 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -17,13 +17,18 @@ from twisted.trial import unittest import logging - # logging doesn't have a "don't log anything at all EVARRRR setting, # but since the highest value is 50, 1000000 should do ;) NEVER = 1000000 -logging.getLogger().addHandler(logging.StreamHandler()) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter( + "%(levelname)s:%(name)s:%(message)s [%(pathname)s:%(lineno)d]" +)) +logging.getLogger().addHandler(handler) logging.getLogger().setLevel(NEVER) +logging.getLogger("synapse.storage.SQL").setLevel(NEVER) +logging.getLogger("synapse.storage.txn").setLevel(NEVER) def around(target): @@ -70,8 +75,6 @@ class TestCase(unittest.TestCase): return ret logging.getLogger().setLevel(level) - # Don't set SQL logging - logging.getLogger("synapse.storage").setLevel(old_level) return orig() def assertObjectHasAttributes(self, attrs, obj): diff --git a/tests/utils.py b/tests/utils.py index ed547bc39b..915b934e94 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,7 +20,6 @@ from synapse.storage.prepare_database import prepare_database from synapse.storage.engines import create_engine from synapse.server import HomeServer from synapse.federation.transport import server -from synapse.types import Requester from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.logcontext import LoggingContext @@ -512,7 +511,3 @@ class DeferredMockCallable(object): "call(%s)" % _format_call(c[0], c[1]) for c in calls ]) ) - - -def requester_for_user(user): - return Requester(user, None, False) |