summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py43
-rw-r--r--synapse/config/__main__.py2
-rw-r--r--synapse/config/_base.py121
-rw-r--r--synapse/handlers/auth.py18
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/directory.py77
-rw-r--r--synapse/handlers/groups_local.py18
-rw-r--r--synapse/handlers/room.py5
-rw-r--r--synapse/handlers/user_directory.py14
-rw-r--r--synapse/http/matrixfederationclient.py12
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/rest/client/v1/directory.py37
-rw-r--r--synapse/rest/media/v1/media_repository.py24
-rw-r--r--synapse/rest/media/v1/media_storage.py8
-rw-r--r--synapse/rest/media/v1/storage_provider.py6
-rw-r--r--synapse/storage/_base.py4
-rw-r--r--synapse/storage/keys.py2
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/signatures.py2
-rw-r--r--synapse/storage/transactions.py2
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/logcontext.py120
22 files changed, 281 insertions, 248 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e3f0d99a3f..0b85b377e3 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,7 @@ import sys
 
 from six import iteritems
 
+import psutil
 from prometheus_client import Gauge
 
 from twisted.application import service
@@ -502,7 +503,6 @@ def run(hs):
 
     def performance_stats_init():
         try:
-            import psutil
             process = psutil.Process()
             # Ensure we can fetch both, and make the initial request for cpu_percent
             # so the next request will use this as the initial point.
@@ -510,12 +510,9 @@ def run(hs):
             process.cpu_percent(interval=None)
             logger.info("report_stats can use psutil")
             stats_process.append(process)
