summary refs log tree commit diff
path: root/synapse/util/caches
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/caches')
-rw-r--r--synapse/util/caches/__init__.py95
-rw-r--r--synapse/util/caches/descriptors.py102
-rw-r--r--synapse/util/caches/dictionary_cache.py42
-rw-r--r--synapse/util/caches/expiringcache.py21
-rw-r--r--synapse/util/caches/lrucache.py45
-rw-r--r--synapse/util/caches/response_cache.py107
-rw-r--r--synapse/util/caches/stream_change_cache.py61
-rw-r--r--synapse/util/caches/treecache.py6
8 files changed, 355 insertions, 124 deletions
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 4adae96681..7b065b195e 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,28 +13,87 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import synapse.metrics
 import os
 
+import six
+from six.moves import intern
+
+from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
+
 CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
 
-metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
+
+def get_cache_factor_for(cache_name):
+    env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
+    factor = os.environ.get(env_var)
+    if factor:
+        return float(factor)
+
+    return CACHE_SIZE_FACTOR
+
 
 caches_by_name = {}
-# cache_counter = metrics.register_cache(
-#     "cache",
-#     lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
-#     labels=["name"],
-# )
-
-
-def register_cache(name, cache):
-    caches_by_name[name] = cache
-    return metrics.register_cache(
-        "cache",
-        lambda: len(cache),
-        name,
-    )
+collectors_by_name = {}
+
+cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
+cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
+cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
+cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
+
+response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
+response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
+response_cache_evicted = Gauge(
+    "synapse_util_caches_response_cache:evicted_size", "", ["name"]
+)
+response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
+
+
+def register_cache(cache_type, cache_name, cache):
+
+    # Check if the metric is already registered. Unregister it, if so.
+    # This usually happens during tests, as at runtime these caches are
+    # effectively singletons.
+    metric_name = "cache_%s_%s" % (cache_type, cache_name)
+    if metric_name in collectors_by_name.keys():
+        REGISTRY.unregister(collectors_by_name[metric_name])
+
+    class CacheMetric(object):
+
+        hits = 0
+        misses = 0
+        evicted_size = 0
+
+        def inc_hits(self):
+            self.hits += 1
+
+        def inc_misses(self):
+            self.misses += 1
+
+        def inc_evictions(self, size=1):
+            self.evicted_size += size
+
+        def describe(self):
+            return []
+
+        def collect(self):
+            if cache_type == "response_cache":
+                response_cache_size.labels(cache_name).set(len(cache))
+                response_cache_hits.labels(cache_name).set(self.hits)
+                response_cache_evicted.labels(cache_name).set(self.evicted_size)
+                response_cache_total.labels(cache_name).set(self.hits + self.misses)
+            else:
+                cache_size.labels(cache_name).set(len(cache))
+                cache_hits.labels(cache_name).set(self.hits)
+                cache_evicted.labels(cache_name).set(self.evicted_size)
+                cache_total.labels(cache_name).set(self.hits + self.misses)
+
+            yield GaugeMetricFamily("__unused", "")
+
+    metric = CacheMetric()
+    REGISTRY.register(metric)
+    caches_by_name[cache_name] = cache
+    collectors_by_name[metric_name] = metric
+    return metric
 
 
 KNOWN_KEYS = {
@@ -66,7 +125,9 @@ def intern_string(string):
         return None
 
     try:
-        string = string.encode("ascii")
+        if six.PY2:
+            string = string.encode("ascii")
+
         return intern(string)
     except UnicodeEncodeError:
         return string
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index af65bfe7b8..f8a07df6b8 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,25 +13,26 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import functools
+import inspect
 import logging
+import threading
+from collections import namedtuple
+
+import six
+from six import itervalues, string_types
+
+from twisted.internet import defer
 
+from synapse.util import logcontext, unwrapFirstError
 from synapse.util.async import ObservableDeferred
-from synapse.util import unwrapFirstError, logcontext
-from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches import get_cache_factor_for
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
 from synapse.util.stringutils import to_ascii
 
 from . import register_cache
 
-from twisted.internet import defer
-from collections import namedtuple
-
-import functools
-import inspect
-import threading
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -39,12 +41,11 @@ _CacheSentinel = object()
 
 class CacheEntry(object):
     __slots__ = [
-        "deferred", "sequence", "callbacks", "invalidated"
+        "deferred", "callbacks", "invalidated"
     ]
 
-    def __init__(self, deferred, sequence, callbacks):
+    def __init__(self, deferred, callbacks):
         self.deferred = deferred
-        self.sequence = sequence
         self.callbacks = set(callbacks)
         self.invalidated = False
 
@@ -62,7 +63,6 @@ class Cache(object):
         "max_entries",
         "name",
         "keylen",
-        "sequence",
         "thread",
         "metrics",
         "_pending_deferred_cache",
@@ -75,13 +75,16 @@ class Cache(object):
         self.cache = LruCache(
             max_size=max_entries, keylen=keylen, cache_type=cache_type,
             size_callback=(lambda d: len(d)) if iterable else None,
+            evicted_callback=self._on_evicted,
         )
 
         self.name = name
         self.keylen = keylen
-        self.sequence = 0
         self.thread = None
-        self.metrics = register_cache(name, self.cache)
+        self.metrics = register_cache("cache", name, self.cache)
+
+    def _on_evicted(self, evicted_count):
+        self.metrics.inc_evictions(evicted_count)
 
     def check_thread(self):
         expected_thread = self.thread
@@ -109,11 +112,10 @@ class Cache(object):
         callbacks = [callback] if callback else []
         val = self._pending_deferred_cache.get(key, _CacheSentinel)
         if val is not _CacheSentinel:
-            if val.sequence == self.sequence:
-                val.callbacks.update(callbacks)
-                if update_metrics:
-                    self.metrics.inc_hits()
-                return val.deferred
+            val.callbacks.update(callbacks)
+            if update_metrics:
+                self.metrics.inc_hits()
+            return val.deferred
 
         val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
         if val is not _CacheSentinel:
@@ -133,12 +135,9 @@ class Cache(object):
         self.check_thread()
         entry = CacheEntry(
             deferred=value,
-            sequence=self.sequence,
             callbacks=callbacks,
         )
 
-        entry.callbacks.update(callbacks)
-
         existing_entry = self._pending_deferred_cache.pop(key, None)
         if existing_entry:
             existing_entry.invalidate()
@@ -146,13 +145,25 @@ class Cache(object):
         self._pending_deferred_cache[key] = entry
 
         def shuffle(result):
-            if self.sequence == entry.sequence:
-                existing_entry = self._pending_deferred_cache.pop(key, None)
-                if existing_entry is entry:
-                    self.cache.set(key, result, entry.callbacks)
-                else:
-                    entry.invalidate()
+            existing_entry = self._pending_deferred_cache.pop(key, None)
+            if existing_entry is entry:
+                self.cache.set(key, result, entry.callbacks)
             else:
+                # oops, the _pending_deferred_cache has been updated since
+                # we started our query, so we are out of date.
+                #
+                # Better put back whatever we took out. (We do it this way
+                # round, rather than peeking into the _pending_deferred_cache
+                # and then removing on a match, to make the common case faster)
+                if existing_entry is not None:
+                    self._pending_deferred_cache[key] = existing_entry
+
+                # we're not going to put this entry into the cache, so need
+                # to make sure that the invalidation callbacks are called.
+                # That was probably done when _pending_deferred_cache was
+                # updated, but it's possible that `set` was called without
+                # `invalidate` being previously called, in which case it may
+                # not have been. Either way, let's double-check now.
                 entry.invalidate()
             return result
 
@@ -164,25 +175,29 @@ class Cache(object):
 
     def invalidate(self, key):
         self.check_thread()
+        self.cache.pop(key, None)
 
-        # Increment the sequence number so that any SELECT statements that
-        # raced with the INSERT don't update the cache (SYN-369)
-        self.sequence += 1
+        # if we have a pending lookup for this key, remove it from the
+        # _pending_deferred_cache, which will (a) stop it being returned
+        # for future queries and (b) stop it being persisted as a proper entry
+        # in self.cache.
         entry = self._pending_deferred_cache.pop(key, None)
+
+        # run the invalidation callbacks now, rather than waiting for the
+        # deferred to resolve.
         if entry:
             entry.invalidate()
 
-        self.cache.pop(key, None)
-
     def invalidate_many(self, key):
         self.check_thread()
         if not isinstance(key, tuple):
             raise TypeError(
                 "The cache key must be a tuple not %r" % (type(key),)
             )
-        self.sequence += 1
         self.cache.del_multi(key)
 
+        # if we have a pending lookup for this key, remove it from the
+        # _pending_deferred_cache, as above
         entry_dict = self._pending_deferred_cache.pop(key, None)
         if entry_dict is not None:
             for entry in iterate_tree_cache_entry(entry_dict):
@@ -190,8 +205,10 @@ class Cache(object):
 
     def invalidate_all(self):
         self.check_thread()
-        self.sequence += 1
         self.cache.clear()
+        for entry in itervalues(self._pending_deferred_cache):
+            entry.invalidate()
+        self._pending_deferred_cache.clear()
 
 
 class _CacheDescriptorBase(object):
@@ -294,7 +311,7 @@ class CacheDescriptor(_CacheDescriptorBase):
             orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
             cache_context=cache_context)
 
