diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index e6a66dc041..f7423f2fab 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -37,7 +37,7 @@ class LruCache(object):
"""
def __init__(self, max_size, keylen=1, cache_type=dict):
cache = cache_type()
- self.size = 0
+ self.cache = cache # Used for introspection.
list_root = []
list_root[:] = [list_root, list_root, None, None]
@@ -60,7 +60,6 @@ class LruCache(object):
prev_node[NEXT] = node
next_node[PREV] = node
cache[key] = node
- self.size += 1
def move_node_to_front(node):
prev_node = node[PREV]
@@ -79,7 +78,6 @@ class LruCache(object):
next_node = node[NEXT]
prev_node[NEXT] = next_node
next_node[PREV] = prev_node
- self.size -= 1
@synchronized
def cache_get(key, default=None):
@@ -98,7 +96,7 @@ class LruCache(object):
node[VALUE] = value
else:
add_node(key, value)
- if self.size > max_size:
+ if len(cache) > max_size:
todelete = list_root[PREV]
delete_node(todelete)
cache.pop(todelete[KEY], None)
@@ -110,7 +108,7 @@ class LruCache(object):
return node[VALUE]
else:
add_node(key, value)
- if self.size > max_size:
+ if len(cache) > max_size:
todelete = list_root[PREV]
delete_node(todelete)
cache.pop(todelete[KEY], None)
@@ -145,7 +143,7 @@ class LruCache(object):
@synchronized
def cache_len():
- return self.size
+ return len(cache)
@synchronized
def cache_contains(key):
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
new file mode 100644
index 0000000000..c673b1bdfc
--- /dev/null
+++ b/synapse/util/caches/stream_change_cache.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.caches import cache_counter, caches_by_name
+
+
+from blist import sorteddict
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class StreamChangeCache(object):
+ """Keeps track of the stream positions of the latest change in a set of entities.
+
+ Typically the entity will be a room or user id.
+
+ Given a list of entities and a stream position, it will give a subset of
+ entities that may have changed since that position. If position key is too
+ old then the cache will simply return all given entities.
+ """
+ def __init__(self, name, current_stream_pos, max_size=10000):
+ self._max_size = max_size
+ self._entity_to_key = {}
+ self._cache = sorteddict()
+ self._earliest_known_stream_pos = current_stream_pos
+ self.name = name
+ caches_by_name[self.name] = self._cache
+
+ def has_entity_changed(self, entity, stream_pos):
+ """Returns True if the entity may have been updated since stream_pos
+ """
+ assert type(stream_pos) is int
+
+ if stream_pos < self._earliest_known_stream_pos:
+ cache_counter.inc_misses(self.name)
+ return True
+
+ if stream_pos == self._earliest_known_stream_pos:
+ # If the same as the earliest key, assume nothing has changed.
+ cache_counter.inc_hits(self.name)
+ return False
+
+ latest_entity_change_pos = self._entity_to_key.get(entity, None)
+ if latest_entity_change_pos is None:
+ cache_counter.inc_misses(self.name)
+ return True
+
+ if stream_pos < latest_entity_change_pos:
+ cache_counter.inc_misses(self.name)
+ return True
+
+ cache_counter.inc_hits(self.name)
+ return False
+
+ def get_entities_changed(self, entities, stream_pos):
+ """Returns subset of entities that have had new things since the
+ given position. If the position is too old it will just return the given list.
+ """
+ assert type(stream_pos) is int
+
+ if stream_pos >= self._earliest_known_stream_pos:
+ keys = self._cache.keys()
+ i = keys.bisect_right(stream_pos)
+
+ result = set(
+ self._cache[k] for k in keys[i:]
+ ).intersection(entities)
+
+ cache_counter.inc_hits(self.name)
+ else:
+ result = entities
+ cache_counter.inc_misses(self.name)
+
+ return result
+
+ def entity_has_changed(self, entity, stream_pos):
+ """Informs the cache that the entity has been changed at the given
+ position.
+ """
+ assert type(stream_pos) is int
+
+ if stream_pos > self._earliest_known_stream_pos:
+ old_pos = self._entity_to_key.get(entity, None)
+ if old_pos:
+ stream_pos = max(stream_pos, old_pos)
+ self._cache.pop(old_pos, None)
+ self._cache[stream_pos] = entity
+ self._entity_to_key[entity] = stream_pos
+
+ while len(self._cache) > self._max_size:
+ k, r = self._cache.popitem()
+ self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+ self._entity_to_key.pop(r, None)
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 3b58860910..29d02f7e95 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -8,6 +8,7 @@ class TreeCache(object):
Keys must be tuples.
"""
def __init__(self):
+ self.size = 0
self.root = {}
def __setitem__(self, key, value):
@@ -20,7 +21,8 @@ class TreeCache(object):
node = self.root
for k in key[:-1]:
node = node.setdefault(k, {})
- node[key[-1]] = value
+ node[key[-1]] = _Entry(value)
+ self.size += 1
def get(self, key, default=None):
node = self.root
@@ -28,9 +30,10 @@ class TreeCache(object):
node = node.get(k, None)
if node is None:
return default
- return node.get(key[-1], default)
+ return node.get(key[-1], _Entry(default)).value
def clear(self):
+ self.size = 0
self.root = {}
def pop(self, key, default=None):
@@ -57,4 +60,33 @@ class TreeCache(object):
break
node_and_keys[i+1][0].pop(k)
+ popped, cnt = _strip_and_count_entires(popped)
+ self.size -= cnt
return popped
+
+ def __len__(self):
+ return self.size
+
+
+class _Entry(object):
+ __slots__ = ["value"]
+
+ def __init__(self, value):
+ self.value = value
+
+
+def _strip_and_count_entires(d):
+ """Takes an _Entry or dict with leaves of _Entry's, and either returns the
+ value or a dictionary with _Entry's replaced by their values.
+
+ Also returns the count of _Entry's
+ """
+ if isinstance(d, dict):
+ cnt = 0
+ for key, value in d.items():
+ v, n = _strip_and_count_entires(value)
+ d[key] = v
+ cnt += n
+ return d, cnt
+ else:
+ return d.value, 1
|