summary refs log tree commit diff
path: root/synapse/storage/databases/main/account_data.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/account_data.py')
-rw-r--r--synapse/storage/databases/main/account_data.py121
1 files changed, 92 insertions, 29 deletions
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py

index 49ee23470d..bff51e92b9 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -16,7 +16,7 @@ import abc import logging -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Set, Tuple from synapse.api.constants import AccountDataTypes from synapse.storage._base import SQLBaseStore, db_to_json @@ -24,7 +24,7 @@ from synapse.storage.database import DatabasePool from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import JsonDict from synapse.util import json_encoder -from synapse.util.caches.descriptors import _CacheContext, cached +from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -287,23 +287,25 @@ class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta): "get_updated_account_data_for_user", get_updated_account_data_for_user_txn ) - @cached(num_args=2, cache_context=True, max_entries=5000) - async def is_ignored_by( - self, ignored_user_id: str, ignorer_user_id: str, cache_context: _CacheContext - ) -> bool: - ignored_account_data = await self.get_global_account_data_by_type_for_user( - AccountDataTypes.IGNORED_USER_LIST, - ignorer_user_id, - on_invalidate=cache_context.invalidate, - ) - if not ignored_account_data: - return False + @cached(max_entries=5000, iterable=True) + async def ignored_by(self, user_id: str) -> Set[str]: + """ + Get users which ignore the given user. - try: - return ignored_user_id in ignored_account_data.get("ignored_users", {}) - except TypeError: - # The type of the ignored_users field is invalid. - return False + Params: + user_id: The user ID which might be ignored. + + Return: + The user IDs which ignore the given user. + """ + return set( + await self.db_pool.simple_select_onecol( + table="ignored_users", + keyvalues={"ignored_user_id": user_id}, + retcol="ignorer_user_id", + desc="ignored_by", + ) + ) class AccountDataStore(AccountDataWorkerStore): @@ -390,18 +392,14 @@ class AccountDataStore(AccountDataWorkerStore): Returns: The maximum stream ID. """ - content_json = json_encoder.encode(content) - async with self._account_data_id_gen.get_next() as next_id: - # no need to lock here as account_data has a unique constraint on - # (user_id, account_data_type) so simple_upsert will retry if - # there is a conflict. - await self.db_pool.simple_upsert( - desc="add_user_account_data", - table="account_data", - keyvalues={"user_id": user_id, "account_data_type": account_data_type}, - values={"stream_id": next_id, "content": content_json}, - lock=False, + await self.db_pool.runInteraction( + "add_user_account_data", + self._add_account_data_for_user, + next_id, + user_id, + account_data_type, + content, ) # it's theoretically possible for the above to succeed and the @@ -424,6 +422,71 @@ class AccountDataStore(AccountDataWorkerStore): return self._account_data_id_gen.get_current_token() + def _add_account_data_for_user( + self, + txn, + next_id: int, + user_id: str, + account_data_type: str, + content: JsonDict, + ) -> None: + content_json = json_encoder.encode(content) + + # no need to lock here as account_data has a unique constraint on + # (user_id, account_data_type) so simple_upsert will retry if + # there is a conflict. + self.db_pool.simple_upsert_txn( + txn, + table="account_data", + keyvalues={"user_id": user_id, "account_data_type": account_data_type}, + values={"stream_id": next_id, "content": content_json}, + lock=False, + ) + + # Ignored users get denormalized into a separate table as an optimisation. + if account_data_type != AccountDataTypes.IGNORED_USER_LIST: + return + + # Insert / delete to sync the list of ignored users. + previously_ignored_users = set( + self.db_pool.simple_select_onecol_txn( + txn, + table="ignored_users", + keyvalues={"ignorer_user_id": user_id}, + retcol="ignored_user_id", + ) + ) + + # If the data is invalid, no one is ignored. + ignored_users_content = content.get("ignored_users", {}) + if isinstance(ignored_users_content, dict): + currently_ignored_users = set(ignored_users_content) + else: + currently_ignored_users = set() + + # Delete entries which are no longer ignored. + self.db_pool.simple_delete_many_txn( + txn, + table="ignored_users", + column="ignored_user_id", + iterable=previously_ignored_users - currently_ignored_users, + keyvalues={"ignorer_user_id": user_id}, + ) + + # Add entries which are newly ignored. + self.db_pool.simple_insert_many_txn( + txn, + table="ignored_users", + values=[ + {"ignorer_user_id": user_id, "ignored_user_id": u} + for u in currently_ignored_users - previously_ignored_users + ], + ) + + # Invalidate the cache for any ignored users which were added or removed. + for ignored_user_id in previously_ignored_users ^ currently_ignored_users: + self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) + async def _update_max_stream_id(self, next_id: int) -> None: """Update the max stream_id