-        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+        max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
 
         self.max_entries = max_entries
         self.tree = tree
@@ -376,9 +393,10 @@ class CacheDescriptor(_CacheDescriptorBase):
 
                 ret.addErrback(onErr)
 
-                # If our cache_key is a string, try to convert to ascii to save
-                # a bit of space in large caches
-                if isinstance(cache_key, basestring):
+                # If our cache_key is a string on py2, try to convert to ascii
+                # to save a bit of space in large caches. Py3 does this
+                # internally automatically.
+                if six.PY2 and isinstance(cache_key, string_types):
                     cache_key = to_ascii(cache_key)
 
                 result_d = ObservableDeferred(ret, consumeErrors=True)
@@ -549,7 +567,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
                     return results
 
                 return logcontext.make_deferred_yieldable(defer.gatherResults(
-                    cached_defers.values(),
+                    list(cached_defers.values()),
                     consumeErrors=True,
                 ).addCallback(update_results_dict).addErrback(
                     unwrapFirstError
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index d4105822b3..6c0b5a4094 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches.lrucache import LruCache
-from collections import namedtuple
-from . import register_cache
-import threading
 import logging
+import threading
+from collections import namedtuple
 
+from synapse.util.caches.lrucache import LruCache
+
+from . import register_cache
 
 logger = logging.getLogger(__name__)
 
@@ -55,7 +56,7 @@ class DictionaryCache(object):
             __slots__ = []
 
         self.sentinel = Sentinel()
-        self.metrics = register_cache(name, self.cache)
+        self.metrics = register_cache("dictionary", name, self.cache)
 
     def check_thread(self):
         expected_thread = self.thread
@@ -107,34 +108,37 @@ class DictionaryCache(object):
         self.sequence += 1
         self.cache.clear()
 
-    def update(self, sequence, key, value, full=False, known_absent=None):
+    def update(self, sequence, key, value, fetched_keys=None):
         """Updates the entry in the cache
 
         Args:
             sequence
-            key
-            value (dict): The value to update the cache with.
-            full (bool): Whether the given value is the full dict, or just a
-                partial subset there of. If not full then any existing entries
-                for the key will be updated.
-            known_absent (set): Set of keys that we know don't exist in the full
-                dict.
+            key (K)
+            value (dict[X,Y]): The value to update the cache with.
+            fetched_keys (None|set[X]): All of the dictionary keys which were
+                fetched from the database.
+
+                If None, this is the complete value for key K. Otherwise, it
+                is used to infer a list of keys which we know don't exist in
+                the full dict.
         """
         self.check_thread()
         if self.sequence == sequence:
             # Only update the cache if the caches sequence number matches the
             # number that the cache had before the SELECT was started (SYN-369)
-            if known_absent is None:
-                known_absent = set()
-            if full:
-                self._insert(key, value, known_absent)
+            if fetched_keys is None:
+                self._insert(key, value, set())
             else:
-                self._update_or_insert(key, value, known_absent)
+                self._update_or_insert(key, value, fetched_keys)
 
     def _update_or_insert(self, key, value, known_absent):
-        entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {}))
+        # We pop and reinsert as we need to tell the cache the size may have
+        # changed
+
+        entry = self.cache.pop(key, DictionaryEntry(False, set(), {}))
         entry.value.update(value)
         entry.known_absent.update(known_absent)
