diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 16fcb00206..5ac8643eef 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -142,9 +142,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
- # _cache is not valid at or before the earliest known stream position, so
+ # _cache is not valid before the earliest known stream position, so
# return that the entity has changed.
- if stream_pos <= self._earliest_known_stream_pos:
+ if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -186,7 +186,7 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
- if not self._cache or stream_pos <= self._earliest_known_stream_pos:
+ if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return set(entities)
@@ -238,9 +238,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
- # _cache is not valid at or before the earliest known stream position, so
+ # _cache is not valid before the earliest known stream position, so
# return that an entity has changed.
- if stream_pos <= self._earliest_known_stream_pos:
+ if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -270,9 +270,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
- # _cache is not valid at or before the earliest known stream position, so
+ # _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed.
- if stream_pos <= self._earliest_known_stream_pos:
+ if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = []
@@ -314,6 +314,15 @@ class StreamChangeCache:
self._entity_to_key[entity] = stream_pos
self._evict()
+ def all_entities_changed(self, stream_pos: int) -> None:
+ """
+ Mark all entities as changed. This is useful when the cache is invalidated and
+ there may be some potential change for all of the entities.
+ """
+ self._cache.clear()
+ self._entity_to_key.clear()
+ self._earliest_known_stream_pos = stream_pos
+
def _evict(self) -> None:
"""
Ensure the cache has not exceeded the maximum size.
|