1 files changed, 21 insertions, 1 deletions
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b37f1c0725..a1aec7aa55 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -18,11 +18,15 @@ from synapse.util.caches import cache_counter, caches_by_name
from blist import sorteddict
import logging
+import os
logger = logging.getLogger(__name__)
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
class StreamChangeCache(object):
"""Keeps track of the stream positions of the latest change in a set of entities.
@@ -33,7 +37,7 @@ class StreamChangeCache(object):
old then the cache will simply return all given entities.
"""
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
- self._max_size = max_size
+ self._max_size = int(max_size * CACHE_SIZE_FACTOR)
self._entity_to_key = {}
self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos
@@ -85,6 +89,22 @@ class StreamChangeCache(object):
return result
+ def get_all_entities_changed(self, stream_pos):
+ """Returns all entites that have had new things since the given
+ position. If the position is too old it will return None.
+ """
+ assert type(stream_pos) is int
+
+ if stream_pos >= self._earliest_known_stream_pos:
+ keys = self._cache.keys()
+ i = keys.bisect_right(stream_pos)
+
+ return (
+ self._cache[k] for k in keys[i:]
+ )
+ else:
+ return None
+
def entity_has_changed(self, entity, stream_pos):
"""Informs the cache that the entity has been changed at the given
position.
|