+        self.cache[key] = entry
 
     def _insert(self, key, value, known_absent):
         self.cache[key] = DictionaryEntry(True, known_absent, value)
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 6ad53a6390..465adc54a8 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,11 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import register_cache
-
-from collections import OrderedDict
 import logging
+from collections import OrderedDict
 
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.caches import register_cache
 
 logger = logging.getLogger(__name__)
 
@@ -52,19 +52,22 @@ class ExpiringCache(object):
 
         self._cache = OrderedDict()
 
-        self.metrics = register_cache(cache_name, self)
-
         self.iterable = iterable
 
         self._size_estimate = 0
 
+        self.metrics = register_cache("expiring", cache_name, self)
+
     def start(self):
         if not self._expiry_ms:
             # Don't bother starting the loop if things never expire
             return
 
         def f():
-            self._prune_cache()
+            run_as_background_process(
+                "prune_cache_%s" % self._cache_name,
+                self._prune_cache,
+            )
 
         self._clock.looping_call(f, self._expiry_ms / 2)
 
@@ -79,7 +82,11 @@ class ExpiringCache(object):
         while self._max_len and len(self) > self._max_len:
             _key, value = self._cache.popitem(last=False)
             if self.iterable:
-                self._size_estimate -= len(value.value)
+                removed_len = len(value.value)
+                self.metrics.inc_evictions(removed_len)
+                self._size_estimate -= removed_len
+            else:
+                self.metrics.inc_evictions()
 
     def __getitem__(self, key):
         try:
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index cf5fbb679c..b684f24e7b 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -14,8 +14,8 @@
 # limitations under the License.
 
 
