summary refs log tree commit diff
path: root/synapse/util/caches/stream_change_cache.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-05 17:07:59 +0100
committerErik Johnston <erik@matrix.org>2020-05-05 17:40:29 +0100
commitf9073893af82eec64b594dbcaef37c407a291c52 (patch)
tree6b57fce16886aef99c6e796900822290866a38df /synapse/util/caches/stream_change_cache.py
parentWorkaround for assertion errors from db_query_to_update_function (#7378) (diff)
downloadsynapse-f9073893af82eec64b594dbcaef37c407a291c52.tar.xz
Speed up fetching device lists changes in sync.
Currently we copy `users_who_share_room` needlessly about three times,
which is expensive when the set is large (which it can easily be).
Diffstat (limited to 'synapse/util/caches/stream_change_cache.py')
-rw-r--r--synapse/util/caches/stream_change_cache.py19
1 files changed, 15 insertions, 4 deletions
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 38dc3f501e..e54f80d76e 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -14,12 +14,13 @@
 # limitations under the License.
 
 import logging
-from typing import Dict, Iterable, List, Mapping, Optional, Set
+from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
 
 from six import integer_types
 
 from sortedcontainers import SortedDict
 
+from synapse.types import Collection
 from synapse.util import caches
 
 logger = logging.getLogger(__name__)
@@ -85,8 +86,8 @@ class StreamChangeCache:
         return False
 
     def get_entities_changed(
-        self, entities: Iterable[EntityType], stream_pos: int
-    ) -> Set[EntityType]:
+        self, entities: Collection[EntityType], stream_pos: int
+    ) -> Union[Set[EntityType], FrozenSet[EntityType]]:
         """
         Returns subset of entities that have had new things since the given
         position.  Entities unknown to the cache will be returned.  If the
@@ -94,7 +95,17 @@ class StreamChangeCache:
         """
         changed_entities = self.get_all_entities_changed(stream_pos)
         if changed_entities is not None:
-            result = set(changed_entities).intersection(entities)
+            # We now do an intersection, trying to do so in the most efficient
+            # way possible (some of these sets are *large*). First check in the
+            # given iterable is already set that we can reuse, otherwise we
+            # create a set of the *smallest* of the two iterables and call
+            # `intersection(..)` on it (this can be twice as fast as the reverse).
+            if isinstance(entities, (set, frozenset)):
+                result = entities.intersection(changed_entities)
+            elif len(changed_entities) < len(entities):
+                result = set(changed_entities).intersection(entities)
+            else:
+                result = set(entities).intersection(changed_entities)
             self.metrics.inc_hits()
         else:
             result = set(entities)