summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2023-10-17 11:56:14 -0400
committerPatrick Cloke <patrickc@matrix.org>2023-10-17 11:56:14 -0400
commit07b3b9a95e979c0125d069b8841fd7ce4917e559 (patch)
treef4c9e521e2cd0ab54b03282032a44e6d007b1cb0 /synapse/storage
parentRevert "TEMPORARY Measure and log test cases" (diff)
parent1.95.0rc1 (diff)
downloadsynapse-07b3b9a95e979c0125d069b8841fd7ce4917e559.tar.xz
Merge branch 'release-v1.95' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/controllers/state.py14
-rw-r--r--synapse/storage/database.py86
-rw-r--r--synapse/storage/databases/main/__init__.py66
-rw-r--r--synapse/storage/databases/main/account_data.py18
-rw-r--r--synapse/storage/databases/main/appservice.py29
-rw-r--r--synapse/storage/databases/main/client_ips.py46
-rw-r--r--synapse/storage/databases/main/deviceinbox.py47
-rw-r--r--synapse/storage/databases/main/devices.py61
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py9
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py41
-rw-r--r--synapse/storage/databases/main/event_federation.py107
-rw-r--r--synapse/storage/databases/main/events.py88
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py62
-rw-r--r--synapse/storage/databases/main/events_worker.py36
-rw-r--r--synapse/storage/databases/main/keys.py46
-rw-r--r--synapse/storage/databases/main/media_repository.py30
-rw-r--r--synapse/storage/databases/main/presence.py127
-rw-r--r--synapse/storage/databases/main/purge_events.py5
-rw-r--r--synapse/storage/databases/main/push_rule.py97
-rw-r--r--synapse/storage/databases/main/pusher.py121
-rw-r--r--synapse/storage/databases/main/receipts.py78
-rw-r--r--synapse/storage/databases/main/registration.py152
-rw-r--r--synapse/storage/databases/main/relations.py19
-rw-r--r--synapse/storage/databases/main/room.py61
-rw-r--r--synapse/storage/databases/main/roommember.py88
-rw-r--r--synapse/storage/databases/main/search.py20
-rw-r--r--synapse/storage/databases/main/state.py62
-rw-r--r--synapse/storage/databases/main/state_deltas.py52
-rw-r--r--synapse/storage/databases/main/stats.py37
-rw-r--r--synapse/storage/databases/main/stream.py22
-rw-r--r--synapse/storage/databases/main/task_scheduler.py44
-rw-r--r--synapse/storage/databases/main/transactions.py55
-rw-r--r--synapse/storage/databases/main/ui_auth.py41
-rw-r--r--synapse/storage/databases/main/user_directory.py54
-rw-r--r--synapse/storage/databases/main/user_erasure_store.py19
-rw-r--r--synapse/storage/databases/state/store.py54
-rw-r--r--synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql20
37 files changed, 1171 insertions, 843 deletions
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py

