summary refs log tree commit diff
path: root/synapse/util/caches
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-03-30 12:36:40 +0100
committerErik Johnston <erik@matrix.org>2016-03-30 12:36:40 +0100
commit5fbdf2bcec40bf2f24fc0698440ee384595ff027 (patch)
treede838c7f39544ba52cd94a429bb65d7222a4a7cb /synapse/util/caches
parentMerge pull request #672 from nikriek/new-author (diff)
parentBump version and changelog (diff)
downloadsynapse-5fbdf2bcec40bf2f24fc0698440ee384595ff027.tar.xz
Merge branch 'release-v0.14.0' of github.com:matrix-org/synapse v0.14.0
Diffstat (limited to 'synapse/util/caches')
-rw-r--r--synapse/util/caches/__init__.py57
-rw-r--r--synapse/util/caches/descriptors.py6
-rw-r--r--synapse/util/caches/expiringcache.py18
-rw-r--r--synapse/util/caches/lrucache.py73
-rw-r--r--synapse/util/caches/stream_change_cache.py20
5 files changed, 138 insertions, 36 deletions
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 1a14904194..d53569ca49 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -14,6 +14,10 @@
 # limitations under the License.
 
 import synapse.metrics
+from lrucache import LruCache
+import os
+
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
 
 DEBUG_CACHES = False
 
@@ -25,3 +29,56 @@ cache_counter = metrics.register_cache(
     lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
     labels=["name"],
 )
+
+_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
+caches_by_name["string_cache"] = _string_cache
+
+
+KNOWN_KEYS = {
+    key: key for key in
+    (
+        "auth_events",
+        "content",
+        "depth",
+        "event_id",
+        "hashes",
+        "origin",
+        "origin_server_ts",
+        "prev_events",
+        "room_id",
+        "sender",
+        "signatures",
+        "state_key",
+        "type",
+        "unsigned",
+        "user_id",
+    )
+}
+
+
+def intern_string(string):
+    """Takes a (potentially) unicode string and interns using custom cache
+    """
+    return _string_cache.setdefault(string, string)
+
+
+def intern_dict(dictionary):
+    """Takes a dictionary and interns well known keys and their values
+    """
+    return {
+        KNOWN_KEYS.get(key, key): _intern_known_values(key, value)
+        for key, value in dictionary.items()
+    }
+
+
+def _intern_known_values(key, value):
+    intern_str_keys = ("event_id", "room_id")
+    intern_unicode_keys = ("sender", "user_id", "type", "state_key")
+
+    if key in intern_str_keys:
+        return intern(value.encode('ascii'))
+
+    if key in intern_unicode_keys:
+        return intern_string(value)
+
+    return value
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 277854ccbc..35544b19fd 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -28,6 +28,7 @@ from twisted.internet import defer
 
 from collections import OrderedDict
 
+import os
 import functools
 import inspect
 import threading
@@ -38,6 +39,9 @@ logger = logging.getLogger(__name__)
 _CacheSentinel = object()
 
 
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
 class Cache(object):
 
     def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
@@ -140,6 +144,8 @@ class CacheDescriptor(object):
     """
     def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False,
                  inlineCallbacks=False):
+        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+
         self.orig = orig
 
         if inlineCallbacks:
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 62cae99649..2b68c1ac93 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.util.caches import cache_counter, caches_by_name
+
 import logging
 
 
@@ -47,6 +49,8 @@ class ExpiringCache(object):
 
         self._cache = {}
 
+        caches_by_name[cache_name] = self._cache
+
     def start(self):
         if not self._expiry_ms:
             # Don't bother starting the loop if things never expire
@@ -65,14 +69,19 @@ class ExpiringCache(object):
         if self._max_len and len(self._cache.keys()) > self._max_len:
             sorted_entries = sorted(
                 self._cache.items(),
-                key=lambda (k, v): v.time,
+                key=lambda item: item[1].time,
             )
 
             for k, _ in sorted_entries[self._max_len:]:
                 self._cache.pop(k)
 
     def __getitem__(self, key):
-        entry = self._cache[key]
+        try:
+            entry = self._cache[key]
+            cache_counter.inc_hits(self._cache_name)
+        except KeyError:
+            cache_counter.inc_misses(self._cache_name)
+            raise
 
         if self._reset_expiry_on_get:
             entry.time = self._clock.time_msec()
@@ -105,9 +114,12 @@ class ExpiringCache(object):
 
         logger.debug(
             "[%s] _prune_cache before: %d, after len: %d",
-            self._cache_name, begin_length, len(self._cache.keys())
+            self._cache_name, begin_length, len(self._cache)
         )
 
+    def __len__(self):
+        return len(self._cache)
+
 
 class _CacheEntry(object):
     def __init__(self, time, value):
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index f7423f2fab..f9df445a8d 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -29,6 +29,16 @@ def enumerate_leaves(node, depth):
                 yield m
 
 
+class _Node(object):
+    __slots__ = ["prev_node", "next_node", "key", "value"]
+
+    def __init__(self, prev_node, next_node, key, value):
+        self.prev_node = prev_node
+        self.next_node = next_node
+        self.key = key
+        self.value = value
+
+
 class LruCache(object):
     """
     Least-recently-used cache.
