summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py2
-rw-r--r--synapse/util/caches/descriptors.py6
-rw-r--r--synapse/util/caches/expiringcache.py16
-rw-r--r--synapse/util/caches/stream_change_cache.py22
-rw-r--r--synapse/util/wheel_timer.py97
5 files changed, 139 insertions, 4 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py

index 133671e238..3b9da5b34a 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py
@@ -42,7 +42,7 @@ class Clock(object): def time_msec(self): """Returns the current system time in miliseconds since epoch.""" - return self.time() * 1000 + return int(self.time() * 1000) def looping_call(self, f, msec): l = task.LoopingCall(f) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 277854ccbc..35544b19fd 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py
@@ -28,6 +28,7 @@ from twisted.internet import defer from collections import OrderedDict +import os import functools import inspect import threading @@ -38,6 +39,9 @@ logger = logging.getLogger(__name__) _CacheSentinel = object() +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) + + class Cache(object): def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): @@ -140,6 +144,8 @@ class CacheDescriptor(object): """ def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False, inlineCallbacks=False): + max_entries = int(max_entries * CACHE_SIZE_FACTOR) + self.orig = orig if inlineCallbacks: diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 62cae99649..e863a8f8a9 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py
@@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util.caches import cache_counter, caches_by_name + import logging @@ -47,6 +49,8 @@ class ExpiringCache(object): self._cache = {} + caches_by_name[cache_name] = self._cache + def start(self): if not self._expiry_ms: # Don't bother starting the loop if things never expire @@ -72,7 +76,12 @@ class ExpiringCache(object): self._cache.pop(k) def __getitem__(self, key): - entry = self._cache[key] + try: + entry = self._cache[key] + cache_counter.inc_hits(self._cache_name) + except KeyError: + cache_counter.inc_misses(self._cache_name) + raise if self._reset_expiry_on_get: entry.time = self._clock.time_msec() @@ -105,9 +114,12 @@ class ExpiringCache(object): logger.debug( "[%s] _prune_cache before: %d, after len: %d", - self._cache_name, begin_length, len(self._cache.keys()) + self._cache_name, begin_length, len(self._cache) ) + def __len__(self): + return len(self._cache) + class _CacheEntry(object): def __init__(self, time, value): diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b37f1c0725..a1aec7aa55 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py
@@ -18,11 +18,15 @@ from synapse.util.caches import cache_counter, caches_by_name from blist import sorteddict import logging +import os logger = logging.getLogger(__name__) +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) + + class StreamChangeCache(object): """Keeps track of the stream positions of the latest change in a set of entities. @@ -33,7 +37,7 @@ class StreamChangeCache(object): old then the cache will simply return all given entities. """ def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): - self._max_size = max_size + self._max_size = int(max_size * CACHE_SIZE_FACTOR) self._entity_to_key = {} self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos @@ -85,6 +89,22 @@ class StreamChangeCache(object): return result + def get_all_entities_changed(self, stream_pos): + """Returns all entites that have had new things since the given + position. If the position is too old it will return None. + """ + assert type(stream_pos) is int + + if stream_pos >= self._earliest_known_stream_pos: + keys = self._cache.keys() + i = keys.bisect_right(stream_pos) + + return ( + self._cache[k] for k in keys[i:] + ) + else: + return None + def entity_has_changed(self, entity, stream_pos): """Informs the cache that the entity has been changed at the given position. diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py new file mode 100644
index 0000000000..7412fc57a4 --- /dev/null +++ b/synapse/util/wheel_timer.py
@@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class _Entry(object): + __slots__ = ["end_key", "queue"] + + def __init__(self, end_key): + self.end_key = end_key + self.queue = [] + + +class WheelTimer(object): + """Stores arbitrary objects that will be returned after their timers have + expired. + """ + + def __init__(self, bucket_size=5000): + """ + Args: + bucket_size (int): Size of buckets in ms. Corresponds roughly to the + accuracy of the timer. + """ + self.bucket_size = bucket_size + self.entries = [] + self.current_tick = 0 + + def insert(self, now, obj, then): + """Inserts object into timer. + + Args: + now (int): Current time in msec + obj (object): Object to be inserted + then (int): When to return the object strictly after. + """ + then_key = int(then / self.bucket_size) + 1 + + if self.entries: + min_key = self.entries[0].end_key + max_key = self.entries[-1].end_key + + if then_key <= max_key: + # The max here is to protect against inserts for times in the past + self.entries[max(min_key, then_key) - min_key].queue.append(obj) + return + + next_key = int(now / self.bucket_size) + 1 + if self.entries: + last_key = self.entries[-1].end_key + else: + last_key = next_key + + # Handle the case when `then` is in the past and `entries` is empty. + then_key = max(last_key, then_key) + + # Add empty entries between the end of the current list and when we want + # to insert. This ensures there are no gaps. + self.entries.extend( + _Entry(key) for key in xrange(last_key, then_key + 1) + ) + + self.entries[-1].queue.append(obj) + + def fetch(self, now): + """Fetch any objects that have timed out + + Args: + now (ms): Current time in msec + + Returns: + list: List of objects that have timed out + """ + now_key = int(now / self.bucket_size) + + ret = [] + while self.entries and self.entries[0].end_key <= now_key: + ret.extend(self.entries.pop(0).queue) + + return ret + + def __len__(self): + l = 0 + for entry in self.entries: + l += len(entry.queue) + return l