summary refs log tree commit diff
path: root/synapse/util/caches/stream_change_cache.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/caches/stream_change_cache.py')
-rw-r--r--synapse/util/caches/stream_change_cache.py20
1 files changed, 19 insertions, 1 deletions
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b37f1c0725..ea8a74ca69 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -18,11 +18,15 @@ from synapse.util.caches import cache_counter, caches_by_name
 
 from blist import sorteddict
 import logging
+import os
 
 
 logger = logging.getLogger(__name__)
 
 
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
 class StreamChangeCache(object):
     """Keeps track of the stream positions of the latest change in a set of entities.
 
@@ -33,7 +37,7 @@ class StreamChangeCache(object):
     old then the cache will simply return all given entities.
     """
     def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
-        self._max_size = max_size
+        self._max_size = int(max_size * CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
         self._cache = sorteddict()
         self._earliest_known_stream_pos = current_stream_pos
@@ -85,6 +89,20 @@ class StreamChangeCache(object):
 
         return result
 
+    def get_all_entities_changed(self, stream_pos):
+        """Returns all entites that have had new things since the given
+        position. If the position is too old it will return None.
+        """
+        assert type(stream_pos) is int
+
+        if stream_pos >= self._earliest_known_stream_pos:
+            keys = self._cache.keys()
+            i = keys.bisect_right(stream_pos)
+
+            return [self._cache[k] for k in keys[i:]]
+        else:
+            return None
+
     def entity_has_changed(self, entity, stream_pos):
         """Informs the cache that the entity has been changed at the given
         position.