summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py43
-rw-r--r--synapse/config/__main__.py2
-rw-r--r--synapse/config/_base.py121
-rw-r--r--synapse/config/emailconfig.py40
-rw-r--r--synapse/handlers/auth.py18
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/groups_local.py18
-rw-r--r--synapse/handlers/user_directory.py14
-rw-r--r--synapse/http/matrixfederationclient.py12
-rw-r--r--synapse/http/request_metrics.py31
-rw-r--r--synapse/notifier.py6
-rw-r--r--synapse/push/mailer.py8
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/rest/media/v1/media_repository.py24
-rw-r--r--synapse/rest/media/v1/media_storage.py8
-rw-r--r--synapse/rest/media/v1/storage_provider.py6
-rw-r--r--synapse/storage/_base.py4
-rw-r--r--synapse/storage/keys.py2
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/signatures.py2
-rw-r--r--synapse/storage/transactions.py2
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/logcontext.py120
-rw-r--r--synapse/util/manhole.py6
24 files changed, 271 insertions, 230 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e3f0d99a3f..0b85b377e3 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,7 @@ import sys
 
 from six import iteritems
 
+import psutil
 from prometheus_client import Gauge
 
 from twisted.application import service
@@ -502,7 +503,6 @@ def run(hs):
 
     def performance_stats_init():
         try:
-            import psutil
             process = psutil.Process()
             # Ensure we can fetch both, and make the initial request for cpu_percent
             # so the next request will use this as the initial point.
@@ -510,12 +510,9 @@ def run(hs):
             process.cpu_percent(interval=None)
             logger.info("report_stats can use psutil")
             stats_process.append(process)
