diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index d8253bd942..91c335f85b 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -115,7 +115,7 @@ class StreamChangeCache:
"""
new_size = math.floor(self._original_max_size * factor)
if new_size != self._max_size:
- self.max_size = new_size
+ self._max_size = new_size
self._evict()
return True
return False
@@ -165,7 +165,7 @@ class StreamChangeCache:
return False
def get_entities_changed(
- self, entities: Collection[EntityType], stream_pos: int
+ self, entities: Collection[EntityType], stream_pos: int, _perf_factor: int = 1
) -> Union[Set[EntityType], FrozenSet[EntityType]]:
"""
Returns the subset of the given entities that have had changes after the given position.
@@ -177,6 +177,8 @@ class StreamChangeCache:
Args:
entities: Entities to check for changes.
stream_pos: The stream position to check for changes after.
+ _perf_factor: Used by unit tests to choose when to use each
+ optimisation.
Return:
A subset of entities which have changed after the given stream position.
@@ -184,6 +186,22 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
+ if not self._cache or stream_pos <= self._earliest_known_stream_pos:
+ self.metrics.inc_misses()
+ return set(entities)
+
+ # If there have been tonnes of changes compared with the number of
+ # entities, it is faster to check each entities stream ordering
+ # one-by-one.
+ max_stream_pos, _ = self._cache.peekitem()
+ if max_stream_pos - stream_pos > _perf_factor * len(entities):
+ self.metrics.inc_hits()
+ return {
+ entity
+ for entity in entities
+ if self._entity_to_key.get(entity, -1) > stream_pos
+ }
+
cache_result = self.get_all_entities_changed(stream_pos)
if cache_result.hit:
# We now do an intersection, trying to do so in the most efficient
|