summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-08-04 15:56:56 +0100
committerErik Johnston <erik@matrix.org>2015-08-04 15:56:56 +0100
commite7768e77f56709897215c13ee3fd187550d20fd2 (patch)
tree4a01064901771b2a02c23bf2e4b90b4ec0e2d120 /synapse/storage
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/acl_perf (diff)
downloadsynapse-e7768e77f56709897215c13ee3fd187550d20fd2.tar.xz
Add basic dictionary cache
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/util/caches.py94
1 files changed, 94 insertions, 0 deletions
diff --git a/synapse/storage/util/caches.py b/synapse/storage/util/caches.py
new file mode 100644
index 0000000000..0877cc79f6
--- /dev/null
+++ b/synapse/storage/util/caches.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.lrucache import LruCache
+from collections import namedtuple
+import threading
+
+
+DictionaryEntry = namedtuple("DictionaryEntry", ("full", "value"))
+
+
+class DictionaryCache(object):
+
+    def __init__(self, name, max_entries=1000):
+        self.cache = LruCache(max_size=max_entries)
+
+        self.name = name
+        self.sequence = 0
+        self.thread = None
+        # caches_by_name[name] = self.cache
+
+        class Sentinel(object):
+            __slots__ = []
+
+        self.sentinel = Sentinel()
+
+    def check_thread(self):
+        expected_thread = self.thread
+        if expected_thread is None:
+            self.thread = threading.current_thread()
+        else:
+            if expected_thread is not threading.current_thread():
+                raise ValueError(
+                    "Cache objects can only be accessed from the main thread"
+                )
+
+    def get(self, key, dict_keys=None):
+        entry = self.cache.get(key, self.sentinel)
+        if entry is not self.sentinel:
+            # cache_counter.inc_hits(self.name)
+
+            if dict_keys is None:
+                return DictionaryEntry(entry.full, dict(entry.value))
+            else:
+                return DictionaryEntry(entry.full, {
+                    k: entry.value[k]
+                    for k in dict_keys
+                    if k in entry.value
+                })
+
+        # cache_counter.inc_misses(self.name)
+        return DictionaryEntry(False, {})
+
+    def invalidate(self, key):
+        self.check_thread()
+
+        # Increment the sequence number so that any SELECT statements that
+        # raced with the INSERT don't update the cache (SYN-369)
+        self.sequence += 1
+        self.cache.pop(key, None)
+
+    def invalidate_all(self):
+        self.check_thread()
+        self.sequence += 1
+        self.cache.clear()
+
+    def update(self, sequence, key, value, full=False):
+        self.check_thread()
+        if self.sequence == sequence:
+            # Only update the cache if the caches sequence number matches the
+            # number that the cache had before the SELECT was started (SYN-369)
+            if full:
+                self._insert(key, value)
+            else:
+                self._update_or_insert(key, value)
+
+    def _update_or_insert(self, key, value):
+        entry = self.cache.setdefault(key, DictionaryEntry(False, {}))
+        entry.value.update(value)
+
+    def _insert(self, key, value):
+        self.cache[key] = DictionaryEntry(True, value)