-        except (ImportError, AttributeError):
-            logger.warn(
-                "report_stats enabled but psutil is not installed or incorrect version."
-                " Disabling reporting of memory/cpu stats."
-                " Ensuring psutil is available will help matrix.org track performance"
-                " changes across releases."
+        except (AttributeError):
+            logger.warning(
+                "Unable to read memory/cpu stats. Disabling reporting."
             )
 
     def generate_user_daily_visit_stats():
@@ -530,10 +527,13 @@ def run(hs):
     clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
 
     # monthly active user limiting functionality
-    clock.looping_call(
-        hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
-    )
-    hs.get_datastore().reap_monthly_active_users()
+    def reap_monthly_active_users():
+        return run_as_background_process(
+            "reap_monthly_active_users",
+            hs.get_datastore().reap_monthly_active_users,
+        )
+    clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
+    reap_monthly_active_users()
 
     @defer.inlineCallbacks
     def generate_monthly_active_users():
@@ -547,12 +547,23 @@ def run(hs):
         registered_reserved_users_mau_gauge.set(float(reserved_count))
         max_mau_gauge.set(float(hs.config.max_mau_value))
 
-    hs.get_datastore().initialise_reserved_users(
-        hs.config.mau_limits_reserved_threepids
+    def start_generate_monthly_active_users():
+        return run_as_background_process(
+            "generate_monthly_active_users",
+            generate_monthly_active_users,
+        )
+
+    # XXX is this really supposed to be a background process? it looks
+    # like it needs to complete before some of the other stuff runs.
+    run_as_background_process(
+        "initialise_reserved_users",
+        hs.get_datastore().initialise_reserved_users,
+        hs.config.mau_limits_reserved_threepids,
     )
-    generate_monthly_active_users()
+
+    start_generate_monthly_active_users()
     if hs.config.limit_usage_by_mau:
-        clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+        clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
     # End of monthly active user settings
 
     if hs.config.report_stats:
@@ -568,7 +579,7 @@ def run(hs):
         clock.call_later(5 * 60, start_phone_stats_home)
 
     if hs.config.daemonize and hs.config.print_pidfile:
-        print (hs.config.pid_file)
+        print(hs.config.pid_file)
 
     _base.start_reactor(
         "synapse-homeserver",
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 8fccf573ee..79fe9c3dac 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -28,7 +28,7 @@ if __name__ == "__main__":
             sys.stderr.write("\n" + str(e) + "\n")
             sys.exit(1)
 
-        print (getattr(config, key))
+        print(getattr(config, key))
         sys.exit(0)
     else:
         sys.stderr.write("Unknown command %r\n" % (action,))
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 3d2e90dd5b..14dae65ea0 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -106,10 +106,7 @@ class Config(object):
     @classmethod
     def check_file(cls, file_path, config_name):
         if file_path is None:
-            raise ConfigError(
-                "Missing config for %s."
-                % (config_name,)
-            )
+            raise ConfigError("Missing config for %s." % (config_name,))
         try:
             os.stat(file_path)
         except OSError as e:
@@ -128,9 +125,7 @@ class Config(object):
             if e.errno != errno.EEXIST:
                 raise
         if not os.path.isdir(dir_path):
-            raise ConfigError(
-                "%s is not a directory" % (dir_path,)
-            )
+            raise ConfigError("%s is not a directory" % (dir_path,))
         return dir_path
 
     @classmethod
@@ -156,21 +151,20 @@ class Config(object):
         return results
 
     def generate_config(
-            self,
-            config_dir_path,
-            server_name,
-            is_generating_file,
-            report_stats=None,
+        self, config_dir_path, server_name, is_generating_file, report_stats=None
     ):
         default_config = "# vim:ft=yaml\n"
 
-        default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all(
-            "default_config",
-            config_dir_path=config_dir_path,
-            server_name=server_name,
-            is_generating_file=is_generating_file,
-            report_stats=report_stats,
-        ))
+        default_config += "\n\n".join(
+            dedent(conf)
+            for conf in self.invoke_all(
+                "default_config",
+                config_dir_path=config_dir_path,
+                server_name=server_name,
+                is_generating_file=is_generating_file,
+                report_stats=report_stats,
+            )
+        )
 
         config = yaml.load(default_config)
 
@@ -178,23 +172,22 @@ class Config(object):
 
     @classmethod
     def load_config(cls, description, argv):
-        config_parser = argparse.ArgumentParser(
-            description=description,
-        )
+        config_parser = argparse.ArgumentParser(description=description)
         config_parser.add_argument(
-            "-c", "--config-path",
+            "-c",
+            "--config-path",
             action="append",
             metavar="CONFIG_FILE",
             help="Specify config file. Can be given multiple times and"
-                 " may specify directories containing *.yaml files."
+            " may specify directories containing *.yaml files.",
         )
 
         config_parser.add_argument(
             "--keys-directory",
             metavar="DIRECTORY",
             help="Where files such as certs and signing keys are stored when"
-                 " their location is given explicitly in the config."
-                 " Defaults to the directory containing the last config file",
+            " their location is given explicitly in the config."
+            " Defaults to the directory containing the last config file",
         )
 
         config_args = config_parser.parse_args(argv)
@@ -203,9 +196,7 @@ class Config(object):
 
         obj = cls()
         obj.read_config_files(
-            config_files,
-            keys_directory=config_args.keys_directory,
-            generate_keys=False,
+            config_files, keys_directory=config_args.keys_directory, generate_keys=False
         )
         return obj
 
@@ -213,38 +204,38 @@ class Config(object):
     def load_or_generate_config(cls, description, argv):
         config_parser = argparse.ArgumentParser(add_help=False)
         config_parser.add_argument(
-            "-c", "--config-path",
+            "-c",
+            "--config-path",
             action="append",
             metavar="CONFIG_FILE",
             help="Specify config file. Can be given multiple times and"
-                 " may specify directories containing *.yaml files."
+            " may specify directories containing *.yaml files.",
         )
         config_parser.add_argument(
             "--generate-config",
             action="store_true",
-            help="Generate a config file for the server name"
+            help="Generate a config file for the server name",
         )
         config_parser.add_argument(
             "--report-stats",
             action="store",
             help="Whether the generated config reports anonymized usage statistics",
-            choices=["yes", "no"]
+            choices=["yes", "no"],
         )
         config_parser.add_argument(
             "--generate-keys",
             action="store_true",
-            help="Generate any missing key files then exit"
+            help="Generate any missing key files then exit",
         )
         config_parser.add_argument(
             "--keys-directory",
             metavar="DIRECTORY",
             help="Used with 'generate-*' options to specify where files such as"
-                 " certs and signing keys should be stored in, unless explicitly"
-                 " specified in the config."
+            " certs and signing keys should be stored in, unless explicitly"
+            " specified in the config.",
         )
         config_parser.add_argument(
-            "-H", "--server-name",
-            help="The server name to generate a config file for"
+            "-H", "--server-name", help="The server name to generate a config file for"
         )
         config_args, remaining_args = config_parser.parse_known_args(argv)
 
@@ -257,8 +248,8 @@ class Config(object):
         if config_args.generate_config:
             if config_args.report_stats is None:
                 config_parser.error(
-                    "Please specify either --report-stats=yes or --report-stats=no\n\n" +
-                    MISSING_REPORT_STATS_SPIEL
+                    "Please specify either --report-stats=yes or --report-stats=no\n\n"
+                    + MISSING_REPORT_STATS_SPIEL
                 )
             if not config_files:
                 config_parser.error(
@@ -287,26 +278,32 @@ class Config(object):
                         config_dir_path=config_dir_path,
                         server_name=server_name,
                         report_stats=(config_args.report_stats == "yes"),
-                        is_generating_file=True
+                        is_generating_file=True,
                     )
                     obj.invoke_all("generate_files", config)
                     config_file.write(config_str)
-                print((
-                    "A config file has been generated in %r for server name"
-                    " %r with corresponding SSL keys and self-signed"
-                    " certificates. Please review this file and customise it"
-                    " to your needs."
-                ) % (config_path, server_name))
+                print(
+                    (
+                        "A config file has been generated in %r for server name"
+                        " %r with corresponding SSL keys and self-signed"
+                        " certificates. Please review this file and customise it"
+                        " to your needs."
+                    )
+                    % (config_path, server_name)
+                )
                 print(
                     "If this server name is incorrect, you will need to"
                     " regenerate the SSL certificates"
                 )
                 return
             else:
-                print((
-                    "Config file %r already exists. Generating any missing key"
-                    " files."
-                ) % (config_path,))
+                print(
+                    (
+                        "Config file %r already exists. Generating any missing key"
+                        " files."
+                    )
+                    % (config_path,)
+                )
                 generate_keys = True
 
         parser = argparse.ArgumentParser(
@@ -338,8 +335,7 @@ class Config(object):
 
         return obj
 
-    def read_config_files(self, config_files, keys_directory=None,
-                          generate_keys=False):
+    def read_config_files(self, config_files, keys_directory=None, generate_keys=False):
         if not keys_directory:
             keys_directory = os.path.dirname(config_files[-1])
 
@@ -364,8 +360,9 @@ class Config(object):
 
         if "report_stats" not in config:
             raise ConfigError(
-                MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
-                MISSING_REPORT_STATS_SPIEL
+                MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS
+                + "\n"
+                + MISSING_REPORT_STATS_SPIEL
             )
 
         if generate_keys:
@@ -399,16 +396,16 @@ def find_config_files(search_paths):
                 for entry in os.listdir(config_path):
                     entry_path = os.path.join(config_path, entry)
                     if not os.path.isfile(entry_path):
-                        print (
-                            "Found subdirectory in config directory: %r. IGNORING."
-                        ) % (entry_path, )
+                        err = "Found subdirectory in config directory: %r. IGNORING."
+                        print(err % (entry_path,))
                         continue
 
                     if not entry.endswith(".yaml"):
-                        print (
-                            "Found file in config directory that does not"
-                            " end in '.yaml': %r. IGNORING."
-                        ) % (entry_path, )
+                        err = (
+                            "Found file in config directory that does not end in "
+                            "'.yaml': %r. IGNORING."
+                        )
+                        print(err % (entry_path,))
                         continue
 
                     files.append(entry_path)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
 import pymacaroons
 from canonicaljson import json
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.web.client import PartialDownloadError
 
 import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
+from synapse.util import logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
 
 from ._base import BaseHandler
 
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
                 bcrypt.gensalt(self.bcrypt_rounds),
             ).decode('ascii')
 
-        return make_deferred_yieldable(
-            threads.deferToThreadPool(
-                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
-            ),
-        )
+        return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
             if not isinstance(stored_hash, bytes):
                 stored_hash = stored_hash.encode('ascii')
 
-            return make_deferred_yieldable(
-                threads.deferToThreadPool(
-                    self.hs.get_reactor(),
-                    self.hs.get_reactor().getThreadPool(),
-                    _do_validate_hash,
-                ),
-            )
+            return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
         else:
             return defer.succeed(False)
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
 
 from ._base import BaseHandler
 
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
             None
         """
         if not self._user_parter_running:
-            run_in_background(self._user_parter_loop)
+            run_as_background_process("user_parter_loop", self._user_parter_loop)
 
     @defer.inlineCallbacks
     def _user_parter_loop(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 18741c5fac..02f12f6645 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def create_association(self, user_id, room_alias, room_id, servers=None):
-        # association creation for human users
-        # TODO(erikj): Do user auth.
+    def create_association(self, requester, room_alias, room_id, servers=None,
+                           send_event=True):
+        """Attempt to create a new alias
 
-        if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
-            raise SynapseError(
-                403, "This user is not permitted to create this alias",
-            )
+        Args:
+            requester (Requester)
+            room_alias (RoomAlias)
+            room_id (str)
+            servers (list[str]|None): List of servers that others servers
+                should try and join via
+            send_event (bool): Whether to send an updated m.room.aliases event
 
-        can_create = yield self.can_modify_alias(
-            room_alias,
-            user_id=user_id
-        )
-        if not can_create:
-            raise SynapseError(
-                400, "This alias is reserved by an application service.",
-                errcode=Codes.EXCLUSIVE
-            )
-        yield self._create_association(room_alias, room_id, servers, creator=user_id)
+        Returns:
+            Deferred
+        """
 
-    @defer.inlineCallbacks
-    def create_appservice_association(self, service, room_alias, room_id,
-                                      servers=None):
-        if not service.is_interested_in_alias(room_alias.to_string()):
-            raise SynapseError(
-                400, "This application service has not reserved"
-                " this kind of alias.", errcode=Codes.EXCLUSIVE
+        user_id = requester.user.to_string()
+
+        service = requester.app_service
+        if service:
+            if not service.is_interested_in_alias(room_alias.to_string()):
+                raise SynapseError(
+                    400, "This application service has not reserved"
+                    " this kind of alias.", errcode=Codes.EXCLUSIVE
+                )
+        else:
+            if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+                raise AuthError(
+                    403, "This user is not permitted to create this alias",
+                )
+
+            can_create = yield self.can_modify_alias(
+                room_alias,
+                user_id=user_id
             )
+            if not can_create:
+                raise AuthError(
+                    400, "This alias is reserved by an application service.",
+                    errcode=Codes.EXCLUSIVE
+                )
 
-        # association creation for app services
-        yield self._create_association(room_alias, room_id, servers)
+        yield self._create_association(room_alias, room_id, servers, creator=user_id)
+        if send_event:
+            yield self.send_room_alias_update_event(
+                requester,
+                room_id
+            )
 
     @defer.inlineCallbacks
-    def delete_association(self, requester, user_id, room_alias):
+    def delete_association(self, requester, room_alias):
         # association deletion for human users
 
+        user_id = requester.user.to_string()
+
         try:
             can_delete = yield self._user_can_delete_alias(room_alias, user_id)
         except StoreError as e:
@@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler):
         try:
             yield self.send_room_alias_update_event(
                 requester,
-                requester.user.to_string(),
                 room_id
             )
 
@@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler):
             )
 
     @defer.inlineCallbacks
-    def send_room_alias_update_event(self, requester, user_id, room_id):
+    def send_room_alias_update_event(self, requester, room_id):
         aliases = yield self.store.get_aliases_for_room(room_id)
 
         yield self.event_creation_handler.create_and_send_nonmember_event(
@@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler):
                 "type": EventTypes.Aliases,
                 "state_key": self.hs.hostname,
                 "room_id": room_id,
-                "sender": user_id,
+                "sender": requester.user.to_string(),
                 "content": {"aliases": aliases},
             },
             ratelimit=False
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.types import get_domain_from_id
 
 logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
             )
         else:
             destination = get_domain_from_id(group_id)
-            return getattr(self.transport_client, func_name)(
+            d = getattr(self.transport_client, func_name)(
                 destination, group_id, *args, **kwargs
             )
+
+            # Capture errors returned by the remote homeserver and
+            # re-throw specific errors as SynapseErrors. This is so
+            # when the remote end responds with things like 403 Not
+            # In Group, we can communicate that to the client instead
+            # of a 500.
+            def h(failure):
+                failure.trap(HttpResponseException)
+                e = failure.value
+                if e.code == 403:
+                    raise e.to_synapse_error()
+                return failure
+            d.addErrback(h)
+            return d
     return f
 
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c3f820b975..ab1571b27b 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             directory_handler = self.hs.get_handlers().directory_handler
             yield directory_handler.create_association(
-                user_id=user_id,
+                requester=requester,
                 room_id=room_id,
                 room_alias=room_alias,
                 servers=[self.hs.hostname],
+                send_event=False,
             )
 
         preset_config = config.get(
@@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             result["room_alias"] = room_alias.to_string()
             yield directory_handler.send_room_alias_update_event(
-                requester, user_id, room_id
+                requester, room_id
             )
 
         defer.returnValue(result)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import get_localpart_from_id
 from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
         """
         return self.store.search_user_dir(user_id, search_term, limit)
 
-    @defer.inlineCallbacks
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
         if self._is_processing:
             return
 
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
         self._is_processing = True
-        try:
-            yield self._unsafe_process()
-        finally:
-            self._is_processing = False
+        run_as_background_process("user_directory.notify_new_event", process)
 
     @defer.inlineCallbacks
     def handle_local_profile_change(self, user_id, profile):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index fcc02fc77d..24b6110c20 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object):
         Returns:
             Deferred: resolves with the http response object on success.
 
-            Fails with ``HTTPRequestException``: if we get an HTTP response
+            Fails with ``HttpResponseException``: if we get an HTTP response
                 code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -480,7 +480,7 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response
+            Fails with ``HttpResponseException`` if we get an HTTP response
             code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -534,7 +534,7 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response
+            Fails with ``HttpResponseException`` if we get an HTTP response
             code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -589,7 +589,7 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response
+            Fails with ``HttpResponseException`` if we get an HTTP response
             code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -640,7 +640,7 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response
+            Fails with ``HttpResponseException`` if we get an HTTP response
             code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -684,7 +684,7 @@ class MatrixFederationHttpClient(object):
             Deferred: resolves with an (int,dict) tuple of the file length and
             a dict of the response headers.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response code
+            Fails with ``HttpResponseException`` if we get an HTTP response code
             >= 300
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index f51184b50d..943876456b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -53,6 +53,7 @@ REQUIREMENTS = {
     "pillow>=3.1.2": ["PIL"],
     "pydenticon>=0.2": ["pydenticon"],
     "sortedcontainers>=1.4.4": ["sortedcontainers"],
+    "psutil>=2.0.0": ["psutil>=2.0.0"],
     "pysaml2>=3.0.0": ["saml2"],
     "pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
     "msgpack-python>=0.4.2": ["msgpack"],
@@ -79,9 +80,6 @@ CONDITIONAL_REQUIREMENTS = {
     "matrix-synapse-ldap3": {
         "matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"],
     },
-    "psutil": {
-        "psutil>=2.0.0": ["psutil>=2.0.0"],
-    },
     "postgres": {
         "psycopg2>=2.6": ["psycopg2"]
     }
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 97733f3026..0220acf644 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -74,38 +74,11 @@ class ClientDirectoryServer(ClientV1RestServlet):
         if room is None:
             raise SynapseError(400, "Room does not exist")
 
-        dir_handler = self.handlers.directory_handler
+        requester = yield self.auth.get_user_by_req(request)
 
-        try:
-            # try to auth as a user
-            requester = yield self.auth.get_user_by_req(request)
-            try:
-                user_id = requester.user.to_string()
-                yield dir_handler.create_association(
-                    user_id, room_alias, room_id, servers
-                )
-                yield dir_handler.send_room_alias_update_event(
-                    requester,
-                    user_id,
-                    room_id
-                )
-            except SynapseError as e:
-                raise e
-            except Exception:
-                logger.exception("Failed to create association")
-                raise
-        except AuthError:
-            # try to auth as an application service
-            service = yield self.auth.get_appservice_by_req(request)
-            yield dir_handler.create_appservice_association(
-                service, room_alias, room_id, servers
-            )
-            logger.info(
-                "Application service at %s created alias %s pointing to %s",
-                service.url,
-                room_alias.to_string(),
-                room_id
-            )
+        yield self.handlers.directory_handler.create_association(
+            requester, room_alias, room_id, servers
+        )
 
         defer.returnValue((200, {}))
 
@@ -135,7 +108,7 @@ class ClientDirectoryServer(ClientV1RestServlet):
         room_alias = RoomAlias.from_string(room_alias)
 
         yield dir_handler.delete_association(
-            requester, user.to_string(), room_alias
+            requester, room_alias
         )
 
         logger.info(
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index a828ff4438..08b1867fab 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse
 
 import twisted.internet.error
 import twisted.web.http
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.web.resource import Resource
 
 from synapse.api.errors import (
@@ -36,8 +36,8 @@ from synapse.api.errors import (
 )
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
 from synapse.util.async_helpers import Linearizer
-from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.stringutils import is_ascii, random_string
 
@@ -492,10 +492,11 @@ class MediaRepository(object):
         ))
 
         thumbnailer = Thumbnailer(input_path)
-        t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+        t_byte_source = yield logcontext.defer_to_thread(
+            self.hs.get_reactor(),
             self._generate_thumbnail,
             thumbnailer, t_width, t_height, t_method, t_type
-        ))
+        )
 
         if t_byte_source:
             try:
@@ -534,10 +535,11 @@ class MediaRepository(object):
         ))
 
         thumbnailer = Thumbnailer(input_path)
-        t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+        t_byte_source = yield logcontext.defer_to_thread(
+            self.hs.get_reactor(),
             self._generate_thumbnail,
             thumbnailer, t_width, t_height, t_method, t_type
-        ))
+        )
 
         if t_byte_source:
             try:
@@ -620,15 +622,17 @@ class MediaRepository(object):
         for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
             # Generate the thumbnail
             if t_method == "crop":
-                t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+                t_byte_source = yield logcontext.defer_to_thread(
+                    self.hs.get_reactor(),
                     thumbnailer.crop,
                     t_width, t_height, t_type,
-                ))
+                )
             elif t_method == "scale":
-                t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+                t_byte_source = yield logcontext.defer_to_thread(
+                    self.hs.get_reactor(),
                     thumbnailer.scale,
                     t_width, t_height, t_type,
-                ))
+                )
             else:
                 logger.error("Unrecognized method: %r", t_method)
                 continue
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index a6189224ee..896078fe76 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -21,9 +21,10 @@ import sys
 
 import six
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.protocols.basic import FileSender
 
+from synapse.util import logcontext
 from synapse.util.file_consumer import BackgroundFileConsumer
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -64,9 +65,10 @@ class MediaStorage(object):
 
         with self.store_into_file(file_info) as (f, fname, finish_cb):
             # Write to the main repository
