summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py32
-rw-r--r--synapse/util/async.py186
-rw-r--r--synapse/util/caches/__init__.py16
-rw-r--r--synapse/util/caches/descriptors.py26
-rw-r--r--synapse/util/caches/dictionary_cache.py34
-rw-r--r--synapse/util/caches/expiringcache.py11
-rw-r--r--synapse/util/caches/lrucache.py2
-rw-r--r--synapse/util/caches/stream_change_cache.py61
-rw-r--r--synapse/util/distributor.py48
-rw-r--r--synapse/util/file_consumer.py20
-rw-r--r--synapse/util/frozenutils.py6
-rw-r--r--synapse/util/httpresourcetree.py4
-rw-r--r--synapse/util/logcontext.py187
-rw-r--r--synapse/util/logformatter.py3
-rw-r--r--synapse/util/logutils.py8
-rw-r--r--synapse/util/manhole.py6
-rw-r--r--synapse/util/metrics.py40
-rw-r--r--synapse/util/msisdn.py1
-rw-r--r--synapse/util/ratelimitutils.py47
-rw-r--r--synapse/util/retryutils.py9
-rw-r--r--synapse/util/rlimit.py3
-rw-r--r--synapse/util/stringutils.py1
-rw-r--r--synapse/util/versionstring.py4
23 files changed, 435 insertions, 320 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index fc11e26623..680ea928c7 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,14 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.logcontext import PreserveLoggingContext
+import logging
+from itertools import islice
 
-from twisted.internet import defer, reactor, task
+import attr
 
-import time
-import logging
+from twisted.internet import defer, task
 
-from itertools import islice
+from synapse.util.logcontext import PreserveLoggingContext
 
 logger = logging.getLogger(__name__)
 
@@ -31,16 +31,27 @@ def unwrapFirstError(failure):
     return failure.value.subFailure
 
 
+@attr.s
 class Clock(object):
-    """A small utility that obtains current time-of-day so that time may be
-    mocked during unit-tests.
+    """
+    A Clock wraps a Twisted reactor and provides utilities on top of it.
 
-    TODO(paul): Also move the sleep() functionality into it
+    Args:
+        reactor: The Twisted reactor to use.
     """
+    _reactor = attr.ib()
+
+    @defer.inlineCallbacks
+    def sleep(self, seconds):
+        d = defer.Deferred()
+        with PreserveLoggingContext():
+            self._reactor.callLater(seconds, d.callback, seconds)
+            res = yield d
+        defer.returnValue(res)
 
     def time(self):
         """Returns the current system time in seconds since epoch."""
-        return time.time()
+        return self._reactor.seconds()
 
     def time_msec(self):
         """Returns the current system time in miliseconds since epoch."""
@@ -56,6 +67,7 @@ class Clock(object):
             msec(float): How long to wait between calls in milliseconds.
         """
         call = task.LoopingCall(f)
+        call.clock = self._reactor
         call.start(msec / 1000.0, now=False)
         return call
 
@@ -73,7 +85,7 @@ class Clock(object):
                 callback(*args, **kwargs)
 
         with PreserveLoggingContext():
-            return reactor.callLater(delay, wrapped_callback, *args, **kwargs)
+            return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
 
     def cancel_call_later(self, timer, ignore_errs=False):
         try:
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 9dd4e6b5bc..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,42 +13,27 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import collections
+import logging
+from contextlib import contextmanager
 
+from six.moves import range
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
 
+from synapse.util import Clock, logcontext, unwrapFirstError
+
 from .logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable, run_in_background
+    PreserveLoggingContext,
+    make_deferred_yieldable,
+    run_in_background,
 )
-from synapse.util import logcontext, unwrapFirstError
-
-from contextlib import contextmanager
-
-import logging
-
-from six.moves import range
 
 logger = logging.getLogger(__name__)
 
 
-@defer.inlineCallbacks
-def sleep(seconds):
-    d = defer.Deferred()
-    with PreserveLoggingContext():
-        reactor.callLater(seconds, d.callback, seconds)
-        res = yield d
-    defer.returnValue(res)
-
-
-def run_on_reactor():
-    """ This will cause the rest of the function to be invoked upon the next
-    iteration of the main loop
-    """
-    return sleep(0)
-
-
 class ObservableDeferred(object):
     """Wraps a deferred object so that we can add observer deferreds. These
     observer deferreds do not affect the callback chain of the original
@@ -171,49 +157,72 @@ def concurrently_execute(func, args, limit):
 
 
 class Linearizer(object):
-    """Linearizes access to resources based on a key. Useful to ensure only one
-    thing is happening at a time on a given resource.
+    """Limits concurrent access to resources based on a key. Useful to ensure
+    only a few things happen at a time on a given resource.
 
     Example:
 
-        with (yield linearizer.queue("test_key")):
+        with (yield limiter.queue("test_key")):
             # do some work.
 
     """
-    def __init__(self, name=None):
+    def __init__(self, name=None, max_count=1, clock=None):
+        """
+        Args:
+            max_count(int): The maximum number of concurrent accesses
+        """
         if name is None:
             self.name = id(self)
         else:
             self.name = name
