summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py18
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/e2e_room_keys.py289
-rw-r--r--synapse/handlers/federation.py12
-rw-r--r--synapse/handlers/room_list.py11
-rw-r--r--synapse/handlers/user_directory.py14
6 files changed, 318 insertions, 30 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
 import pymacaroons
 from canonicaljson import json
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.web.client import PartialDownloadError
 
 import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
+from synapse.util import logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
 
 from ._base import BaseHandler
 
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
                 bcrypt.gensalt(self.bcrypt_rounds),
             ).decode('ascii')
 
-        return make_deferred_yieldable(
-            threads.deferToThreadPool(
-                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
-            ),
-        )
+        return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
             if not isinstance(stored_hash, bytes):
                 stored_hash = stored_hash.encode('ascii')
 
-            return make_deferred_yieldable(
-                threads.deferToThreadPool(
-                    self.hs.get_reactor(),
-                    self.hs.get_reactor().getThreadPool(),
-                    _do_validate_hash,
-                ),
-            )
+            return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
         else:
             return defer.succeed(False)
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
 
 from ._base import BaseHandler
 
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
             None
         """
         if not self._user_parter_running:
-            run_in_background(self._user_parter_loop)
+            run_as_background_process("user_parter_loop", self._user_parter_loop)
 
     @defer.inlineCallbacks
     def _user_parter_loop(self):
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
new file mode 100644
index 0000000000..5edb3cfe04
--- /dev/null
+++ b/synapse/handlers/e2e_room_keys.py
@@ -0,0 +1,289 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017, 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError
+from synapse.util.async_helpers import Linearizer
+
+logger = logging.getLogger(__name__)
+
+
+class E2eRoomKeysHandler(object):
+    """
+    Implements an optional realtime backup mechanism for encrypted E2E megolm room keys.
+    This gives a way for users to store and recover their megolm keys if they lose all
+    their clients. It should also extend easily to future room key mechanisms.
+    The actual payload of the encrypted keys is completely opaque to the handler.
+    """
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+        # Used to lock whenever a client is uploading key data.  This prevents collisions
+        # between clients trying to upload the details of a new session, given all
+        # clients belonging to a user will receive and try to upload a new session at
+        # roughly the same time.  Also used to lock out uploads when the key is being
+        # changed.
+        self._upload_linearizer = Linearizer("upload_room_keys_lock")
+
+    @defer.inlineCallbacks
+    def get_room_keys(self, user_id, version, room_id=None, session_id=None):
+        """Bulk get the E2E room keys for a given backup, optionally filtered to a given
+        room, or a given session.
+        See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
+
+        Args:
+            user_id(str): the user whose keys we're getting
+            version(str): the version ID of the backup we're getting keys from
+            room_id(string): room ID to get keys for, for None to get keys for all rooms
+            session_id(string): session ID to get keys for, for None to get keys for all
+                sessions
+        Returns:
+            A deferred list of dicts giving the session_data and message metadata for
+            these room keys.
+        """
+
+        # we deliberately take the lock to get keys so that changing the version
+        # works atomically
+        with (yield self._upload_linearizer.queue(user_id)):
+            results = yield self.store.get_e2e_room_keys(
+                user_id, version, room_id, session_id
+            )
+
+            if results['rooms'] == {}:
+                raise SynapseError(404, "No room_keys found")
+
+            defer.returnValue(results)
+
+    @defer.inlineCallbacks
+    def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
+        """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
+        room or a given session.
+        See EndToEndRoomKeyStore.delete_e2e_room_keys for full details.
+
+        Args:
+            user_id(str): the user whose backup we're deleting
+            version(str): the version ID of the backup we're deleting
+            room_id(string): room ID to delete keys for, for None to delete keys for all
+                rooms
+            session_id(string): session ID to delete keys for, for None to delete keys
+                for all sessions
+        Returns:
+            A deferred of the deletion transaction
+        """
+
+        # lock for consistency with uploading
+        with (yield self._upload_linearizer.queue(user_id)):
+            yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+
+    @defer.inlineCallbacks
+    def upload_room_keys(self, user_id, version, room_keys):
+        """Bulk upload a list of room keys into a given backup version, asserting
+        that the given version is the current backup version.  room_keys are merged
+        into the current backup as described in RoomKeysServlet.on_PUT().
+
+        Args:
+            user_id(str): the user whose backup we're setting
+            version(str): the version ID of the backup we're updating
+            room_keys(dict): a nested dict describing the room_keys we're setting:
+
+        {
+            "rooms": {
+                "!abc:matrix.org": {
+                    "sessions": {
+                        "c0ff33": {
+                            "first_message_index": 1,
+                            "forwarded_count": 1,
+                            "is_verified": false,
+                            "session_data": "SSBBTSBBIEZJU0gK"
+                        }
+                    }
+                }
+            }
+        }
+
+        Raises:
+            SynapseError: with code 404 if there are no versions defined
+            RoomKeysVersionError: if the uploaded version is not the current version
+        """
+
+        # TODO: Validate the JSON to make sure it has the right keys.
+
+        # XXX: perhaps we should use a finer grained lock here?
+        with (yield self._upload_linearizer.queue(user_id)):
+
+            # Check that the version we're trying to upload is the current version
+            try:
+                version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Version '%s' not found" % (version,))
+                else:
+                    raise
+
+            if version_info['version'] != version:
+                # Check that the version we're trying to upload actually exists
+                try:
+                    version_info = yield self.store.get_e2e_room_keys_version_info(
+                        user_id, version,
+                    )
+                    # if we get this far, the version must exist
+                    raise RoomKeysVersionError(current_version=version_info['version'])
+                except StoreError as e:
+                    if e.code == 404:
+                        raise SynapseError(404, "Version '%s' not found" % (version,))
+                    else:
+                        raise
+
+            # go through the room_keys.
+            # XXX: this should/could be done concurrently, given we're in a lock.
+            for room_id, room in iteritems(room_keys['rooms']):
+                for session_id, session in iteritems(room['sessions']):
+                    yield self._upload_room_key(
+                        user_id, version, room_id, session_id, session
+                    )
+
+    @defer.inlineCallbacks
+    def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
+        """Upload a given room_key for a given room and session into a given
+        version of the backup.  Merges the key with any which might already exist.
+
+        Args:
+            user_id(str): the user whose backup we're setting
+            version(str): the version ID of the backup we're updating
+            room_id(str): the ID of the room whose keys we're setting
+            session_id(str): the session whose room_key we're setting
+            room_key(dict): the room_key being set
+        """
+
+        # get the room_key for this particular row
+        current_room_key = None
+        try:
+            current_room_key = yield self.store.get_e2e_room_key(
+                user_id, version, room_id, session_id
+            )
+        except StoreError as e:
+            if e.code == 404:
+                pass
+            else:
+                raise
+
+        if self._should_replace_room_key(current_room_key, room_key):
+            yield self.store.set_e2e_room_key(
+                user_id, version, room_id, session_id, room_key
+            )
+
+    @staticmethod
+    def _should_replace_room_key(current_room_key, room_key):
+        """
+        Determine whether to replace a given current_room_key (if any)
+        with a newly uploaded room_key backup
+
+        Args:
+            current_room_key (dict): Optional, the current room_key dict if any
+            room_key (dict): The new room_key dict which may or may not be fit to
+                replace the current_room_key
+
+        Returns:
+            True if current_room_key should be replaced by room_key in the backup
+        """
+
+        if current_room_key:
+            # spelt out with if/elifs rather than nested boolean expressions
+            # purely for legibility.
+
+            if room_key['is_verified'] and not current_room_key['is_verified']:
+                return True
+            elif (
+                room_key['first_message_index'] <
+                current_room_key['first_message_index']
+            ):
+                return True
+            elif room_key['forwarded_count'] < current_room_key['forwarded_count']:
+                return True
+            else:
+                return False
+        return True
+
+    @defer.inlineCallbacks
+    def create_version(self, user_id, version_info):
+        """Create a new backup version.  This automatically becomes the new
+        backup version for the user's keys; previous backups will no longer be
+        writeable to.
+
+        Args:
+            user_id(str): the user whose backup version we're creating
+            version_info(dict): metadata about the new version being created
+
+        {
+            "algorithm": "m.megolm_backup.v1",
+            "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+        }
+
+        Returns:
+            A deferred of a string that gives the new version number.
+        """
+
+        # TODO: Validate the JSON to make sure it has the right keys.
+
+        # lock everyone out until we've switched version
+        with (yield self._upload_linearizer.queue(user_id)):
+            new_version = yield self.store.create_e2e_room_keys_version(
+                user_id, version_info
+            )
+            defer.returnValue(new_version)
+
+    @defer.inlineCallbacks
+    def get_version_info(self, user_id, version=None):
+        """Get the info about a given version of the user's backup
+
+        Args:
+            user_id(str): the user whose current backup version we're querying
+            version(str): Optional; if None gives the most recent version
+                otherwise a historical one.
+        Raises:
+            StoreError: code 404 if the requested backup version doesn't exist
+        Returns:
+            A deferred of a info dict that gives the info about the new version.
+
+        {
+            "version": "1234",
+            "algorithm": "m.megolm_backup.v1",
+            "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+        }
+        """
+
+        with (yield self._upload_linearizer.queue(user_id)):
+            res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+            defer.returnValue(res)
+
+    @defer.inlineCallbacks
+    def delete_version(self, user_id, version=None):
+        """Deletes a given version of the user's e2e_room_keys backup
+
+        Args:
+            user_id(str): the user whose current backup version we're deleting
+            version(str): the version id of the backup being deleted
+        Raises:
+            StoreError: code 404 if this backup version doesn't exist
+        """
+
+        with (yield self._upload_linearizer.queue(user_id)):
+            yield self.store.delete_e2e_room_keys_version(user_id, version)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 45d955e6f5..cab57a8849 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -309,8 +309,8 @@ class FederationHandler(BaseHandler):
 
                 if sent_to_us_directly:
                     logger.warn(
-                        "[%s %s] Failed to fetch %d prev events: rejecting",
-                        room_id, event_id, len(prevs - seen),
+                        "[%s %s] Rejecting: failed to fetch %d prev events: %s",
+                        room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
                     )
                     raise FederationError(
                         "ERROR",
@@ -452,8 +452,8 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "[%s %s]: Requesting %d prev_events: %s",
-            room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
+            "[%s %s]: Requesting missing events between %s and %s",
+            room_id, event_id, shortstr(latest), event_id,
         )
 
         # XXX: we set timeout to 10s to help workaround
@@ -1852,7 +1852,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_get_missing_events(self, origin, room_id, earliest_events,
-                              latest_events, limit, min_depth):
+                              latest_events, limit):
         in_room = yield self.auth.check_host_in_room(
             room_id,
             origin
@@ -1861,14 +1861,12 @@ class FederationHandler(BaseHandler):
             raise AuthError(403, "Host not in room.")
 
         limit = min(limit, 20)
-        min_depth = max(min_depth, 0)
 
         missing_events = yield self.store.get_missing_events(
             room_id=room_id,
             earliest_events=earliest_events,
             latest_events=latest_events,
             limit=limit,
-            min_depth=min_depth,
         )
 
         missing_events = yield filter_events_for_server(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 38e1737ec9..dc88620885 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -16,7 +16,7 @@
 import logging
 from collections import namedtuple
 
-from six import iteritems
+from six import PY3, iteritems
 from six.moves import range
 
 import msgpack
@@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
 
     @classmethod
     def from_token(cls, token):
+        if PY3:
+            # The argument raw=False is only available on new versions of
+            # msgpack, and only really needed on Python 3. Gate it behind
+            # a PY3 check to avoid causing issues on Debian-packaged versions.
+            decoded = msgpack.loads(decode_base64(token), raw=False)
+        else:
+            decoded = msgpack.loads(decode_base64(token))
         return RoomListNextBatch(**{
             cls.REVERSE_KEY_DICT[key]: val
-            for key, val in msgpack.loads(decode_base64(token)).items()
+            for key, val in decoded.items()
         })
 
     def to_token(self):
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import get_localpart_from_id
 from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
         """
         return self.store.search_user_dir(user_id, search_term, limit)
 
-    @defer.inlineCallbacks
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
         if self._is_processing:
             return
 
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
         self._is_processing = True
-        try:
-            yield self._unsafe_process()
-        finally:
-            self._is_processing = False
+        run_as_background_process("user_directory.notify_new_event", process)
 
     @defer.inlineCallbacks
     def handle_local_profile_change(self, user_id, profile):