-            yield make_deferred_yieldable(threads.deferToThread(
+            yield logcontext.defer_to_thread(
+                self.hs.get_reactor(),
                 _write_file_synchronously, source, f,
-            ))
+            )
             yield finish_cb()
 
         defer.returnValue(fname)
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 7b9f8b4d79..5aa03031f6 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,9 +17,10 @@ import logging
 import os
 import shutil
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 
 from synapse.config._base import Config
+from synapse.util import logcontext
 from synapse.util.logcontext import run_in_background
 
 from .media_storage import FileResponder
@@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider):
         if not os.path.exists(dirname):
             os.makedirs(dirname)
 
-        return threads.deferToThread(
+        return logcontext.defer_to_thread(
+            self.hs.get_reactor(),
             shutil.copyfile, primary_fname, backup_fname,
         )
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index be61147b9b..d9d0255d0b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,7 +18,7 @@ import threading
 import time
 
 from six import PY2, iteritems, iterkeys, itervalues
-from six.moves import intern, range
+from six.moves import builtins, intern, range
 
 from canonicaljson import json
 from prometheus_client import Histogram
@@ -1233,7 +1233,7 @@ def db_to_json(db_content):
 
     # psycopg2 on Python 2 returns buffer objects, which we need to cast to
     # bytes to decode
-    if PY2 and isinstance(db_content, buffer):
+    if PY2 and isinstance(db_content, builtins.buffer):
         db_content = bytes(db_content)
 
     # Decode it to a Unicode string before feeding it to json.loads, so we
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a1331c1a61..8af17921e3 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c7987bfcdd..2743b52bad 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ from ._base import SQLBaseStore
 logger = logging.getLogger(__name__)
 
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 5623391f6e..158e9dbe7b 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -27,7 +27,7 @@ from ._base import SQLBaseStore
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index a3032cdce9..d8bf953ec0 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index f2bde74dc5..625aedc940 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -15,6 +15,8 @@
 
 import logging
 
+from six import integer_types
+
 from sortedcontainers import SortedDict
 
 from synapse.util import caches
@@ -47,7 +49,7 @@ class StreamChangeCache(object):
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) is int or type(stream_pos) is long
+        assert type(stream_pos) in integer_types
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 89224b26cc..4c6e92beb8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
 import logging
 import threading
 
