summary refs log tree commit diff
path: root/synapse/storage/databases/main/account_data.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-09-19 21:51:51 +0300
committerGitHub <noreply@github.com>2024-09-19 19:51:51 +0100
commita851f6b237ad7b4488d2d80c0b0777436d24da6a (patch)
tree23dbfbd70fa08566a7de2a7b1146ccf080019809 /synapse/storage/databases/main/account_data.py
parentSliding Sync: Avoid fetching left rooms and add back `newly_left` rooms (#17725) (diff)
downloadsynapse-a851f6b237ad7b4488d2d80c0b0777436d24da6a.tar.xz
Sliding sync: Add connection tracking to the `account_data` extension (#17695)
This is basically exactly the same logic as for receipts. Essentially we
just need to track which room account data we have and haven't sent down
to clients, and use that when we pull stuff out.

I think this just needs a couple of extra tests written

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
Diffstat (limited to 'synapse/storage/databases/main/account_data.py')
-rw-r--r--synapse/storage/databases/main/account_data.py50
1 files changed, 50 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py

index b30639b4e6..e583c182ba 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -467,6 +467,56 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) get_updated_room_account_data_for_user_txn, ) + async def get_updated_room_account_data_for_user_for_room( + self, + # Since there are multiple arguments with the same type, force keyword arguments + # so people don't accidentally swap the order + *, + user_id: str, + room_id: str, + from_stream_id: int, + to_stream_id: int, + ) -> Dict[str, JsonMapping]: + """Get the room account_data that's changed for a user in a room. + + (> `from_stream_id` and <= `to_stream_id`) + + Args: + user_id: The user to get the account_data for. + room_id: The room to check + from_stream_id: The point in the stream to fetch from + to_stream_id: The point in the stream to fetch to + + Returns: + A dict of the room account data. + """ + + def get_updated_room_account_data_for_user_for_room_txn( + txn: LoggingTransaction, + ) -> Dict[str, JsonMapping]: + sql = """ + SELECT account_data_type, content FROM room_account_data + WHERE user_id = ? AND room_id = ? AND stream_id > ? AND stream_id <= ? + """ + txn.execute(sql, (user_id, room_id, from_stream_id, to_stream_id)) + + room_account_data: Dict[str, JsonMapping] = {} + for row in txn: + room_account_data[row[0]] = db_to_json(row[1]) + + return room_account_data + + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(from_stream_id) + ) + if not changed: + return {} + + return await self.db_pool.runInteraction( + "get_updated_room_account_data_for_user_for_room", + get_updated_room_account_data_for_user_for_room_txn, + ) + @cached(max_entries=5000, iterable=True) async def ignored_by(self, user_id: str) -> FrozenSet[str]: """