+
+        if not clock:
+            from twisted.internet import reactor
+            clock = Clock(reactor)
+        self._clock = clock
+        self.max_count = max_count
+
+        # key_to_defer is a map from the key to a 2 element list where
+        # the first element is the number of things executing, and
+        # the second element is an OrderedDict, where the keys are deferreds for the
+        # things blocked from executing.
         self.key_to_defer = {}
 
     @defer.inlineCallbacks
     def queue(self, key):
-        # If there is already a deferred in the queue, we pull it out so that
-        # we can wait on it later.
-        # Then we replace it with a deferred that we resolve *after* the
-        # context manager has exited.
-        # We only return the context manager after the previous deferred has
-        # resolved.
-        # This all has the net effect of creating a chain of deferreds that
-        # wait for the previous deferred before starting their work.
-        current_defer = self.key_to_defer.get(key)
+        entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
 
-        new_defer = defer.Deferred()
-        self.key_to_defer[key] = new_defer
+        # If the number of things executing is greater than the maximum
+        # then add a deferred to the list of blocked items
+        # When on of the things currently executing finishes it will callback
+        # this item so that it can continue executing.
+        if entry[0] >= self.max_count:
+            new_defer = defer.Deferred()
+            entry[1][new_defer] = 1
 
-        if current_defer:
             logger.info(
-                "Waiting to acquire linearizer lock %r for key %r", self.name, key
+                "Waiting to acquire linearizer lock %r for key %r", self.name, key,
             )
             try:
-                with PreserveLoggingContext():
-                    yield current_defer
-            except Exception:
-                logger.exception("Unexpected exception in Linearizer")
-
-            logger.info("Acquired linearizer lock %r for key %r", self.name,
-                        key)
+                yield make_deferred_yieldable(new_defer)
+            except Exception as e:
+                if isinstance(e, CancelledError):
+                    logger.info(
+                        "Cancelling wait for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+                else:
+                    logger.warn(
+                        "Unexpected exception waiting for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+
+                # we just have to take ourselves back out of the queue.
+                del entry[1][new_defer]
+                raise
+
+            logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+            entry[0] += 1
 
             # if the code holding the lock completes synchronously, then it
             # will recursively run the next claimant on the list. That can
@@ -223,15 +232,15 @@ class Linearizer(object):
             # In order to break the cycle, we add a cheeky sleep(0) here to
             # ensure that we fall back to the reactor between each iteration.
             #
-            # (There's no particular need for it to happen before we return
-            # the context manager, but it needs to happen while we hold the
-            # lock, and the context manager's exit code must be synchronous,
-            # so actually this is the only sensible place.
-            yield run_on_reactor()
+            # (This needs to happen while we hold the lock, and the context manager's exit
+            # code must be synchronous, so this is the only sensible place.)
+            yield self._clock.sleep(0)
 
         else:
-            logger.info("Acquired uncontended linearizer lock %r for key %r",
-                        self.name, key)
+            logger.info(
+                "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+            )
+            entry[0] += 1
 
         @contextmanager
         def _ctx_manager():
@@ -239,73 +248,15 @@ class Linearizer(object):
                 yield
             finally:
                 logger.info("Releasing linearizer lock %r for key %r", self.name, key)
-                with PreserveLoggingContext():
-                    new_defer.callback(None)
-                current_d = self.key_to_defer.get(key)
-                if current_d is new_defer:
-                    self.key_to_defer.pop(key, None)
-
-        defer.returnValue(_ctx_manager())
-
-
-class Limiter(object):
-    """Limits concurrent access to resources based on a key. Useful to ensure
-    only a few thing happen at a time on a given resource.
-
-    Example:
-
-        with (yield limiter.queue("test_key")):
-            # do some work.
-
-    """
-    def __init__(self, max_count):
-        """
-        Args:
-            max_count(int): The maximum number of concurrent access
-        """
-        self.max_count = max_count
-
-        # key_to_defer is a map from the key to a 2 element list where
-        # the first element is the number of things executing
-        # the second element is a list of deferreds for the things blocked from
-        # executing.
-        self.key_to_defer = {}
-
-    @defer.inlineCallbacks
-    def queue(self, key):
-        entry = self.key_to_defer.setdefault(key, [0, []])
-
-        # If the number of things executing is greater than the maximum
-        # then add a deferred to the list of blocked items
-        # When on of the things currently executing finishes it will callback
-        # this item so that it can continue executing.
-        if entry[0] >= self.max_count:
-            new_defer = defer.Deferred()
-            entry[1].append(new_defer)
-
-            logger.info("Waiting to acquire limiter lock for key %r", key)
-            with PreserveLoggingContext():
-                yield new_defer
-            logger.info("Acquired limiter lock for key %r", key)
-        else:
-            logger.info("Acquired uncontended limiter lock for key %r", key)
-
-        entry[0] += 1
-
-        @contextmanager
-        def _ctx_manager():
-            try:
-                yield
-            finally:
-                logger.info("Releasing limiter lock for key %r", key)
 
                 # We've finished executing so check if there are any things
                 # blocked waiting to execute and start one of them
                 entry[0] -= 1
 
                 if entry[1]:
-                    next_def = entry[1].pop(0)
+                    (next_def, _) = entry[1].popitem(last=False)
 
+                    # we need to run the next thing in the sentinel context.
                     with PreserveLoggingContext():
                         next_def.callback(None)
                 elif entry[0] == 0:
@@ -404,7 +355,7 @@ class DeferredTimeoutError(Exception):
     """
 
 
-def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
     """
     Add a timeout to a deferred by scheduling it to be cancelled after
     timeout seconds.
@@ -419,6 +370,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
     Args:
         deferred (defer.Deferred): deferred to be timed out
         timeout (Number): seconds to time out after
+        reactor (twisted.internet.reactor): the Twisted reactor to use
 
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 183faf75a1..7b065b195e 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,15 +13,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily
-
 import os
 
-from six.moves import intern
 import six
+from six.moves import intern
+
+from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
 
 CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
 
+
+def get_cache_factor_for(cache_name):
+    env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
+    factor = os.environ.get(env_var)
+    if factor:
+        return float(factor)
+
+    return CACHE_SIZE_FACTOR
+
+
 caches_by_name = {}
 collectors_by_name = {}
 
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index fc1874b65b..f8a07df6b8 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -13,28 +13,26 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import functools
+import inspect
 import logging
+import threading
+from collections import namedtuple
 
+import six
+from six import itervalues, string_types
+
+from twisted.internet import defer
+
+from synapse.util import logcontext, unwrapFirstError
 from synapse.util.async import ObservableDeferred
-from synapse.util import unwrapFirstError, logcontext
-from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches import get_cache_factor_for
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
 from synapse.util.stringutils import to_ascii
 
 from . import register_cache
 
-from twisted.internet import defer
-from collections import namedtuple
-
-import functools
-import inspect
-import threading
-
-from six import string_types, itervalues
-import six
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -313,7 +311,7 @@ class CacheDescriptor(_CacheDescriptorBase):
             orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
             cache_context=cache_context)
 
-        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+        max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
 
         self.max_entries = max_entries
         self.tree = tree
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index bdc21e348f..6c0b5a4094 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches.lrucache import LruCache
-from collections import namedtuple
-from . import register_cache
-import threading
 import logging
+import threading
+from collections import namedtuple
 
+from synapse.util.caches.lrucache import LruCache
+
+from . import register_cache
 
 logger = logging.getLogger(__name__)
 
@@ -107,29 +108,28 @@ class DictionaryCache(object):
         self.sequence += 1
         self.cache.clear()
 
-    def update(self, sequence, key, value, full=False, known_absent=None):
+    def update(self, sequence, key, value, fetched_keys=None):
         """Updates the entry in the cache
 
         Args:
             sequence
-            key
-            value (dict): The value to update the cache with.
-            full (bool): Whether the given value is the full dict, or just a
-                partial subset there of. If not full then any existing entries
-                for the key will be updated.
-            known_absent (set): Set of keys that we know don't exist in the full
-                dict.
+            key (K)
+            value (dict[X,Y]): The value to update the cache with.
+            fetched_keys (None|set[X]): All of the dictionary keys which were
+                fetched from the database.
+
+                If None, this is the complete value for key K. Otherwise, it
+                is used to infer a list of keys which we know don't exist in
+                the full dict.
         """
         self.check_thread()
         if self.sequence == sequence:
             # Only update the cache if the caches sequence number matches the
             # number that the cache had before the SELECT was started (SYN-369)
-            if known_absent is None:
-                known_absent = set()
-            if full:
-                self._insert(key, value, known_absent)
+            if fetched_keys is None:
+                self._insert(key, value, set())
             else:
-                self._update_or_insert(key, value, known_absent)
+                self._update_or_insert(key, value, fetched_keys)
 
     def _update_or_insert(self, key, value, known_absent):
         # We pop and reinsert as we need to tell the cache the size may have
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index ff04c91955..ce85b2ae11 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,11 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import register_cache
-
-from collections import OrderedDict
 import logging
+from collections import OrderedDict
 
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.caches import register_cache
 
 logger = logging.getLogger(__name__)
 
@@ -64,7 +64,10 @@ class ExpiringCache(object):
             return
 
         def f():
-            self._prune_cache()
+            return run_as_background_process(
+                "prune_cache_%s" % self._cache_name,
+                self._prune_cache,
+            )
 
         self._clock.looping_call(f, self._expiry_ms / 2)
 
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 1c5a982094..b684f24e7b 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -14,8 +14,8 @@
 # limitations under the License.
 
 
-from functools import wraps
 import threading
+from functools import wraps
 
 from synapse.util.caches.treecache import TreeCache
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index a7fe0397fa..f2bde74dc5 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,12 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
-
-
-from blist import sorteddict
 import logging
 
+from sortedcontainers import SortedDict
+
+from synapse.util import caches
 
 logger = logging.getLogger(__name__)
 
@@ -32,16 +31,18 @@ class StreamChangeCache(object):
     entities that may have changed since that position. If position key is too
     old then the cache will simply return all given entities.
     """
-    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
-        self._max_size = int(max_size * CACHE_SIZE_FACTOR)
+
+    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
+        self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
-        self._cache = sorteddict()
+        self._cache = SortedDict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
-        self.metrics = register_cache("cache", self.name, self._cache)
+        self.metrics = caches.register_cache("cache", self.name, self._cache)
 
-        for entity, stream_pos in prefilled_cache.items():
-            self.entity_has_changed(entity, stream_pos)
+        if prefilled_cache:
+            for entity, stream_pos in prefilled_cache.items():
+                self.entity_has_changed(entity, stream_pos)
 
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
@@ -65,22 +66,25 @@ class StreamChangeCache(object):
         return False
 
     def get_entities_changed(self, entities, stream_pos):
-        """Returns subset of entities that have had new things since the
-        given position. If the position is too old it will just return the given list.
+        """
+        Returns subset of entities that have had new things since the given
+        position.  Entities unknown to the cache will be returned.  If the
+        position is too old it will just return the given list.
         """
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
+            changed_entities = {
+                self._cache[k] for k in self._cache.islice(
+                    start=self._cache.bisect_right(stream_pos),
+                )
+            }
 
-            result = set(
-                self._cache[k] for k in keys[i:]
-            ).intersection(entities)
+            result = changed_entities.intersection(entities)
 
             self.metrics.inc_hits()
         else:
-            result = entities
+            result = set(entities)
             self.metrics.inc_misses()
 
         return result
@@ -90,12 +94,13 @@ class StreamChangeCache(object):
         """
         assert type(stream_pos) is int
 
+        if not self._cache:
+            # If we have no cache, nothing can have changed.
+            return False
+
         if stream_pos >= self._earliest_known_stream_pos:
             self.metrics.inc_hits()
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return i < len(keys)
+            return self._cache.bisect_right(stream_pos) < len(self._cache)
         else:
             self.metrics.inc_misses()
             return True
@@ -107,10 +112,8 @@ class StreamChangeCache(object):
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return [self._cache[k] for k in keys[i:]]
+            return [self._cache[k] for k in self._cache.islice(
+                start=self._cache.bisect_right(stream_pos))]
         else:
             return None
 
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
             self._entity_to_key[entity] = stream_pos
 
             while len(self._cache) > self._max_size:
-                k, r = self._cache.popitem()
-                self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+                k, r = self._cache.popitem(0)
+                self._earliest_known_stream_pos = max(
+                    k, self._earliest_known_stream_pos,
+                )
                 self._entity_to_key.pop(r, None)
 
     def get_max_pos_of_last_change(self, entity):
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 734331caaa..194da87639 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -17,20 +17,18 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
 
 def user_left_room(distributor, user, room_id):
-    with PreserveLoggingContext():
-        distributor.fire("user_left_room", user=user, room_id=room_id)
+    distributor.fire("user_left_room", user=user, room_id=room_id)
 
 
 def user_joined_room(distributor, user, room_id):
-    with PreserveLoggingContext():
-        distributor.fire("user_joined_room", user=user, room_id=room_id)
+    distributor.fire("user_joined_room", user=user, room_id=room_id)
 
 
 class Distributor(object):
@@ -44,9 +42,7 @@ class Distributor(object):
       model will do for today.
     """
 
-    def __init__(self, suppress_failures=True):
-        self.suppress_failures = suppress_failures
-
+    def __init__(self):
         self.signals = {}
         self.pre_registration = {}
 
@@ -56,7 +52,6 @@ class Distributor(object):
 
         self.signals[name] = Signal(
             name,
-            suppress_failures=self.suppress_failures,
         )
 
         if name in self.pre_registration:
@@ -75,10 +70,18 @@ class Distributor(object):
             self.pre_registration[name].append(observer)
 
     def fire(self, name, *args, **kwargs):
+        """Dispatches the given signal to the registered observers.
+
+        Runs the observers as a background process. Does not return a deferred.
+        """
         if name not in self.signals:
             raise KeyError("%r does not have a signal named %s" % (self, name))
 
-        return self.signals[name].fire(*args, **kwargs)
+        run_as_background_process(
+            name,
+            self.signals[name].fire,
+            *args, **kwargs
+        )
 
 
 class Signal(object):
@@ -91,9 +94,8 @@ class Signal(object):
     method into all of the observers.
     """
 
-    def __init__(self, name, suppress_failures):
+    def __init__(self, name):
         self.name = name
-        self.suppress_failures = suppress_failures
         self.observers = []
 
     def observe(self, observer):
@@ -103,7 +105,6 @@ class Signal(object):
         Each observer callable may return a Deferred."""
         self.observers.append(observer)
 
-    @defer.inlineCallbacks
     def fire(self, *args, **kwargs):
         """Invokes every callable in the observer list, passing in the args and
         kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -121,22 +122,17 @@ class Signal(object):
                         failure.type,
                         failure.value,
                         failure.getTracebackObject()))
-                if not self.suppress_failures:
-                    return failure
 
             return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
 
-        with PreserveLoggingContext():
-            deferreds = [
-                do(observer)
-                for observer in self.observers
-            ]
-
-            res = yield defer.gatherResults(
-                deferreds, consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        deferreds = [
+            run_in_background(do, o)
+            for o in self.observers
+        ]
 
-        defer.returnValue(res)
+        return make_deferred_yieldable(defer.gatherResults(
+            deferreds, consumeErrors=True,
+        ))
 
     def __repr__(self):
         return "<Signal name=%r>" % (self.name,)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 3380970e4e..629ed44149 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,11 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import threads, reactor
+from six.moves import queue
 
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from twisted.internet import threads
 
-from six.moves import queue
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 
 class BackgroundFileConsumer(object):
@@ -27,6 +27,7 @@ class BackgroundFileConsumer(object):
     Args:
         file_obj (file): The file like object to write to. Closed when
             finished.
+        reactor (twisted.internet.reactor): the Twisted reactor to use
     """
 
     # For PushProducers pause if we have this many unwritten slices
@@ -34,9 +35,11 @@ class BackgroundFileConsumer(object):
     # And resume once the size of the queue is less than this
     _RESUME_ON_QUEUE_SIZE = 2
 
-    def __init__(self, file_obj):
+    def __init__(self, file_obj, reactor):
         self._file_obj = file_obj
 
+        self._reactor = reactor
+
         # Producer we're registered with
         self._producer = None
 
@@ -71,7 +74,10 @@ class BackgroundFileConsumer(object):
         self._producer = producer
         self.streaming = streaming
         self._finished_deferred = run_in_background(
-            threads.deferToThread, self._writer
+            threads.deferToThreadPool,
+            self._reactor,
+            self._reactor.getThreadPool(),
+            self._writer,
         )
         if not streaming:
             self._producer.resumeProducing()
@@ -109,7 +115,7 @@ class BackgroundFileConsumer(object):
                 # producer.
                 if self._producer and self._paused_producer:
                     if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
-                        reactor.callFromThread(self._resume_paused_producer)
+                        self._reactor.callFromThread(self._resume_paused_producer)
 
                 bytes = self._bytes_queue.get()
 
@@ -121,7 +127,7 @@ class BackgroundFileConsumer(object):
                 # If its a pull producer then we need to explicitly ask for
                 # more stuff.
                 if not self.streaming and self._producer:
-                    reactor.callFromThread(self._producer.resumeProducing)
+                    self._reactor.callFromThread(self._producer.resumeProducing)
         except Exception as e:
             self._write_exception = e
             raise
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 15f0a7ba9e..581c6052ac 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -13,11 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from frozendict import frozendict
-import simplejson as json
-
 from six import string_types
 
+from canonicaljson import json
+from frozendict import frozendict
+
 
 def freeze(o):
     if isinstance(o, dict):
diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py
index e9f0f292ee..2d7ddc1cbe 100644
--- a/synapse/util/httpresourcetree.py
+++ b/synapse/util/httpresourcetree.py
@@ -12,10 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.web.resource import NoResource
-
 import logging
 
+from twisted.web.resource import NoResource
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a58c723403..8dcae50b39 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -22,10 +22,10 @@ them.
 See doc/log_contexts.rst for details on how this works.
 """
 
-from twisted.internet import defer
-
-import threading
 import logging
+import threading
+
+from twisted.internet import defer
 
 logger = logging.getLogger(__name__)
 
@@ -49,17 +49,118 @@ except Exception:
         return None
 
 
+class ContextResourceUsage(object):
+    """Object for tracking the resources used by a log context
+
+    Attributes:
+        ru_utime (float): user CPU time (in seconds)
+        ru_stime (float): system CPU time (in seconds)
+        db_txn_count (int): number of database transactions done
+        db_sched_duration_sec (float): amount of time spent waiting for a
+            database connection
+        db_txn_duration_sec (float): amount of time spent doing database
+            transactions (excluding scheduling time)
+        evt_db_fetch_count (int): number of events requested from the database
+    """
+
+    __slots__ = [
+        "ru_stime", "ru_utime",
+        "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+        "evt_db_fetch_count",
+    ]
+
+    def __init__(self, copy_from=None):
+        """Create a new ContextResourceUsage
+
+        Args:
+            copy_from (ContextResourceUsage|None): if not None, an object to
+                copy stats from
+        """
+        if copy_from is None:
+            self.reset()
+        else:
+            self.ru_utime = copy_from.ru_utime
+            self.ru_stime = copy_from.ru_stime
+            self.db_txn_count = copy_from.db_txn_count
+
+            self.db_txn_duration_sec = copy_from.db_txn_duration_sec
+            self.db_sched_duration_sec = copy_from.db_sched_duration_sec
+            self.evt_db_fetch_count = copy_from.evt_db_fetch_count
+
+    def copy(self):
+        return ContextResourceUsage(copy_from=self)
+
+    def reset(self):
+        self.ru_stime = 0.
+        self.ru_utime = 0.
+        self.db_txn_count = 0
+
+        self.db_txn_duration_sec = 0
+        self.db_sched_duration_sec = 0
+        self.evt_db_fetch_count = 0
+
+    def __repr__(self):
+        return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
+                "db_txn_count='%r', db_txn_duration_sec='%r', "
+                "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
+                    self.ru_stime,
+                    self.ru_utime,
+                    self.db_txn_count,
+                    self.db_txn_duration_sec,
+                    self.db_sched_duration_sec,
+                    self.evt_db_fetch_count,)
+
+    def __iadd__(self, other):
+        """Add another ContextResourceUsage's stats to this one's.
+
+        Args:
+            other (ContextResourceUsage): the other resource usage object
+        """
+        self.ru_utime += other.ru_utime
+        self.ru_stime += other.ru_stime
+        self.db_txn_count += other.db_txn_count
+        self.db_txn_duration_sec += other.db_txn_duration_sec
+        self.db_sched_duration_sec += other.db_sched_duration_sec
+        self.evt_db_fetch_count += other.evt_db_fetch_count
+        return self
+
+    def __isub__(self, other):
+        self.ru_utime -= other.ru_utime
+        self.ru_stime -= other.ru_stime
+        self.db_txn_count -= other.db_txn_count
+        self.db_txn_duration_sec -= other.db_txn_duration_sec
+        self.db_sched_duration_sec -= other.db_sched_duration_sec
+        self.evt_db_fetch_count -= other.evt_db_fetch_count
+        return self
+
+    def __add__(self, other):
+        res = ContextResourceUsage(copy_from=self)
+        res += other
+        return res
+
+    def __sub__(self, other):
+        res = ContextResourceUsage(copy_from=self)
+        res -= other
+        return res
+
+
 class LoggingContext(object):
     """Additional context for log formatting. Contexts are scoped within a
     "with" block.
 
+    If a parent is given when creating a new context, then:
+        - logging fields are copied from the parent to the new context on entry
+        - when the new context exits, the cpu usage stats are copied from the
+          child to the parent
+
     Args:
         name (str): Name for the context for debugging.
+        parent_context (LoggingContext|None): The parent of the new context
     """
 
     __slots__ = [
-        "previous_context", "name", "ru_stime", "ru_utime",
-        "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+        "previous_context", "name", "parent_context",
+        "_resource_usage",
         "usage_start",
         "main_thread", "alive",
         "request", "tag",
@@ -90,24 +191,21 @@ class LoggingContext(object):
         def add_database_scheduled(self, sched_sec):
             pass
 
+        def record_event_fetch(self, event_count):
+            pass
+
         def __nonzero__(self):
             return False
         __bool__ = __nonzero__  # python3
 
     sentinel = Sentinel()
 
-    def __init__(self, name=None):
+    def __init__(self, name=None, parent_context=None):
         self.previous_context = LoggingContext.current_context()
         self.name = name
-        self.ru_stime = 0.
-        self.ru_utime = 0.
-        self.db_txn_count = 0
-
-        # sec spent waiting for db txns, excluding scheduling time
-        self.db_txn_duration_sec = 0
 
-        # sec spent waiting for db txns to be scheduled
-        self.db_sched_duration_sec = 0
+        # track the resources used by this context so far
+        self._resource_usage = ContextResourceUsage()
 
         # If alive has the thread resource usage when the logcontext last
         # became active.
@@ -118,6 +216,8 @@ class LoggingContext(object):
         self.tag = ""
         self.alive = True
 
+        self.parent_context = parent_context
+
     def __str__(self):
         return "%s@%x" % (self.name, id(self))
 
@@ -155,6 +255,10 @@ class LoggingContext(object):
                 self.previous_context, old_context
             )
         self.alive = True
+
+        if self.parent_context is not None:
+            self.parent_context.copy_to(self)
+
         return self
 
     def __exit__(self, type, value, traceback):
@@ -176,6 +280,13 @@ class LoggingContext(object):
         self.previous_context = None
         self.alive = False
 
+        # if we have a parent, pass our CPU usage stats on
+        if self.parent_context is not None:
+            self.parent_context._resource_usage += self._resource_usage
+
+            # reset them in case we get entered again
+            self._resource_usage.reset()
+
     def copy_to(self, record):
         """Copy logging fields from this context to a log record or
         another LoggingContext
@@ -200,39 +311,43 @@ class LoggingContext(object):
             logger.warning("Stopped logcontext %s on different thread", self)
             return
 
-        # When we stop, let's record the resource used since we started
-        if self.usage_start:
-            usage_end = get_thread_resource_usage()
+        # When we stop, let's record the cpu used since we started
+        if not self.usage_start:
+            logger.warning(
+                "Called stop on logcontext %s without calling start", self,
+            )
+            return
+
+        usage_end = get_thread_resource_usage()
 
-            self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime
-            self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime
+        self._resource_usage.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime
+        self._resource_usage.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime
 
-            self.usage_start = None
-        else:
-            logger.warning("Called stop on logcontext %s without calling start", self)
+        self.usage_start = None
 
     def get_resource_usage(self):
-        """Get CPU time used by this logcontext so far.
+        """Get resources used by this logcontext so far.
 
         Returns:
-            tuple[float, float]: The user and system CPU usage in seconds
+            ContextResourceUsage: a *copy* of the object tracking resource
+                usage so far
         """
-        ru_utime = self.ru_utime
-        ru_stime = self.ru_stime
+        # we always return a copy, for consistency
+        res = self._resource_usage.copy()
 
         # If we are on the correct thread and we're currently running then we
         # can include resource usage so far.
         is_main_thread = threading.current_thread() is self.main_thread
         if self.alive and self.usage_start and is_main_thread:
             current = get_thread_resource_usage()
-            ru_utime += current.ru_utime - self.usage_start.ru_utime
-            ru_stime += current.ru_stime - self.usage_start.ru_stime
+            res.ru_utime += current.ru_utime - self.usage_start.ru_utime
+            res.ru_stime += current.ru_stime - self.usage_start.ru_stime
 
-        return ru_utime, ru_stime
+        return res
 
     def add_database_transaction(self, duration_sec):
-        self.db_txn_count += 1
-        self.db_txn_duration_sec += duration_sec
+        self._resource_usage.db_txn_count += 1
+        self._resource_usage.db_txn_duration_sec += duration_sec
 
     def add_database_scheduled(self, sched_sec):
         """Record a use of the database pool
@@ -241,7 +356,15 @@ class LoggingContext(object):
             sched_sec (float): number of seconds it took us to get a
                 connection
         """
-        self.db_sched_duration_sec += sched_sec
+        self._resource_usage.db_sched_duration_sec += sched_sec
+
+    def record_event_fetch(self, event_count):
+        """Record a number of events being fetched from the db
+
+        Args:
+            event_count (int): number of events being fetched
+        """
+        self._resource_usage.evt_db_fetch_count += event_count
 
 
 class LoggingContextFilter(logging.Filter):
diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py
index 3e42868ea9..a46bc47ce3 100644
--- a/synapse/util/logformatter.py
+++ b/synapse/util/logformatter.py
@@ -14,10 +14,11 @@
 # limitations under the License.
 
 
-from six import StringIO
 import logging
 import traceback
 
+from six import StringIO
+
 
 class LogFormatter(logging.Formatter):
     """Log formatter which gives more detail for exceptions
diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py
index 03249c5dc8..62a00189cc 100644
--- a/synapse/util/logutils.py
+++ b/synapse/util/logutils.py
@@ -14,13 +14,11 @@
 # limitations under the License.
 
 
-from inspect import getcallargs
-from functools import wraps
-
-import logging
 import inspect
+import logging
 import time
-
+from functools import wraps
+from inspect import getcallargs
 
 _TIME_FUNC_ID = 0
 
diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 97e0f00b67..14be3c7396 100644
--- a/synapse/util/manhole.py
+++ b/synapse/util/manhole.py
@@ -12,11 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.conch.manhole import ColoredManhole
-from twisted.conch.insults import insults
 from twisted.conch import manhole_ssh
-from twisted.cred import checkers, portal
+from twisted.conch.insults import insults
+from twisted.conch.manhole import ColoredManhole
 from twisted.conch.ssh.keys import Key
+from twisted.cred import checkers, portal
 
 PUBLIC_KEY = (
     "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az"
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 1ba7d65c7c..97f1267380 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,14 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
+from functools import wraps
 
 from prometheus_client import Counter
-from synapse.util.logcontext import LoggingContext
 
-from functools import wraps
-import logging
+from twisted.internet import defer
 
+from synapse.util.logcontext import LoggingContext
 
 logger = logging.getLogger(__name__)
 
@@ -60,10 +60,9 @@ def measure_func(name):
 
 class Measure(object):
     __slots__ = [
-        "clock", "name", "start_context", "start", "new_context", "ru_utime",
-        "ru_stime",
-        "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+        "clock", "name", "start_context", "start",
         "created_context",
+        "start_usage",
     ]
 
     def __init__(self, clock, name):
@@ -81,10 +80,7 @@ class Measure(object):
             self.start_context.__enter__()
             self.created_context = True
 
-        self.ru_utime, self.ru_stime = self.start_context.get_resource_usage()
-        self.db_txn_count = self.start_context.db_txn_count
-        self.db_txn_duration_sec = self.start_context.db_txn_duration_sec
-        self.db_sched_duration_sec = self.start_context.db_sched_duration_sec
+        self.start_usage = self.start_context.get_resource_usage()
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         if isinstance(exc_type, Exception) or not self.start_context:
@@ -108,15 +104,19 @@ class Measure(object):
             logger.warn("Expected context. (%r)", self.name)
             return
 
-        ru_utime, ru_stime = context.get_resource_usage()
-
-        block_ru_utime.labels(self.name).inc(ru_utime - self.ru_utime)
-        block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime)
-        block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count)
-        block_db_txn_duration.labels(self.name).inc(
-            context.db_txn_duration_sec - self.db_txn_duration_sec)
-        block_db_sched_duration.labels(self.name).inc(
-            context.db_sched_duration_sec - self.db_sched_duration_sec)
+        current = context.get_resource_usage()
+        usage = current - self.start_usage
+        try:
+            block_ru_utime.labels(self.name).inc(usage.ru_utime)
+            block_ru_stime.labels(self.name).inc(usage.ru_stime)
+            block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
+            block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
+            block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+        except ValueError:
+            logger.warn(
+                "Failed to save metrics! OLD: %r, NEW: %r",
+                self.start_usage, current
+            )
 
         if self.created_context:
             self.start_context.__exit__(exc_type, exc_val, exc_tb)
diff --git a/synapse/util/msisdn.py b/synapse/util/msisdn.py
index 607161e7f0..a6c30e5265 100644
--- a/synapse/util/msisdn.py
+++ b/synapse/util/msisdn.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import phonenumbers
+
 from synapse.api.errors import SynapseError
 
 
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 0ab63c3d7d..7deb38f2a7 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -13,21 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import collections
+import contextlib
+import logging
+
 from twisted.internet import defer
 
 from synapse.api.errors import LimitExceededError
-
-from synapse.util.async import sleep
 from synapse.util.logcontext import (
-    run_in_background, make_deferred_yieldable,
     PreserveLoggingContext,
+    make_deferred_yieldable,
+    run_in_background,
 )
 
-import collections
-import contextlib
-import logging
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -94,13 +92,22 @@ class _PerHostRatelimiter(object):
 
         self.window_size = window_size
         self.sleep_limit = sleep_limit
-        self.sleep_msec = sleep_msec
+        self.sleep_sec = sleep_msec / 1000.0
         self.reject_limit = reject_limit
         self.concurrent_requests = concurrent_requests
 
+        # request_id objects for requests which have been slept
         self.sleeping_requests = set()
+
+        # map from request_id object to Deferred for requests which are ready
+        # for processing but have been queued
         self.ready_request_queue = collections.OrderedDict()
+
+        # request id objects for requests which are in progress
         self.current_processing = set()
+
+        # times at which we have recently (within the last window_size ms)
+        # received requests.
         self.request_times = []
 
     @contextlib.contextmanager
@@ -119,11 +126,15 @@ class _PerHostRatelimiter(object):
 
     def _on_enter(self, request_id):
         time_now = self.clock.time_msec()
+
+        # remove any entries from request_times which aren't within the window
         self.request_times[:] = [
             r for r in self.request_times
             if time_now - r < self.window_size
         ]
 
+        # reject the request if we already have too many queued up (either
+        # sleeping or in the ready queue).
         queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
         if queue_size > self.reject_limit:
             raise LimitExceededError(
@@ -136,9 +147,13 @@ class _PerHostRatelimiter(object):
 
         def queue_request():
             if len(self.current_processing) > self.concurrent_requests:
-                logger.debug("Ratelimit [%s]: Queue req", id(request_id))
                 queue_defer = defer.Deferred()
                 self.ready_request_queue[request_id] = queue_defer
+                logger.info(
+                    "Ratelimiter: queueing request (queue now %i items)",
+                    len(self.ready_request_queue),
+                )
+
                 return queue_defer
             else:
                 return defer.succeed(None)
@@ -150,10 +165,9 @@ class _PerHostRatelimiter(object):
 
         if len(self.request_times) > self.sleep_limit:
             logger.debug(
-                "Ratelimit [%s]: sleeping req",
-                id(request_id),
+                "Ratelimiter: sleeping request for %f sec", self.sleep_sec,
             )
-            ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
+            ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
 
             self.sleeping_requests.add(request_id)
 
@@ -202,11 +216,8 @@ class _PerHostRatelimiter(object):
         )
         self.current_processing.discard(request_id)
         try:
-            request_id, deferred = self.ready_request_queue.popitem()
-
-            # XXX: why do we do the following? the on_start callback above will
-            # do it for us.
-            self.current_processing.add(request_id)
+            # start processing the next item on the queue.
+            _, deferred = self.ready_request_queue.popitem(last=False)
 
             with PreserveLoggingContext():
                 deferred.callback(None)
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 4e93f69d3a..8a3a06fd74 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -12,14 +12,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import synapse.util.logcontext
-from twisted.internet import defer
-
-from synapse.api.errors import CodeMessageException
-
 import logging
 import random
 
+from twisted.internet import defer
+
+import synapse.util.logcontext
+from synapse.api.errors import CodeMessageException
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/util/rlimit.py b/synapse/util/rlimit.py
index f4a9abf83f..6c0f2bb0cf 100644
--- a/synapse/util/rlimit.py
+++ b/synapse/util/rlimit.py
@@ -13,9 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import resource
 import logging
-
+import resource
 
 logger = logging.getLogger("synapse.app.homeserver")
 
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index b98b9dc6e4..43d9db67ec 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -15,6 +15,7 @@
 
 import random
 import string
+
 from six.moves import range
 
 _string_with_symbols = (
diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py
index 52086df465..1fbcd41115 100644
--- a/synapse/util/versionstring.py
+++ b/synapse/util/versionstring.py
@@ -14,9 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import subprocess
-import os
 import logging
+import os
+import subprocess
 
 logger = logging.getLogger(__name__)