-        except (ImportError, AttributeError):
-            logger.warn(
-                "report_stats enabled but psutil is not installed or incorrect version."
-                " Disabling reporting of memory/cpu stats."
-                " Ensuring psutil is available will help matrix.org track performance"
-                " changes across releases."
+        except (AttributeError):
+            logger.warning(
+                "Unable to read memory/cpu stats. Disabling reporting."
             )
 
     def generate_user_daily_visit_stats():
@@ -530,10 +527,13 @@ def run(hs):
     clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
 
     # monthly active user limiting functionality
-    clock.looping_call(
-        hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
-    )
-    hs.get_datastore().reap_monthly_active_users()
+    def reap_monthly_active_users():
+        return run_as_background_process(
+            "reap_monthly_active_users",
+            hs.get_datastore().reap_monthly_active_users,
+        )
+    clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
+    reap_monthly_active_users()
 
     @defer.inlineCallbacks
     def generate_monthly_active_users():
@@ -547,12 +547,23 @@ def run(hs):
         registered_reserved_users_mau_gauge.set(float(reserved_count))
         max_mau_gauge.set(float(hs.config.max_mau_value))
 
-    hs.get_datastore().initialise_reserved_users(
-        hs.config.mau_limits_reserved_threepids
+    def start_generate_monthly_active_users():
+        return run_as_background_process(
+            "generate_monthly_active_users",
+            generate_monthly_active_users,
+        )
+
+    # XXX is this really supposed to be a background process? it looks
+    # like it needs to complete before some of the other stuff runs.
+    run_as_background_process(
+        "initialise_reserved_users",
+        hs.get_datastore().initialise_reserved_users,
+        hs.config.mau_limits_reserved_threepids,
     )
-    generate_monthly_active_users()
+
+    start_generate_monthly_active_users()
     if hs.config.limit_usage_by_mau:
-        clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+        clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
     # End of monthly active user settings
 
     if hs.config.report_stats:
@@ -568,7 +579,7 @@ def run(hs):
         clock.call_later(5 * 60, start_phone_stats_home)
 
     if hs.config.daemonize and hs.config.print_pidfile:
-        print (hs.config.pid_file)
+        print(hs.config.pid_file)
 
     _base.start_reactor(
         "synapse-homeserver",
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 8fccf573ee..79fe9c3dac 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -28,7 +28,7 @@ if __name__ == "__main__":
             sys.stderr.write("\n" + str(e) + "\n")
             sys.exit(1)
 
-        print (getattr(config, key))
+        print(getattr(config, key))
         sys.exit(0)
     else:
         sys.stderr.write("Unknown command %r\n" % (action,))
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 3d2e90dd5b..14dae65ea0 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -106,10 +106,7 @@ class Config(object):
     @classmethod
     def check_file(cls, file_path, config_name):
         if file_path is None:
-            raise ConfigError(
-                "Missing config for %s."
-                % (config_name,)
-            )
+            raise ConfigError("Missing config for %s." % (config_name,))
         try:
             os.stat(file_path)
         except OSError as e:
@@ -128,9 +125,7 @@ class Config(object):
             if e.errno != errno.EEXIST:
                 raise
         if not os.path.isdir(dir_path):
-            raise ConfigError(
-                "%s is not a directory" % (dir_path,)
-            )
+            raise ConfigError("%s is not a directory" % (dir_path,))
         return dir_path
 
     @classmethod
@@ -156,21 +151,20 @@ class Config(object):
         return results
 
     def generate_config(
-            self,
-            config_dir_path,
-            server_name,
-            is_generating_file,
-            report_stats=None,
+        self, config_dir_path, server_name, is_generating_file, report_stats=None
     ):
         default_config = "# vim:ft=yaml\n"
 
-        default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all(
-            "default_config",
-            config_dir_path=config_dir_path,
-            server_name=server_name,
-            is_generating_file=is_generating_file,
-            report_stats=report_stats,
-        ))
+        default_config += "\n\n".join(
+            dedent(conf)
+            for conf in self.invoke_all(
+                "default_config",
+                config_dir_path=config_dir_path,
+                server_name=server_name,
+                is_generating_file=is_generating_file,
+                report_stats=report_stats,
+            )
+        )
 
         config = yaml.load(default_config)
 
@@ -178,23 +172,22 @@ class Config(object):
 
     @classmethod
     def load_config(cls, description, argv):
-        config_parser = argparse.ArgumentParser(
-            description=description,
-        )
+        config_parser = argparse.ArgumentParser(description=description)
         config_parser.add_argument(
-            "-c", "--config-path",
+            "-c",
+            "--config-path",
             action="append",
             metavar="CONFIG_FILE",
             help="Specify config file. Can be given multiple times and"
-                 " may specify directories containing *.yaml files."
+            " may specify directories containing *.yaml files.",
         )
 
         config_parser.add_argument(
             "--keys-directory",
             metavar="DIRECTORY",
             help="Where files such as certs and signing keys are stored when"
-                 " their location is given explicitly in the config."
-                 " Defaults to the directory containing the last config file",
+            " their location is given explicitly in the config."
+            " Defaults to the directory containing the last config file",
         )
 
         config_args = config_parser.parse_args(argv)
@@ -203,9 +196,7 @@ class Config(object):
 
         obj = cls()
         obj.read_config_files(
-            config_files,
-            keys_directory=config_args.keys_directory,
-            generate_keys=False,
+            config_files, keys_directory=config_args.keys_directory, generate_keys=False
         )
         return obj
 
@@ -213,38 +204,38 @@ class Config(object):
     def load_or_generate_config(cls, description, argv):
         config_parser = argparse.ArgumentParser(add_help=False)
         config_parser.add_argument(
-            "-c", "--config-path",
+            "-c",
+            "--config-path",
             action="append",
             metavar="CONFIG_FILE",
             help="Specify config file. Can be given multiple times and"
-                 " may specify directories containing *.yaml files."
+            " may specify directories containing *.yaml files.",
         )
         config_parser.add_argument(
             "--generate-config",
             action="store_true",
-            help="Generate a config file for the server name"
+            help="Generate a config file for the server name",
         )
         config_parser.add_argument(
             "--report-stats",
             action="store",
             help="Whether the generated config reports anonymized usage statistics",
-            choices=["yes", "no"]
+            choices=["yes", "no"],
         )
         config_parser.add_argument(
             "--generate-keys",
             action="store_true",
-            help="Generate any missing key files then exit"
+            help="Generate any missing key files then exit",
         )
         config_parser.add_argument(
             "--keys-directory",
             metavar="DIRECTORY",
             help="Used with 'generate-*' options to specify where files such as"
-                 " certs and signing keys should be stored in, unless explicitly"
-                 " specified in the config."
+            " certs and signing keys should be stored in, unless explicitly"
+            " specified in the config.",
         )
         config_parser.add_argument(
-            "-H", "--server-name",
-            help="The server name to generate a config file for"
+            "-H", "--server-name", help="The server name to generate a config file for"
         )
         config_args, remaining_args = config_parser.parse_known_args(argv)
 
@@ -257,8 +248,8 @@ class Config(object):
         if config_args.generate_config:
             if config_args.report_stats is None:
                 config_parser.error(
-                    "Please specify either --report-stats=yes or --report-stats=no\n\n" +
-                    MISSING_REPORT_STATS_SPIEL
+                    "Please specify either --report-stats=yes or --report-stats=no\n\n"
+                    + MISSING_REPORT_STATS_SPIEL
                 )
             if not config_files:
                 config_parser.error(
@@ -287,26 +278,32 @@ class Config(object):
                         config_dir_path=config_dir_path,
                         server_name=server_name,
                         report_stats=(config_args.report_stats == "yes"),
-                        is_generating_file=True
+                        is_generating_file=True,
                     )
                     obj.invoke_all("generate_files", config)
                     config_file.write(config_str)
-                print((
-                    "A config file has been generated in %r for server name"
-                    " %r with corresponding SSL keys and self-signed"
-                    " certificates. Please review this file and customise it"
-                    " to your needs."
-                ) % (config_path, server_name))
+                print(
+                    (
+                        "A config file has been generated in %r for server name"
+                        " %r with corresponding SSL keys and self-signed"
+                        " certificates. Please review this file and customise it"
+                        " to your needs."
+                    )
+                    % (config_path, server_name)
+                )
                 print(
                     "If this server name is incorrect, you will need to"
                     " regenerate the SSL certificates"
                 )
                 return
             else:
-                print((
-                    "Config file %r already exists. Generating any missing key"
-                    " files."
-                ) % (config_path,))
+                print(
+                    (
+                        "Config file %r already exists. Generating any missing key"
+                        " files."
+                    )
+                    % (config_path,)
+                )
                 generate_keys = True
 
         parser = argparse.ArgumentParser(
@@ -338,8 +335,7 @@ class Config(object):
 
         return obj
 
-    def read_config_files(self, config_files, keys_directory=None,
-                          generate_keys=False):
+    def read_config_files(self, config_files, keys_directory=None, generate_keys=False):
         if not keys_directory:
             keys_directory = os.path.dirname(config_files[-1])
 
@@ -364,8 +360,9 @@ class Config(object):
 
         if "report_stats" not in config:
             raise ConfigError(
-                MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
-                MISSING_REPORT_STATS_SPIEL
+                MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS
+                + "\n"
+                + MISSING_REPORT_STATS_SPIEL
             )
 
         if generate_keys:
@@ -399,16 +396,16 @@ def find_config_files(search_paths):
                 for entry in os.listdir(config_path):
                     entry_path = os.path.join(config_path, entry)
                     if not os.path.isfile(entry_path):
-                        print (
-                            "Found subdirectory in config directory: %r. IGNORING."
-                        ) % (entry_path, )
+                        err = "Found subdirectory in config directory: %r. IGNORING."
+                        print(err % (entry_path,))
                         continue
 
                     if not entry.endswith(".yaml"):
-                        print (
-                            "Found file in config directory that does not"
-                            " end in '.yaml': %r. IGNORING."
-                        ) % (entry_path, )
+                        err = (
+                            "Found file in config directory that does not end in "
+                            "'.yaml': %r. IGNORING."
+                        )
+                        print(err % (entry_path,))
                         continue
 
                     files.append(entry_path)
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index e2582cfecc..93d70cff14 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -19,18 +19,12 @@ from __future__ import print_function
 import email.utils
 import logging
 import os
-import sys
-import textwrap
 
-from ._base import Config
+import pkg_resources
 
-logger = logging.getLogger(__name__)
+from ._base import Config, ConfigError
 
-TEMPLATE_DIR_WARNING = """\
-WARNING: The email notifier is configured to look for templates in '%(template_dir)s',
-but no templates could be found there. We will fall back to using the example templates;
-to get rid of this warning, leave 'email.template_dir' unset.
-"""
+logger = logging.getLogger(__name__)
 
 
 class EmailConfig(Config):
@@ -78,20 +72,22 @@ class EmailConfig(Config):
             self.email_notif_template_html = email_config["notif_template_html"]
             self.email_notif_template_text = email_config["notif_template_text"]
 
-            self.email_template_dir = email_config.get("template_dir")
-
-            # backwards-compatibility hack
-            if (
-                self.email_template_dir == "res/templates"
-                and not os.path.isfile(
-                    os.path.join(self.email_template_dir, self.email_notif_template_text)
+            template_dir = email_config.get("template_dir")
+            # we need an absolute path, because we change directory after starting (and
+            # we don't yet know what auxilliary templates like mail.css we will need).
+            # (Note that loading as package_resources with jinja.PackageLoader doesn't
+            # work for the same reason.)
+            if not template_dir:
+                template_dir = pkg_resources.resource_filename(
+                    'synapse', 'res/templates'
                 )
-            ):
-                t = TEMPLATE_DIR_WARNING % {
-                    "template_dir": self.email_template_dir,
-                }
-                print(textwrap.fill(t, width=80) + "\n", file=sys.stderr)
-                self.email_template_dir = None
+            template_dir = os.path.abspath(template_dir)
+
+            for f in self.email_notif_template_text, self.email_notif_template_html:
+                p = os.path.join(template_dir, f)
+                if not os.path.isfile(p):
+                    raise ConfigError("Unable to find email template file %s" % (p, ))
+            self.email_template_dir = template_dir
 
             self.email_notif_for_new_users = email_config.get(
                 "notif_for_new_users", True
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
 import pymacaroons
 from canonicaljson import json
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.web.client import PartialDownloadError
 
 import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
+from synapse.util import logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
 
 from ._base import BaseHandler
 
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
                 bcrypt.gensalt(self.bcrypt_rounds),
             ).decode('ascii')
 
-        return make_deferred_yieldable(
-            threads.deferToThreadPool(
-                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
-            ),
-        )
+        return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
             if not isinstance(stored_hash, bytes):
                 stored_hash = stored_hash.encode('ascii')
 
-            return make_deferred_yieldable(
-                threads.deferToThreadPool(
-                    self.hs.get_reactor(),
-                    self.hs.get_reactor().getThreadPool(),
-                    _do_validate_hash,
-                ),
-            )
+            return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
         else:
             return defer.succeed(False)
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
 
 from ._base import BaseHandler
 
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
             None
         """
         if not self._user_parter_running:
-            run_in_background(self._user_parter_loop)
+            run_as_background_process("user_parter_loop", self._user_parter_loop)
 
     @defer.inlineCallbacks
     def _user_parter_loop(self):
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.types import get_domain_from_id
 
 logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
             )
         else:
             destination = get_domain_from_id(group_id)
-            return getattr(self.transport_client, func_name)(
+            d = getattr(self.transport_client, func_name)(
                 destination, group_id, *args, **kwargs
             )
+
+            # Capture errors returned by the remote homeserver and
+            # re-throw specific errors as SynapseErrors. This is so
+            # when the remote end responds with things like 403 Not
+            # In Group, we can communicate that to the client instead
+            # of a 500.
+            def h(failure):
+                failure.trap(HttpResponseException)
+                e = failure.value
+                if e.code == 403:
+                    raise e.to_synapse_error()
+                return failure
+            d.addErrback(h)
+            return d
     return f
 
 
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import get_localpart_from_id
 from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
         """
         return self.store.search_user_dir(user_id, search_term, limit)
 
-    @defer.inlineCallbacks
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
         if self._is_processing:
             return
 
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
         self._is_processing = True
-        try:
-            yield self._unsafe_process()
-        finally:
-            self._is_processing = False
+        run_as_background_process("user_directory.notify_new_event", process)
 
     @defer.inlineCallbacks
     def handle_local_profile_change(self, user_id, profile):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index fcc02fc77d..24b6110c20 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object):
         Returns:
             Deferred: resolves with the http response object on success.
 
-            Fails with ``HTTPRequestException``: if we get an HTTP response
+            Fails with ``HttpResponseException``: if we get an HTTP response
                 code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -480,7 +480,7 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response
+            Fails with ``HttpResponseException`` if we get an HTTP response
             code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -534,7 +534,7 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response
+            Fails with ``HttpResponseException`` if we get an HTTP response
             code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -589,7 +589,7 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response
+            Fails with ``HttpResponseException`` if we get an HTTP response
             code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -640,7 +640,7 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response
+            Fails with ``HttpResponseException`` if we get an HTTP response
             code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -684,7 +684,7 @@ class MatrixFederationHttpClient(object):
             Deferred: resolves with an (int,dict) tuple of the file length and
             a dict of the response headers.
 
-            Fails with ``HTTPRequestException`` if we get an HTTP response code
+            Fails with ``HttpResponseException`` if we get an HTTP response code
             >= 300
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index fedb4e6b18..62045a918b 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -39,7 +39,8 @@ outgoing_responses_counter = Counter(
 )
 
 response_timer = Histogram(
-    "synapse_http_server_response_time_seconds", "sec",
+    "synapse_http_server_response_time_seconds",
+    "sec",
     ["method", "servlet", "tag", "code"],
 )
 
@@ -79,15 +80,11 @@ response_size = Counter(
 # than when the response was written.
 
 in_flight_requests_ru_utime = Counter(
-    "synapse_http_server_in_flight_requests_ru_utime_seconds",
-    "",
-    ["method", "servlet"],
+    "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]
 )
 
 in_flight_requests_ru_stime = Counter(
-    "synapse_http_server_in_flight_requests_ru_stime_seconds",
-    "",
-    ["method", "servlet"],
+    "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]
 )
 
 in_flight_requests_db_txn_count = Counter(
@@ -134,7 +131,7 @@ def _get_in_flight_counts():
     # type
     counts = {}
     for rm in reqs:
-        key = (rm.method, rm.name,)
+        key = (rm.method, rm.name)
         counts[key] = counts.get(key, 0) + 1
 
     return counts
@@ -175,7 +172,8 @@ class RequestMetrics(object):
             if context != self.start_context:
                 logger.warn(
                     "Context have unexpectedly changed %r, %r",
-                    context, self.start_context
+                    context,
+                    self.start_context,
                 )
                 return
 
@@ -192,10 +190,10 @@ class RequestMetrics(object):
         resource_usage = context.get_resource_usage()
 
         response_ru_utime.labels(self.method, self.name, tag).inc(
-            resource_usage.ru_utime,
+            resource_usage.ru_utime
         )
         response_ru_stime.labels(self.method, self.name, tag).inc(
-            resource_usage.ru_stime,
+            resource_usage.ru_stime
         )
         response_db_txn_count.labels(self.method, self.name, tag).inc(
             resource_usage.db_txn_count
@@ -222,8 +220,15 @@ class RequestMetrics(object):
         diff = new_stats - self._request_stats
         self._request_stats = new_stats
 
-        in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
-        in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
+        # max() is used since rapid use of ru_stime/ru_utime can end up with the
+        # count going backwards due to NTP, time smearing, fine-grained
+        # correction, or floating points. Who knows, really?
+        in_flight_requests_ru_utime.labels(self.method, self.name).inc(
+            max(diff.ru_utime, 0)
+        )
+        in_flight_requests_ru_stime.labels(self.method, self.name).inc(
+            max(diff.ru_stime, 0)
+        )
 
         in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
             diff.db_txn_count
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 340b16ce25..de02b1017e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -186,9 +186,9 @@ class Notifier(object):
         def count_listeners():
             all_user_streams = set()
 
-            for x in self.room_to_user_streams.values():
+            for x in list(self.room_to_user_streams.values()):
                 all_user_streams |= x
-            for x in self.user_to_user_stream.values():
+            for x in list(self.user_to_user_stream.values()):
                 all_user_streams.add(x)
 
             return sum(stream.count_listeners() for stream in all_user_streams)
@@ -196,7 +196,7 @@ class Notifier(object):
 
         LaterGauge(
             "synapse_notifier_rooms", "", [],
-            lambda: count(bool, self.room_to_user_streams.values()),
+            lambda: count(bool, list(self.room_to_user_streams.values())),
         )
         LaterGauge(
             "synapse_notifier_users", "", [],
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index b9dcfee740..16fb5e8471 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -526,12 +526,8 @@ def load_jinja2_templates(config):
     Returns:
         (notif_template_html, notif_template_text)
     """
-    logger.info("loading jinja2")
-
-    if config.email_template_dir:
-        loader = jinja2.FileSystemLoader(config.email_template_dir)
-    else:
-        loader = jinja2.PackageLoader('synapse', 'res/templates')
+    logger.info("loading email templates from '%s'", config.email_template_dir)
+    loader = jinja2.FileSystemLoader(config.email_template_dir)
     env = jinja2.Environment(loader=loader)
     env.filters["format_ts"] = format_ts_filter
     env.filters["mxc_to_http"] = _create_mxc_to_http_filter(config)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index f51184b50d..943876456b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -53,6 +53,7 @@ REQUIREMENTS = {
     "pillow>=3.1.2": ["PIL"],
     "pydenticon>=0.2": ["pydenticon"],
     "sortedcontainers>=1.4.4": ["sortedcontainers"],
+    "psutil>=2.0.0": ["psutil>=2.0.0"],
     "pysaml2>=3.0.0": ["saml2"],
     "pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
     "msgpack-python>=0.4.2": ["msgpack"],
@@ -79,9 +80,6 @@ CONDITIONAL_REQUIREMENTS = {
     "matrix-synapse-ldap3": {
         "matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"],
     },
-    "psutil": {
-        "psutil>=2.0.0": ["psutil>=2.0.0"],
-    },
     "postgres": {
         "psycopg2>=2.6": ["psycopg2"]
     }
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index a828ff4438..08b1867fab 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse
 
 import twisted.internet.error
 import twisted.web.http
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.web.resource import Resource
 
 from synapse.api.errors import (
@@ -36,8 +36,8 @@ from synapse.api.errors import (
 )
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
 from synapse.util.async_helpers import Linearizer
-from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.stringutils import is_ascii, random_string
 
@@ -492,10 +492,11 @@ class MediaRepository(object):
         ))
 
         thumbnailer = Thumbnailer(input_path)
-        t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+        t_byte_source = yield logcontext.defer_to_thread(
+            self.hs.get_reactor(),
             self._generate_thumbnail,
             thumbnailer, t_width, t_height, t_method, t_type
-        ))
+        )
 
         if t_byte_source:
             try:
@@ -534,10 +535,11 @@ class MediaRepository(object):
         ))
 
         thumbnailer = Thumbnailer(input_path)
-        t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+        t_byte_source = yield logcontext.defer_to_thread(
+            self.hs.get_reactor(),
             self._generate_thumbnail,
             thumbnailer, t_width, t_height, t_method, t_type
-        ))
+        )
 
         if t_byte_source:
             try:
@@ -620,15 +622,17 @@ class MediaRepository(object):
         for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
             # Generate the thumbnail
             if t_method == "crop":
-                t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+                t_byte_source = yield logcontext.defer_to_thread(
+                    self.hs.get_reactor(),
                     thumbnailer.crop,
                     t_width, t_height, t_type,
-                ))
+                )
             elif t_method == "scale":
-                t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+                t_byte_source = yield logcontext.defer_to_thread(
+                    self.hs.get_reactor(),
                     thumbnailer.scale,
                     t_width, t_height, t_type,
-                ))
+                )
             else:
                 logger.error("Unrecognized method: %r", t_method)
                 continue
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index a6189224ee..896078fe76 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -21,9 +21,10 @@ import sys
 
 import six
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.protocols.basic import FileSender
 
+from synapse.util import logcontext
 from synapse.util.file_consumer import BackgroundFileConsumer
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -64,9 +65,10 @@ class MediaStorage(object):
 
         with self.store_into_file(file_info) as (f, fname, finish_cb):
             # Write to the main repository
-            yield make_deferred_yieldable(threads.deferToThread(
+            yield logcontext.defer_to_thread(
+                self.hs.get_reactor(),
                 _write_file_synchronously, source, f,
-            ))
+            )
             yield finish_cb()
 
         defer.returnValue(fname)
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 7b9f8b4d79..5aa03031f6 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,9 +17,10 @@ import logging
 import os
 import shutil
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 
 from synapse.config._base import Config
+from synapse.util import logcontext
 from synapse.util.logcontext import run_in_background
 
 from .media_storage import FileResponder
@@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider):
         if not os.path.exists(dirname):
             os.makedirs(dirname)
 
-        return threads.deferToThread(
+        return logcontext.defer_to_thread(
+            self.hs.get_reactor(),
             shutil.copyfile, primary_fname, backup_fname,
         )
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index be61147b9b..d9d0255d0b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,7 +18,7 @@ import threading
 import time
 
 from six import PY2, iteritems, iterkeys, itervalues
-from six.moves import intern, range
+from six.moves import builtins, intern, range
 
 from canonicaljson import json
 from prometheus_client import Histogram
@@ -1233,7 +1233,7 @@ def db_to_json(db_content):
 
     # psycopg2 on Python 2 returns buffer objects, which we need to cast to
     # bytes to decode
-    if PY2 and isinstance(db_content, buffer):
+    if PY2 and isinstance(db_content, builtins.buffer):
         db_content = bytes(db_content)
 
     # Decode it to a Unicode string before feeding it to json.loads, so we
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a1331c1a61..8af17921e3 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c7987bfcdd..2743b52bad 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ from ._base import SQLBaseStore
 logger = logging.getLogger(__name__)
 
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 5623391f6e..158e9dbe7b 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -27,7 +27,7 @@ from ._base import SQLBaseStore
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index a3032cdce9..d8bf953ec0 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index f2bde74dc5..625aedc940 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -15,6 +15,8 @@
 
 import logging
 
+from six import integer_types
+
 from sortedcontainers import SortedDict
 
 from synapse.util import caches
@@ -47,7 +49,7 @@ class StreamChangeCache(object):
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) is int or type(stream_pos) is long
+        assert type(stream_pos) in integer_types
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 89224b26cc..4c6e92beb8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
 import logging
 import threading
 
-from twisted.internet import defer
+from twisted.internet import defer, threads
 
 logger = logging.getLogger(__name__)
 
@@ -562,58 +562,76 @@ def _set_context_cb(result, context):
     return result
 
 
-# modules to ignore in `logcontext_tracer`
-_to_ignore = [
-    "synapse.util.logcontext",
-    "synapse.http.server",
-    "synapse.storage._base",
-    "synapse.util.async_helpers",
-]
+def defer_to_thread(reactor, f, *args, **kwargs):
+    """
+    Calls the function `f` using a thread from the reactor's default threadpool and
+    returns the result as a Deferred.
+
+    Creates a new logcontext for `f`, which is created as a child of the current
+    logcontext (so its CPU usage metrics will get attributed to the current
+    logcontext). `f` should preserve the logcontext it is given.
+
+    The result deferred follows the Synapse logcontext rules: you should `yield`
+    on it.
+
+    Args:
+        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+            the Deferred will be invoked, and whose threadpool we should use for the
+            function.
+
+            Normally this will be hs.get_reactor().
+
+        f (callable): The function to call.
 
+        args: positional arguments to pass to f.
 
-def logcontext_tracer(frame, event, arg):
-    """A tracer that logs whenever a logcontext "unexpectedly" changes within
-    a function. Probably inaccurate.
+        kwargs: keyword arguments to pass to f.
 
-    Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
+    Returns:
+        Deferred: A Deferred which fires a callback with the result of `f`, or an
+            errback if `f` throws an exception.
     """
-    if event == 'call':
-        name = frame.f_globals["__name__"]
-        if name.startswith("synapse"):
-            if name == "synapse.util.logcontext":
-                if frame.f_code.co_name in ["__enter__", "__exit__"]:
-                    tracer = frame.f_back.f_trace
-                    if tracer:
-                        tracer.just_changed = True
-
-            tracer = frame.f_trace
-            if tracer:
-                return tracer
-
-            if not any(name.startswith(ig) for ig in _to_ignore):
-                return LineTracer()
-
-
-class LineTracer(object):
-    __slots__ = ["context", "just_changed"]
-
-    def __init__(self):
-        self.context = LoggingContext.current_context()
-        self.just_changed = False
-
-    def __call__(self, frame, event, arg):
-        if event in 'line':
-            if self.just_changed:
-                self.context = LoggingContext.current_context()
-                self.just_changed = False
-            else:
-                c = LoggingContext.current_context()
-                if c != self.context:
-                    logger.info(
-                        "Context changed! %s -> %s, %s, %s",
-                        self.context, c,
-                        frame.f_code.co_filename, frame.f_lineno
-                    )
-                    self.context = c
+    return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
 
-        return self
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+    """
+    A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+    logcontexts correctly.
+
+    Calls the function `f` using a thread from the given threadpool and returns
+    the result as a Deferred.
+
+    Creates a new logcontext for `f`, which is created as a child of the current
+    logcontext (so its CPU usage metrics will get attributed to the current
+    logcontext). `f` should preserve the logcontext it is given.
+
+    The result deferred follows the Synapse logcontext rules: you should `yield`
+    on it.
+
+    Args:
+        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+            the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+        threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+            running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+        f (callable): The function to call.
+
+        args: positional arguments to pass to f.
+
+        kwargs: keyword arguments to pass to f.
+
+    Returns:
+        Deferred: A Deferred which fires a callback with the result of `f`, or an
+            errback if `f` throws an exception.
+    """
+    logcontext = LoggingContext.current_context()
+
+    def g():
+        with LoggingContext(parent_context=logcontext):
+            return f(*args, **kwargs)
+
+    return make_deferred_yieldable(
+        threads.deferToThreadPool(reactor, threadpool, g)
+    )
diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 8d0f2a8918..9cb7e9c9ab 100644
--- a/synapse/util/manhole.py
+++ b/synapse/util/manhole.py
@@ -70,6 +70,8 @@ def manhole(username, password, globals):
     Returns:
         twisted.internet.protocol.Factory: A factory to pass to ``listenTCP``
     """
+    if not isinstance(password, bytes):
+        password = password.encode('ascii')
 
     checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(
         **{username: password}
@@ -82,7 +84,7 @@ def manhole(username, password, globals):
     )
 
     factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
-    factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY)
-    factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY)
+    factory.publicKeys[b'ssh-rsa'] = Key.fromString(PUBLIC_KEY)
+    factory.privateKeys[b'ssh-rsa'] = Key.fromString(PRIVATE_KEY)
 
     return factory