From a851f6b237ad7b4488d2d80c0b0777436d24da6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Sep 2024 21:51:51 +0300 Subject: Sliding sync: Add connection tracking to the `account_data` extension (#17695) This is basically exactly the same logic as for receipts. Essentially we just need to track which room account data we have and haven't sent down to clients, and use that when we pull stuff out. I think this just needs a couple of extra tests written --------- Co-authored-by: Eric Eastwood --- synapse/storage/databases/main/account_data.py | 50 ++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) (limited to 'synapse/storage/databases/main/account_data.py') diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index b30639b4e6..e583c182ba 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -467,6 +467,56 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) get_updated_room_account_data_for_user_txn, ) + async def get_updated_room_account_data_for_user_for_room( + self, + # Since there are multiple arguments with the same type, force keyword arguments + # so people don't accidentally swap the order + *, + user_id: str, + room_id: str, + from_stream_id: int, + to_stream_id: int, + ) -> Dict[str, JsonMapping]: + """Get the room account_data that's changed for a user in a room. + + (> `from_stream_id` and <= `to_stream_id`) + + Args: + user_id: The user to get the account_data for. + room_id: The room to check + from_stream_id: The point in the stream to fetch from + to_stream_id: The point in the stream to fetch to + + Returns: + A dict of the room account data. + """ + + def get_updated_room_account_data_for_user_for_room_txn( + txn: LoggingTransaction, + ) -> Dict[str, JsonMapping]: + sql = """ + SELECT account_data_type, content FROM room_account_data + WHERE user_id = ? AND room_id = ? AND stream_id > ? AND stream_id <= ? + """ + txn.execute(sql, (user_id, room_id, from_stream_id, to_stream_id)) + + room_account_data: Dict[str, JsonMapping] = {} + for row in txn: + room_account_data[row[0]] = db_to_json(row[1]) + + return room_account_data + + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(from_stream_id) + ) + if not changed: + return {} + + return await self.db_pool.runInteraction( + "get_updated_room_account_data_for_user_for_room", + get_updated_room_account_data_for_user_for_room_txn, + ) + @cached(max_entries=5000, iterable=True) async def ignored_by(self, user_id: str) -> FrozenSet[str]: """ -- cgit 1.5.1