diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index cf039e7f7d..82aac2bbf3 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -16,15 +16,16 @@
import abc
import logging
-from typing import List, Tuple
+from typing import List, Optional, Tuple
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.types import JsonDict
from synapse.util import json_encoder
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
@@ -97,13 +98,15 @@ class AccountDataWorkerStore(SQLBaseStore):
"get_account_data_for_user", get_account_data_for_user_txn
)
- @cachedInlineCallbacks(num_args=2, max_entries=5000)
- def get_global_account_data_by_type_for_user(self, data_type, user_id):
+ @cached(num_args=2, max_entries=5000)
+ async def get_global_account_data_by_type_for_user(
+ self, data_type: str, user_id: str
+ ) -> Optional[JsonDict]:
"""
Returns:
- Deferred: A dict
+ The account data.
"""
- result = yield self.db_pool.simple_select_one_onecol(
+ result = await self.db_pool.simple_select_one_onecol(
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": data_type},
retcol="content",
@@ -280,9 +283,11 @@ class AccountDataWorkerStore(SQLBaseStore):
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn
)
- @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
- def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
- ignored_account_data = yield self.get_global_account_data_by_type_for_user(
+ @cached(num_args=2, cache_context=True, max_entries=5000)
+ async def is_ignored_by(
+ self, ignored_user_id: str, ignorer_user_id: str, cache_context: _CacheContext
+ ) -> bool:
+ ignored_account_data = await self.get_global_account_data_by_type_for_user(
"m.ignored_user_list",
ignorer_user_id,
on_invalidate=cache_context.invalidate,
@@ -307,24 +312,27 @@ class AccountDataStore(AccountDataWorkerStore):
super(AccountDataStore, self).__init__(database, db_conn, hs)
- def get_max_account_data_stream_id(self):
+ def get_max_account_data_stream_id(self) -> int:
"""Get the current max stream id for the private user data stream
Returns:
- A deferred int.
+ The maximum stream ID.
"""
return self._account_data_id_gen.get_current_token()
- @defer.inlineCallbacks
- def add_account_data_to_room(self, user_id, room_id, account_data_type, content):
+ async def add_account_data_to_room(
+ self, user_id: str, room_id: str, account_data_type: str, content: JsonDict
+ ) -> int:
"""Add some account_data to a room for a user.
+
Args:
- user_id(str): The user to add a tag for.
- room_id(str): The room to add a tag for.
- account_data_type(str): The type of account_data to add.
- content(dict): A json object to associate with the tag.
+ user_id: The user to add a tag for.
+ room_id: The room to add a tag for.
+ account_data_type: The type of account_data to add.
+ content: A json object to associate with the tag.
+
Returns:
- A deferred that completes once the account_data has been added.
+ The maximum stream ID.
"""
content_json = json_encoder.encode(content)
@@ -332,7 +340,7 @@ class AccountDataStore(AccountDataWorkerStore):
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so simple_upsert will
# retry if there is a conflict.
- yield self.db_pool.simple_upsert(
+ await self.db_pool.simple_upsert(
desc="add_room_account_data",
table="room_account_data",
keyvalues={
@@ -350,7 +358,7 @@ class AccountDataStore(AccountDataWorkerStore):
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
- yield self._update_max_stream_id(next_id)
+ await self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
@@ -359,18 +367,20 @@ class AccountDataStore(AccountDataWorkerStore):
(user_id, room_id, account_data_type), content
)
- result = self._account_data_id_gen.get_current_token()
- return result
+ return self._account_data_id_gen.get_current_token()
- @defer.inlineCallbacks
- def add_account_data_for_user(self, user_id, account_data_type, content):
+ async def add_account_data_for_user(
+ self, user_id: str, account_data_type: str, content: JsonDict
+ ) -> int:
"""Add some account_data to a room for a user.
+
Args:
- user_id(str): The user to add a tag for.
- account_data_type(str): The type of account_data to add.
- content(dict): A json object to associate with the tag.
+ user_id: The user to add a tag for.
+ account_data_type: The type of account_data to add.
+ content: A json object to associate with the tag.
+
Returns:
- A deferred that completes once the account_data has been added.
+ The maximum stream ID.
"""
content_json = json_encoder.encode(content)
@@ -378,7 +388,7 @@ class AccountDataStore(AccountDataWorkerStore):
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
- yield self.db_pool.simple_upsert(
+ await self.db_pool.simple_upsert(
desc="add_user_account_data",
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
@@ -396,7 +406,7 @@ class AccountDataStore(AccountDataWorkerStore):
# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.
- yield self._update_max_stream_id(next_id)
+ await self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
@@ -404,14 +414,13 @@ class AccountDataStore(AccountDataWorkerStore):
(account_data_type, user_id)
)
- result = self._account_data_id_gen.get_current_token()
- return result
+ return self._account_data_id_gen.get_current_token()
- def _update_max_stream_id(self, next_id):
+ def _update_max_stream_id(self, next_id: int):
"""Update the max stream_id
Args:
- next_id(int): The the revision to advance to.
+ next_id: The the revision to advance to.
"""
# Note: This is only here for backwards compat to allow admins to
|