-from functools import wraps
 import threading
+from functools import wraps
 
 from synapse.util.caches.treecache import TreeCache
 
@@ -49,7 +49,24 @@ class LruCache(object):
     Can also set callbacks on objects when getting/setting which are fired
     when that key gets invalidated/evicted.
     """
-    def __init__(self, max_size, keylen=1, cache_type=dict, size_callback=None):
+    def __init__(self, max_size, keylen=1, cache_type=dict, size_callback=None,
+                 evicted_callback=None):
+        """
+        Args:
+            max_size (int):
+
+            keylen (int):
+
+            cache_type (type):
+                type of underlying cache to be used. Typically one of dict
+                or TreeCache.
+
+            size_callback (func(V) -> int | None):
+
+            evicted_callback (func(int)|None):
+                if not None, called on eviction with the size of the evicted
+                entry
+        """
         cache = cache_type()
         self.cache = cache  # Used for introspection.
         list_root = _Node(None, None, None, None)
@@ -61,8 +78,10 @@ class LruCache(object):
         def evict():
             while cache_len() > max_size:
                 todelete = list_root.prev_node
-                delete_node(todelete)
+                evicted_len = delete_node(todelete)
                 cache.pop(todelete.key, None)
+                if evicted_callback:
+                    evicted_callback(evicted_len)
 
         def synchronized(f):
             @wraps(f)
@@ -111,12 +130,15 @@ class LruCache(object):
             prev_node.next_node = next_node
             next_node.prev_node = prev_node
 
+            deleted_len = 1
             if size_callback:
-                cached_cache_len[0] -= size_callback(node.value)
+                deleted_len = size_callback(node.value)
+                cached_cache_len[0] -= deleted_len
 
             for cb in node.callbacks:
                 cb()
             node.callbacks.clear()
+            return deleted_len
 
         @synchronized
         def cache_get(key, default=None, callbacks=[]):
@@ -132,14 +154,21 @@ class LruCache(object):
         def cache_set(key, value, callbacks=[]):
             node = cache.get(key, None)
             if node is not None:
-                if value != node.value:
+                # We sometimes store large objects, e.g. dicts, which cause
+                # the inequality check to take a long time. So let's only do
+                # the check if we have some callbacks to call.
+                if node.callbacks and value != node.value:
                     for cb in node.callbacks:
                         cb()
                     node.callbacks.clear()
 
-                    if size_callback:
-                        cached_cache_len[0] -= size_callback(node.value)
-                        cached_cache_len[0] += size_callback(value)
+                # We don't bother to protect this by value != node.value as
+                # generally size_callback will be cheap compared with equality
+                # checks. (For example, taking the size of two dicts is quicker
+                # than comparing them for equality.)
+                if size_callback:
+                    cached_cache_len[0] -= size_callback(node.value)
+                    cached_cache_len[0] += size_callback(value)
 
                 node.callbacks.update(callbacks)
 
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 00af539880..a8491b42d5 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,8 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
+from synapse.util.caches import register_cache
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
+logger = logging.getLogger(__name__)
 
 
 class ResponseCache(object):
@@ -24,20 +31,69 @@ class ResponseCache(object):
     used rather than trying to compute a new response.
     """
 