-from twisted.internet import defer
+from twisted.internet import defer, threads
 
 logger = logging.getLogger(__name__)
 
@@ -562,58 +562,76 @@ def _set_context_cb(result, context):
     return result
 
 
-# modules to ignore in `logcontext_tracer`
-_to_ignore = [
-    "synapse.util.logcontext",
-    "synapse.http.server",
-    "synapse.storage._base",
-    "synapse.util.async_helpers",
-]
+def defer_to_thread(reactor, f, *args, **kwargs):
+    """
+    Calls the function `f` using a thread from the reactor's default threadpool and
+    returns the result as a Deferred.
+
+    Creates a new logcontext for `f`, which is created as a child of the current
+    logcontext (so its CPU usage metrics will get attributed to the current
+    logcontext). `f` should preserve the logcontext it is given.
+
+    The result deferred follows the Synapse logcontext rules: you should `yield`
+    on it.
+
+    Args:
+        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+            the Deferred will be invoked, and whose threadpool we should use for the
+            function.
+
+            Normally this will be hs.get_reactor().
+
+        f (callable): The function to call.
 
+        args: positional arguments to pass to f.
 
-def logcontext_tracer(frame, event, arg):
-    """A tracer that logs whenever a logcontext "unexpectedly" changes within
-    a function. Probably inaccurate.
+        kwargs: keyword arguments to pass to f.
 
