Add optimisation to `StreamChangeCache` (#17130)
When there have been lots of changes compared with the number of
entities, we can do a fast(er) path.
Locally I ran some benchmarking, and the comparison seems to give the
best determination of which method we use.
1 files changed, 19 insertions, 1 deletions
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2079ca789c..91c335f85b 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -165,7 +165,7 @@ class StreamChangeCache:
return False
def get_entities_changed(
- self, entities: Collection[EntityType], stream_pos: int
+ self, entities: Collection[EntityType], stream_pos: int, _perf_factor: int = 1
) -> Union[Set[EntityType], FrozenSet[EntityType]]:
"""
Returns the subset of the given entities that have had changes after the given position.
@@ -177,6 +177,8 @@ class StreamChangeCache:
Args:
entities: Entities to check for changes.
stream_pos: The stream position to check for changes after.
+ _perf_factor: Used by unit tests to choose when to use each
+ optimisation.
Return:
A subset of entities which have changed after the given stream position.
@@ -184,6 +186,22 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
+ if not self._cache or stream_pos <= self._earliest_known_stream_pos:
+ self.metrics.inc_misses()
+ return set(entities)
+
+ # If there have been tonnes of changes compared with the number of
+ # entities, it is faster to check each entities stream ordering
+ # one-by-one.
+ max_stream_pos, _ = self._cache.peekitem()
+ if max_stream_pos - stream_pos > _perf_factor * len(entities):
+ self.metrics.inc_hits()
+ return {
+ entity
+ for entity in entities
+ if self._entity_to_key.get(entity, -1) > stream_pos
+ }
+
cache_result = self.get_all_entities_changed(stream_pos)
if cache_result.hit:
# We now do an intersection, trying to do so in the most efficient
|