-    def __init__(self, hs, timeout_ms=0):
+    def __init__(self, hs, name, timeout_ms=0):
         self.pending_result_cache = {}  # Requests that haven't finished yet.
 
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.
 
+        self._name = name
+        self._metrics = register_cache(
+            "response_cache", name, self
+        )
+
+    def size(self):
+        return len(self.pending_result_cache)
+
+    def __len__(self):
+        return self.size()
+
     def get(self, key):
+        """Look up the given key.
+
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if the request has completed, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        If there is no entry for the key, returns None. It is worth noting that
+        this means there is no way to distinguish a completed result of None
+        from an absent cache entry.
+
+        Args:
+            key (hashable):
+
+        Returns:
+            twisted.internet.defer.Deferred|None|E: None if there is no entry
+            for this key; otherwise either a deferred result or the result
+            itself.
+        """
         result = self.pending_result_cache.get(key)
         if result is not None:
+            self._metrics.inc_hits()
             return result.observe()
         else:
+            self._metrics.inc_misses()
             return None
 
     def set(self, key, deferred):
+        """Set the entry for the given key to the given deferred.
+
+        *deferred* should run its callbacks in the sentinel logcontext (ie,
+        you should wrap normal synapse deferreds with
+        logcontext.run_in_background).
+
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if *deferred* was already complete, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        Args:
+            key (hashable):
+            deferred (twisted.internet.defer.Deferred[T):
+
+        Returns:
+            twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
+                result.
+        """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
 
@@ -53,3 +109,52 @@ class ResponseCache(object):
 
         result.addBoth(remove)
         return result.observe()
+
+    def wrap(self, key, callback, *args, **kwargs):
+        """Wrap together a *get* and *set* call, taking care of logcontexts
+
+        First looks up the key in the cache, and if it is present makes it
+        follow the synapse logcontext rules and returns it.
+
+        Otherwise, makes a call to *callback(*args, **kwargs)*, which should
+        follow the synapse logcontext rules, and adds the result to the cache.
+
+        Example usage:
+
+            @defer.inlineCallbacks
+            def handle_request(request):
+                # etc
+                defer.returnValue(result)
+
+            result = yield response_cache.wrap(
+                key,
+                handle_request,
+                request,
+            )
+
+        Args:
+            key (hashable): key to get/set in the cache
+
+            callback (callable): function to call if the key is not found in
+                the cache
+
+            *args: positional parameters to pass to the callback, if it is used
+
+            **kwargs: named paramters to pass to the callback, if it is used
+
+        Returns:
+            twisted.internet.defer.Deferred: yieldable result
+        """
+        result = self.get(key)
+        if not result:
+            logger.info("[%s]: no cached result for [%s], calculating new one",
+                        self._name, key)
+            d = run_in_background(callback, *args, **kwargs)
+            result = self.set(key, d)
+        elif not isinstance(result, defer.Deferred) or result.called:
+            logger.info("[%s]: using completed cached result for [%s]",
+                        self._name, key)
+        else:
+            logger.info("[%s]: using incomplete cached result for [%s]",
+                        self._name, key)
+        return make_deferred_yieldable(result)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 941d873ab8..f2bde74dc5 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,12 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
-
-
-from blist import sorteddict
 import logging
 