-    Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
+    Returns:
+        Deferred: A Deferred which fires a callback with the result of `f`, or an
+            errback if `f` throws an exception.
     """
-    if event == 'call':
-        name = frame.f_globals["__name__"]
-        if name.startswith("synapse"):
-            if name == "synapse.util.logcontext":
-                if frame.f_code.co_name in ["__enter__", "__exit__"]:
-                    tracer = frame.f_back.f_trace
-                    if tracer:
-                        tracer.just_changed = True
-
-            tracer = frame.f_trace
-            if tracer:
-                return tracer
-
-            if not any(name.startswith(ig) for ig in _to_ignore):
-                return LineTracer()
-
-
-class LineTracer(object):
-    __slots__ = ["context", "just_changed"]
-
-    def __init__(self):
-        self.context = LoggingContext.current_context()
-        self.just_changed = False
-
-    def __call__(self, frame, event, arg):
-        if event in 'line':
-            if self.just_changed:
-                self.context = LoggingContext.current_context()
-                self.just_changed = False
-            else:
-                c = LoggingContext.current_context()
-                if c != self.context:
-                    logger.info(
-                        "Context changed! %s -> %s, %s, %s",
-                        self.context, c,
-                        frame.f_code.co_filename, frame.f_lineno
-                    )
-                    self.context = c
+    return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
 
-        return self
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+    """
+    A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+    logcontexts correctly.
+
+    Calls the function `f` using a thread from the given threadpool and returns
+    the result as a Deferred.
+
+    Creates a new logcontext for `f`, which is created as a child of the current
+    logcontext (so its CPU usage metrics will get attributed to the current
+    logcontext). `f` should preserve the logcontext it is given.
+
+    The result deferred follows the Synapse logcontext rules: you should `yield`
+    on it.
+
+    Args:
+        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+            the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+        threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+            running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+        f (callable): The function to call.
+
+        args: positional arguments to pass to f.
+
+        kwargs: keyword arguments to pass to f.
+
+    Returns:
+        Deferred: A Deferred which fires a callback with the result of `f`, or an
+            errback if `f` throws an exception.
+    """
+    logcontext = LoggingContext.current_context()
+
+    def g():
+        with LoggingContext(parent_context=logcontext):
+            return f(*args, **kwargs)
+
+    return make_deferred_yieldable(
+        threads.deferToThreadPool(reactor, threadpool, g)
+    )