diff options
author | Erik Johnston <erikj@element.io> | 2024-05-06 12:56:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-06 12:56:52 +0100 |
commit | 3e6ee8ff88c41ad1fca8c055520be952ab21b705 (patch) | |
tree | 01e66e1e7da694e4786626ac89fd6e69ea8a8349 /synapse | |
parent | Fix bug where `StreamChangeCache` would not respect cache factors (#17152) (diff) | |
download | synapse-3e6ee8ff88c41ad1fca8c055520be952ab21b705.tar.xz |
Add optimisation to `StreamChangeCache` (#17130)
When there have been lots of changes compared with the number of entities, we can do a fast(er) path. Locally I ran some benchmarking, and the comparison seems to give the best determination of which method we use.
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 2079ca789c..91c335f85b 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -165,7 +165,7 @@ class StreamChangeCache: return False def get_entities_changed( - self, entities: Collection[EntityType], stream_pos: int + self, entities: Collection[EntityType], stream_pos: int, _perf_factor: int = 1 ) -> Union[Set[EntityType], FrozenSet[EntityType]]: """ Returns the subset of the given entities that have had changes after the given position. @@ -177,6 +177,8 @@ class StreamChangeCache: Args: entities: Entities to check for changes. stream_pos: The stream position to check for changes after. + _perf_factor: Used by unit tests to choose when to use each + optimisation. Return: A subset of entities which have changed after the given stream position. @@ -184,6 +186,22 @@ class StreamChangeCache: This will be all entities if the given stream position is at or earlier than the earliest known stream position. """ + if not self._cache or stream_pos <= self._earliest_known_stream_pos: + self.metrics.inc_misses() + return set(entities) + + # If there have been tonnes of changes compared with the number of + # entities, it is faster to check each entities stream ordering + # one-by-one. + max_stream_pos, _ = self._cache.peekitem() + if max_stream_pos - stream_pos > _perf_factor * len(entities): + self.metrics.inc_hits() + return { + entity + for entity in entities + if self._entity_to_key.get(entity, -1) > stream_pos + } + cache_result = self.get_all_entities_changed(stream_pos) if cache_result.hit: # We now do an intersection, trying to do so in the most efficient |