From acea4d7a2ff61b5beda420b54a8451088060a8cd Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 2 Dec 2022 12:58:56 -0500 Subject: Add missing types to tests.util. (#14597) Removes files under tests.util from the ignored by list, then fully types all tests/util/*.py files. --- tests/util/test_stream_change_cache.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'tests/util/test_stream_change_cache.py') diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index 9ed01f7e0c..1b0fa52ad1 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -8,7 +8,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): Tests for StreamChangeCache. """ - def test_prefilled_cache(self): + def test_prefilled_cache(self) -> None: """ Providing a prefilled cache to StreamChangeCache will result in a cache with the prefilled-cache entered in. @@ -16,7 +16,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2}) self.assertTrue(cache.has_entity_changed("user@foo.com", 1)) - def test_has_entity_changed(self): + def test_has_entity_changed(self) -> None: """ StreamChangeCache.entity_has_changed will mark entities as changed, and has_entity_changed will observe the changed entities. @@ -52,7 +52,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): self.assertTrue(cache.has_entity_changed("user@foo.com", 0)) self.assertTrue(cache.has_entity_changed("not@here.website", 0)) - def test_entity_has_changed_pops_off_start(self): + def test_entity_has_changed_pops_off_start(self) -> None: """ StreamChangeCache.entity_has_changed will respect the max size and purge the oldest items upon reaching that max size. @@ -86,7 +86,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): ) self.assertIsNone(cache.get_all_entities_changed(1)) - def test_get_all_entities_changed(self): + def test_get_all_entities_changed(self) -> None: """ StreamChangeCache.get_all_entities_changed will return all changed entities since the given position. If the position is before the start @@ -142,7 +142,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): r = cache.get_all_entities_changed(3) self.assertTrue(r == ok1 or r == ok2) - def test_has_any_entity_changed(self): + def test_has_any_entity_changed(self) -> None: """ StreamChangeCache.has_any_entity_changed will return True if any entities have been changed since the provided stream position, and @@ -168,7 +168,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): self.assertFalse(cache.has_any_entity_changed(2)) self.assertFalse(cache.has_any_entity_changed(3)) - def test_get_entities_changed(self): + def test_get_entities_changed(self) -> None: """ StreamChangeCache.get_entities_changed will return the entities in the given list that have changed since the provided stream ID. If the @@ -228,7 +228,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): {"bar@baz.net"}, ) - def test_max_pos(self): + def test_max_pos(self) -> None: """ StreamChangeCache.get_max_pos_of_last_change will return the most recent point where the entity could have changed. If the entity is not -- cgit 1.5.1 From 6a8310f3dfe77acf59df2fe3e88a71b85b9b3ecc Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 5 Dec 2022 09:00:59 -0500 Subject: Compare to the earliest known stream pos in the stream change cache. (#14435) The internal methods of the StreamChangeCache were inconsistently treating the earliest known stream position as valid. It is now treated as invalid, meaning the cache cannot determine if an entity at the earliest known stream position has changed or not. --- changelog.d/14435.bugfix | 1 + poetry.lock | 2 +- pyproject.toml | 3 +- synapse/util/caches/stream_change_cache.py | 142 +++++++++++++++++++++++------ tests/util/test_stream_change_cache.py | 38 +++----- 5 files changed, 133 insertions(+), 53 deletions(-) create mode 100644 changelog.d/14435.bugfix (limited to 'tests/util/test_stream_change_cache.py') diff --git a/changelog.d/14435.bugfix b/changelog.d/14435.bugfix new file mode 100644 index 0000000000..149ee99dd7 --- /dev/null +++ b/changelog.d/14435.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances. diff --git a/poetry.lock b/poetry.lock index 8c63134578..90b363a548 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1639,7 +1639,7 @@ url-preview = ["lxml"] [metadata] lock-version = "1.1" python-versions = "^3.7.1" -content-hash = "27811bd21d56ceeb0f68ded5a00375efcd1a004928f0736f5b02927ce8594cb0" +content-hash = "8c44ceeb9df5c3ab43040400e0a6b895de49417e61293a1ba027640b34f03263" [metadata.files] attrs = [ diff --git a/pyproject.toml b/pyproject.toml index af5ce2aa03..1368e4e688 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -141,7 +141,8 @@ pyasn1 = ">=0.1.9" pyasn1-modules = ">=0.0.7" bcrypt = ">=3.1.7" Pillow = ">=5.4.0" -sortedcontainers = ">=1.4.4" +# We use SortedDict.peekitem(), which was added in sortedcontainers 1.5.2. +sortedcontainers = ">=1.5.2" pymacaroons = ">=0.13.0" msgpack = ">=0.5.2" phonenumbers = ">=8.2.0" diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 666f4b6895..042de8d7c8 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -27,13 +27,17 @@ EntityType = str class StreamChangeCache: - """Keeps track of the stream positions of the latest change in a set of entities. + """ + Keeps track of the stream positions of the latest change in a set of entities. + + The entity will is typically a room ID or user ID, but can be any string. - Typically the entity will be a room or user id. + Can be queried for whether a specific entity has changed after a stream position + or for a list of changed entities after a stream position. See the individual + methods for more information. - Given a list of entities and a stream position, it will give a subset of - entities that may have changed since that position. If position key is too - old then the cache will simply return all given entities. + Only tracks to a maximum cache size, any position earlier than the earliest + known stream position must be treated as unknown. """ def __init__( @@ -45,16 +49,20 @@ class StreamChangeCache: ) -> None: self._original_max_size: int = max_size self._max_size = math.floor(max_size) - self._entity_to_key: Dict[EntityType, int] = {} - # map from stream id to the a set of entities which changed at that stream id. + # map from stream id to the set of entities which changed at that stream id. self._cache: SortedDict[int, Set[EntityType]] = SortedDict() + # map from entity to the stream ID of the latest change for that entity. + # + # Must be kept in sync with _cache. + self._entity_to_key: Dict[EntityType, int] = {} # the earliest stream_pos for which we can reliably answer # get_all_entities_changed. In other words, one less than the earliest # stream_pos for which we know _cache is valid. # self._earliest_known_stream_pos = current_stream_pos + self.name = name self.metrics = caches.register_cache( "cache", self.name, self._cache, resize_callback=self.set_cache_factor @@ -82,22 +90,46 @@ class StreamChangeCache: return False def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool: - """Returns True if the entity may have been updated since stream_pos""" + """ + Returns True if the entity may have been updated after stream_pos. + + Args: + entity: The entity to check for changes. + stream_pos: The stream position to check for changes after. + + Return: + True if the entity may have been updated, this happens if: + * The given stream position is at or earlier than the earliest + known stream position. + * The given stream position is earlier than the latest change for + the entity. + + False otherwise: + * The entity is unknown. + * The given stream position is at or later than the latest change + for the entity. + """ assert isinstance(stream_pos, int) - if stream_pos < self._earliest_known_stream_pos: + # _cache is not valid at or before the earliest known stream position, so + # return that the entity has changed. + if stream_pos <= self._earliest_known_stream_pos: self.metrics.inc_misses() return True + # If the entity is unknown, it hasn't changed. latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: self.metrics.inc_hits() return False + # This is a known entity, return true if the stream position is earlier + # than the last change. if stream_pos < latest_entity_change_pos: self.metrics.inc_misses() return True + # Otherwise, the stream position is after the latest change: return false. self.metrics.inc_hits() return False @@ -105,15 +137,27 @@ class StreamChangeCache: self, entities: Collection[EntityType], stream_pos: int ) -> Union[Set[EntityType], FrozenSet[EntityType]]: """ - Returns subset of entities that have had new things since the given - position. Entities unknown to the cache will be returned. If the - position is too old it will just return the given list. + Returns the subset of the given entities that have had changes after the given position. + + Entities unknown to the cache will be returned. + + If the position is too old it will just return the given list. + + Args: + entities: Entities to check for changes. + stream_pos: The stream position to check for changes after. + + Return: + A subset of entities which have changed after the given stream position. + + This will be all entities if the given stream position is at or earlier + than the earliest known stream position. """ changed_entities = self.get_all_entities_changed(stream_pos) if changed_entities is not None: # We now do an intersection, trying to do so in the most efficient # way possible (some of these sets are *large*). First check in the - # given iterable is already set that we can reuse, otherwise we + # given iterable is already a set that we can reuse, otherwise we # create a set of the *smallest* of the two iterables and call # `intersection(..)` on it (this can be twice as fast as the reverse). if isinstance(entities, (set, frozenset)): @@ -130,29 +174,57 @@ class StreamChangeCache: return result def has_any_entity_changed(self, stream_pos: int) -> bool: - """Returns if any entity has changed""" - assert type(stream_pos) is int + """ + Returns true if any entity has changed after the given stream position. + + Args: + stream_pos: The stream position to check for changes after. + + Return: + True if any entity has changed after the given stream position or + if the given stream position is at or earlier than the earliest + known stream position. + + False otherwise. + """ + assert isinstance(stream_pos, int) if not self._cache: # If the cache is empty, nothing can have changed. return False - if stream_pos >= self._earliest_known_stream_pos: - self.metrics.inc_hits() - return self._cache.bisect_right(stream_pos) < len(self._cache) - else: + # _cache is not valid at or before the earliest known stream position, so + # return that an entity has changed. + if stream_pos <= self._earliest_known_stream_pos: self.metrics.inc_misses() return True + self.metrics.inc_hits() + return stream_pos < self._cache.peekitem()[0] + def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]: - """Returns all entities that have had new things since the given - position. If the position is too old it will return None. + """ + Returns all entities that have had changes after the given position. + + If the stream change cache does not go far enough back, i.e. the position + is too old, it will return None. Returns the entities in the order that they were changed. + + Args: + stream_pos: The stream position to check for changes after. + + Return: + Entities which have changed after the given stream position. + + None if the given stream position is at or earlier than the earliest + known stream position. """ - assert type(stream_pos) is int + assert isinstance(stream_pos, int) - if stream_pos < self._earliest_known_stream_pos: + # _cache is not valid at or before the earliest known stream position, so + # return None to mark that it is unknown if an entity has changed. + if stream_pos <= self._earliest_known_stream_pos: return None changed_entities: List[EntityType] = [] @@ -162,11 +234,17 @@ class StreamChangeCache: return changed_entities def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: - """Informs the cache that the entity has been changed at the given - position. """ - assert type(stream_pos) is int + Informs the cache that the entity has been changed at the given position. + + Args: + entity: The entity to mark as changed. + stream_pos: The stream position to update the entity to. + """ + assert isinstance(stream_pos, int) + # For a change before _cache is valid (e.g. at or before the earliest known + # stream position) there's nothing to do. if stream_pos <= self._earliest_known_stream_pos: return @@ -189,6 +267,11 @@ class StreamChangeCache: self._evict() def _evict(self) -> None: + """ + Ensure the cache has not exceeded the maximum size. + + Evicts entries until it is at the maximum size. + """ # if the cache is too big, remove entries while len(self._cache) > self._max_size: k, r = self._cache.popitem(0) @@ -199,5 +282,12 @@ class StreamChangeCache: def get_max_pos_of_last_change(self, entity: EntityType) -> int: """Returns an upper bound of the stream id of the last change to an entity. + + Args: + entity: The entity to check. + + Return: + The stream position of the latest change for the given entity or + the earliest known stream position if the entitiy is unknown. """ return self._entity_to_key.get(entity, self._earliest_known_stream_pos) diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index 1b0fa52ad1..a29cc872f9 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -51,6 +51,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): # return True, whether it's a known entity or not. self.assertTrue(cache.has_entity_changed("user@foo.com", 0)) self.assertTrue(cache.has_entity_changed("not@here.website", 0)) + self.assertTrue(cache.has_entity_changed("user@foo.com", 3)) + self.assertTrue(cache.has_entity_changed("not@here.website", 3)) def test_entity_has_changed_pops_off_start(self) -> None: """ @@ -65,15 +67,14 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): # The cache is at the max size, 2 self.assertEqual(len(cache._cache), 2) + # The cache's earliest known position is 2. + self.assertEqual(cache._earliest_known_stream_pos, 2) # The oldest item has been popped off self.assertTrue("user@foo.com" not in cache._entity_to_key) - self.assertEqual( - cache.get_all_entities_changed(2), - ["bar@baz.net", "user@elsewhere.org"], - ) - self.assertIsNone(cache.get_all_entities_changed(1)) + self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"]) + self.assertIsNone(cache.get_all_entities_changed(2)) # If we update an existing entity, it keeps the two existing entities cache.entity_has_changed("bar@baz.net", 5) @@ -81,10 +82,10 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): {"bar@baz.net", "user@elsewhere.org"}, set(cache._entity_to_key) ) self.assertEqual( - cache.get_all_entities_changed(2), + cache.get_all_entities_changed(3), ["user@elsewhere.org", "bar@baz.net"], ) - self.assertIsNone(cache.get_all_entities_changed(1)) + self.assertIsNone(cache.get_all_entities_changed(2)) def test_get_all_entities_changed(self) -> None: """ @@ -99,28 +100,15 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): cache.entity_has_changed("anotheruser@foo.com", 3) cache.entity_has_changed("user@elsewhere.org", 4) - r = cache.get_all_entities_changed(1) + r = cache.get_all_entities_changed(2) - # either of these are valid - ok1 = [ - "user@foo.com", - "bar@baz.net", - "anotheruser@foo.com", - "user@elsewhere.org", - ] - ok2 = [ - "user@foo.com", - "anotheruser@foo.com", - "bar@baz.net", - "user@elsewhere.org", - ] + # Results are ordered so either of these are valid. + ok1 = ["bar@baz.net", "anotheruser@foo.com", "user@elsewhere.org"] + ok2 = ["anotheruser@foo.com", "bar@baz.net", "user@elsewhere.org"] self.assertTrue(r == ok1 or r == ok2) - r = cache.get_all_entities_changed(2) - self.assertTrue(r == ok1[1:] or r == ok2[1:]) - self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"]) - self.assertEqual(cache.get_all_entities_changed(0), None) + self.assertEqual(cache.get_all_entities_changed(1), None) # ... later, things gest more updates cache.entity_has_changed("user@foo.com", 5) -- cgit 1.5.1 From cee9445884eb62c070fb0b03a112a862e8dea7c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Dec 2022 20:19:14 +0000 Subject: Better return type for `get_all_entities_changed` (#14604) Help callers from using the return value incorrectly by ensuring that callers explicitly check if there was a cache hit or not. --- changelog.d/14604.bugfix | 1 + synapse/handlers/appservice.py | 4 +- synapse/handlers/presence.py | 12 ++-- synapse/handlers/sync.py | 6 +- synapse/handlers/typing.py | 8 +-- synapse/storage/databases/main/devices.py | 111 ++++++++++++++++++----------- synapse/util/caches/stream_change_cache.py | 52 ++++++++++---- tests/util/test_stream_change_cache.py | 20 +++--- 8 files changed, 138 insertions(+), 76 deletions(-) create mode 100644 changelog.d/14604.bugfix (limited to 'tests/util/test_stream_change_cache.py') diff --git a/changelog.d/14604.bugfix b/changelog.d/14604.bugfix new file mode 100644 index 0000000000..149ee99dd7 --- /dev/null +++ b/changelog.d/14604.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 66f5b8d108..f68027aaed 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -615,8 +615,8 @@ class ApplicationServicesHandler: ) # Fetch the users who have modified their device list since then. - users_with_changed_device_lists = ( - await self.store.get_users_whose_devices_changed(from_key, to_key=new_key) + users_with_changed_device_lists = await self.store.get_all_devices_changed( + from_key, to_key=new_key ) # Filter out any users the application service is not interested in diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1799174c2f..2af90b25a3 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1692,10 +1692,12 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): if from_key is not None: # First get all users that have had a presence update - updated_users = stream_change_cache.get_all_entities_changed(from_key) + result = stream_change_cache.get_all_entities_changed(from_key) # Cross-reference users we're interested in with those that have had updates. - if updated_users is not None: + if result.hit: + updated_users = result.entities + # If we have the full list of changes for presence we can # simply check which ones share a room with the user. get_updates_counter.labels("stream").inc() @@ -1767,9 +1769,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): updated_users = None if from_key: # Only return updates since the last sync - updated_users = self.store.presence_stream_cache.get_all_entities_changed( - from_key - ) + result = self.store.presence_stream_cache.get_all_entities_changed(from_key) + if result.hit: + updated_users = result.entities if updated_users is not None: # Get the actual presence update for each change diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c8858b22dd..0b395a104d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1528,10 +1528,12 @@ class SyncHandler: # # If we don't have that info cached then we get all the users that # share a room with our user and check if those users have changed. - changed_users = self.store.get_cached_device_list_changes( + cache_result = self.store.get_cached_device_list_changes( since_token.device_list_key ) - if changed_users is not None: + if cache_result.hit: + changed_users = cache_result.entities + result = await self.store.get_rooms_for_users(changed_users) for changed_user_id, entries in result.items(): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a0ea719430..3f656ea4f5 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -420,11 +420,11 @@ class TypingWriterHandler(FollowerTypingHandler): if last_id == current_id: return [], current_id, False - changed_rooms: Optional[ - Iterable[str] - ] = self._typing_stream_change_cache.get_all_entities_changed(last_id) + result = self._typing_stream_change_cache.get_all_entities_changed(last_id) - if changed_rooms is None: + if result.hit: + changed_rooms: Iterable[str] = result.entities + else: changed_rooms = self._room_serials rows = [] diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 8ba995df3b..a5bb4d404e 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -58,7 +58,10 @@ from synapse.types import JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.caches.stream_change_cache import ( + AllEntitiesChangedResult, + StreamChangeCache, +) from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.stringutils import shortstr @@ -799,18 +802,66 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): def get_cached_device_list_changes( self, from_key: int, - ) -> Optional[List[str]]: + ) -> AllEntitiesChangedResult: """Get set of users whose devices have changed since `from_key`, or None if that information is not in our cache. """ return self._device_list_stream_cache.get_all_entities_changed(from_key) + @cancellable + async def get_all_devices_changed( + self, + from_key: int, + to_key: int, + ) -> Set[str]: + """Get all users whose devices have changed in the given range. + + Args: + from_key: The minimum device lists stream token to query device list + changes for, exclusive. + to_key: The maximum device lists stream token to query device list + changes for, inclusive. + + Returns: + The set of user_ids whose devices have changed since `from_key` + (exclusive) until `to_key` (inclusive). + """ + + result = self._device_list_stream_cache.get_all_entities_changed(from_key) + + if result.hit: + # We know which users might have changed devices. + if not result.entities: + # If no users then we can return early. + return set() + + # Otherwise we need to filter down the list + return await self.get_users_whose_devices_changed( + from_key, result.entities, to_key + ) + + # If the cache didn't tell us anything, we just need to query the full + # range. + sql = """ + SELECT DISTINCT user_id FROM device_lists_stream + WHERE ? < stream_id AND stream_id <= ? + """ + + rows = await self.db_pool.execute( + "get_all_devices_changed", + None, + sql, + from_key, + to_key, + ) + return {u for u, in rows} + @cancellable async def get_users_whose_devices_changed( self, from_key: int, - user_ids: Optional[Collection[str]] = None, + user_ids: Collection[str], to_key: Optional[int] = None, ) -> Set[str]: """Get set of users whose devices have changed since `from_key` that @@ -830,52 +881,32 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): """ # Get set of users who *may* have changed. Users not in the returned # list have definitely not changed. - user_ids_to_check: Optional[Collection[str]] - if user_ids is None: - # Get set of all users that have had device list changes since 'from_key' - user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed( - from_key - ) - else: - # The same as above, but filter results to only those users in 'user_ids' - user_ids_to_check = self._device_list_stream_cache.get_entities_changed( - user_ids, from_key - ) + user_ids_to_check = self._device_list_stream_cache.get_entities_changed( + user_ids, from_key + ) # If an empty set was returned, there's nothing to do. - if user_ids_to_check is not None and not user_ids_to_check: + if not user_ids_to_check: return set() - def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]: - stream_id_where_clause = "stream_id > ?" - sql_args = [from_key] - - if to_key: - stream_id_where_clause += " AND stream_id <= ?" - sql_args.append(to_key) + if to_key is None: + to_key = self._device_list_id_gen.get_current_token() - sql = f""" + def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]: + sql = """ SELECT DISTINCT user_id FROM device_lists_stream - WHERE {stream_id_where_clause} + WHERE ? < stream_id AND stream_id <= ? AND %s """ - # If the stream change cache gave us no information, fetch *all* - # users between the stream IDs. - if user_ids_to_check is None: - txn.execute(sql, sql_args) - return {user_id for user_id, in txn} + changes: Set[str] = set() - # Otherwise, fetch changes for the given users. - else: - changes: Set[str] = set() - - # Query device changes with a batch of users at a time - for chunk in batch_iter(user_ids_to_check, 100): - clause, args = make_in_list_sql_clause( - txn.database_engine, "user_id", chunk - ) - txn.execute(sql + " AND " + clause, sql_args + args) - changes.update(user_id for user_id, in txn) + # Query device changes with a batch of users at a time + for chunk in batch_iter(user_ids_to_check, 100): + clause, args = make_in_list_sql_clause( + txn.database_engine, "user_id", chunk + ) + txn.execute(sql % (clause,), [from_key, to_key] + args) + changes.update(user_id for user_id, in txn) return changes diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 042de8d7c8..c8b17acb59 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -16,6 +16,7 @@ import logging import math from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Union +import attr from sortedcontainers import SortedDict from synapse.util import caches @@ -26,6 +27,29 @@ logger = logging.getLogger(__name__) EntityType = str +@attr.s(auto_attribs=True, frozen=True, slots=True) +class AllEntitiesChangedResult: + """Return type of `get_all_entities_changed`. + + Callers must check that there was a cache hit, via `result.hit`, before + using the entities in `result.entities`. + + This specifically does *not* implement helpers such as `__bool__` to ensure + that callers do the correct checks. + """ + + _entities: Optional[List[EntityType]] + + @property + def hit(self) -> bool: + return self._entities is not None + + @property + def entities(self) -> List[EntityType]: + assert self._entities is not None + return self._entities + + class StreamChangeCache: """ Keeps track of the stream positions of the latest change in a set of entities. @@ -153,19 +177,19 @@ class StreamChangeCache: This will be all entities if the given stream position is at or earlier than the earliest known stream position. """ - changed_entities = self.get_all_entities_changed(stream_pos) - if changed_entities is not None: + cache_result = self.get_all_entities_changed(stream_pos) + if cache_result.hit: # We now do an intersection, trying to do so in the most efficient # way possible (some of these sets are *large*). First check in the # given iterable is already a set that we can reuse, otherwise we # create a set of the *smallest* of the two iterables and call # `intersection(..)` on it (this can be twice as fast as the reverse). if isinstance(entities, (set, frozenset)): - result = entities.intersection(changed_entities) - elif len(changed_entities) < len(entities): - result = set(changed_entities).intersection(entities) + result = entities.intersection(cache_result.entities) + elif len(cache_result.entities) < len(entities): + result = set(cache_result.entities).intersection(entities) else: - result = set(entities).intersection(changed_entities) + result = set(entities).intersection(cache_result.entities) self.metrics.inc_hits() else: result = set(entities) @@ -202,12 +226,12 @@ class StreamChangeCache: self.metrics.inc_hits() return stream_pos < self._cache.peekitem()[0] - def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]: + def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult: """ Returns all entities that have had changes after the given position. - If the stream change cache does not go far enough back, i.e. the position - is too old, it will return None. + If the stream change cache does not go far enough back, i.e. the + position is too old, it will return None. Returns the entities in the order that they were changed. @@ -215,23 +239,21 @@ class StreamChangeCache: stream_pos: The stream position to check for changes after. Return: - Entities which have changed after the given stream position. - - None if the given stream position is at or earlier than the earliest - known stream position. + A class indicating if we have the requested data cached, and if so + includes the entities in the order they were changed. """ assert isinstance(stream_pos, int) # _cache is not valid at or before the earliest known stream position, so # return None to mark that it is unknown if an entity has changed. if stream_pos <= self._earliest_known_stream_pos: - return None + return AllEntitiesChangedResult(None) changed_entities: List[EntityType] = [] for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)): changed_entities.extend(self._cache[k]) - return changed_entities + return AllEntitiesChangedResult(changed_entities) def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: """ diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index a29cc872f9..0305741c99 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -73,8 +73,10 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): # The oldest item has been popped off self.assertTrue("user@foo.com" not in cache._entity_to_key) - self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"]) - self.assertIsNone(cache.get_all_entities_changed(2)) + self.assertEqual( + cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] + ) + self.assertFalse(cache.get_all_entities_changed(2).hit) # If we update an existing entity, it keeps the two existing entities cache.entity_has_changed("bar@baz.net", 5) @@ -82,10 +84,10 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): {"bar@baz.net", "user@elsewhere.org"}, set(cache._entity_to_key) ) self.assertEqual( - cache.get_all_entities_changed(3), + cache.get_all_entities_changed(3).entities, ["user@elsewhere.org", "bar@baz.net"], ) - self.assertIsNone(cache.get_all_entities_changed(2)) + self.assertFalse(cache.get_all_entities_changed(2).hit) def test_get_all_entities_changed(self) -> None: """ @@ -105,10 +107,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): # Results are ordered so either of these are valid. ok1 = ["bar@baz.net", "anotheruser@foo.com", "user@elsewhere.org"] ok2 = ["anotheruser@foo.com", "bar@baz.net", "user@elsewhere.org"] - self.assertTrue(r == ok1 or r == ok2) + self.assertTrue(r.entities == ok1 or r.entities == ok2) - self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"]) - self.assertEqual(cache.get_all_entities_changed(1), None) + self.assertEqual( + cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] + ) + self.assertFalse(cache.get_all_entities_changed(1).hit) # ... later, things gest more updates cache.entity_has_changed("user@foo.com", 5) @@ -128,7 +132,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): "anotheruser@foo.com", ] r = cache.get_all_entities_changed(3) - self.assertTrue(r == ok1 or r == ok2) + self.assertTrue(r.entities == ok1 or r.entities == ok2) def test_has_any_entity_changed(self) -> None: """ -- cgit 1.5.1 From da777207528513c858395758bf4c023da2c2c1a3 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 8 Dec 2022 11:35:49 -0500 Subject: Check the stream position before checking if the cache is empty. (#14639) An empty cache does not mean the entity has no changed, if it is earlier than the earliest known stream position return that the entity *has* changed since the cache cannot accurately answer that query. --- changelog.d/14639.bugfix | 1 + synapse/util/caches/stream_change_cache.py | 9 +++++---- tests/util/test_stream_change_cache.py | 7 ++++--- 3 files changed, 10 insertions(+), 7 deletions(-) create mode 100644 changelog.d/14639.bugfix (limited to 'tests/util/test_stream_change_cache.py') diff --git a/changelog.d/14639.bugfix b/changelog.d/14639.bugfix new file mode 100644 index 0000000000..8730b10afe --- /dev/null +++ b/changelog.d/14639.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where the user directory and room/user stats might be out of sync. diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index c8b17acb59..1657459549 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -213,16 +213,17 @@ class StreamChangeCache: """ assert isinstance(stream_pos, int) - if not self._cache: - # If the cache is empty, nothing can have changed. - return False - # _cache is not valid at or before the earliest known stream position, so # return that an entity has changed. if stream_pos <= self._earliest_known_stream_pos: self.metrics.inc_misses() return True + # If the cache is empty, nothing can have changed. + if not self._cache: + self.metrics.inc_misses() + return False + self.metrics.inc_hits() return stream_pos < self._cache.peekitem()[0] diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index 0305741c99..3df053493b 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -144,9 +144,10 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): """ cache = StreamChangeCache("#test", 1) - # With no entities, it returns False for the past, present, and future. - self.assertFalse(cache.has_any_entity_changed(0)) - self.assertFalse(cache.has_any_entity_changed(1)) + # With no entities, it returns True for the past, present, and False for + # the future. + self.assertTrue(cache.has_any_entity_changed(0)) + self.assertTrue(cache.has_any_entity_changed(1)) self.assertFalse(cache.has_any_entity_changed(2)) # We add an entity -- cgit 1.5.1