@@ -38,10 +48,9 @@ class LruCache(object):
     def __init__(self, max_size, keylen=1, cache_type=dict):
         cache = cache_type()
         self.cache = cache  # Used for introspection.
-        list_root = []
-        list_root[:] = [list_root, list_root, None, None]
-
-        PREV, NEXT, KEY, VALUE = 0, 1, 2, 3
+        list_root = _Node(None, None, None, None)
+        list_root.next_node = list_root
+        list_root.prev_node = list_root
 
         lock = threading.Lock()
 
@@ -55,36 +64,36 @@ class LruCache(object):
 
         def add_node(key, value):
             prev_node = list_root
-            next_node = prev_node[NEXT]
-            node = [prev_node, next_node, key, value]
-            prev_node[NEXT] = node
-            next_node[PREV] = node
+            next_node = prev_node.next_node
+            node = _Node(prev_node, next_node, key, value)
+            prev_node.next_node = node
+            next_node.prev_node = node
             cache[key] = node
 
         def move_node_to_front(node):
-            prev_node = node[PREV]
-            next_node = node[NEXT]
-            prev_node[NEXT] = next_node
-            next_node[PREV] = prev_node
+            prev_node = node.prev_node
+            next_node = node.next_node
+            prev_node.next_node = next_node
+            next_node.prev_node = prev_node
             prev_node = list_root
-            next_node = prev_node[NEXT]
-            node[PREV] = prev_node
-            node[NEXT] = next_node
-            prev_node[NEXT] = node
-            next_node[PREV] = node
+            next_node = prev_node.next_node
+            node.prev_node = prev_node
+            node.next_node = next_node
+            prev_node.next_node = node
+            next_node.prev_node = node
 
         def delete_node(node):
-            prev_node = node[PREV]
-            next_node = node[NEXT]
-            prev_node[NEXT] = next_node
-            next_node[PREV] = prev_node
+            prev_node = node.prev_node
+            next_node = node.next_node
+            prev_node.next_node = next_node
+            next_node.prev_node = prev_node
 
         @synchronized
         def cache_get(key, default=None):
             node = cache.get(key, None)
             if node is not None:
                 move_node_to_front(node)
-                return node[VALUE]
+                return node.value
             else:
                 return default
 
@@ -93,25 +102,25 @@ class LruCache(object):
             node = cache.get(key, None)
             if node is not None:
                 move_node_to_front(node)
-                node[VALUE] = value
+                node.value = value
             else:
                 add_node(key, value)
                 if len(cache) > max_size:
-                    todelete = list_root[PREV]
+                    todelete = list_root.prev_node
                     delete_node(todelete)
-                    cache.pop(todelete[KEY], None)
+                    cache.pop(todelete.key, None)
 
         @synchronized
         def cache_set_default(key, value):
             node = cache.get(key, None)
             if node is not None:
-                return node[VALUE]
+                return node.value
             else:
                 add_node(key, value)
                 if len(cache) > max_size:
-                    todelete = list_root[PREV]
+                    todelete = list_root.prev_node
                     delete_node(todelete)
-                    cache.pop(todelete[KEY], None)
+                    cache.pop(todelete.key, None)
                 return value
 
         @synchronized
@@ -119,8 +128,8 @@ class LruCache(object):
             node = cache.get(key, None)
             if node:
                 delete_node(node)
-                cache.pop(node[KEY], None)
-                return node[VALUE]
+                cache.pop(node.key, None)
+                return node.value
             else:
                 return default
 
@@ -137,8 +146,8 @@ class LruCache(object):
 
         @synchronized
         def cache_clear():
-            list_root[NEXT] = list_root
-            list_root[PREV] = list_root
+            list_root.next_node = list_root
+            list_root.prev_node = list_root
             cache.clear()
 
         @synchronized
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b37f1c0725..ea8a74ca69 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -18,11 +18,15 @@ from synapse.util.caches import cache_counter, caches_by_name
 
 from blist import sorteddict
 import logging
+import os
 
 
 logger = logging.getLogger(__name__)
 
 
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
 class StreamChangeCache(object):
     """Keeps track of the stream positions of the latest change in a set of entities.
 
@@ -33,7 +37,7 @@ class StreamChangeCache(object):
     old then the cache will simply return all given entities.
     """
     def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
-        self._max_size = max_size
+        self._max_size = int(max_size * CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
         self._cache = sorteddict()
         self._earliest_known_stream_pos = current_stream_pos
@@ -85,6 +89,20 @@ class StreamChangeCache(object):
 
         return result
 
+    def get_all_entities_changed(self, stream_pos):
+        """Returns all entites that have had new things since the given
+        position. If the position is too old it will return None.
+        """
+        assert type(stream_pos) is int
+
+        if stream_pos >= self._earliest_known_stream_pos:
+            keys = self._cache.keys()
+            i = keys.bisect_right(stream_pos)
+
+            return [self._cache[k] for k in keys[i:]]
+        else:
+            return None
+
     def entity_has_changed(self, entity, stream_pos):
         """Informs the cache that the entity has been changed at the given
         position.