diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index d2e1e36e7f..ab48052cdc 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -23,8 +23,6 @@ from typing import Any, Dict, List, Optional, Tuple
from canonicaljson import json
-from twisted.internet import defer
-
from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion, RoomVersions
@@ -32,7 +30,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.data_stores.main.search import SearchStore
from synapse.storage.database import Database, LoggingTransaction
from synapse.types import ThirdPartyInstanceID
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
@@ -192,8 +190,7 @@ class RoomWorkerStore(SQLBaseStore):
return self.db.runInteraction("count_public_rooms", _count_public_rooms_txn)
- @defer.inlineCallbacks
- def get_largest_public_rooms(
+ async def get_largest_public_rooms(
self,
network_tuple: Optional[ThirdPartyInstanceID],
search_filter: Optional[dict],
@@ -330,10 +327,10 @@ class RoomWorkerStore(SQLBaseStore):
return results
- ret_val = yield self.db.runInteraction(
+ ret_val = await self.db.runInteraction(
"get_largest_public_rooms", _get_largest_public_rooms_txn
)
- defer.returnValue(ret_val)
+ return ret_val
@cached(max_entries=10000)
def is_room_blocked(self, room_id):
@@ -509,8 +506,8 @@ class RoomWorkerStore(SQLBaseStore):
"get_rooms_paginate", _get_rooms_paginate_txn,
)
- @cachedInlineCallbacks(max_entries=10000)
- def get_ratelimit_for_user(self, user_id):
+ @cached(max_entries=10000)
+ async def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given
user
@@ -522,7 +519,7 @@ class RoomWorkerStore(SQLBaseStore):
of RatelimitOverride are None or 0 then ratelimitng has been
disabled for that user entirely.
"""
- row = yield self.db.simple_select_one(
+ row = await self.db.simple_select_one(
table="ratelimit_override",
keyvalues={"user_id": user_id},
retcols=("messages_per_second", "burst_count"),
@@ -538,8 +535,8 @@ class RoomWorkerStore(SQLBaseStore):
else:
return None
- @cachedInlineCallbacks()
- def get_retention_policy_for_room(self, room_id):
+ @cached()
+ async def get_retention_policy_for_room(self, room_id):
"""Get the retention policy for a given room.
If no retention policy has been found for this room, returns a policy defined
@@ -566,19 +563,17 @@ class RoomWorkerStore(SQLBaseStore):
return self.db.cursor_to_dict(txn)
- ret = yield self.db.runInteraction(
+ ret = await self.db.runInteraction(
"get_retention_policy_for_room", get_retention_policy_for_room_txn,
)
# If we don't know this room ID, ret will be None, in this case return the default
# policy.
if not ret:
- defer.returnValue(
- {
- "min_lifetime": self.config.retention_default_min_lifetime,
- "max_lifetime": self.config.retention_default_max_lifetime,
- }
- )
+ return {
+ "min_lifetime": self.config.retention_default_min_lifetime,
+ "max_lifetime": self.config.retention_default_max_lifetime,
+ }
row = ret[0]
@@ -592,7 +587,7 @@ class RoomWorkerStore(SQLBaseStore):
if row["max_lifetime"] is None:
row["max_lifetime"] = self.config.retention_default_max_lifetime
- defer.returnValue(row)
+ return row
def get_media_mxcs_in_room(self, room_id):
"""Retrieves all the local and remote media MXC URIs in a given room
@@ -881,8 +876,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
self._background_add_rooms_room_version_column,
)
- @defer.inlineCallbacks
- def _background_insert_retention(self, progress, batch_size):
+ async def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of
them into the room_retention table.
NULLs the property's columns if missing from the retention event in the room's
@@ -940,14 +934,14 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
else:
return False
- end = yield self.db.runInteraction(
+ end = await self.db.runInteraction(
"insert_room_retention", _background_insert_retention_txn,
)
if end:
- yield self.db.updates._end_background_update("insert_room_retention")
+ await self.db.updates._end_background_update("insert_room_retention")
- defer.returnValue(batch_size)
+ return batch_size
async def _background_add_rooms_room_version_column(
self, progress: dict, batch_size: int
@@ -1096,8 +1090,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
lock=False,
)
- @defer.inlineCallbacks
- def store_room(
+ async def store_room(
self,
room_id: str,
room_creator_user_id: str,
@@ -1140,7 +1133,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
)
with self._public_room_id_gen.get_next() as next_id:
- yield self.db.runInteraction("store_room_txn", store_room_txn, next_id)
+ await self.db.runInteraction("store_room_txn", store_room_txn, next_id)
except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
@@ -1165,8 +1158,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
lock=False,
)
- @defer.inlineCallbacks
- def set_room_is_public(self, room_id, is_public):
+ async def set_room_is_public(self, room_id, is_public):
def set_room_is_public_txn(txn, next_id):
self.db.simple_update_one_txn(
txn,
@@ -1206,13 +1198,12 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
)
with self._public_room_id_gen.get_next() as next_id:
- yield self.db.runInteraction(
+ await self.db.runInteraction(
"set_room_is_public", set_room_is_public_txn, next_id
)
self.hs.get_notifier().on_new_replication_data()
- @defer.inlineCallbacks
- def set_room_is_public_appservice(
+ async def set_room_is_public_appservice(
self, room_id, appservice_id, network_id, is_public
):
"""Edit the appservice/network specific public room list.
@@ -1287,7 +1278,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
)
with self._public_room_id_gen.get_next() as next_id:
- yield self.db.runInteraction(
+ await self.db.runInteraction(
"set_room_is_public_appservice",
set_room_is_public_appservice_txn,
next_id,
@@ -1327,52 +1318,47 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()
- @defer.inlineCallbacks
- def block_room(self, room_id, user_id):
+ async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked. Can be called multiple times.
Args:
- room_id (str): Room to block
- user_id (str): Who blocked it
-
- Returns:
- Deferred
+ room_id: Room to block
+ user_id: Who blocked it
"""
- yield self.db.simple_upsert(
+ await self.db.simple_upsert(
table="blocked_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={"user_id": user_id},
desc="block_room",
)
- yield self.db.runInteraction(
+ await self.db.runInteraction(
"block_room_invalidation",
self._invalidate_cache_and_stream,
self.is_room_blocked,
(room_id,),
)
- @defer.inlineCallbacks
- def get_rooms_for_retention_period_in_range(
- self, min_ms, max_ms, include_null=False
- ):
+ async def get_rooms_for_retention_period_in_range(
+ self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
+ ) -> Dict[str, dict]:
"""Retrieves all of the rooms within the given retention range.
Optionally includes the rooms which don't have a retention policy.
Args:
- min_ms (int|None): Duration in milliseconds that define the lower limit of
+ min_ms: Duration in milliseconds that define the lower limit of
the range to handle (exclusive). If None, doesn't set a lower limit.
- max_ms (int|None): Duration in milliseconds that define the upper limit of
+ max_ms: Duration in milliseconds that define the upper limit of
the range to handle (inclusive). If None, doesn't set an upper limit.
- include_null (bool): Whether to include rooms which retention policy is NULL
+ include_null: Whether to include rooms which retention policy is NULL
in the returned set.
Returns:
- dict[str, dict]: The rooms within this range, along with their retention
- policy. The key is "room_id", and maps to a dict describing the retention
- policy associated with this room ID. The keys for this nested dict are
- "min_lifetime" (int|None), and "max_lifetime" (int|None).
+ The rooms within this range, along with their retention
+ policy. The key is "room_id", and maps to a dict describing the retention
+ policy associated with this room ID. The keys for this nested dict are
+ "min_lifetime" (int|None), and "max_lifetime" (int|None).
"""
def get_rooms_for_retention_period_in_range_txn(txn):
@@ -1431,9 +1417,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
return rooms_dict
- rooms = yield self.db.runInteraction(
+ rooms = await self.db.runInteraction(
"get_rooms_for_retention_period_in_range",
get_rooms_for_retention_period_in_range_txn,
)
- defer.returnValue(rooms)
+ return rooms
|