diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 133671e238..3b9da5b34a 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -42,7 +42,7 @@ class Clock(object):
def time_msec(self):
"""Returns the current system time in miliseconds since epoch."""
- return self.time() * 1000
+ return int(self.time() * 1000)
def looping_call(self, f, msec):
l = task.LoopingCall(f)
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 277854ccbc..35544b19fd 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -28,6 +28,7 @@ from twisted.internet import defer
from collections import OrderedDict
+import os
import functools
import inspect
import threading
@@ -38,6 +39,9 @@ logger = logging.getLogger(__name__)
_CacheSentinel = object()
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
class Cache(object):
def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
@@ -140,6 +144,8 @@ class CacheDescriptor(object):
"""
def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False,
inlineCallbacks=False):
+ max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+
self.orig = orig
if inlineCallbacks:
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 62cae99649..e863a8f8a9 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.util.caches import cache_counter, caches_by_name
+
import logging
@@ -47,6 +49,8 @@ class ExpiringCache(object):
self._cache = {}
+ caches_by_name[cache_name] = self._cache
+
def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
@@ -72,7 +76,12 @@ class ExpiringCache(object):
self._cache.pop(k)
def __getitem__(self, key):
- entry = self._cache[key]
+ try:
+ entry = self._cache[key]
+ cache_counter.inc_hits(self._cache_name)
+ except KeyError:
+ cache_counter.inc_misses(self._cache_name)
+ raise
if self._reset_expiry_on_get:
entry.time = self._clock.time_msec()
@@ -105,9 +114,12 @@ class ExpiringCache(object):
logger.debug(
"[%s] _prune_cache before: %d, after len: %d",
- self._cache_name, begin_length, len(self._cache.keys())
+ self._cache_name, begin_length, len(self._cache)
)
+ def __len__(self):
+ return len(self._cache)
+
class _CacheEntry(object):
def __init__(self, time, value):
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b37f1c0725..a1aec7aa55 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -18,11 +18,15 @@ from synapse.util.caches import cache_counter, caches_by_name
from blist import sorteddict
import logging
+import os
logger = logging.getLogger(__name__)
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
class StreamChangeCache(object):
"""Keeps track of the stream positions of the latest change in a set of entities.
@@ -33,7 +37,7 @@ class StreamChangeCache(object):
old then the cache will simply return all given entities.
"""
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
- self._max_size = max_size
+ self._max_size = int(max_size * CACHE_SIZE_FACTOR)
self._entity_to_key = {}
self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos
@@ -85,6 +89,22 @@ class StreamChangeCache(object):
return result
+ def get_all_entities_changed(self, stream_pos):
+ """Returns all entites that have had new things since the given
+ position. If the position is too old it will return None.
+ """
+ assert type(stream_pos) is int
+
+ if stream_pos >= self._earliest_known_stream_pos:
+ keys = self._cache.keys()
+ i = keys.bisect_right(stream_pos)
+
+ return (
+ self._cache[k] for k in keys[i:]
+ )
+ else:
+ return None
+
def entity_has_changed(self, entity, stream_pos):
"""Informs the cache that the entity has been changed at the given
position.
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
new file mode 100644
index 0000000000..7412fc57a4
--- /dev/null
+++ b/synapse/util/wheel_timer.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class _Entry(object):
+ __slots__ = ["end_key", "queue"]
+
+ def __init__(self, end_key):
+ self.end_key = end_key
+ self.queue = []
+
+
+class WheelTimer(object):
+ """Stores arbitrary objects that will be returned after their timers have
+ expired.
+ """
+
+ def __init__(self, bucket_size=5000):
+ """
+ Args:
+ bucket_size (int): Size of buckets in ms. Corresponds roughly to the
+ accuracy of the timer.
+ """
+ self.bucket_size = bucket_size
+ self.entries = []
+ self.current_tick = 0
+
+ def insert(self, now, obj, then):
+ """Inserts object into timer.
+
+ Args:
+ now (int): Current time in msec
+ obj (object): Object to be inserted
+ then (int): When to return the object strictly after.
+ """
+ then_key = int(then / self.bucket_size) + 1
+
+ if self.entries:
+ min_key = self.entries[0].end_key
+ max_key = self.entries[-1].end_key
+
+ if then_key <= max_key:
+ # The max here is to protect against inserts for times in the past
+ self.entries[max(min_key, then_key) - min_key].queue.append(obj)
+ return
+
+ next_key = int(now / self.bucket_size) + 1
+ if self.entries:
+ last_key = self.entries[-1].end_key
+ else:
+ last_key = next_key
+
+ # Handle the case when `then` is in the past and `entries` is empty.
+ then_key = max(last_key, then_key)
+
+ # Add empty entries between the end of the current list and when we want
+ # to insert. This ensures there are no gaps.
+ self.entries.extend(
+ _Entry(key) for key in xrange(last_key, then_key + 1)
+ )
+
+ self.entries[-1].queue.append(obj)
+
+ def fetch(self, now):
+ """Fetch any objects that have timed out
+
+ Args:
+ now (ms): Current time in msec
+
+ Returns:
+ list: List of objects that have timed out
+ """
+ now_key = int(now / self.bucket_size)
+
+ ret = []
+ while self.entries and self.entries[0].end_key <= now_key:
+ ret.extend(self.entries.pop(0).queue)
+
+ return ret
+
+ def __len__(self):
+ l = 0
+ for entry in self.entries:
+ l += len(entry.queue)
+ return l
|