index 46957723a1..9f7959c45d 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py
@@ -16,7 +16,6 @@ from itertools import chain from typing import ( TYPE_CHECKING, AbstractSet, - Any, Callable, Collection, Dict, @@ -32,6 +31,7 @@ from typing import ( from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.logging.opentracing import tag_args, trace +from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.roommember import ProfileInfo from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, @@ -531,19 +531,9 @@ class StateStorageController: @tag_args async def get_current_state_deltas( self, prev_stream_id: int, max_stream_id: int - ) -> Tuple[int, List[Dict[str, Any]]]: + ) -> Tuple[int, List[StateDelta]]: """Fetch a list of room state changes since the given stream id - Each entry in the result contains the following fields: - - stream_id (int) - - room_id (str) - - type (str): event type - - state_key (str): - - event_id (str|None): new event_id for this state key. None if the - state has been deleted. - - prev_event_id (str|None): previous event_id for this state key. None - if it's new state. - Args: prev_stream_id: point to get changes since (exclusive) max_stream_id: the point that we know has been correctly persisted diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ca894edd5a..81f661160c 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -1874,9 +1874,9 @@ class DatabasePool: keyvalues: Optional[Dict[str, Any]] = None, desc: str = "simple_select_many_batch", batch_size: int = 100, - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """Executes a SELECT query on the named table, which may return zero or - more rows, returning the result as a list of dicts. + more rows. Filters rows by whether the value of `column` is in `iterable`. @@ -1888,10 +1888,13 @@ class DatabasePool: keyvalues: dict of column names and values to select the rows with desc: description of the transaction, for logging and metrics batch_size: the number of rows for each select query + + Returns: + The results as a list of tuples. """ keyvalues = keyvalues or {} - results: List[Dict[str, Any]] = [] + results: List[Tuple[Any, ...]] = [] for chunk in batch_iter(iterable, batch_size): rows = await self.runInteraction( @@ -1918,9 +1921,9 @@ class DatabasePool: iterable: Collection[Any], keyvalues: Dict[str, Any], retcols: Iterable[str], - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """Executes a SELECT query on the named table, which may return zero or - more rows, returning the result as a list of dicts. + more rows. Filters rows by whether the value of `column` is in `iterable`. @@ -1931,6 +1934,9 @@ class DatabasePool: iterable: list keyvalues: dict of column names and values to select the rows with retcols: list of strings giving the names of the columns to return + + Returns: + The results as a list of tuples. """ if not iterable: return [] @@ -1949,7 +1955,7 @@ class DatabasePool: ) txn.execute(sql, values) - return cls.cursor_to_dict(txn) + return txn.fetchall() async def simple_update( self, @@ -2418,7 +2424,7 @@ class DatabasePool: keyvalues: Optional[Dict[str, Any]] = None, exclude_keyvalues: Optional[Dict[str, Any]] = None, order_direction: str = "ASC", - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """ Executes a SELECT query on the named table with start and limit, of row numbers, which may return zero or number of rows from start to limit, @@ -2447,7 +2453,7 @@ class DatabasePool: order_direction: Whether the results should be ordered "ASC" or "DESC". Returns: - The result as a list of dictionaries. + The result as a list of tuples. """ if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") @@ -2474,69 +2480,7 @@ class DatabasePool: ) txn.execute(sql, arg_list + [limit, start]) - return cls.cursor_to_dict(txn) - - async def simple_search_list( - self, - table: str, - term: Optional[str], - col: str, - retcols: Collection[str], - desc: str = "simple_search_list", - ) -> Optional[List[Dict[str, Any]]]: - """Executes a SELECT query on the named table, which may return zero or - more rows, returning the result as a list of dicts. - - Args: - table: the table name - term: term for searching the table matched to a column. - col: column to query term should be matched to - retcols: the names of the columns to return - - Returns: - A list of dictionaries or None. - """ - - return await self.runInteraction( - desc, - self.simple_search_list_txn, - table, - term, - col, - retcols, - db_autocommit=True, - ) - - @classmethod - def simple_search_list_txn( - cls, - txn: LoggingTransaction, - table: str, - term: Optional[str], - col: str, - retcols: Iterable[str], - ) -> Optional[List[Dict[str, Any]]]: - """Executes a SELECT query on the named table, which may return zero or - more rows, returning the result as a list of dicts. - - Args: - txn: Transaction object - table: the table name - term: term for searching the table matched to a column. - col: column to query term should be matched to - retcols: the names of the columns to return - - Returns: - None if no term is given, otherwise a list of dictionaries. - """ - if term: - sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (", ".join(retcols), table, col) - termvalues = ["%%" + term + "%%"] - txn.execute(sql, termvalues) - else: - return None - - return cls.cursor_to_dict(txn) + return txn.fetchall() def make_in_list_sql_clause( diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 101403578c..840d725114 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast from synapse.api.constants import Direction from synapse.config.homeserver import HomeServerConfig @@ -142,26 +142,6 @@ class DataStore( super().__init__(database, db_conn, hs) - async def get_users(self) -> List[JsonDict]: - """Function to retrieve a list of users in users table. - - Returns: - A list of dictionaries representing users. - """ - return await self.db_pool.simple_select_list( - table="users", - keyvalues={}, - retcols=[ - "name", - "password_hash", - "is_guest", - "admin", - "user_type", - "deactivated", - ], - desc="get_users", - ) - async def get_users_paginate( self, start: int, @@ -316,7 +296,11 @@ class DataStore( "get_users_paginate_txn", get_users_paginate_txn ) - async def search_users(self, term: str) -> Optional[List[JsonDict]]: + async def search_users( + self, term: str + ) -> List[ + Tuple[str, Optional[str], Union[int, bool], Union[int, bool], Optional[str]] + ]: """Function to search users list for one or more users with the matched term. @@ -324,15 +308,37 @@ class DataStore( term: search term Returns: - A list of dictionaries or None. + A list of tuples of name, password_hash, is_guest, admin, user_type or None. """ - return await self.db_pool.simple_search_list( - table="users", - term=term, - col="name", - retcols=["name", "password_hash", "is_guest", "admin", "user_type"], - desc="search_users", - ) + + def search_users( + txn: LoggingTransaction, + ) -> List[ + Tuple[str, Optional[str], Union[int, bool], Union[int, bool], Optional[str]] + ]: + search_term = "%%" + term + "%%" + + sql = """ + SELECT name, password_hash, is_guest, admin, user_type + FROM users + WHERE name LIKE ? + """ + txn.execute(sql, (search_term,)) + + return cast( + List[ + Tuple[ + str, + Optional[str], + Union[int, bool], + Union[int, bool], + Optional[str], + ] + ], + txn.fetchall(), + ) + + return await self.db_pool.runInteraction("search_users", search_users) def check_database_before_upgrade( diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 80f146dd53..39498d52c6 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -103,6 +103,13 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) "AccountDataAndTagsChangeCache", account_max ) + self.db_pool.updates.register_background_index_update( + update_name="room_account_data_index_room_id", + index_name="room_account_data_room_id", + table="room_account_data", + columns=("room_id",), + ) + self.db_pool.updates.register_background_update_handler( "delete_account_data_for_deactivated_users", self._delete_account_data_for_deactivated_users, @@ -151,10 +158,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) sql += " AND content != '{}'" txn.execute(sql, (user_id,)) - rows = self.db_pool.cursor_to_dict(txn) return { - row["account_data_type"]: db_to_json(row["content"]) for row in rows + account_data_type: db_to_json(content) + for account_data_type, content in txn } return await self.db_pool.runInteraction( @@ -196,13 +203,12 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) sql += " AND content != '{}'" txn.execute(sql, (user_id,)) - rows = self.db_pool.cursor_to_dict(txn) by_room: Dict[str, Dict[str, JsonDict]] = {} - for row in rows: - room_data = by_room.setdefault(row["room_id"], {}) + for room_id, account_data_type, content in txn: + room_data = by_room.setdefault(room_id, {}) - room_data[row["account_data_type"]] = db_to_json(row["content"]) + room_data[account_data_type] = db_to_json(content) return by_room diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 0553a0621a..073a99cd84 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py
@@ -14,17 +14,7 @@ # limitations under the License. import logging import re -from typing import ( - TYPE_CHECKING, - Any, - Dict, - List, - Optional, - Pattern, - Sequence, - Tuple, - cast, -) +from typing import TYPE_CHECKING, List, Optional, Pattern, Sequence, Tuple, cast from synapse.appservice import ( ApplicationService, @@ -353,21 +343,15 @@ class ApplicationServiceTransactionWorkerStore( def _get_oldest_unsent_txn( txn: LoggingTransaction, - ) -> Optional[Dict[str, Any]]: + ) -> Optional[Tuple[int, str]]: # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) txn.execute( - "SELECT * FROM application_services_txns WHERE as_id=?" + "SELECT txn_id, event_ids FROM application_services_txns WHERE as_id=?" " ORDER BY txn_id ASC LIMIT 1", (service.id,), ) - rows = self.db_pool.cursor_to_dict(txn) - if not rows: - return None - - entry = rows[0] - - return entry + return cast(Optional[Tuple[int, str]], txn.fetchone()) entry = await self.db_pool.runInteraction( "get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn @@ -376,8 +360,9 @@ class ApplicationServiceTransactionWorkerStore( if not entry: return None - event_ids = db_to_json(entry["event_ids"]) + txn_id, event_ids_str = entry + event_ids = db_to_json(event_ids_str) events = await self.get_events_as_list(event_ids) # TODO: to-device messages, one-time key counts, device list summaries and unused @@ -385,7 +370,7 @@ class ApplicationServiceTransactionWorkerStore( # We likely want to populate those for reliability. return AppServiceTransaction( service=service, - id=entry["txn_id"], + id=txn_id, events=events, ephemeral=[], to_device_messages=[], diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 16170e0436..bf5b8c804b 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py
@@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union, cast +import attr from typing_extensions import TypedDict from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -42,7 +43,8 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 10 * 60 * 1000 -class DeviceLastConnectionInfo(TypedDict): +@attr.s(slots=True, frozen=True, auto_attribs=True) +class DeviceLastConnectionInfo: """Metadata for the last connection seen for a user and device combination""" # These types must match the columns in the `devices` table @@ -499,24 +501,29 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke device_id: If None fetches all devices for the user Returns: - A dictionary mapping a tuple of (user_id, device_id) to dicts, with - keys giving the column names from the devices table. + A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo. """ keyvalues = {"user_id": user_id} if device_id is not None: keyvalues["device_id"] = device_id - res = cast( - List[DeviceLastConnectionInfo], - await self.db_pool.simple_select_list( - table="devices", - keyvalues=keyvalues, - retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), - ), + res = await self.db_pool.simple_select_list( + table="devices", + keyvalues=keyvalues, + retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), ) - return {(d["user_id"], d["device_id"]): d for d in res} + return { + (d["user_id"], d["device_id"]): DeviceLastConnectionInfo( + user_id=d["user_id"], + device_id=d["device_id"], + ip=d["ip"], + user_agent=d["user_agent"], + last_seen=d["last_seen"], + ) + for d in res + } async def _get_user_ip_and_agents_from_database( self, user: UserID, since_ts: int = 0 @@ -683,8 +690,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke device_id: If None fetches all devices for the user Returns: - A dictionary mapping a tuple of (user_id, device_id) to dicts, with - keys giving the column names from the devices table. + A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo. """ ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id) @@ -705,13 +711,13 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke continue if not device_id or did == device_id: - ret[(user_id, did)] = { - "user_id": user_id, - "ip": ip, - "user_agent": user_agent, - "device_id": did, - "last_seen": last_seen, - } + ret[(user_id, did)] = DeviceLastConnectionInfo( + user_id=user_id, + ip=ip, + user_agent=user_agent, + device_id=did, + last_seen=last_seen, + ) return ret async def get_user_ip_and_agents( diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 0be12f0e06..72dc4f54dc 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -344,18 +344,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): # Note that this is more efficient than just dropping `device_id` from the query, # since device_inbox has an index on `(user_id, device_id, stream_id)` if not device_ids_to_query: - user_device_dicts = self.db_pool.simple_select_many_txn( - txn, - table="devices", - column="user_id", - iterable=user_ids_to_query, - keyvalues={"hidden": False}, - retcols=("device_id",), + user_device_dicts = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="devices", + column="user_id", + iterable=user_ids_to_query, + keyvalues={"hidden": False}, + retcols=("device_id",), + ), ) - device_ids_to_query.update( - {row["device_id"] for row in user_device_dicts} - ) + device_ids_to_query.update({row[0] for row in user_device_dicts}) if not device_ids_to_query: # We've ended up with no devices to query. @@ -449,7 +450,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): user_id: str, device_id: Optional[str], up_to_stream_id: int, - limit: int, + limit: Optional[int] = None, ) -> int: """ Args: @@ -480,11 +481,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): ROW_ID_NAME = self.database_engine.row_id_name def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: + limit_statement = "" if limit is None else f"LIMIT {limit}" sql = f""" DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN ( SELECT {ROW_ID_NAME} FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ? - LIMIT {limit} + {limit_statement} ) """ txn.execute(sql, (user_id, device_id, up_to_stream_id)) @@ -849,20 +851,21 @@ class DeviceInboxWorkerStore(SQLBaseStore): # We exclude hidden devices (such as cross-signing keys) here as they are # not expected to receive to-device messages. - rows = self.db_pool.simple_select_many_txn( - txn, - table="devices", - keyvalues={"user_id": user_id, "hidden": False}, - column="device_id", - iterable=devices, - retcols=("device_id",), + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="devices", + keyvalues={"user_id": user_id, "hidden": False}, + column="device_id", + iterable=devices, + retcols=("device_id",), + ), ) - for row in rows: + for (device_id,) in rows: # Only insert into the local inbox if the device exists on # this server - device_id = row["device_id"] - with start_active_span("serialise_to_device_message"): msg = messages_by_device[device_id] set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"]) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index a8206c6afe..a07086149c 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -1054,16 +1054,19 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): async def get_device_list_last_stream_id_for_remotes( self, user_ids: Iterable[str] ) -> Mapping[str, Optional[str]]: - rows = await self.db_pool.simple_select_many_batch( - table="device_lists_remote_extremeties", - column="user_id", - iterable=user_ids, - retcols=("user_id", "stream_id"), - desc="get_device_list_last_stream_id_for_remotes", + rows = cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_many_batch( + table="device_lists_remote_extremeties", + column="user_id", + iterable=user_ids, + retcols=("user_id", "stream_id"), + desc="get_device_list_last_stream_id_for_remotes", + ), ) results: Dict[str, Optional[str]] = {user_id: None for user_id in user_ids} - results.update({row["user_id"]: row["stream_id"] for row in rows}) + results.update(rows) return results @@ -1079,22 +1082,30 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): The IDs of users whose device lists need resync. """ if user_ids: - rows = await self.db_pool.simple_select_many_batch( - table="device_lists_remote_resync", - column="user_id", - iterable=user_ids, - retcols=("user_id",), - desc="get_user_ids_requiring_device_list_resync_with_iterable", + row_tuples = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="device_lists_remote_resync", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync_with_iterable", + ), ) + + return {row[0] for row in row_tuples} else: - rows = await self.db_pool.simple_select_list( - table="device_lists_remote_resync", - keyvalues=None, - retcols=("user_id",), - desc="get_user_ids_requiring_device_list_resync", + rows = cast( + List[Dict[str, str]], + await self.db_pool.simple_select_list( + table="device_lists_remote_resync", + keyvalues=None, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync", + ), ) - return {row["user_id"] for row in rows} + return {row["user_id"] for row in rows} async def mark_remote_users_device_caches_as_stale( self, user_ids: StrCollection @@ -1415,13 +1426,13 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): def get_devices_not_accessed_since_txn( txn: LoggingTransaction, - ) -> List[Dict[str, str]]: + ) -> List[Tuple[str, str]]: sql = """ SELECT user_id, device_id FROM devices WHERE last_seen < ? AND hidden = FALSE """ txn.execute(sql, (since_ms,)) - return self.db_pool.cursor_to_dict(txn) + return cast(List[Tuple[str, str]], txn.fetchall()) rows = await self.db_pool.runInteraction( "get_devices_not_accessed_since", @@ -1429,11 +1440,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ) devices: Dict[str, List[str]] = {} - for row in rows: + for user_id, device_id in rows: # Remote devices are never stale from our point of view. - if self.hs.is_mine_id(row["user_id"]): - user_devices = devices.setdefault(row["user_id"], []) - user_devices.append(row["device_id"]) + if self.hs.is_mine_id(user_id): + user_devices = devices.setdefault(user_id, []) + user_devices.append(device_id) return devices diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index d01f28cc80..aac4cfb054 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -53,6 +53,13 @@ class EndToEndRoomKeyBackgroundStore(SQLBaseStore): ): super().__init__(database, db_conn, hs) + self.db_pool.updates.register_background_index_update( + update_name="e2e_room_keys_index_room_id", + index_name="e2e_room_keys_room_id", + table="e2e_room_keys", + columns=("room_id",), + ) + self.db_pool.updates.register_background_update_handler( "delete_e2e_backup_keys_for_deactivated_users", self._delete_e2e_backup_keys_for_deactivated_users, @@ -208,7 +215,7 @@ class EndToEndRoomKeyStore(EndToEndRoomKeyBackgroundStore): "message": "Set room key", "room_id": room_id, "session_id": session_id, - StreamKeyType.ROOM: room_key, + StreamKeyType.ROOM.value: room_key, } ) diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 89fac23f93..f13d776b0d 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -493,15 +493,18 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker A map from (algorithm, key_id) to json string for key """ - rows = await self.db_pool.simple_select_many_batch( - table="e2e_one_time_keys_json", - column="key_id", - iterable=key_ids, - retcols=("algorithm", "key_id", "key_json"), - keyvalues={"user_id": user_id, "device_id": device_id}, - desc="add_e2e_one_time_keys_check", + rows = cast( + List[Tuple[str, str, str]], + await self.db_pool.simple_select_many_batch( + table="e2e_one_time_keys_json", + column="key_id", + iterable=key_ids, + retcols=("algorithm", "key_id", "key_json"), + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="add_e2e_one_time_keys_check", + ), ) - result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} + result = {(algorithm, key_id): key_json for algorithm, key_id, key_json in rows} log_kv({"message": "Fetched one time keys for user", "one_time_keys": result}) return result @@ -921,14 +924,10 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker } txn.execute(sql, params) - rows = self.db_pool.cursor_to_dict(txn) - for row in rows: - user_id = row["user_id"] - key_type = row["keytype"] - key = db_to_json(row["keydata"]) + for user_id, key_type, key_data, _ in txn: user_keys = result.setdefault(user_id, {}) - user_keys[key_type] = key + user_keys[key_type] = db_to_json(key_data) return result @@ -988,13 +987,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker query_params.extend(item) txn.execute(sql, query_params) - rows = self.db_pool.cursor_to_dict(txn) # and add the signatures to the appropriate keys - for row in rows: - key_id: str = row["key_id"] - target_user_id: str = row["target_user_id"] - target_device_id: str = row["target_device_id"] + for target_user_id, target_device_id, key_id, signature in txn: key_type = devices[(target_user_id, target_device_id)] # We need to copy everything, because the result may have come # from the cache. dict.copy only does a shallow copy, so we @@ -1012,13 +1007,11 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker ].copy() if from_user_id in signatures: user_sigs = signatures[from_user_id] = signatures[from_user_id] - user_sigs[key_id] = row["signature"] + user_sigs[key_id] = signature else: - signatures[from_user_id] = {key_id: row["signature"]} + signatures[from_user_id] = {key_id: signature} else: - target_user_key["signatures"] = { - from_user_id: {key_id: row["signature"]} - } + target_user_key["signatures"] = {from_user_id: {key_id: signature}} return keys diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index afffa54985..4f80ce75cc 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -1049,15 +1049,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas Args: event_ids: The event IDs to calculate the max depth of. """ - rows = await self.db_pool.simple_select_many_batch( - table="events", - column="event_id", - iterable=event_ids, - retcols=( - "event_id", - "depth", + rows = cast( + List[Tuple[str, int]], + await self.db_pool.simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=( + "event_id", + "depth", + ), + desc="get_max_depth_of", ), - desc="get_max_depth_of", ) if not rows: @@ -1065,10 +1068,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas else: max_depth_event_id = "" current_max_depth = 0 - for row in rows: - if row["depth"] > current_max_depth: - max_depth_event_id = row["event_id"] - current_max_depth = row["depth"] + for event_id, depth in rows: + if depth > current_max_depth: + max_depth_event_id = event_id + current_max_depth = depth return max_depth_event_id, current_max_depth @@ -1078,15 +1081,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas Args: event_ids: The event IDs to calculate the max depth of. """ - rows = await self.db_pool.simple_select_many_batch( - table="events", - column="event_id", - iterable=event_ids, - retcols=( - "event_id", - "depth", + rows = cast( + List[Tuple[str, int]], + await self.db_pool.simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=( + "event_id", + "depth", + ), + desc="get_min_depth_of", ), - desc="get_min_depth_of", ) if not rows: @@ -1094,10 +1100,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas else: min_depth_event_id = "" current_min_depth = MAX_DEPTH - for row in rows: - if row["depth"] < current_min_depth: - min_depth_event_id = row["event_id"] - current_min_depth = row["depth"] + for event_id, depth in rows: + if depth < current_min_depth: + min_depth_event_id = event_id + current_min_depth = depth return min_depth_event_id, current_min_depth @@ -1553,19 +1559,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas A filtered down list of `event_ids` that have previous failed pull attempts. """ - rows = await self.db_pool.simple_select_many_batch( - table="event_failed_pull_attempts", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=("event_id",), - desc="get_event_ids_with_failed_pull_attempts", + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id",), + desc="get_event_ids_with_failed_pull_attempts", + ), ) - event_ids_with_failed_pull_attempts: Set[str] = { - row["event_id"] for row in rows - } - - return event_ids_with_failed_pull_attempts + return {row[0] for row in rows} @trace async def get_event_ids_to_not_pull_from_backoff( @@ -1585,32 +1590,34 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas A dictionary of event_ids that should not be attempted to be pulled and the next timestamp at which we may try pulling them again. """ - event_failed_pull_attempts = await self.db_pool.simple_select_many_batch( - table="event_failed_pull_attempts", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=( - "event_id", - "last_attempt_ts", - "num_attempts", + event_failed_pull_attempts = cast( + List[Tuple[str, int, int]], + await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=( + "event_id", + "last_attempt_ts", + "num_attempts", + ), + desc="get_event_ids_to_not_pull_from_backoff", ), - desc="get_event_ids_to_not_pull_from_backoff", ) current_time = self._clock.time_msec() event_ids_with_backoff = {} - for event_failed_pull_attempt in event_failed_pull_attempts: - event_id = event_failed_pull_attempt["event_id"] + for event_id, last_attempt_ts, num_attempts in event_failed_pull_attempts: # Exponential back-off (up to the upper bound) so we don't try to # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. backoff_end_time = ( - event_failed_pull_attempt["last_attempt_ts"] + last_attempt_ts + ( 2 ** min( - event_failed_pull_attempt["num_attempts"], + num_attempts, BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, ) ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 790d058c43..ef6766b5e0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -27,6 +27,7 @@ from typing import ( Optional, Set, Tuple, + Union, cast, ) @@ -501,16 +502,19 @@ class PersistEventsStore: # We ignore legacy rooms that we aren't filling the chain cover index # for. - rows = self.db_pool.simple_select_many_txn( - txn, - table="rooms", - column="room_id", - iterable={event.room_id for event in events if event.is_state()}, - keyvalues={}, - retcols=("room_id", "has_auth_chain_index"), + rows = cast( + List[Tuple[str, Optional[Union[int, bool]]]], + self.db_pool.simple_select_many_txn( + txn, + table="rooms", + column="room_id", + iterable={event.room_id for event in events if event.is_state()}, + keyvalues={}, + retcols=("room_id", "has_auth_chain_index"), + ), ) rooms_using_chain_index = { - row["room_id"] for row in rows if row["has_auth_chain_index"] + room_id for room_id, has_auth_chain_index in rows if has_auth_chain_index } state_events = { @@ -571,19 +575,18 @@ class PersistEventsStore: # We check if there are any events that need to be handled in the rooms # we're looking at. These should just be out of band memberships, where # we didn't have the auth chain when we first persisted. - rows = db_pool.simple_select_many_txn( - txn, - table="event_auth_chain_to_calculate", - keyvalues={}, - column="room_id", - iterable=set(event_to_room_id.values()), - retcols=("event_id", "type", "state_key"), + auth_chain_to_calc_rows = cast( + List[Tuple[str, str, str]], + db_pool.simple_select_many_txn( + txn, + table="event_auth_chain_to_calculate", + keyvalues={}, + column="room_id", + iterable=set(event_to_room_id.values()), + retcols=("event_id", "type", "state_key"), + ), ) - for row in rows: - event_id = row["event_id"] - event_type = row["type"] - state_key = row["state_key"] - + for event_id, event_type, state_key in auth_chain_to_calc_rows: # (We could pull out the auth events for all rows at once using # simple_select_many, but this case happens rarely and almost always # with a single row.) @@ -753,23 +756,31 @@ class PersistEventsStore: # Step 1, fetch all existing links from all the chains we've seen # referenced. chain_links = _LinkMap() - rows = db_pool.simple_select_many_txn( - txn, - table="event_auth_chain_links", - column="origin_chain_id", - iterable={chain_id for chain_id, _ in chain_map.values()}, - keyvalues={}, - retcols=( - "origin_chain_id", - "origin_sequence_number", - "target_chain_id", - "target_sequence_number", + auth_chain_rows = cast( + List[Tuple[int, int, int, int]], + db_pool.simple_select_many_txn( + txn, + table="event_auth_chain_links", + column="origin_chain_id", + iterable={chain_id for chain_id, _ in chain_map.values()}, + keyvalues={}, + retcols=( + "origin_chain_id", + "origin_sequence_number", + "target_chain_id", + "target_sequence_number", + ), ), ) - for row in rows: + for ( + origin_chain_id, + origin_sequence_number, + target_chain_id, + target_sequence_number, + ) in auth_chain_rows: chain_links.add_link( - (row["origin_chain_id"], row["origin_sequence_number"]), - (row["target_chain_id"], row["target_sequence_number"]), + (origin_chain_id, origin_sequence_number), + (target_chain_id, target_sequence_number), new=False, ) @@ -1654,8 +1665,6 @@ class PersistEventsStore: ) -> None: to_prefill = [] - rows = [] - ev_map = {e.event_id: e for e, _ in events_and_contexts} if not ev_map: return @@ -1676,10 +1685,9 @@ class PersistEventsStore: ) txn.execute(sql + clause, args) - rows = self.db_pool.cursor_to_dict(txn) - for row in rows: - event = ev_map[row["event_id"]] - if not row["rejects"] and not row["redacts"]: + for event_id, redacts, rejects in txn: + event = ev_map[event_id] + if not rejects and not redacts: to_prefill.append(EventCacheEntry(event=event, redacted_event=None)) async def external_prefill() -> None: diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index daef3685b0..c5fce1c82b 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -369,18 +369,20 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)] for chunk in chunks: - ev_rows = self.db_pool.simple_select_many_txn( - txn, - table="event_json", - column="event_id", - iterable=chunk, - retcols=["event_id", "json"], - keyvalues={}, + ev_rows = cast( + List[Tuple[str, str]], + self.db_pool.simple_select_many_txn( + txn, + table="event_json", + column="event_id", + iterable=chunk, + retcols=["event_id", "json"], + keyvalues={}, + ), ) - for row in ev_rows: - event_id = row["event_id"] - event_json = db_to_json(row["json"]) + for event_id, json in ev_rows: + event_json = db_to_json(json) try: origin_server_ts = event_json["origin_server_ts"] except (KeyError, AttributeError): @@ -563,15 +565,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if deleted: # We now need to invalidate the caches of these rooms - rows = self.db_pool.simple_select_many_txn( - txn, - table="events", - column="event_id", - iterable=to_delete, - keyvalues={}, - retcols=("room_id",), + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="events", + column="event_id", + iterable=to_delete, + keyvalues={}, + retcols=("room_id",), + ), ) - room_ids = {row["room_id"] for row in rows} + room_ids = {row[0] for row in rows} for room_id in room_ids: txn.call_after( self.get_latest_event_ids_in_room.invalidate, (room_id,) # type: ignore[attr-defined] @@ -1038,18 +1043,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): count = len(rows) # We also need to fetch the auth events for them. - auth_events = self.db_pool.simple_select_many_txn( - txn, - table="event_auth", - column="event_id", - iterable=event_to_room_id, - keyvalues={}, - retcols=("event_id", "auth_id"), + auth_events = cast( + List[Tuple[str, str]], + self.db_pool.simple_select_many_txn( + txn, + table="event_auth", + column="event_id", + iterable=event_to_room_id, + keyvalues={}, + retcols=("event_id", "auth_id"), + ), ) event_to_auth_chain: Dict[str, List[str]] = {} - for row in auth_events: - event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"]) + for event_id, auth_id in auth_events: + event_to_auth_chain.setdefault(event_id, []).append(auth_id) # Calculate and persist the chain cover index for this set of events. # diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 8737a1370e..89757eabed 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -1584,16 +1584,19 @@ class EventsWorkerStore(SQLBaseStore): """Given a list of event ids, check if we have already processed and stored them as non outliers. """ - rows = await self.db_pool.simple_select_many_batch( - table="events", - retcols=("event_id",), - column="event_id", - iterable=list(event_ids), - keyvalues={"outlier": False}, - desc="have_events_in_timeline", + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ), ) - return {r["event_id"] for r in rows} + return {r[0] for r in rows} @trace @tag_args @@ -2340,15 +2343,18 @@ class EventsWorkerStore(SQLBaseStore): a dict mapping from event id to partial-stateness. We return True for any of the events which are unknown (or are outliers). """ - result = await self.db_pool.simple_select_many_batch( - table="partial_state_events", - column="event_id", - iterable=event_ids, - retcols=["event_id"], - desc="get_partial_state_events", + result = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="partial_state_events", + column="event_id", + iterable=event_ids, + retcols=["event_id"], + desc="get_partial_state_events", + ), ) # convert the result to a dict, to make @cachedList work - partial = {r["event_id"] for r in result} + partial = {r[0] for r in result} return {e_id: e_id in partial for e_id in event_ids} @cached() diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index 889c578b9c..ea797864b9 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py
@@ -16,7 +16,7 @@ import itertools import json import logging -from typing import Dict, Iterable, Mapping, Optional, Tuple +from typing import Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes @@ -205,35 +205,39 @@ class KeyStore(CacheInvalidationWorkerStore): If we have multiple entries for a given key ID, returns the most recent. """ - rows = await self.db_pool.simple_select_many_batch( - table="server_keys_json", - column="key_id", - iterable=key_ids, - keyvalues={"server_name": server_name}, - retcols=( - "key_id", - "from_server", - "ts_added_ms", - "ts_valid_until_ms", - "key_json", + rows = cast( + List[Tuple[str, str, int, int, Union[bytes, memoryview]]], + await self.db_pool.simple_select_many_batch( + table="server_keys_json", + column="key_id", + iterable=key_ids, + keyvalues={"server_name": server_name}, + retcols=( + "key_id", + "from_server", + "ts_added_ms", + "ts_valid_until_ms", + "key_json", + ), + desc="get_server_keys_json_for_remote", ), - desc="get_server_keys_json_for_remote", ) if not rows: return {} - # We sort the rows so that the most recently added entry is picked up. - rows.sort(key=lambda r: r["ts_added_ms"]) + # We sort the rows by ts_added_ms so that the most recently added entry + # will stomp over older entries in the dictionary. + rows.sort(key=lambda r: r[2]) return { - row["key_id"]: FetchKeyResultForRemote( + key_id: FetchKeyResultForRemote( # Cast to bytes since postgresql returns a memoryview. - key_json=bytes(row["key_json"]), - valid_until_ts=row["ts_valid_until_ms"], - added_ts=row["ts_added_ms"], + key_json=bytes(key_json), + valid_until_ts=ts_valid_until_ms, + added_ts=ts_added_ms, ) - for row in rows + for key_id, from_server, ts_added_ms, ts_valid_until_ms, key_json in rows } async def get_all_server_keys_json_for_remote( @@ -260,6 +264,8 @@ class KeyStore(CacheInvalidationWorkerStore): if not rows: return {} + # We sort the rows by ts_added_ms so that the most recently added entry + # will stomp over older entries in the dictionary. rows.sort(key=lambda r: r["ts_added_ms"]) return { diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 8cebeb5189..2e6b176bd2 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py
@@ -28,6 +28,7 @@ from typing import ( from synapse.api.constants import Direction from synapse.logging.opentracing import trace +from synapse.media._base import ThumbnailInfo from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -435,8 +436,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_url_cache", ) - async def get_local_media_thumbnails(self, media_id: str) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + async def get_local_media_thumbnails(self, media_id: str) -> List[ThumbnailInfo]: + rows = await self.db_pool.simple_select_list( "local_media_repository_thumbnails", {"media_id": media_id}, ( @@ -448,6 +449,16 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ), desc="get_local_media_thumbnails", ) + return [ + ThumbnailInfo( + width=row["thumbnail_width"], + height=row["thumbnail_height"], + method=row["thumbnail_method"], + type=row["thumbnail_type"], + length=row["thumbnail_length"], + ) + for row in rows + ] @trace async def store_local_thumbnail( @@ -556,8 +567,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def get_remote_media_thumbnails( self, origin: str, media_id: str - ) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + ) -> List[ThumbnailInfo]: + rows = await self.db_pool.simple_select_list( "remote_media_cache_thumbnails", {"media_origin": origin, "media_id": media_id}, ( @@ -566,10 +577,19 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "thumbnail_method", "thumbnail_type", "thumbnail_length", - "filesystem_id", ), desc="get_remote_media_thumbnails", ) + return [ + ThumbnailInfo( + width=row["thumbnail_width"], + height=row["thumbnail_height"], + method=row["thumbnail_method"], + type=row["thumbnail_type"], + length=row["thumbnail_length"], + ) + for row in rows + ] @trace async def get_remote_media_thumbnail( diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 194b4e031f..3b444d2d07 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py
@@ -20,6 +20,7 @@ from typing import ( Mapping, Optional, Tuple, + Union, cast, ) @@ -260,27 +261,40 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) async def get_presence_for_users( self, user_ids: Iterable[str] ) -> Mapping[str, UserPresenceState]: - rows = await self.db_pool.simple_select_many_batch( - table="presence_stream", - column="user_id", - iterable=user_ids, - keyvalues={}, - retcols=( - "user_id", - "state", - "last_active_ts", - "last_federation_update_ts", - "last_user_sync_ts", - "status_msg", - "currently_active", + # TODO All these columns are nullable, but we don't expect that: + # https://github.com/matrix-org/synapse/issues/16467 + rows = cast( + List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], + await self.db_pool.simple_select_many_batch( + table="presence_stream", + column="user_id", + iterable=user_ids, + keyvalues={}, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + desc="get_presence_for_users", ), - desc="get_presence_for_users", ) - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return {row["user_id"]: UserPresenceState(**row) for row in rows} + return { + user_id: UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) + for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows + } async def should_user_receive_full_presence_with_token( self, @@ -385,28 +399,49 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) limit = 100 offset = 0 while True: - rows = await self.db_pool.runInteraction( - "get_presence_for_all_users", - self.db_pool.simple_select_list_paginate_txn, - "presence_stream", - orderby="stream_id", - start=offset, - limit=limit, - exclude_keyvalues=exclude_keyvalues, - retcols=( - "user_id", - "state", - "last_active_ts", - "last_federation_update_ts", - "last_user_sync_ts", - "status_msg", - "currently_active", + # TODO All these columns are nullable, but we don't expect that: + # https://github.com/matrix-org/synapse/issues/16467 + rows = cast( + List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], + await self.db_pool.runInteraction( + "get_presence_for_all_users", + self.db_pool.simple_select_list_paginate_txn, + "presence_stream", + orderby="stream_id", + start=offset, + limit=limit, + exclude_keyvalues=exclude_keyvalues, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + order_direction="ASC", ), - order_direction="ASC", ) - for row in rows: - users_to_state[row["user_id"]] = UserPresenceState(**row) + for ( + user_id, + state, + last_active_ts, + last_federation_update_ts, + last_user_sync_ts, + status_msg, + currently_active, + ) in rows: + users_to_state[user_id] = UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) # We've run out of updates to query if len(rows) < limit: @@ -434,13 +469,21 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) txn = db_conn.cursor() txn.execute(sql, (PresenceState.OFFLINE,)) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() txn.close() - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return [UserPresenceState(**row) for row in rows] + return [ + UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) + for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows + ] def take_presence_startup_info(self) -> List[UserPresenceState]: active_on_startup = self._presence_on_startup diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index dea0e0458c..1e11bf2706 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -89,6 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # furthermore, we might already have the table from a previous (failed) # purge attempt, so let's drop the table first. + if isinstance(self.database_engine, PostgresEngine): + # Disable statement timeouts for this transaction; purging rooms can + # take a while! + txn.execute("SET LOCAL statement_timeout = 0") + txn.execute("DROP TABLE IF EXISTS events_to_purge") txn.execute( diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 923166974c..f5356e7f80 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -62,20 +62,34 @@ logger = logging.getLogger(__name__) def _load_rules( - rawrules: List[JsonDict], + rawrules: List[Tuple[str, int, str, str]], enabled_map: Dict[str, bool], experimental_config: ExperimentalConfig, ) -> FilteredPushRules: """Take the DB rows returned from the DB and convert them into a full `FilteredPushRules` object. + + Args: + rawrules: List of tuples of: + * rule ID + * Priority lass + * Conditions (as serialized JSON) + * Actions (as serialized JSON) + enabled_map: A dictionary of rule ID to a boolean of whether the rule is + enabled. This might not include all rule IDs from rawrules. + experimental_config: The `experimental_features` section of the Synapse + config. (Used to check if various features are enabled.) + + Returns: + A new FilteredPushRules object. """ ruleslist = [ PushRule.from_db( - rule_id=rawrule["rule_id"], - priority_class=rawrule["priority_class"], - conditions=rawrule["conditions"], - actions=rawrule["actions"], + rule_id=rawrule[0], + priority_class=rawrule[1], + conditions=rawrule[2], + actions=rawrule[3], ) for rawrule in rawrules ] @@ -183,7 +197,19 @@ class PushRulesWorkerStore( enabled_map = await self.get_push_rules_enabled_for_user(user_id) - return _load_rules(rows, enabled_map, self.hs.config.experimental) + return _load_rules( + [ + ( + row["rule_id"], + row["priority_class"], + row["conditions"], + row["actions"], + ) + for row in rows + ], + enabled_map, + self.hs.config.experimental, + ) async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]: results = await self.db_pool.simple_select_list( @@ -221,21 +247,36 @@ class PushRulesWorkerStore( if not user_ids: return {} - raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids} + raw_rules: Dict[str, List[Tuple[str, int, str, str]]] = { + user_id: [] for user_id in user_ids + } - rows = await self.db_pool.simple_select_many_batch( - table="push_rules", - column="user_name", - iterable=user_ids, - retcols=("*",), - desc="bulk_get_push_rules", - batch_size=1000, + rows = cast( + List[Tuple[str, str, int, int, str, str]], + await self.db_pool.simple_select_many_batch( + table="push_rules", + column="user_name", + iterable=user_ids, + retcols=( + "user_name", + "rule_id", + "priority_class", + "priority", + "conditions", + "actions", + ), + desc="bulk_get_push_rules", + batch_size=1000, + ), ) - rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) + # Sort by highest priority_class, then highest priority. + rows.sort(key=lambda row: (-int(row[2]), -int(row[3]))) - for row in rows: - raw_rules.setdefault(row["user_name"], []).append(row) + for user_name, rule_id, priority_class, _, conditions, actions in rows: + raw_rules.setdefault(user_name, []).append( + (rule_id, priority_class, conditions, actions) + ) enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids) @@ -256,17 +297,19 @@ class PushRulesWorkerStore( results: Dict[str, Dict[str, bool]] = {user_id: {} for user_id in user_ids} - rows = await self.db_pool.simple_select_many_batch( - table="push_rules_enable", - column="user_name", - iterable=user_ids, - retcols=("user_name", "rule_id", "enabled"), - desc="bulk_get_push_rules_enabled", - batch_size=1000, + rows = cast( + List[Tuple[str, str, Optional[int]]], + await self.db_pool.simple_select_many_batch( + table="push_rules_enable", + column="user_name", + iterable=user_ids, + retcols=("user_name", "rule_id", "enabled"), + desc="bulk_get_push_rules_enabled", + batch_size=1000, + ), ) - for row in rows: - enabled = bool(row["enabled"]) - results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled + for user_name, rule_id, enabled in rows: + results.setdefault(user_name, {})[rule_id] = bool(enabled) return results async def get_all_push_rule_updates( diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 87e28e22d3..c7eb7fc478 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py
@@ -47,6 +47,27 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# The type of a row in the pushers table. +PusherRow = Tuple[ + int, # id + str, # user_name + Optional[int], # access_token + str, # profile_tag + str, # kind + str, # app_id + str, # app_display_name + str, # device_display_name + str, # pushkey + int, # ts + str, # lang + str, # data + int, # last_stream_ordering + int, # last_success + int, # failing_since + bool, # enabled + str, # device_id +] + class PusherWorkerStore(SQLBaseStore): def __init__( @@ -83,30 +104,66 @@ class PusherWorkerStore(SQLBaseStore): self._remove_deleted_email_pushers, ) - def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: + def _decode_pushers_rows( + self, + rows: Iterable[PusherRow], + ) -> Iterator[PusherConfig]: """JSON-decode the data in the rows returned from the `pushers` table Drops any rows whose data cannot be decoded """ - for r in rows: - data_json = r["data"] + for ( + id, + user_name, + access_token, + profile_tag, + kind, + app_id, + app_display_name, + device_display_name, + pushkey, + ts, + lang, + data, + last_stream_ordering, + last_success, + failing_since, + enabled, + device_id, + ) in rows: try: - r["data"] = db_to_json(data_json) + data_json = db_to_json(data) except Exception as e: logger.warning( "Invalid JSON in data for pusher %d: %s, %s", - r["id"], - data_json, + id, + data, e.args[0], ) continue - # If we're using SQLite, then boolean values are integers. This is - # troublesome since some code using the return value of this method might - # expect it to be a boolean, or will expose it to clients (in responses). - r["enabled"] = bool(r["enabled"]) - - yield PusherConfig(**r) + yield PusherConfig( + id=id, + user_name=user_name, + profile_tag=profile_tag, + kind=kind, + app_id=app_id, + app_display_name=app_display_name, + device_display_name=device_display_name, + pushkey=pushkey, + ts=ts, + lang=lang, + data=data_json, + last_stream_ordering=last_stream_ordering, + last_success=last_success, + failing_since=failing_since, + # If we're using SQLite, then boolean values are integers. This is + # troublesome since some code using the return value of this method might + # expect it to be a boolean, or will expose it to clients (in responses). + enabled=bool(enabled), + device_id=device_id, + access_token=access_token, + ) def get_pushers_stream_token(self) -> int: return self._pushers_id_gen.get_current_token() @@ -136,7 +193,7 @@ class PusherWorkerStore(SQLBaseStore): The pushers for which the given columns have the given values. """ - def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def get_pushers_by_txn(txn: LoggingTransaction) -> List[PusherRow]: # We could technically use simple_select_list here, but we need to call # COALESCE on the 'enabled' column. While it is technically possible to give # simple_select_list the whole `COALESCE(...) AS ...` as a column name, it @@ -154,7 +211,7 @@ class PusherWorkerStore(SQLBaseStore): txn.execute(sql, list(keyvalues.values())) - return self.db_pool.cursor_to_dict(txn) + return cast(List[PusherRow], txn.fetchall()) ret = await self.db_pool.runInteraction( desc="get_pushers_by", @@ -164,14 +221,22 @@ class PusherWorkerStore(SQLBaseStore): return self._decode_pushers_rows(ret) async def get_enabled_pushers(self) -> Iterator[PusherConfig]: - def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]: - txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)") - rows = self.db_pool.cursor_to_dict(txn) - - return self._decode_pushers_rows(rows) + def get_enabled_pushers_txn(txn: LoggingTransaction) -> List[PusherRow]: + txn.execute( + """ + SELECT id, user_name, access_token, profile_tag, kind, app_id, + app_display_name, device_display_name, pushkey, ts, lang, data, + last_stream_ordering, last_success, failing_since, + enabled, device_id + FROM pushers WHERE COALESCE(enabled, TRUE) + """ + ) + return cast(List[PusherRow], txn.fetchall()) - return await self.db_pool.runInteraction( - "get_enabled_pushers", get_enabled_pushers_txn + return self._decode_pushers_rows( + await self.db_pool.runInteraction( + "get_enabled_pushers", get_enabled_pushers_txn + ) ) async def get_all_updated_pushers_rows( @@ -304,7 +369,7 @@ class PusherWorkerStore(SQLBaseStore): ) async def get_throttle_params_by_room( - self, pusher_id: str + self, pusher_id: int ) -> Dict[str, ThrottleParams]: res = await self.db_pool.simple_select_list( "pusher_throttle", @@ -323,7 +388,7 @@ class PusherWorkerStore(SQLBaseStore): return params_by_room async def set_throttle_params( - self, pusher_id: str, room_id: str, params: ThrottleParams + self, pusher_id: int, room_id: str, params: ThrottleParams ) -> None: await self.db_pool.simple_upsert( "pusher_throttle", @@ -534,7 +599,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): (last_pusher_id, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if len(rows) == 0: return 0 @@ -550,19 +615,19 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): txn=txn, table="pushers", key_names=("id",), - key_values=[(row["pusher_id"],) for row in rows], + key_values=[row[0] for row in rows], value_names=("device_id", "access_token"), # If there was already a device_id on the pusher, we only want to clear # the access_token column, so we keep the existing device_id. Otherwise, # we set the device_id we got from joining the access_tokens table. value_values=[ - (row["pusher_device_id"] or row["token_device_id"], None) - for row in rows + (pusher_device_id or token_device_id, None) + for _, pusher_device_id, token_device_id in rows ], ) self.db_pool.updates._background_update_progress_txn( - txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["pusher_id"]} + txn, "set_device_id_for_pushers", {"pusher_id": rows[-1][0]} ) return len(rows) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 0231f9407b..b2645ab43c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -313,25 +313,25 @@ class ReceiptsWorkerStore(SQLBaseStore): ) -> Sequence[JsonMapping]: """See get_linearized_receipts_for_room""" - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]: if from_key: sql = ( - "SELECT * FROM receipts_linearized WHERE" + "SELECT receipt_type, user_id, event_id, data" + " FROM receipts_linearized WHERE" " room_id = ? AND stream_id > ? AND stream_id <= ?" ) txn.execute(sql, (room_id, from_key, to_key)) else: sql = ( - "SELECT * FROM receipts_linearized WHERE" + "SELECT receipt_type, user_id, event_id, data" + " FROM receipts_linearized WHERE" " room_id = ? AND stream_id <= ?" ) txn.execute(sql, (room_id, to_key)) - rows = self.db_pool.cursor_to_dict(txn) - - return rows + return cast(List[Tuple[str, str, str, str]], txn.fetchall()) rows = await self.db_pool.runInteraction("get_linearized_receipts_for_room", f) @@ -339,10 +339,10 @@ class ReceiptsWorkerStore(SQLBaseStore): return [] content: JsonDict = {} - for row in rows: - content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[ - row["user_id"] - ] = db_to_json(row["data"]) + for receipt_type, user_id, event_id, data in rows: + content.setdefault(event_id, {}).setdefault(receipt_type, {})[ + user_id + ] = db_to_json(data) return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}] @@ -357,10 +357,13 @@ class ReceiptsWorkerStore(SQLBaseStore): if not room_ids: return {} - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f( + txn: LoggingTransaction, + ) -> List[Tuple[str, str, str, str, Optional[str], str]]: if from_key: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, thread_id, data + FROM receipts_linearized WHERE stream_id > ? AND stream_id <= ? AND """ clause, args = make_in_list_sql_clause( @@ -370,7 +373,8 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql + clause, [from_key, to_key] + list(args)) else: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, thread_id, data + FROM receipts_linearized WHERE stream_id <= ? AND """ @@ -380,29 +384,31 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql + clause, [to_key] + list(args)) - return self.db_pool.cursor_to_dict(txn) + return cast( + List[Tuple[str, str, str, str, Optional[str], str]], txn.fetchall() + ) txn_results = await self.db_pool.runInteraction( "_get_linearized_receipts_for_rooms", f ) results: JsonDict = {} - for row in txn_results: + for room_id, receipt_type, user_id, event_id, thread_id, data in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( - row["room_id"], - {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}}, + room_id, + {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}}, ) # The content is of the form: # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. } - event_entry = room_event["content"].setdefault(row["event_id"], {}) - receipt_type = event_entry.setdefault(row["receipt_type"], {}) + event_entry = room_event["content"].setdefault(event_id, {}) + receipt_type_dict = event_entry.setdefault(receipt_type, {}) - receipt_type[row["user_id"]] = db_to_json(row["data"]) - if row["thread_id"]: - receipt_type[row["user_id"]]["thread_id"] = row["thread_id"] + receipt_type_dict[user_id] = db_to_json(data) + if thread_id: + receipt_type_dict[user_id]["thread_id"] = thread_id results = { room_id: [results[room_id]] if room_id in results else [] @@ -428,10 +434,11 @@ class ReceiptsWorkerStore(SQLBaseStore): A dictionary of roomids to a list of receipts. """ - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: if from_key: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized WHERE stream_id > ? AND stream_id <= ? ORDER BY stream_id DESC LIMIT 100 @@ -439,7 +446,8 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql, [from_key, to_key]) else: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized WHERE stream_id <= ? ORDER BY stream_id DESC LIMIT 100 @@ -447,27 +455,27 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql, [to_key]) - return self.db_pool.cursor_to_dict(txn) + return cast(List[Tuple[str, str, str, str, str]], txn.fetchall()) txn_results = await self.db_pool.runInteraction( "get_linearized_receipts_for_all_rooms", f ) results: JsonDict = {} - for row in txn_results: + for room_id, receipt_type, user_id, event_id, data in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( - row["room_id"], - {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}}, + room_id, + {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}}, ) # The content is of the form: # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. } - event_entry = room_event["content"].setdefault(row["event_id"], {}) - receipt_type = event_entry.setdefault(row["receipt_type"], {}) + event_entry = room_event["content"].setdefault(event_id, {}) + receipt_type_dict = event_entry.setdefault(receipt_type, {}) - receipt_type[row["user_id"]] = db_to_json(row["data"]) + receipt_type_dict[user_id] = db_to_json(data) return results @@ -742,7 +750,7 @@ class ReceiptsWorkerStore(SQLBaseStore): event_ids: List[str], thread_id: Optional[str], data: dict, - ) -> Optional[Tuple[int, int]]: + ) -> Optional[int]: """Insert a receipt, either from local client or remote server. Automatically does conversion between linearized and graph @@ -804,9 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore): data, ) - max_persisted_id = self._receipts_id_gen.get_current_token() - - return stream_id, max_persisted_id + return stream_id async def _insert_graph_receipt( self, diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index cc964604e2..9e8643ae4d 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -143,6 +143,14 @@ class LoginTokenLookupResult: """The session ID advertised by the SSO Identity Provider.""" +@attr.s(frozen=True, slots=True, auto_attribs=True) +class ThreepidResult: + medium: str + address: str + validated_at: int + added_at: int + + class RegistrationWorkerStore(CacheInvalidationWorkerStore): def __init__( self, @@ -195,7 +203,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]: """Returns info about the user account, if it exists.""" - def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]: + def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[UserInfo]: # We could technically use simple_select_one here, but it would not perform # the COALESCEs (unless hacked into the column names), which could yield # confusing results. @@ -213,35 +221,46 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): (user_id,), ) - rows = self.db_pool.cursor_to_dict(txn) - - if len(rows) == 0: + row = txn.fetchone() + if not row: return None - return rows[0] + ( + name, + is_guest, + admin, + consent_version, + consent_ts, + consent_server_notice_sent, + appservice_id, + creation_ts, + user_type, + deactivated, + shadow_banned, + approved, + locked, + ) = row + + return UserInfo( + appservice_id=appservice_id, + consent_server_notice_sent=consent_server_notice_sent, + consent_version=consent_version, + consent_ts=consent_ts, + creation_ts=creation_ts, + is_admin=bool(admin), + is_deactivated=bool(deactivated), + is_guest=bool(is_guest), + is_shadow_banned=bool(shadow_banned), + user_id=UserID.from_string(name), + user_type=user_type, + approved=bool(approved), + locked=bool(locked), + ) - row = await self.db_pool.runInteraction( + return await self.db_pool.runInteraction( desc="get_user_by_id", func=get_user_by_id_txn, ) - if row is None: - return None - - return UserInfo( - appservice_id=row["appservice_id"], - consent_server_notice_sent=row["consent_server_notice_sent"], - consent_version=row["consent_version"], - consent_ts=row["consent_ts"], - creation_ts=row["creation_ts"], - is_admin=bool(row["admin"]), - is_deactivated=bool(row["deactivated"]), - is_guest=bool(row["is_guest"]), - is_shadow_banned=bool(row["shadow_banned"]), - user_id=UserID.from_string(row["name"]), - user_type=row["user_type"], - approved=bool(row["approved"]), - locked=bool(row["locked"]), - ) async def is_trial_user(self, user_id: str) -> bool: """Checks if user is in the "trial" period, i.e. within the first @@ -579,16 +598,31 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """ txn.execute(sql, (token,)) - rows = self.db_pool.cursor_to_dict(txn) - - if rows: - row = rows[0] + row = txn.fetchone() - # This field is nullable, ensure it comes out as a boolean - if row["token_used"] is None: - row["token_used"] = False - - return TokenLookupResult(**row) + if row: + ( + user_id, + is_guest, + shadow_banned, + token_id, + device_id, + valid_until_ms, + token_owner, + token_used, + ) = row + + return TokenLookupResult( + user_id=user_id, + is_guest=is_guest, + shadow_banned=shadow_banned, + token_id=token_id, + device_id=device_id, + valid_until_ms=valid_until_ms, + token_owner=token_owner, + # This field is nullable, ensure it comes out as a boolean + token_used=bool(token_used), + ) return None @@ -833,11 +867,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Counts all users registered on the homeserver.""" def _count_users(txn: LoggingTransaction) -> int: - txn.execute("SELECT COUNT(*) AS users FROM users") - rows = self.db_pool.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + txn.execute("SELECT COUNT(*) FROM users") + row = txn.fetchone() + assert row is not None + return row[0] return await self.db_pool.runInteraction("count_users", _count_users) @@ -891,11 +924,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Counts all users without a special user_type registered on the homeserver.""" def _count_users(txn: LoggingTransaction) -> int: - txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null") - rows = self.db_pool.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + txn.execute("SELECT COUNT(*) FROM users where user_type is null") + row = txn.fetchone() + assert row is not None + return row[0] return await self.db_pool.runInteraction("count_real_users", _count_users) @@ -964,13 +996,14 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): {"user_id": user_id, "validated_at": validated_at, "added_at": added_at}, ) - async def user_get_threepids(self, user_id: str) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + async def user_get_threepids(self, user_id: str) -> List[ThreepidResult]: + results = await self.db_pool.simple_select_list( "user_threepids", - {"user_id": user_id}, - ["medium", "address", "validated_at", "added_at"], - "user_get_threepids", + keyvalues={"user_id": user_id}, + retcols=["medium", "address", "validated_at", "added_at"], + desc="user_get_threepids", ) + return [ThreepidResult(**r) for r in results] async def user_delete_threepid( self, user_id: str, medium: str, address: str @@ -1252,12 +1285,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): ) txn.execute(sql, []) - res = self.db_pool.cursor_to_dict(txn) - if res: - for user in res: - self.set_expiration_date_for_user_txn( - txn, user["name"], use_delta=True - ) + for (name,) in txn.fetchall(): + self.set_expiration_date_for_user_txn(txn, name, use_delta=True) await self.db_pool.runInteraction( "get_users_with_no_expiration_date", @@ -1963,11 +1992,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): (user_id,), ) - rows = self.db_pool.cursor_to_dict(txn) + row = txn.fetchone() + assert row is not None # We cast to bool because the value returned by the database engine might # be an integer if we're using SQLite. - return bool(rows[0]["approved"]) + return bool(row[0]) return await self.db_pool.runInteraction( desc="is_user_pending_approval", @@ -2045,22 +2075,22 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): (last_user, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return True, 0 rows_processed_nb = 0 - for user in rows: - if not user["count_tokens"] and not user["count_threepids"]: - self.set_user_deactivated_status_txn(txn, user["name"], True) + for name, count_tokens, count_threepids in rows: + if not count_tokens and not count_threepids: + self.set_user_deactivated_status_txn(txn, name, True) rows_processed_nb += 1 logger.info("Marked %d rows as deactivated", rows_processed_nb) self.db_pool.updates._background_update_progress_txn( - txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]} + txn, "users_set_deactivated_flag", {"user_id": rows[-1][0]} ) if batch_size > len(rows): diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 9246b418f5..7f40e2c446 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py
@@ -349,16 +349,19 @@ class RelationsWorkerStore(SQLBaseStore): def get_all_relation_ids_for_event_with_types_txn( txn: LoggingTransaction, ) -> List[str]: - rows = self.db_pool.simple_select_many_txn( - txn=txn, - table="event_relations", - column="relation_type", - iterable=relation_types, - keyvalues={"relates_to_id": event_id}, - retcols=["event_id"], + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn=txn, + table="event_relations", + column="relation_type", + iterable=relation_types, + keyvalues={"relates_to_id": event_id}, + retcols=["event_id"], + ), ) - return [row["event_id"] for row in rows] + return [row[0] for row in rows] return await self.db_pool.runInteraction( desc="get_all_relation_ids_for_event_with_types", diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 719e11aea6..9d24d2c347 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -831,7 +831,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): def get_retention_policy_for_room_txn( txn: LoggingTransaction, - ) -> List[Dict[str, Optional[int]]]: + ) -> Optional[Tuple[Optional[int], Optional[int]]]: txn.execute( """ SELECT min_lifetime, max_lifetime FROM room_retention @@ -841,7 +841,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): (room_id,), ) - return self.db_pool.cursor_to_dict(txn) + return cast(Optional[Tuple[Optional[int], Optional[int]]], txn.fetchone()) ret = await self.db_pool.runInteraction( "get_retention_policy_for_room", @@ -856,8 +856,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): max_lifetime=self.config.retention.retention_default_max_lifetime, ) - min_lifetime = ret[0]["min_lifetime"] - max_lifetime = ret[0]["max_lifetime"] + min_lifetime, max_lifetime = ret # If one of the room's policy's attributes isn't defined, use the matching # attribute from the default policy. @@ -1162,14 +1161,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, args) - rows = self.db_pool.cursor_to_dict(txn) - rooms_dict = {} - - for row in rows: - rooms_dict[row["room_id"]] = RetentionPolicy( - min_lifetime=row["min_lifetime"], - max_lifetime=row["max_lifetime"], + rooms_dict = { + room_id: RetentionPolicy( + min_lifetime=min_lifetime, + max_lifetime=max_lifetime, ) + for room_id, min_lifetime, max_lifetime in txn + } if include_null: # If required, do a second query that retrieves all of the rooms we know @@ -1178,13 +1176,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql) - rows = self.db_pool.cursor_to_dict(txn) - # If a room isn't already in the dict (i.e. it doesn't have a retention # policy in its state), add it with a null policy. - for row in rows: - if row["room_id"] not in rooms_dict: - rooms_dict[row["room_id"]] = RetentionPolicy() + for (room_id,) in txn: + if room_id not in rooms_dict: + rooms_dict[room_id] = RetentionPolicy() return rooms_dict @@ -1300,14 +1296,17 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): complete. """ - rows: List[Dict[str, str]] = await self.db_pool.simple_select_many_batch( - table="partial_state_rooms", - column="room_id", - iterable=room_ids, - retcols=("room_id",), - desc="is_partial_state_room_batched", - ) - partial_state_rooms = {row_dict["room_id"] for row_dict in rows} + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="partial_state_rooms", + column="room_id", + iterable=room_ids, + retcols=("room_id",), + desc="is_partial_state_room_batched", + ), + ) + partial_state_rooms = {row[0] for row in rows} return {room_id: room_id in partial_state_rooms for room_id in room_ids} async def get_join_event_id_and_device_lists_stream_id_for_partial_state( @@ -1703,24 +1702,24 @@ class RoomBackgroundUpdateStore(SQLBaseStore): (last_room, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return True - for row in rows: - if not row["json"]: + for room_id, event_id, json in rows: + if not json: retention_policy = {} else: - ev = db_to_json(row["json"]) + ev = db_to_json(json) retention_policy = ev["content"] self.db_pool.simple_insert_txn( txn=txn, table="room_retention", values={ - "room_id": row["room_id"], - "event_id": row["event_id"], + "room_id": room_id, + "event_id": event_id, "min_lifetime": retention_policy.get("min_lifetime"), "max_lifetime": retention_policy.get("max_lifetime"), }, @@ -1729,7 +1728,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): logger.info("Inserted %d rows into room_retention", len(rows)) self.db_pool.updates._background_update_progress_txn( - txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]} + txn, "insert_room_retention", {"room_id": rows[-1][0]} ) if batch_size > len(rows): diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index e93573f315..3a87eba430 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -27,6 +27,7 @@ from typing import ( Set, Tuple, Union, + cast, ) import attr @@ -683,25 +684,28 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): Map from user_id to set of rooms that is currently in. """ - rows = await self.db_pool.simple_select_many_batch( - table="current_state_events", - column="state_key", - iterable=user_ids, - retcols=( - "state_key", - "room_id", + rows = cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_many_batch( + table="current_state_events", + column="state_key", + iterable=user_ids, + retcols=( + "state_key", + "room_id", + ), + keyvalues={ + "type": EventTypes.Member, + "membership": Membership.JOIN, + }, + desc="get_rooms_for_users", ), - keyvalues={ - "type": EventTypes.Member, - "membership": Membership.JOIN, - }, - desc="get_rooms_for_users", ) user_rooms: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids} - for row in rows: - user_rooms[row["state_key"]].add(row["room_id"]) + for state_key, room_id in rows: + user_rooms[state_key].add(room_id) return {key: frozenset(rooms) for key, rooms in user_rooms.items()} @@ -892,17 +896,20 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): Map from event ID to `user_id`, or None if event is not a join. """ - rows = await self.db_pool.simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=event_ids, - retcols=("user_id", "event_id"), - keyvalues={"membership": Membership.JOIN}, - batch_size=1000, - desc="_get_user_ids_from_membership_event_ids", + rows = cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=event_ids, + retcols=("event_id", "user_id"), + keyvalues={"membership": Membership.JOIN}, + batch_size=1000, + desc="_get_user_ids_from_membership_event_ids", + ), ) - return {row["event_id"]: row["user_id"] for row in rows} + return dict(rows) @cached(max_entries=10000) async def is_host_joined(self, room_id: str, host: str) -> bool: @@ -1202,21 +1209,22 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): membership event, otherwise the value is None. """ - rows = await self.db_pool.simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=member_event_ids, - retcols=("user_id", "membership", "event_id"), - keyvalues={}, - batch_size=500, - desc="get_membership_from_event_ids", + rows = cast( + List[Tuple[str, str, str]], + await self.db_pool.simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=member_event_ids, + retcols=("user_id", "membership", "event_id"), + keyvalues={}, + batch_size=500, + desc="get_membership_from_event_ids", + ), ) return { - row["event_id"]: EventIdMembership( - membership=row["membership"], user_id=row["user_id"] - ) - for row in rows + event_id: EventIdMembership(membership=membership, user_id=user_id) + for user_id, membership, event_id in rows } async def is_local_host_in_room_ignoring_users( @@ -1349,18 +1357,16 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return 0 - min_stream_id = rows[-1]["stream_ordering"] + min_stream_id = rows[-1][0] to_update = [] - for row in rows: - event_id = row["event_id"] - room_id = row["room_id"] + for _, event_id, room_id, json in rows: try: - event_json = db_to_json(row["json"]) + event_json = db_to_json(json) content = event_json["content"] except Exception: continue diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index a7aae661d8..1d69c4a5f0 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py
@@ -179,22 +179,24 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # store_search_entries_txn with a generator function, but that # would mean having two cursors open on the database at once. # Instead we just build a list of results. - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return 0 - min_stream_id = rows[-1]["stream_ordering"] + min_stream_id = rows[-1][0] event_search_rows = [] - for row in rows: + for ( + stream_ordering, + event_id, + room_id, + etype, + json, + origin_server_ts, + ) in rows: try: - event_id = row["event_id"] - room_id = row["room_id"] - etype = row["type"] - stream_ordering = row["stream_ordering"] - origin_server_ts = row["origin_server_ts"] try: - event_json = db_to_json(row["json"]) + event_json = db_to_json(json) content = event_json["content"] except Exception: continue diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 5eaaff5b68..598025dd91 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py
@@ -20,10 +20,12 @@ from typing import ( Collection, Dict, Iterable, + List, Mapping, Optional, Set, Tuple, + cast, ) import attr @@ -388,16 +390,19 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): Raises: RuntimeError if the state is unknown at any of the given events """ - rows = await self.db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=("event_id", "state_group"), - desc="_get_state_group_for_events", + rows = cast( + List[Tuple[str, int]], + await self.db_pool.simple_select_many_batch( + table="event_to_state_groups", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id", "state_group"), + desc="_get_state_group_for_events", + ), ) - res = {row["event_id"]: row["state_group"] for row in rows} + res = dict(rows) for e in event_ids: if e not in res: raise RuntimeError("No state group for unknown or outlier event %s" % e) @@ -415,16 +420,19 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): The subset of state groups that are referenced. """ - rows = await self.db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="state_group", - iterable=state_groups, - keyvalues={}, - retcols=("DISTINCT state_group",), - desc="get_referenced_state_groups", + rows = cast( + List[Tuple[int]], + await self.db_pool.simple_select_many_batch( + table="event_to_state_groups", + column="state_group", + iterable=state_groups, + keyvalues={}, + retcols=("DISTINCT state_group",), + desc="get_referenced_state_groups", + ), ) - return {row["state_group"] for row in rows} + return {row[0] for row in rows} async def update_state_for_partial_state_event( self, @@ -624,16 +632,22 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): # potentially stale, since there may have been a period where the # server didn't share a room with the remote user and therefore may # have missed any device updates. - rows = self.db_pool.simple_select_many_txn( - txn, - table="current_state_events", - column="room_id", - iterable=to_delete, - keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN}, - retcols=("state_key",), + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="current_state_events", + column="room_id", + iterable=to_delete, + keyvalues={ + "type": EventTypes.Member, + "membership": Membership.JOIN, + }, + retcols=("state_key",), + ), ) - potentially_left_users = {row["state_key"] for row in rows} + potentially_left_users = {row[0] for row in rows} # Now lets actually delete the rooms from the DB. self.db_pool.simple_delete_many_txn( diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 445213e12a..3151186e0c 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py
@@ -13,7 +13,9 @@ # limitations under the License. import logging -from typing import Any, Dict, List, Tuple +from typing import List, Optional, Tuple + +import attr from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction @@ -22,6 +24,20 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class StateDelta: + stream_id: int + room_id: str + event_type: str + state_key: str + + event_id: Optional[str] + """new event_id for this state key. None if the state has been deleted.""" + + prev_event_id: Optional[str] + """previous event_id for this state key. None if it's new state.""" + + class StateDeltasStore(SQLBaseStore): # This class must be mixed in with a child class which provides the following # attribute. TODO: can we get static analysis to enforce this? @@ -29,31 +45,21 @@ class StateDeltasStore(SQLBaseStore): async def get_partial_current_state_deltas( self, prev_stream_id: int, max_stream_id: int - ) -> Tuple[int, List[Dict[str, Any]]]: + ) -> Tuple[int, List[StateDelta]]: """Fetch a list of room state changes since the given stream id - Each entry in the result contains the following fields: - - stream_id (int) - - room_id (str) - - type (str): event type - - state_key (str): - - event_id (str|None): new event_id for this state key. None if the - state has been deleted. - - prev_event_id (str|None): previous event_id for this state key. None - if it's new state. - This may be the partial state if we're lazy joining the room. Args: prev_stream_id: point to get changes since (exclusive) max_stream_id: the point that we know has been correctly persisted - - ie, an upper limit to return changes from. + - ie, an upper limit to return changes from. Returns: A tuple consisting of: - - the stream id which these results go up to - - list of current_state_delta_stream rows. If it is empty, we are - up to date. + - the stream id which these results go up to + - list of current_state_delta_stream rows. If it is empty, we are + up to date. """ prev_stream_id = int(prev_stream_id) @@ -72,7 +78,7 @@ class StateDeltasStore(SQLBaseStore): def get_current_state_deltas_txn( txn: LoggingTransaction, - ) -> Tuple[int, List[Dict[str, Any]]]: + ) -> Tuple[int, List[StateDelta]]: # First we calculate the max stream id that will give us less than # N results. # We arbitrarily limit to 100 stream_id entries to ensure we don't @@ -112,7 +118,17 @@ class StateDeltasStore(SQLBaseStore): ORDER BY stream_id ASC """ txn.execute(sql, (prev_stream_id, clipped_stream_id)) - return clipped_stream_id, self.db_pool.cursor_to_dict(txn) + return clipped_stream_id, [ + StateDelta( + stream_id=row[0], + room_id=row[1], + event_type=row[2], + state_key=row[3], + event_id=row[4], + prev_event_id=row[5], + ) + for row in txn.fetchall() + ] return await self.db_pool.runInteraction( "get_current_state_deltas", get_current_state_deltas_txn diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 9d403919e4..5b2d0ba870 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py
@@ -506,25 +506,28 @@ class StatsStore(StateDeltasStore): ) -> Tuple[List[str], Dict[str, int], int, List[str], int]: pos = self.get_room_max_stream_ordering() # type: ignore[attr-defined] - rows = self.db_pool.simple_select_many_txn( - txn, - table="current_state_events", - column="type", - iterable=[ - EventTypes.Create, - EventTypes.JoinRules, - EventTypes.RoomHistoryVisibility, - EventTypes.RoomEncryption, - EventTypes.Name, - EventTypes.Topic, - EventTypes.RoomAvatar, - EventTypes.CanonicalAlias, - ], - keyvalues={"room_id": room_id, "state_key": ""}, - retcols=["event_id"], + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="current_state_events", + column="type", + iterable=[ + EventTypes.Create, + EventTypes.JoinRules, + EventTypes.RoomHistoryVisibility, + EventTypes.RoomEncryption, + EventTypes.Name, + EventTypes.Topic, + EventTypes.RoomAvatar, + EventTypes.CanonicalAlias, + ], + keyvalues={"room_id": room_id, "state_key": ""}, + retcols=["event_id"], + ), ) - event_ids = cast(List[str], [row["event_id"] for row in rows]) + event_ids = [row[0] for row in rows] txn.execute( """ diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 5a3611c415..ea06e4eee0 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -266,7 +266,7 @@ def generate_next_token( # when we are going backwards so we subtract one from the # stream part. last_stream_ordering -= 1 - return RoomStreamToken(last_topo_ordering, last_stream_ordering) + return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering) def _make_generic_sql_bound( @@ -558,7 +558,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if p > min_pos } - return RoomStreamToken(None, min_pos, immutabledict(positions)) + return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions)) async def get_room_events_stream_for_rooms( self, @@ -708,7 +708,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ret.reverse() if rows: - key = RoomStreamToken(None, min(r.stream_ordering for r in rows)) + key = RoomStreamToken(stream=min(r.stream_ordering for r in rows)) else: # Assume we didn't get anything because there was nothing to # get. @@ -969,7 +969,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): topo = await self.db_pool.runInteraction( "_get_max_topological_txn", self._get_max_topological_txn, room_id ) - return RoomStreamToken(topo, stream_ordering) + return RoomStreamToken(topological=topo, stream=stream_ordering) @overload def get_stream_id_for_event_txn( @@ -1033,7 +1033,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcols=("stream_ordering", "topological_ordering"), desc="get_topological_token_for_event", ) - return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) + return RoomStreamToken( + topological=row["topological_ordering"], stream=row["stream_ordering"] + ) async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: """Gets the topological token in a room after or at the given stream @@ -1114,8 +1116,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): else: topo = None internal = event.internal_metadata - internal.before = RoomStreamToken(topo, stream - 1) - internal.after = RoomStreamToken(topo, stream) + internal.before = RoomStreamToken(topological=topo, stream=stream - 1) + internal.after = RoomStreamToken(topological=topo, stream=stream) internal.order = (int(topo) if topo else 0, int(stream)) async def get_events_around( @@ -1191,11 +1193,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Paginating backwards includes the event at the token, but paginating # forward doesn't. before_token = RoomStreamToken( - results["topological_ordering"] - 1, results["stream_ordering"] + topological=results["topological_ordering"] - 1, + stream=results["stream_ordering"], ) after_token = RoomStreamToken( - results["topological_ordering"], results["stream_ordering"] + topological=results["topological_ordering"], + stream=results["stream_ordering"], ) rows, start_token = self._paginate_room_events_txn( diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 5c5372a825..5555b53575 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, List, Optional, Tuple, cast from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -27,6 +27,8 @@ from synapse.util import json_encoder if TYPE_CHECKING: from synapse.server import HomeServer +ScheduledTaskRow = Tuple[str, str, str, int, str, str, str, str] + class TaskSchedulerWorkerStore(SQLBaseStore): def __init__( @@ -38,13 +40,18 @@ class TaskSchedulerWorkerStore(SQLBaseStore): super().__init__(database, db_conn, hs) @staticmethod - def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: - row["status"] = TaskStatus(row["status"]) - if row["params"] is not None: - row["params"] = db_to_json(row["params"]) - if row["result"] is not None: - row["result"] = db_to_json(row["result"]) - return ScheduledTask(**row) + def _convert_row_to_task(row: ScheduledTaskRow) -> ScheduledTask: + task_id, action, status, timestamp, resource_id, params, result, error = row + return ScheduledTask( + id=task_id, + action=action, + status=TaskStatus(status), + timestamp=timestamp, + resource_id=resource_id, + params=db_to_json(params) if params is not None else None, + result=db_to_json(result) if result is not None else None, + error=error, + ) async def get_scheduled_tasks( self, @@ -68,7 +75,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore): Returns: a list of `ScheduledTask`, ordered by increasing timestamps """ - def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[ScheduledTaskRow]: clauses: List[str] = [] args: List[Any] = [] if resource_id: @@ -101,7 +108,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore): args.append(limit) txn.execute(sql, args) - return self.db_pool.cursor_to_dict(txn) + return cast(List[ScheduledTaskRow], txn.fetchall()) rows = await self.db_pool.runInteraction( "get_scheduled_tasks", get_scheduled_tasks_txn @@ -193,7 +200,22 @@ class TaskSchedulerWorkerStore(SQLBaseStore): desc="get_scheduled_task", ) - return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None + return ( + TaskSchedulerWorkerStore._convert_row_to_task( + ( + row["id"], + row["action"], + row["status"], + row["timestamp"], + row["resource_id"], + row["params"], + row["result"], + row["error"], + ) + ) + if row + else None + ) async def delete_scheduled_task(self, id: str) -> None: """Delete a specific task from its id. diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 8f70eff809..c4a6475060 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -211,18 +211,28 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): async def get_destination_retry_timings_batch( self, destinations: StrCollection ) -> Mapping[str, Optional[DestinationRetryTimings]]: - rows = await self.db_pool.simple_select_many_batch( - table="destinations", - iterable=destinations, - column="destination", - retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), - desc="get_destination_retry_timings_batch", + rows = cast( + List[Tuple[str, Optional[int], Optional[int], Optional[int]]], + await self.db_pool.simple_select_many_batch( + table="destinations", + iterable=destinations, + column="destination", + retcols=( + "destination", + "failure_ts", + "retry_last_ts", + "retry_interval", + ), + desc="get_destination_retry_timings_batch", + ), ) return { - row.pop("destination"): DestinationRetryTimings(**row) - for row in rows - if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"] + destination: DestinationRetryTimings( + failure_ts, retry_last_ts, retry_interval + ) + for destination, failure_ts, retry_last_ts, retry_interval in rows + if retry_last_ts and failure_ts and retry_interval } async def set_destination_retry_timings( @@ -526,7 +536,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): start: int, limit: int, direction: Direction = Direction.FORWARDS, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: """Function to retrieve a paginated list of destination's rooms. This will return a json list of rooms and the total number of rooms. @@ -537,12 +547,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): limit: number of rows to retrieve direction: sort ascending or descending by room_id Returns: - A tuple of a dict of rooms and a count of total rooms. + A tuple of a list of room tuples and a count of total rooms. + + Each room tuple is room_id, stream_ordering. """ def get_destination_rooms_paginate_txn( txn: LoggingTransaction, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: if direction == Direction.BACKWARDS: order = "DESC" else: @@ -556,14 +568,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, [destination]) count = cast(Tuple[int], txn.fetchone())[0] - rooms = self.db_pool.simple_select_list_paginate_txn( - txn=txn, - table="destination_rooms", - orderby="room_id", - start=start, - limit=limit, - retcols=("room_id", "stream_ordering"), - order_direction=order, + rooms = cast( + List[Tuple[str, int]], + self.db_pool.simple_select_list_paginate_txn( + txn=txn, + table="destination_rooms", + orderby="room_id", + start=start, + limit=limit, + retcols=("room_id", "stream_ordering"), + order_direction=order, + ), ) return rooms, count diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py
index f38bedbbcd..919c66f553 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py
@@ -337,13 +337,16 @@ class UIAuthWorkerStore(SQLBaseStore): # If a registration token was used, decrement the pending counter # before deleting the session. - rows = self.db_pool.simple_select_many_txn( - txn, - table="ui_auth_sessions_credentials", - column="session_id", - iterable=session_ids, - keyvalues={"stage_type": LoginType.REGISTRATION_TOKEN}, - retcols=["result"], + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="ui_auth_sessions_credentials", + column="session_id", + iterable=session_ids, + keyvalues={"stage_type": LoginType.REGISTRATION_TOKEN}, + retcols=["result"], + ), ) # Get the tokens used and how much pending needs to be decremented by. @@ -353,23 +356,25 @@ class UIAuthWorkerStore(SQLBaseStore): # registration token stage for that session will be True. # If a token was used to authenticate, but registration was # never completed, the result will be the token used. - token = db_to_json(r["result"]) + token = db_to_json(r[0]) if isinstance(token, str): token_counts[token] = token_counts.get(token, 0) + 1 # Update the `pending` counters. if len(token_counts) > 0: - token_rows = self.db_pool.simple_select_many_txn( - txn, - table="registration_tokens", - column="token", - iterable=list(token_counts.keys()), - keyvalues={}, - retcols=["token", "pending"], + token_rows = cast( + List[Tuple[str, int]], + self.db_pool.simple_select_many_txn( + txn, + table="registration_tokens", + column="token", + iterable=list(token_counts.keys()), + keyvalues={}, + retcols=["token", "pending"], + ), ) - for token_row in token_rows: - token = token_row["token"] - new_pending = token_row["pending"] - token_counts[token] + for token, pending in token_rows: + new_pending = pending - token_counts[token] self.db_pool.simple_update_one_txn( txn, table="registration_tokens", diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index f0dc31fee6..23eb92c514 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -410,25 +410,24 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) # Next fetch their profiles. Note that not all users have profiles. - profile_rows = self.db_pool.simple_select_many_txn( - txn, - table="profiles", - column="full_user_id", - iterable=list(users_to_insert), - retcols=( - "full_user_id", - "displayname", - "avatar_url", + profile_rows = cast( + List[Tuple[str, Optional[str], Optional[str]]], + self.db_pool.simple_select_many_txn( + txn, + table="profiles", + column="full_user_id", + iterable=list(users_to_insert), + retcols=( + "full_user_id", + "displayname", + "avatar_url", + ), + keyvalues={}, ), - keyvalues={}, ) profiles = { - row["full_user_id"]: _UserDirProfile( - row["full_user_id"], - row["displayname"], - row["avatar_url"], - ) - for row in profile_rows + full_user_id: _UserDirProfile(full_user_id, displayname, avatar_url) + for full_user_id, displayname, avatar_url in profile_rows } profiles_to_insert = [ @@ -517,18 +516,21 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined] ] - rows = self.db_pool.simple_select_many_txn( - txn, - table="users", - column="name", - iterable=users, - keyvalues={ - "deactivated": 0, - }, - retcols=("name", "user_type"), + rows = cast( + List[Tuple[str, Optional[str]]], + self.db_pool.simple_select_many_txn( + txn, + table="users", + column="name", + iterable=users, + keyvalues={ + "deactivated": 0, + }, + retcols=("name", "user_type"), + ), ) - return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT] + return [name for name, user_type in rows if user_type != UserTypes.SUPPORT] async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool: """Check if the room is either world_readable or publically joinable""" diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py
index 06fcbe5e54..8bd58c6e3d 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterable, Mapping +from typing import Iterable, List, Mapping, Tuple, cast from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main import CacheInvalidationWorkerStore @@ -50,14 +50,17 @@ class UserErasureWorkerStore(CacheInvalidationWorkerStore): Returns: for each user, whether the user has requested erasure. """ - rows = await self.db_pool.simple_select_many_batch( - table="erased_users", - column="user_id", - iterable=user_ids, - retcols=("user_id",), - desc="are_users_erased", + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="erased_users", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="are_users_erased", + ), ) - erased_users = {row["user_id"] for row in rows} + erased_users = {row[0] for row in rows} return {u: u in erased_users for u in user_ids} diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 6984d11352..09d2a8c5b3 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -13,7 +13,17 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + cast, +) import attr @@ -730,19 +740,22 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "[purge] found %i state groups to delete", len(state_groups_to_delete) ) - rows = self.db_pool.simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=state_groups_to_delete, - keyvalues={}, - retcols=("state_group",), + rows = cast( + List[Tuple[int]], + self.db_pool.simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=state_groups_to_delete, + keyvalues={}, + retcols=("state_group",), + ), ) remaining_state_groups = { - row["state_group"] - for row in rows - if row["state_group"] not in state_groups_to_delete + state_group + for state_group, in rows + if state_group not in state_groups_to_delete } logger.info( @@ -799,16 +812,19 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): A mapping from state group to previous state group. """ - rows = await self.db_pool.simple_select_many_batch( - table="state_group_edges", - column="prev_state_group", - iterable=state_groups, - keyvalues={}, - retcols=("prev_state_group", "state_group"), - desc="get_previous_state_groups", + rows = cast( + List[Tuple[int, int]], + await self.db_pool.simple_select_many_batch( + table="state_group_edges", + column="prev_state_group", + iterable=state_groups, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + desc="get_previous_state_groups", + ), ) - return {row["state_group"]: row["prev_state_group"] for row in rows} + return dict(rows) async def purge_room_state( self, room_id: str, state_groups_to_delete: Collection[int] diff --git a/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql new file mode 100644
index 0000000000..fc948166e6 --- /dev/null +++ b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql
@@ -0,0 +1,20 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8204, 'e2e_room_keys_index_room_id', '{}'); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8204, 'room_account_data_index_room_id', '{}');