+from sortedcontainers import SortedDict
+
+from synapse.util import caches
 
 logger = logging.getLogger(__name__)
 
@@ -32,16 +31,18 @@ class StreamChangeCache(object):
     entities that may have changed since that position. If position key is too
     old then the cache will simply return all given entities.
     """
-    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
-        self._max_size = int(max_size * CACHE_SIZE_FACTOR)
+
+    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
+        self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
-        self._cache = sorteddict()
+        self._cache = SortedDict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
-        self.metrics = register_cache(self.name, self._cache)
+        self.metrics = caches.register_cache("cache", self.name, self._cache)
 
-        for entity, stream_pos in prefilled_cache.items():
-            self.entity_has_changed(entity, stream_pos)
+        if prefilled_cache:
+            for entity, stream_pos in prefilled_cache.items():
+                self.entity_has_changed(entity, stream_pos)
 
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
@@ -65,22 +66,25 @@ class StreamChangeCache(object):
         return False
 
     def get_entities_changed(self, entities, stream_pos):
-        """Returns subset of entities that have had new things since the
-        given position. If the position is too old it will just return the given list.
+        """
+        Returns subset of entities that have had new things since the given
+        position.  Entities unknown to the cache will be returned.  If the
+        position is too old it will just return the given list.
         """
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
+            changed_entities = {
+                self._cache[k] for k in self._cache.islice(
+                    start=self._cache.bisect_right(stream_pos),
+                )
+            }
 
-            result = set(
-                self._cache[k] for k in keys[i:]
-            ).intersection(entities)
+            result = changed_entities.intersection(entities)
 
             self.metrics.inc_hits()
         else:
-            result = entities
+            result = set(entities)
             self.metrics.inc_misses()
 
         return result
@@ -90,12 +94,13 @@ class StreamChangeCache(object):
         """
         assert type(stream_pos) is int
 
+        if not self._cache:
+            # If we have no cache, nothing can have changed.
+            return False
+
         if stream_pos >= self._earliest_known_stream_pos:
             self.metrics.inc_hits()
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return i < len(keys)
+            return self._cache.bisect_right(stream_pos) < len(self._cache)
         else:
             self.metrics.inc_misses()
             return True
@@ -107,10 +112,8 @@ class StreamChangeCache(object):
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return [self._cache[k] for k in keys[i:]]
+            return [self._cache[k] for k in self._cache.islice(
+                start=self._cache.bisect_right(stream_pos))]
         else:
             return None
 
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
             self._entity_to_key[entity] = stream_pos
 
             while len(self._cache) > self._max_size:
-                k, r = self._cache.popitem()
-                self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+                k, r = self._cache.popitem(0)
+                self._earliest_known_stream_pos = max(
+                    k, self._earliest_known_stream_pos,
+                )
                 self._entity_to_key.pop(r, None)
 
     def get_max_pos_of_last_change(self, entity):
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index fcc341a6b7..dd4c9e6067 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -1,3 +1,5 @@
+from six import itervalues
+
 SENTINEL = object()
 
 
@@ -49,7 +51,7 @@ class TreeCache(object):
         if popped is SENTINEL:
             return default
 
-        node_and_keys = zip(nodes, key)
+        node_and_keys = list(zip(nodes, key))
         node_and_keys.reverse()
         node_and_keys.append((self.root, None))
 
@@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d):
     can contain dicts.
     """
     if isinstance(d, dict):
-        for value_d in d.itervalues():
+        for value_d in itervalues(d):
             for value in iterate_tree_cache_entry(value_d):
                 yield value
     else: