summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-06 12:56:52 +0100
committerGitHub <noreply@github.com>2024-05-06 12:56:52 +0100
commit3e6ee8ff88c41ad1fca8c055520be952ab21b705 (patch)
tree01e66e1e7da694e4786626ac89fd6e69ea8a8349 /synapse/util
parentFix bug where `StreamChangeCache` would not respect cache factors (#17152) (diff)
downloadsynapse-3e6ee8ff88c41ad1fca8c055520be952ab21b705.tar.xz
Add optimisation to `StreamChangeCache` (#17130)
When there have been lots of changes compared with the number of
entities, we can do a fast(er) path.

Locally I ran some benchmarking, and the comparison seems to give the
best determination of which method we use.
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/caches/stream_change_cache.py20
1 files changed, 19 insertions, 1 deletions
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2079ca789c..91c335f85b 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -165,7 +165,7 @@ class StreamChangeCache:
         return False
 
     def get_entities_changed(
-        self, entities: Collection[EntityType], stream_pos: int
+        self, entities: Collection[EntityType], stream_pos: int, _perf_factor: int = 1
     ) -> Union[Set[EntityType], FrozenSet[EntityType]]:
         """
         Returns the subset of the given entities that have had changes after the given position.
@@ -177,6 +177,8 @@ class StreamChangeCache:
         Args:
             entities: Entities to check for changes.
             stream_pos: The stream position to check for changes after.
+            _perf_factor: Used by unit tests to choose when to use each
+                optimisation.
 
         Return:
             A subset of entities which have changed after the given stream position.
@@ -184,6 +186,22 @@ class StreamChangeCache:
             This will be all entities if the given stream position is at or earlier
             than the earliest known stream position.
         """
+        if not self._cache or stream_pos <= self._earliest_known_stream_pos:
+            self.metrics.inc_misses()
+            return set(entities)
+
+        # If there have been tonnes of changes compared with the number of
+        # entities, it is faster to check each entities stream ordering
+        # one-by-one.
+        max_stream_pos, _ = self._cache.peekitem()
+        if max_stream_pos - stream_pos > _perf_factor * len(entities):
+            self.metrics.inc_hits()
+            return {
+                entity
+                for entity in entities
+                if self._entity_to_key.get(entity, -1) > stream_pos
+            }
+
         cache_result = self.get_all_entities_changed(stream_pos)
         if cache_result.hit:
             # We now do an intersection, trying to do so in the most efficient