diff options
author | Erik Johnston <erik@matrix.org> | 2016-01-29 13:52:12 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-01-29 13:52:12 +0000 |
commit | fd142c29d9af53fe76dcf0cb4e1ebe7aac5d28f2 (patch) | |
tree | fd4650925097b4014e8ce32619ab4956a0a89879 /synapse/util | |
parent | Bump version and changelog (diff) | |
parent | Bump AccountDataAndTagsChangeCache size (diff) | |
download | synapse-fd142c29d9af53fe76dcf0cb4e1ebe7aac5d28f2.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.12.1
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/caches/lrucache.py | 10 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 107 | ||||
-rw-r--r-- | synapse/util/caches/treecache.py | 36 |
3 files changed, 145 insertions, 8 deletions
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index e6a66dc041..f7423f2fab 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -37,7 +37,7 @@ class LruCache(object): """ def __init__(self, max_size, keylen=1, cache_type=dict): cache = cache_type() - self.size = 0 + self.cache = cache # Used for introspection. list_root = [] list_root[:] = [list_root, list_root, None, None] @@ -60,7 +60,6 @@ class LruCache(object): prev_node[NEXT] = node next_node[PREV] = node cache[key] = node - self.size += 1 def move_node_to_front(node): prev_node = node[PREV] @@ -79,7 +78,6 @@ class LruCache(object): next_node = node[NEXT] prev_node[NEXT] = next_node next_node[PREV] = prev_node - self.size -= 1 @synchronized def cache_get(key, default=None): @@ -98,7 +96,7 @@ class LruCache(object): node[VALUE] = value else: add_node(key, value) - if self.size > max_size: + if len(cache) > max_size: todelete = list_root[PREV] delete_node(todelete) cache.pop(todelete[KEY], None) @@ -110,7 +108,7 @@ class LruCache(object): return node[VALUE] else: add_node(key, value) - if self.size > max_size: + if len(cache) > max_size: todelete = list_root[PREV] delete_node(todelete) cache.pop(todelete[KEY], None) @@ -145,7 +143,7 @@ class LruCache(object): @synchronized def cache_len(): - return self.size + return len(cache) @synchronized def cache_contains(key): diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py new file mode 100644 index 0000000000..c673b1bdfc --- /dev/null +++ b/synapse/util/caches/stream_change_cache.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.caches import cache_counter, caches_by_name + + +from blist import sorteddict +import logging + + +logger = logging.getLogger(__name__) + + +class StreamChangeCache(object): + """Keeps track of the stream positions of the latest change in a set of entities. + + Typically the entity will be a room or user id. + + Given a list of entities and a stream position, it will give a subset of + entities that may have changed since that position. If position key is too + old then the cache will simply return all given entities. + """ + def __init__(self, name, current_stream_pos, max_size=10000): + self._max_size = max_size + self._entity_to_key = {} + self._cache = sorteddict() + self._earliest_known_stream_pos = current_stream_pos + self.name = name + caches_by_name[self.name] = self._cache + + def has_entity_changed(self, entity, stream_pos): + """Returns True if the entity may have been updated since stream_pos + """ + assert type(stream_pos) is int + + if stream_pos < self._earliest_known_stream_pos: + cache_counter.inc_misses(self.name) + return True + + if stream_pos == self._earliest_known_stream_pos: + # If the same as the earliest key, assume nothing has changed. + cache_counter.inc_hits(self.name) + return False + + latest_entity_change_pos = self._entity_to_key.get(entity, None) + if latest_entity_change_pos is None: + cache_counter.inc_misses(self.name) + return True + + if stream_pos < latest_entity_change_pos: + cache_counter.inc_misses(self.name) + return True + + cache_counter.inc_hits(self.name) + return False + + def get_entities_changed(self, entities, stream_pos): + """Returns subset of entities that have had new things since the + given position. If the position is too old it will just return the given list. + """ + assert type(stream_pos) is int + + if stream_pos >= self._earliest_known_stream_pos: + keys = self._cache.keys() + i = keys.bisect_right(stream_pos) + + result = set( + self._cache[k] for k in keys[i:] + ).intersection(entities) + + cache_counter.inc_hits(self.name) + else: + result = entities + cache_counter.inc_misses(self.name) + + return result + + def entity_has_changed(self, entity, stream_pos): + """Informs the cache that the entity has been changed at the given + position. + """ + assert type(stream_pos) is int + + if stream_pos > self._earliest_known_stream_pos: + old_pos = self._entity_to_key.get(entity, None) + if old_pos: + stream_pos = max(stream_pos, old_pos) + self._cache.pop(old_pos, None) + self._cache[stream_pos] = entity + self._entity_to_key[entity] = stream_pos + + while len(self._cache) > self._max_size: + k, r = self._cache.popitem() + self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + self._entity_to_key.pop(r, None) diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 3b58860910..29d02f7e95 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -8,6 +8,7 @@ class TreeCache(object): Keys must be tuples. """ def __init__(self): + self.size = 0 self.root = {} def __setitem__(self, key, value): @@ -20,7 +21,8 @@ class TreeCache(object): node = self.root for k in key[:-1]: node = node.setdefault(k, {}) - node[key[-1]] = value + node[key[-1]] = _Entry(value) + self.size += 1 def get(self, key, default=None): node = self.root @@ -28,9 +30,10 @@ class TreeCache(object): node = node.get(k, None) if node is None: return default - return node.get(key[-1], default) + return node.get(key[-1], _Entry(default)).value def clear(self): + self.size = 0 self.root = {} def pop(self, key, default=None): @@ -57,4 +60,33 @@ class TreeCache(object): break node_and_keys[i+1][0].pop(k) + popped, cnt = _strip_and_count_entires(popped) + self.size -= cnt return popped + + def __len__(self): + return self.size + + +class _Entry(object): + __slots__ = ["value"] + + def __init__(self, value): + self.value = value + + +def _strip_and_count_entires(d): + """Takes an _Entry or dict with leaves of _Entry's, and either returns the + value or a dictionary with _Entry's replaced by their values. + + Also returns the count of _Entry's + """ + if isinstance(d, dict): + cnt = 0 + for key, value in d.items(): + v, n = _strip_and_count_entires(value) + d[key] = v + cnt += n + return d, cnt + else: + return d.value, 1 |