diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 46957723a1..9f7959c45d 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -16,7 +16,6 @@ from itertools import chain
from typing import (
TYPE_CHECKING,
AbstractSet,
- Any,
Callable,
Collection,
Dict,
@@ -32,6 +31,7 @@ from typing import (
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
+from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.roommember import ProfileInfo
from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
@@ -531,19 +531,9 @@ class StateStorageController:
@tag_args
async def get_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int
- ) -> Tuple[int, List[Dict[str, Any]]]:
+ ) -> Tuple[int, List[StateDelta]]:
"""Fetch a list of room state changes since the given stream id
- Each entry in the result contains the following fields:
- - stream_id (int)
- - room_id (str)
- - type (str): event type
- - state_key (str):
- - event_id (str|None): new event_id for this state key. None if the
- state has been deleted.
- - prev_event_id (str|None): previous event_id for this state key. None
- if it's new state.
-
Args:
prev_stream_id: point to get changes since (exclusive)
max_stream_id: the point that we know has been correctly persisted
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ca894edd5a..81f661160c 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1874,9 +1874,9 @@ class DatabasePool:
keyvalues: Optional[Dict[str, Any]] = None,
desc: str = "simple_select_many_batch",
batch_size: int = 100,
- ) -> List[Dict[str, Any]]:
+ ) -> List[Tuple[Any, ...]]:
"""Executes a SELECT query on the named table, which may return zero or
- more rows, returning the result as a list of dicts.
+ more rows.
Filters rows by whether the value of `column` is in `iterable`.
@@ -1888,10 +1888,13 @@ class DatabasePool:
keyvalues: dict of column names and values to select the rows with
desc: description of the transaction, for logging and metrics
batch_size: the number of rows for each select query
+
+ Returns:
+ The results as a list of tuples.
"""
keyvalues = keyvalues or {}
- results: List[Dict[str, Any]] = []
+ results: List[Tuple[Any, ...]] = []
for chunk in batch_iter(iterable, batch_size):
rows = await self.runInteraction(
@@ -1918,9 +1921,9 @@ class DatabasePool:
iterable: Collection[Any],
keyvalues: Dict[str, Any],
retcols: Iterable[str],
- ) -> List[Dict[str, Any]]:
+ ) -> List[Tuple[Any, ...]]:
"""Executes a SELECT query on the named table, which may return zero or
- more rows, returning the result as a list of dicts.
+ more rows.
Filters rows by whether the value of `column` is in `iterable`.
@@ -1931,6 +1934,9 @@ class DatabasePool:
iterable: list
keyvalues: dict of column names and values to select the rows with
retcols: list of strings giving the names of the columns to return
+
+ Returns:
+ The results as a list of tuples.
"""
if not iterable:
return []
@@ -1949,7 +1955,7 @@ class DatabasePool:
)
txn.execute(sql, values)
- return cls.cursor_to_dict(txn)
+ return txn.fetchall()
async def simple_update(
self,
@@ -2418,7 +2424,7 @@ class DatabasePool:
keyvalues: Optional[Dict[str, Any]] = None,
exclude_keyvalues: Optional[Dict[str, Any]] = None,
order_direction: str = "ASC",
- ) -> List[Dict[str, Any]]:
+ ) -> List[Tuple[Any, ...]]:
"""
Executes a SELECT query on the named table with start and limit,
of row numbers, which may return zero or number of rows from start to limit,
@@ -2447,7 +2453,7 @@ class DatabasePool:
order_direction: Whether the results should be ordered "ASC" or "DESC".
Returns:
- The result as a list of dictionaries.
+ The result as a list of tuples.
"""
if order_direction not in ["ASC", "DESC"]:
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
@@ -2474,69 +2480,7 @@ class DatabasePool:
)
txn.execute(sql, arg_list + [limit, start])
- return cls.cursor_to_dict(txn)
-
- async def simple_search_list(
- self,
- table: str,
- term: Optional[str],
- col: str,
- retcols: Collection[str],
- desc: str = "simple_search_list",
- ) -> Optional[List[Dict[str, Any]]]:
- """Executes a SELECT query on the named table, which may return zero or
- more rows, returning the result as a list of dicts.
-
- Args:
- table: the table name
- term: term for searching the table matched to a column.
- col: column to query term should be matched to
- retcols: the names of the columns to return
-
- Returns:
- A list of dictionaries or None.
- """
-
- return await self.runInteraction(
- desc,
- self.simple_search_list_txn,
- table,
- term,
- col,
- retcols,
- db_autocommit=True,
- )
-
- @classmethod
- def simple_search_list_txn(
- cls,
- txn: LoggingTransaction,
- table: str,
- term: Optional[str],
- col: str,
- retcols: Iterable[str],
- ) -> Optional[List[Dict[str, Any]]]:
- """Executes a SELECT query on the named table, which may return zero or
- more rows, returning the result as a list of dicts.
-
- Args:
- txn: Transaction object
- table: the table name
- term: term for searching the table matched to a column.
- col: column to query term should be matched to
- retcols: the names of the columns to return
-
- Returns:
- None if no term is given, otherwise a list of dictionaries.
- """
- if term:
- sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (", ".join(retcols), table, col)
- termvalues = ["%%" + term + "%%"]
- txn.execute(sql, termvalues)
- else:
- return None
-
- return cls.cursor_to_dict(txn)
+ return txn.fetchall()
def make_in_list_sql_clause(
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 101403578c..840d725114 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -15,7 +15,7 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast
from synapse.api.constants import Direction
from synapse.config.homeserver import HomeServerConfig
@@ -142,26 +142,6 @@ class DataStore(
super().__init__(database, db_conn, hs)
- async def get_users(self) -> List[JsonDict]:
- """Function to retrieve a list of users in users table.
-
- Returns:
- A list of dictionaries representing users.
- """
- return await self.db_pool.simple_select_list(
- table="users",
- keyvalues={},
- retcols=[
- "name",
- "password_hash",
- "is_guest",
- "admin",
- "user_type",
- "deactivated",
- ],
- desc="get_users",
- )
-
async def get_users_paginate(
self,
start: int,
@@ -316,7 +296,11 @@ class DataStore(
"get_users_paginate_txn", get_users_paginate_txn
)
- async def search_users(self, term: str) -> Optional[List[JsonDict]]:
+ async def search_users(
+ self, term: str
+ ) -> List[
+ Tuple[str, Optional[str], Union[int, bool], Union[int, bool], Optional[str]]
+ ]:
"""Function to search users list for one or more users with
the matched term.
@@ -324,15 +308,37 @@ class DataStore(
term: search term
Returns:
- A list of dictionaries or None.
+ A list of tuples of name, password_hash, is_guest, admin, user_type or None.
"""
- return await self.db_pool.simple_search_list(
- table="users",
- term=term,
- col="name",
- retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
- desc="search_users",
- )
+
+ def search_users(
+ txn: LoggingTransaction,
+ ) -> List[
+ Tuple[str, Optional[str], Union[int, bool], Union[int, bool], Optional[str]]
+ ]:
+ search_term = "%%" + term + "%%"
+
+ sql = """
+ SELECT name, password_hash, is_guest, admin, user_type
+ FROM users
+ WHERE name LIKE ?
+ """
+ txn.execute(sql, (search_term,))
+
+ return cast(
+ List[
+ Tuple[
+ str,
+ Optional[str],
+ Union[int, bool],
+ Union[int, bool],
+ Optional[str],
+ ]
+ ],
+ txn.fetchall(),
+ )
+
+ return await self.db_pool.runInteraction("search_users", search_users)
def check_database_before_upgrade(
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 80f146dd53..39498d52c6 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -103,6 +103,13 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
"AccountDataAndTagsChangeCache", account_max
)
+ self.db_pool.updates.register_background_index_update(
+ update_name="room_account_data_index_room_id",
+ index_name="room_account_data_room_id",
+ table="room_account_data",
+ columns=("room_id",),
+ )
+
self.db_pool.updates.register_background_update_handler(
"delete_account_data_for_deactivated_users",
self._delete_account_data_for_deactivated_users,
@@ -151,10 +158,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
sql += " AND content != '{}'"
txn.execute(sql, (user_id,))
- rows = self.db_pool.cursor_to_dict(txn)
return {
- row["account_data_type"]: db_to_json(row["content"]) for row in rows
+ account_data_type: db_to_json(content)
+ for account_data_type, content in txn
}
return await self.db_pool.runInteraction(
@@ -196,13 +203,12 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
sql += " AND content != '{}'"
txn.execute(sql, (user_id,))
- rows = self.db_pool.cursor_to_dict(txn)
by_room: Dict[str, Dict[str, JsonDict]] = {}
- for row in rows:
- room_data = by_room.setdefault(row["room_id"], {})
+ for room_id, account_data_type, content in txn:
+ room_data = by_room.setdefault(room_id, {})
- room_data[row["account_data_type"]] = db_to_json(row["content"])
+ room_data[account_data_type] = db_to_json(content)
return by_room
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 0553a0621a..073a99cd84 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -14,17 +14,7 @@
# limitations under the License.
import logging
import re
-from typing import (
- TYPE_CHECKING,
- Any,
- Dict,
- List,
- Optional,
- Pattern,
- Sequence,
- Tuple,
- cast,
-)
+from typing import TYPE_CHECKING, List, Optional, Pattern, Sequence, Tuple, cast
from synapse.appservice import (
ApplicationService,
@@ -353,21 +343,15 @@ class ApplicationServiceTransactionWorkerStore(
def _get_oldest_unsent_txn(
txn: LoggingTransaction,
- ) -> Optional[Dict[str, Any]]:
+ ) -> Optional[Tuple[int, str]]:
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
txn.execute(
- "SELECT * FROM application_services_txns WHERE as_id=?"
+ "SELECT txn_id, event_ids FROM application_services_txns WHERE as_id=?"
" ORDER BY txn_id ASC LIMIT 1",
(service.id,),
)
- rows = self.db_pool.cursor_to_dict(txn)
- if not rows:
- return None
-
- entry = rows[0]
-
- return entry
+ return cast(Optional[Tuple[int, str]], txn.fetchone())
entry = await self.db_pool.runInteraction(
"get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn
@@ -376,8 +360,9 @@ class ApplicationServiceTransactionWorkerStore(
if not entry:
return None
- event_ids = db_to_json(entry["event_ids"])
+ txn_id, event_ids_str = entry
+ event_ids = db_to_json(event_ids_str)
events = await self.get_events_as_list(event_ids)
# TODO: to-device messages, one-time key counts, device list summaries and unused
@@ -385,7 +370,7 @@ class ApplicationServiceTransactionWorkerStore(
# We likely want to populate those for reliability.
return AppServiceTransaction(
service=service,
- id=entry["txn_id"],
+ id=txn_id,
events=events,
ephemeral=[],
to_device_messages=[],
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 16170e0436..bf5b8c804b 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -15,6 +15,7 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union, cast
+import attr
from typing_extensions import TypedDict
from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -42,7 +43,8 @@ logger = logging.getLogger(__name__)
LAST_SEEN_GRANULARITY = 10 * 60 * 1000
-class DeviceLastConnectionInfo(TypedDict):
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class DeviceLastConnectionInfo:
"""Metadata for the last connection seen for a user and device combination"""
# These types must match the columns in the `devices` table
@@ -499,24 +501,29 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
device_id: If None fetches all devices for the user
Returns:
- A dictionary mapping a tuple of (user_id, device_id) to dicts, with
- keys giving the column names from the devices table.
+ A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo.
"""
keyvalues = {"user_id": user_id}
if device_id is not None:
keyvalues["device_id"] = device_id
- res = cast(
- List[DeviceLastConnectionInfo],
- await self.db_pool.simple_select_list(
- table="devices",
- keyvalues=keyvalues,
- retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
- ),
+ res = await self.db_pool.simple_select_list(
+ table="devices",
+ keyvalues=keyvalues,
+ retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
)
- return {(d["user_id"], d["device_id"]): d for d in res}
+ return {
+ (d["user_id"], d["device_id"]): DeviceLastConnectionInfo(
+ user_id=d["user_id"],
+ device_id=d["device_id"],
+ ip=d["ip"],
+ user_agent=d["user_agent"],
+ last_seen=d["last_seen"],
+ )
+ for d in res
+ }
async def _get_user_ip_and_agents_from_database(
self, user: UserID, since_ts: int = 0
@@ -683,8 +690,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
device_id: If None fetches all devices for the user
Returns:
- A dictionary mapping a tuple of (user_id, device_id) to dicts, with
- keys giving the column names from the devices table.
+ A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo.
"""
ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id)
@@ -705,13 +711,13 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
continue
if not device_id or did == device_id:
- ret[(user_id, did)] = {
- "user_id": user_id,
- "ip": ip,
- "user_agent": user_agent,
- "device_id": did,
- "last_seen": last_seen,
- }
+ ret[(user_id, did)] = DeviceLastConnectionInfo(
+ user_id=user_id,
+ ip=ip,
+ user_agent=user_agent,
+ device_id=did,
+ last_seen=last_seen,
+ )
return ret
async def get_user_ip_and_agents(
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 0be12f0e06..72dc4f54dc 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -344,18 +344,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# Note that this is more efficient than just dropping `device_id` from the query,
# since device_inbox has an index on `(user_id, device_id, stream_id)`
if not device_ids_to_query:
- user_device_dicts = self.db_pool.simple_select_many_txn(
- txn,
- table="devices",
- column="user_id",
- iterable=user_ids_to_query,
- keyvalues={"hidden": False},
- retcols=("device_id",),
+ user_device_dicts = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="devices",
+ column="user_id",
+ iterable=user_ids_to_query,
+ keyvalues={"hidden": False},
+ retcols=("device_id",),
+ ),
)
- device_ids_to_query.update(
- {row["device_id"] for row in user_device_dicts}
- )
+ device_ids_to_query.update({row[0] for row in user_device_dicts})
if not device_ids_to_query:
# We've ended up with no devices to query.
@@ -449,7 +450,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
user_id: str,
device_id: Optional[str],
up_to_stream_id: int,
- limit: int,
+ limit: Optional[int] = None,
) -> int:
"""
Args:
@@ -480,11 +481,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
ROW_ID_NAME = self.database_engine.row_id_name
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
+ limit_statement = "" if limit is None else f"LIMIT {limit}"
sql = f"""
DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
SELECT {ROW_ID_NAME} FROM device_inbox
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
- LIMIT {limit}
+ {limit_statement}
)
"""
txn.execute(sql, (user_id, device_id, up_to_stream_id))
@@ -849,20 +851,21 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# We exclude hidden devices (such as cross-signing keys) here as they are
# not expected to receive to-device messages.
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="devices",
- keyvalues={"user_id": user_id, "hidden": False},
- column="device_id",
- iterable=devices,
- retcols=("device_id",),
+ rows = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="devices",
+ keyvalues={"user_id": user_id, "hidden": False},
+ column="device_id",
+ iterable=devices,
+ retcols=("device_id",),
+ ),
)
- for row in rows:
+ for (device_id,) in rows:
# Only insert into the local inbox if the device exists on
# this server
- device_id = row["device_id"]
-
with start_active_span("serialise_to_device_message"):
msg = messages_by_device[device_id]
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index a8206c6afe..a07086149c 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1054,16 +1054,19 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
async def get_device_list_last_stream_id_for_remotes(
self, user_ids: Iterable[str]
) -> Mapping[str, Optional[str]]:
- rows = await self.db_pool.simple_select_many_batch(
- table="device_lists_remote_extremeties",
- column="user_id",
- iterable=user_ids,
- retcols=("user_id", "stream_id"),
- desc="get_device_list_last_stream_id_for_remotes",
+ rows = cast(
+ List[Tuple[str, str]],
+ await self.db_pool.simple_select_many_batch(
+ table="device_lists_remote_extremeties",
+ column="user_id",
+ iterable=user_ids,
+ retcols=("user_id", "stream_id"),
+ desc="get_device_list_last_stream_id_for_remotes",
+ ),
)
results: Dict[str, Optional[str]] = {user_id: None for user_id in user_ids}
- results.update({row["user_id"]: row["stream_id"] for row in rows})
+ results.update(rows)
return results
@@ -1079,22 +1082,30 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
The IDs of users whose device lists need resync.
"""
if user_ids:
- rows = await self.db_pool.simple_select_many_batch(
- table="device_lists_remote_resync",
- column="user_id",
- iterable=user_ids,
- retcols=("user_id",),
- desc="get_user_ids_requiring_device_list_resync_with_iterable",
+ row_tuples = cast(
+ List[Tuple[str]],
+ await self.db_pool.simple_select_many_batch(
+ table="device_lists_remote_resync",
+ column="user_id",
+ iterable=user_ids,
+ retcols=("user_id",),
+ desc="get_user_ids_requiring_device_list_resync_with_iterable",
+ ),
)
+
+ return {row[0] for row in row_tuples}
else:
- rows = await self.db_pool.simple_select_list(
- table="device_lists_remote_resync",
- keyvalues=None,
- retcols=("user_id",),
- desc="get_user_ids_requiring_device_list_resync",
+ rows = cast(
+ List[Dict[str, str]],
+ await self.db_pool.simple_select_list(
+ table="device_lists_remote_resync",
+ keyvalues=None,
+ retcols=("user_id",),
+ desc="get_user_ids_requiring_device_list_resync",
+ ),
)
- return {row["user_id"] for row in rows}
+ return {row["user_id"] for row in rows}
async def mark_remote_users_device_caches_as_stale(
self, user_ids: StrCollection
@@ -1415,13 +1426,13 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
def get_devices_not_accessed_since_txn(
txn: LoggingTransaction,
- ) -> List[Dict[str, str]]:
+ ) -> List[Tuple[str, str]]:
sql = """
SELECT user_id, device_id
FROM devices WHERE last_seen < ? AND hidden = FALSE
"""
txn.execute(sql, (since_ms,))
- return self.db_pool.cursor_to_dict(txn)
+ return cast(List[Tuple[str, str]], txn.fetchall())
rows = await self.db_pool.runInteraction(
"get_devices_not_accessed_since",
@@ -1429,11 +1440,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
)
devices: Dict[str, List[str]] = {}
- for row in rows:
+ for user_id, device_id in rows:
# Remote devices are never stale from our point of view.
- if self.hs.is_mine_id(row["user_id"]):
- user_devices = devices.setdefault(row["user_id"], [])
- user_devices.append(row["device_id"])
+ if self.hs.is_mine_id(user_id):
+ user_devices = devices.setdefault(user_id, [])
+ user_devices.append(device_id)
return devices
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index d01f28cc80..aac4cfb054 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -53,6 +53,13 @@ class EndToEndRoomKeyBackgroundStore(SQLBaseStore):
):
super().__init__(database, db_conn, hs)
+ self.db_pool.updates.register_background_index_update(
+ update_name="e2e_room_keys_index_room_id",
+ index_name="e2e_room_keys_room_id",
+ table="e2e_room_keys",
+ columns=("room_id",),
+ )
+
self.db_pool.updates.register_background_update_handler(
"delete_e2e_backup_keys_for_deactivated_users",
self._delete_e2e_backup_keys_for_deactivated_users,
@@ -208,7 +215,7 @@ class EndToEndRoomKeyStore(EndToEndRoomKeyBackgroundStore):
"message": "Set room key",
"room_id": room_id,
"session_id": session_id,
- StreamKeyType.ROOM: room_key,
+ StreamKeyType.ROOM.value: room_key,
}
)
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 89fac23f93..f13d776b0d 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -493,15 +493,18 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
A map from (algorithm, key_id) to json string for key
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="e2e_one_time_keys_json",
- column="key_id",
- iterable=key_ids,
- retcols=("algorithm", "key_id", "key_json"),
- keyvalues={"user_id": user_id, "device_id": device_id},
- desc="add_e2e_one_time_keys_check",
+ rows = cast(
+ List[Tuple[str, str, str]],
+ await self.db_pool.simple_select_many_batch(
+ table="e2e_one_time_keys_json",
+ column="key_id",
+ iterable=key_ids,
+ retcols=("algorithm", "key_id", "key_json"),
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ desc="add_e2e_one_time_keys_check",
+ ),
)
- result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+ result = {(algorithm, key_id): key_json for algorithm, key_id, key_json in rows}
log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
return result
@@ -921,14 +924,10 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
}
txn.execute(sql, params)
- rows = self.db_pool.cursor_to_dict(txn)
- for row in rows:
- user_id = row["user_id"]
- key_type = row["keytype"]
- key = db_to_json(row["keydata"])
+ for user_id, key_type, key_data, _ in txn:
user_keys = result.setdefault(user_id, {})
- user_keys[key_type] = key
+ user_keys[key_type] = db_to_json(key_data)
return result
@@ -988,13 +987,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
query_params.extend(item)
txn.execute(sql, query_params)
- rows = self.db_pool.cursor_to_dict(txn)
# and add the signatures to the appropriate keys
- for row in rows:
- key_id: str = row["key_id"]
- target_user_id: str = row["target_user_id"]
- target_device_id: str = row["target_device_id"]
+ for target_user_id, target_device_id, key_id, signature in txn:
key_type = devices[(target_user_id, target_device_id)]
# We need to copy everything, because the result may have come
# from the cache. dict.copy only does a shallow copy, so we
@@ -1012,13 +1007,11 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
].copy()
if from_user_id in signatures:
user_sigs = signatures[from_user_id] = signatures[from_user_id]
- user_sigs[key_id] = row["signature"]
+ user_sigs[key_id] = signature
else:
- signatures[from_user_id] = {key_id: row["signature"]}
+ signatures[from_user_id] = {key_id: signature}
else:
- target_user_key["signatures"] = {
- from_user_id: {key_id: row["signature"]}
- }
+ target_user_key["signatures"] = {from_user_id: {key_id: signature}}
return keys
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index afffa54985..4f80ce75cc 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1049,15 +1049,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
Args:
event_ids: The event IDs to calculate the max depth of.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="events",
- column="event_id",
- iterable=event_ids,
- retcols=(
- "event_id",
- "depth",
+ rows = cast(
+ List[Tuple[str, int]],
+ await self.db_pool.simple_select_many_batch(
+ table="events",
+ column="event_id",
+ iterable=event_ids,
+ retcols=(
+ "event_id",
+ "depth",
+ ),
+ desc="get_max_depth_of",
),
- desc="get_max_depth_of",
)
if not rows:
@@ -1065,10 +1068,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
else:
max_depth_event_id = ""
current_max_depth = 0
- for row in rows:
- if row["depth"] > current_max_depth:
- max_depth_event_id = row["event_id"]
- current_max_depth = row["depth"]
+ for event_id, depth in rows:
+ if depth > current_max_depth:
+ max_depth_event_id = event_id
+ current_max_depth = depth
return max_depth_event_id, current_max_depth
@@ -1078,15 +1081,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
Args:
event_ids: The event IDs to calculate the max depth of.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="events",
- column="event_id",
- iterable=event_ids,
- retcols=(
- "event_id",
- "depth",
+ rows = cast(
+ List[Tuple[str, int]],
+ await self.db_pool.simple_select_many_batch(
+ table="events",
+ column="event_id",
+ iterable=event_ids,
+ retcols=(
+ "event_id",
+ "depth",
+ ),
+ desc="get_min_depth_of",
),
- desc="get_min_depth_of",
)
if not rows:
@@ -1094,10 +1100,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
else:
min_depth_event_id = ""
current_min_depth = MAX_DEPTH
- for row in rows:
- if row["depth"] < current_min_depth:
- min_depth_event_id = row["event_id"]
- current_min_depth = row["depth"]
+ for event_id, depth in rows:
+ if depth < current_min_depth:
+ min_depth_event_id = event_id
+ current_min_depth = depth
return min_depth_event_id, current_min_depth
@@ -1553,19 +1559,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
A filtered down list of `event_ids` that have previous failed pull attempts.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="event_failed_pull_attempts",
- column="event_id",
- iterable=event_ids,
- keyvalues={},
- retcols=("event_id",),
- desc="get_event_ids_with_failed_pull_attempts",
+ rows = cast(
+ List[Tuple[str]],
+ await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=("event_id",),
+ desc="get_event_ids_with_failed_pull_attempts",
+ ),
)
- event_ids_with_failed_pull_attempts: Set[str] = {
- row["event_id"] for row in rows
- }
-
- return event_ids_with_failed_pull_attempts
+ return {row[0] for row in rows}
@trace
async def get_event_ids_to_not_pull_from_backoff(
@@ -1585,32 +1590,34 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
A dictionary of event_ids that should not be attempted to be pulled and the
next timestamp at which we may try pulling them again.
"""
- event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
- table="event_failed_pull_attempts",
- column="event_id",
- iterable=event_ids,
- keyvalues={},
- retcols=(
- "event_id",
- "last_attempt_ts",
- "num_attempts",
+ event_failed_pull_attempts = cast(
+ List[Tuple[str, int, int]],
+ await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=(
+ "event_id",
+ "last_attempt_ts",
+ "num_attempts",
+ ),
+ desc="get_event_ids_to_not_pull_from_backoff",
),
- desc="get_event_ids_to_not_pull_from_backoff",
)
current_time = self._clock.time_msec()
event_ids_with_backoff = {}
- for event_failed_pull_attempt in event_failed_pull_attempts:
- event_id = event_failed_pull_attempt["event_id"]
+ for event_id, last_attempt_ts, num_attempts in event_failed_pull_attempts:
# Exponential back-off (up to the upper bound) so we don't try to
# pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
backoff_end_time = (
- event_failed_pull_attempt["last_attempt_ts"]
+ last_attempt_ts
+ (
2
** min(
- event_failed_pull_attempt["num_attempts"],
+ num_attempts,
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
)
)
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 790d058c43..ef6766b5e0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -27,6 +27,7 @@ from typing import (
Optional,
Set,
Tuple,
+ Union,
cast,
)
@@ -501,16 +502,19 @@ class PersistEventsStore:
# We ignore legacy rooms that we aren't filling the chain cover index
# for.
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="rooms",
- column="room_id",
- iterable={event.room_id for event in events if event.is_state()},
- keyvalues={},
- retcols=("room_id", "has_auth_chain_index"),
+ rows = cast(
+ List[Tuple[str, Optional[Union[int, bool]]]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="rooms",
+ column="room_id",
+ iterable={event.room_id for event in events if event.is_state()},
+ keyvalues={},
+ retcols=("room_id", "has_auth_chain_index"),
+ ),
)
rooms_using_chain_index = {
- row["room_id"] for row in rows if row["has_auth_chain_index"]
+ room_id for room_id, has_auth_chain_index in rows if has_auth_chain_index
}
state_events = {
@@ -571,19 +575,18 @@ class PersistEventsStore:
# We check if there are any events that need to be handled in the rooms
# we're looking at. These should just be out of band memberships, where
# we didn't have the auth chain when we first persisted.
- rows = db_pool.simple_select_many_txn(
- txn,
- table="event_auth_chain_to_calculate",
- keyvalues={},
- column="room_id",
- iterable=set(event_to_room_id.values()),
- retcols=("event_id", "type", "state_key"),
+ auth_chain_to_calc_rows = cast(
+ List[Tuple[str, str, str]],
+ db_pool.simple_select_many_txn(
+ txn,
+ table="event_auth_chain_to_calculate",
+ keyvalues={},
+ column="room_id",
+ iterable=set(event_to_room_id.values()),
+ retcols=("event_id", "type", "state_key"),
+ ),
)
- for row in rows:
- event_id = row["event_id"]
- event_type = row["type"]
- state_key = row["state_key"]
-
+ for event_id, event_type, state_key in auth_chain_to_calc_rows:
# (We could pull out the auth events for all rows at once using
# simple_select_many, but this case happens rarely and almost always
# with a single row.)
@@ -753,23 +756,31 @@ class PersistEventsStore:
# Step 1, fetch all existing links from all the chains we've seen
# referenced.
chain_links = _LinkMap()
- rows = db_pool.simple_select_many_txn(
- txn,
- table="event_auth_chain_links",
- column="origin_chain_id",
- iterable={chain_id for chain_id, _ in chain_map.values()},
- keyvalues={},
- retcols=(
- "origin_chain_id",
- "origin_sequence_number",
- "target_chain_id",
- "target_sequence_number",
+ auth_chain_rows = cast(
+ List[Tuple[int, int, int, int]],
+ db_pool.simple_select_many_txn(
+ txn,
+ table="event_auth_chain_links",
+ column="origin_chain_id",
+ iterable={chain_id for chain_id, _ in chain_map.values()},
+ keyvalues={},
+ retcols=(
+ "origin_chain_id",
+ "origin_sequence_number",
+ "target_chain_id",
+ "target_sequence_number",
+ ),
),
)
- for row in rows:
+ for (
+ origin_chain_id,
+ origin_sequence_number,
+ target_chain_id,
+ target_sequence_number,
+ ) in auth_chain_rows:
chain_links.add_link(
- (row["origin_chain_id"], row["origin_sequence_number"]),
- (row["target_chain_id"], row["target_sequence_number"]),
+ (origin_chain_id, origin_sequence_number),
+ (target_chain_id, target_sequence_number),
new=False,
)
@@ -1654,8 +1665,6 @@ class PersistEventsStore:
) -> None:
to_prefill = []
- rows = []
-
ev_map = {e.event_id: e for e, _ in events_and_contexts}
if not ev_map:
return
@@ -1676,10 +1685,9 @@ class PersistEventsStore:
)
txn.execute(sql + clause, args)
- rows = self.db_pool.cursor_to_dict(txn)
- for row in rows:
- event = ev_map[row["event_id"]]
- if not row["rejects"] and not row["redacts"]:
+ for event_id, redacts, rejects in txn:
+ event = ev_map[event_id]
+ if not rejects and not redacts:
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
async def external_prefill() -> None:
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index daef3685b0..c5fce1c82b 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -369,18 +369,20 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
for chunk in chunks:
- ev_rows = self.db_pool.simple_select_many_txn(
- txn,
- table="event_json",
- column="event_id",
- iterable=chunk,
- retcols=["event_id", "json"],
- keyvalues={},
+ ev_rows = cast(
+ List[Tuple[str, str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="event_json",
+ column="event_id",
+ iterable=chunk,
+ retcols=["event_id", "json"],
+ keyvalues={},
+ ),
)
- for row in ev_rows:
- event_id = row["event_id"]
- event_json = db_to_json(row["json"])
+ for event_id, json in ev_rows:
+ event_json = db_to_json(json)
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
@@ -563,15 +565,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
if deleted:
# We now need to invalidate the caches of these rooms
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="events",
- column="event_id",
- iterable=to_delete,
- keyvalues={},
- retcols=("room_id",),
+ rows = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="events",
+ column="event_id",
+ iterable=to_delete,
+ keyvalues={},
+ retcols=("room_id",),
+ ),
)
- room_ids = {row["room_id"] for row in rows}
+ room_ids = {row[0] for row in rows}
for room_id in room_ids:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,) # type: ignore[attr-defined]
@@ -1038,18 +1043,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
count = len(rows)
# We also need to fetch the auth events for them.
- auth_events = self.db_pool.simple_select_many_txn(
- txn,
- table="event_auth",
- column="event_id",
- iterable=event_to_room_id,
- keyvalues={},
- retcols=("event_id", "auth_id"),
+ auth_events = cast(
+ List[Tuple[str, str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="event_auth",
+ column="event_id",
+ iterable=event_to_room_id,
+ keyvalues={},
+ retcols=("event_id", "auth_id"),
+ ),
)
event_to_auth_chain: Dict[str, List[str]] = {}
- for row in auth_events:
- event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"])
+ for event_id, auth_id in auth_events:
+ event_to_auth_chain.setdefault(event_id, []).append(auth_id)
# Calculate and persist the chain cover index for this set of events.
#
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 8737a1370e..89757eabed 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1584,16 +1584,19 @@ class EventsWorkerStore(SQLBaseStore):
"""Given a list of event ids, check if we have already processed and
stored them as non outliers.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="events",
- retcols=("event_id",),
- column="event_id",
- iterable=list(event_ids),
- keyvalues={"outlier": False},
- desc="have_events_in_timeline",
+ rows = cast(
+ List[Tuple[str]],
+ await self.db_pool.simple_select_many_batch(
+ table="events",
+ retcols=("event_id",),
+ column="event_id",
+ iterable=list(event_ids),
+ keyvalues={"outlier": False},
+ desc="have_events_in_timeline",
+ ),
)
- return {r["event_id"] for r in rows}
+ return {r[0] for r in rows}
@trace
@tag_args
@@ -2340,15 +2343,18 @@ class EventsWorkerStore(SQLBaseStore):
a dict mapping from event id to partial-stateness. We return True for
any of the events which are unknown (or are outliers).
"""
- result = await self.db_pool.simple_select_many_batch(
- table="partial_state_events",
- column="event_id",
- iterable=event_ids,
- retcols=["event_id"],
- desc="get_partial_state_events",
+ result = cast(
+ List[Tuple[str]],
+ await self.db_pool.simple_select_many_batch(
+ table="partial_state_events",
+ column="event_id",
+ iterable=event_ids,
+ retcols=["event_id"],
+ desc="get_partial_state_events",
+ ),
)
# convert the result to a dict, to make @cachedList work
- partial = {r["event_id"] for r in result}
+ partial = {r[0] for r in result}
return {e_id: e_id in partial for e_id in event_ids}
@cached()
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index 889c578b9c..ea797864b9 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -16,7 +16,7 @@
import itertools
import json
import logging
-from typing import Dict, Iterable, Mapping, Optional, Tuple
+from typing import Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast
from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
@@ -205,35 +205,39 @@ class KeyStore(CacheInvalidationWorkerStore):
If we have multiple entries for a given key ID, returns the most recent.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="server_keys_json",
- column="key_id",
- iterable=key_ids,
- keyvalues={"server_name": server_name},
- retcols=(
- "key_id",
- "from_server",
- "ts_added_ms",
- "ts_valid_until_ms",
- "key_json",
+ rows = cast(
+ List[Tuple[str, str, int, int, Union[bytes, memoryview]]],
+ await self.db_pool.simple_select_many_batch(
+ table="server_keys_json",
+ column="key_id",
+ iterable=key_ids,
+ keyvalues={"server_name": server_name},
+ retcols=(
+ "key_id",
+ "from_server",
+ "ts_added_ms",
+ "ts_valid_until_ms",
+ "key_json",
+ ),
+ desc="get_server_keys_json_for_remote",
),
- desc="get_server_keys_json_for_remote",
)
if not rows:
return {}
- # We sort the rows so that the most recently added entry is picked up.
- rows.sort(key=lambda r: r["ts_added_ms"])
+ # We sort the rows by ts_added_ms so that the most recently added entry
+ # will stomp over older entries in the dictionary.
+ rows.sort(key=lambda r: r[2])
return {
- row["key_id"]: FetchKeyResultForRemote(
+ key_id: FetchKeyResultForRemote(
# Cast to bytes since postgresql returns a memoryview.
- key_json=bytes(row["key_json"]),
- valid_until_ts=row["ts_valid_until_ms"],
- added_ts=row["ts_added_ms"],
+ key_json=bytes(key_json),
+ valid_until_ts=ts_valid_until_ms,
+ added_ts=ts_added_ms,
)
- for row in rows
+ for key_id, from_server, ts_added_ms, ts_valid_until_ms, key_json in rows
}
async def get_all_server_keys_json_for_remote(
@@ -260,6 +264,8 @@ class KeyStore(CacheInvalidationWorkerStore):
if not rows:
return {}
+ # We sort the rows by ts_added_ms so that the most recently added entry
+ # will stomp over older entries in the dictionary.
rows.sort(key=lambda r: r["ts_added_ms"])
return {
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 8cebeb5189..2e6b176bd2 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -28,6 +28,7 @@ from typing import (
from synapse.api.constants import Direction
from synapse.logging.opentracing import trace
+from synapse.media._base import ThumbnailInfo
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -435,8 +436,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_url_cache",
)
- async def get_local_media_thumbnails(self, media_id: str) -> List[Dict[str, Any]]:
- return await self.db_pool.simple_select_list(
+ async def get_local_media_thumbnails(self, media_id: str) -> List[ThumbnailInfo]:
+ rows = await self.db_pool.simple_select_list(
"local_media_repository_thumbnails",
{"media_id": media_id},
(
@@ -448,6 +449,16 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
),
desc="get_local_media_thumbnails",
)
+ return [
+ ThumbnailInfo(
+ width=row["thumbnail_width"],
+ height=row["thumbnail_height"],
+ method=row["thumbnail_method"],
+ type=row["thumbnail_type"],
+ length=row["thumbnail_length"],
+ )
+ for row in rows
+ ]
@trace
async def store_local_thumbnail(
@@ -556,8 +567,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
async def get_remote_media_thumbnails(
self, origin: str, media_id: str
- ) -> List[Dict[str, Any]]:
- return await self.db_pool.simple_select_list(
+ ) -> List[ThumbnailInfo]:
+ rows = await self.db_pool.simple_select_list(
"remote_media_cache_thumbnails",
{"media_origin": origin, "media_id": media_id},
(
@@ -566,10 +577,19 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"thumbnail_method",
"thumbnail_type",
"thumbnail_length",
- "filesystem_id",
),
desc="get_remote_media_thumbnails",
)
+ return [
+ ThumbnailInfo(
+ width=row["thumbnail_width"],
+ height=row["thumbnail_height"],
+ method=row["thumbnail_method"],
+ type=row["thumbnail_type"],
+ length=row["thumbnail_length"],
+ )
+ for row in rows
+ ]
@trace
async def get_remote_media_thumbnail(
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 194b4e031f..3b444d2d07 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -20,6 +20,7 @@ from typing import (
Mapping,
Optional,
Tuple,
+ Union,
cast,
)
@@ -260,27 +261,40 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
async def get_presence_for_users(
self, user_ids: Iterable[str]
) -> Mapping[str, UserPresenceState]:
- rows = await self.db_pool.simple_select_many_batch(
- table="presence_stream",
- column="user_id",
- iterable=user_ids,
- keyvalues={},
- retcols=(
- "user_id",
- "state",
- "last_active_ts",
- "last_federation_update_ts",
- "last_user_sync_ts",
- "status_msg",
- "currently_active",
+ # TODO All these columns are nullable, but we don't expect that:
+ # https://github.com/matrix-org/synapse/issues/16467
+ rows = cast(
+ List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]],
+ await self.db_pool.simple_select_many_batch(
+ table="presence_stream",
+ column="user_id",
+ iterable=user_ids,
+ keyvalues={},
+ retcols=(
+ "user_id",
+ "state",
+ "last_active_ts",
+ "last_federation_update_ts",
+ "last_user_sync_ts",
+ "status_msg",
+ "currently_active",
+ ),
+ desc="get_presence_for_users",
),
- desc="get_presence_for_users",
)
- for row in rows:
- row["currently_active"] = bool(row["currently_active"])
-
- return {row["user_id"]: UserPresenceState(**row) for row in rows}
+ return {
+ user_id: UserPresenceState(
+ user_id=user_id,
+ state=state,
+ last_active_ts=last_active_ts,
+ last_federation_update_ts=last_federation_update_ts,
+ last_user_sync_ts=last_user_sync_ts,
+ status_msg=status_msg,
+ currently_active=bool(currently_active),
+ )
+ for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows
+ }
async def should_user_receive_full_presence_with_token(
self,
@@ -385,28 +399,49 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
limit = 100
offset = 0
while True:
- rows = await self.db_pool.runInteraction(
- "get_presence_for_all_users",
- self.db_pool.simple_select_list_paginate_txn,
- "presence_stream",
- orderby="stream_id",
- start=offset,
- limit=limit,
- exclude_keyvalues=exclude_keyvalues,
- retcols=(
- "user_id",
- "state",
- "last_active_ts",
- "last_federation_update_ts",
- "last_user_sync_ts",
- "status_msg",
- "currently_active",
+ # TODO All these columns are nullable, but we don't expect that:
+ # https://github.com/matrix-org/synapse/issues/16467
+ rows = cast(
+ List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]],
+ await self.db_pool.runInteraction(
+ "get_presence_for_all_users",
+ self.db_pool.simple_select_list_paginate_txn,
+ "presence_stream",
+ orderby="stream_id",
+ start=offset,
+ limit=limit,
+ exclude_keyvalues=exclude_keyvalues,
+ retcols=(
+ "user_id",
+ "state",
+ "last_active_ts",
+ "last_federation_update_ts",
+ "last_user_sync_ts",
+ "status_msg",
+ "currently_active",
+ ),
+ order_direction="ASC",
),
- order_direction="ASC",
)
- for row in rows:
- users_to_state[row["user_id"]] = UserPresenceState(**row)
+ for (
+ user_id,
+ state,
+ last_active_ts,
+ last_federation_update_ts,
+ last_user_sync_ts,
+ status_msg,
+ currently_active,
+ ) in rows:
+ users_to_state[user_id] = UserPresenceState(
+ user_id=user_id,
+ state=state,
+ last_active_ts=last_active_ts,
+ last_federation_update_ts=last_federation_update_ts,
+ last_user_sync_ts=last_user_sync_ts,
+ status_msg=status_msg,
+ currently_active=bool(currently_active),
+ )
# We've run out of updates to query
if len(rows) < limit:
@@ -434,13 +469,21 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
txn = db_conn.cursor()
txn.execute(sql, (PresenceState.OFFLINE,))
- rows = self.db_pool.cursor_to_dict(txn)
+ rows = txn.fetchall()
txn.close()
- for row in rows:
- row["currently_active"] = bool(row["currently_active"])
-
- return [UserPresenceState(**row) for row in rows]
+ return [
+ UserPresenceState(
+ user_id=user_id,
+ state=state,
+ last_active_ts=last_active_ts,
+ last_federation_update_ts=last_federation_update_ts,
+ last_user_sync_ts=last_user_sync_ts,
+ status_msg=status_msg,
+ currently_active=bool(currently_active),
+ )
+ for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows
+ ]
def take_presence_startup_info(self) -> List[UserPresenceState]:
active_on_startup = self._presence_on_startup
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index dea0e0458c..1e11bf2706 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -89,6 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# furthermore, we might already have the table from a previous (failed)
# purge attempt, so let's drop the table first.
+ if isinstance(self.database_engine, PostgresEngine):
+ # Disable statement timeouts for this transaction; purging rooms can
+ # take a while!
+ txn.execute("SET LOCAL statement_timeout = 0")
+
txn.execute("DROP TABLE IF EXISTS events_to_purge")
txn.execute(
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 923166974c..f5356e7f80 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -62,20 +62,34 @@ logger = logging.getLogger(__name__)
def _load_rules(
- rawrules: List[JsonDict],
+ rawrules: List[Tuple[str, int, str, str]],
enabled_map: Dict[str, bool],
experimental_config: ExperimentalConfig,
) -> FilteredPushRules:
"""Take the DB rows returned from the DB and convert them into a full
`FilteredPushRules` object.
+
+ Args:
+ rawrules: List of tuples of:
+ * rule ID
+ * Priority lass
+ * Conditions (as serialized JSON)
+ * Actions (as serialized JSON)
+ enabled_map: A dictionary of rule ID to a boolean of whether the rule is
+ enabled. This might not include all rule IDs from rawrules.
+ experimental_config: The `experimental_features` section of the Synapse
+ config. (Used to check if various features are enabled.)
+
+ Returns:
+ A new FilteredPushRules object.
"""
ruleslist = [
PushRule.from_db(
- rule_id=rawrule["rule_id"],
- priority_class=rawrule["priority_class"],
- conditions=rawrule["conditions"],
- actions=rawrule["actions"],
+ rule_id=rawrule[0],
+ priority_class=rawrule[1],
+ conditions=rawrule[2],
+ actions=rawrule[3],
)
for rawrule in rawrules
]
@@ -183,7 +197,19 @@ class PushRulesWorkerStore(
enabled_map = await self.get_push_rules_enabled_for_user(user_id)
- return _load_rules(rows, enabled_map, self.hs.config.experimental)
+ return _load_rules(
+ [
+ (
+ row["rule_id"],
+ row["priority_class"],
+ row["conditions"],
+ row["actions"],
+ )
+ for row in rows
+ ],
+ enabled_map,
+ self.hs.config.experimental,
+ )
async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]:
results = await self.db_pool.simple_select_list(
@@ -221,21 +247,36 @@ class PushRulesWorkerStore(
if not user_ids:
return {}
- raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids}
+ raw_rules: Dict[str, List[Tuple[str, int, str, str]]] = {
+ user_id: [] for user_id in user_ids
+ }
- rows = await self.db_pool.simple_select_many_batch(
- table="push_rules",
- column="user_name",
- iterable=user_ids,
- retcols=("*",),
- desc="bulk_get_push_rules",
- batch_size=1000,
+ rows = cast(
+ List[Tuple[str, str, int, int, str, str]],
+ await self.db_pool.simple_select_many_batch(
+ table="push_rules",
+ column="user_name",
+ iterable=user_ids,
+ retcols=(
+ "user_name",
+ "rule_id",
+ "priority_class",
+ "priority",
+ "conditions",
+ "actions",
+ ),
+ desc="bulk_get_push_rules",
+ batch_size=1000,
+ ),
)
- rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
+ # Sort by highest priority_class, then highest priority.
+ rows.sort(key=lambda row: (-int(row[2]), -int(row[3])))
- for row in rows:
- raw_rules.setdefault(row["user_name"], []).append(row)
+ for user_name, rule_id, priority_class, _, conditions, actions in rows:
+ raw_rules.setdefault(user_name, []).append(
+ (rule_id, priority_class, conditions, actions)
+ )
enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids)
@@ -256,17 +297,19 @@ class PushRulesWorkerStore(
results: Dict[str, Dict[str, bool]] = {user_id: {} for user_id in user_ids}
- rows = await self.db_pool.simple_select_many_batch(
- table="push_rules_enable",
- column="user_name",
- iterable=user_ids,
- retcols=("user_name", "rule_id", "enabled"),
- desc="bulk_get_push_rules_enabled",
- batch_size=1000,
+ rows = cast(
+ List[Tuple[str, str, Optional[int]]],
+ await self.db_pool.simple_select_many_batch(
+ table="push_rules_enable",
+ column="user_name",
+ iterable=user_ids,
+ retcols=("user_name", "rule_id", "enabled"),
+ desc="bulk_get_push_rules_enabled",
+ batch_size=1000,
+ ),
)
- for row in rows:
- enabled = bool(row["enabled"])
- results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
+ for user_name, rule_id, enabled in rows:
+ results.setdefault(user_name, {})[rule_id] = bool(enabled)
return results
async def get_all_push_rule_updates(
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 87e28e22d3..c7eb7fc478 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -47,6 +47,27 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+# The type of a row in the pushers table.
+PusherRow = Tuple[
+ int, # id
+ str, # user_name
+ Optional[int], # access_token
+ str, # profile_tag
+ str, # kind
+ str, # app_id
+ str, # app_display_name
+ str, # device_display_name
+ str, # pushkey
+ int, # ts
+ str, # lang
+ str, # data
+ int, # last_stream_ordering
+ int, # last_success
+ int, # failing_since
+ bool, # enabled
+ str, # device_id
+]
+
class PusherWorkerStore(SQLBaseStore):
def __init__(
@@ -83,30 +104,66 @@ class PusherWorkerStore(SQLBaseStore):
self._remove_deleted_email_pushers,
)
- def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
+ def _decode_pushers_rows(
+ self,
+ rows: Iterable[PusherRow],
+ ) -> Iterator[PusherConfig]:
"""JSON-decode the data in the rows returned from the `pushers` table
Drops any rows whose data cannot be decoded
"""
- for r in rows:
- data_json = r["data"]
+ for (
+ id,
+ user_name,
+ access_token,
+ profile_tag,
+ kind,
+ app_id,
+ app_display_name,
+ device_display_name,
+ pushkey,
+ ts,
+ lang,
+ data,
+ last_stream_ordering,
+ last_success,
+ failing_since,
+ enabled,
+ device_id,
+ ) in rows:
try:
- r["data"] = db_to_json(data_json)
+ data_json = db_to_json(data)
except Exception as e:
logger.warning(
"Invalid JSON in data for pusher %d: %s, %s",
- r["id"],
- data_json,
+ id,
+ data,
e.args[0],
)
continue
- # If we're using SQLite, then boolean values are integers. This is
- # troublesome since some code using the return value of this method might
- # expect it to be a boolean, or will expose it to clients (in responses).
- r["enabled"] = bool(r["enabled"])
-
- yield PusherConfig(**r)
+ yield PusherConfig(
+ id=id,
+ user_name=user_name,
+ profile_tag=profile_tag,
+ kind=kind,
+ app_id=app_id,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ pushkey=pushkey,
+ ts=ts,
+ lang=lang,
+ data=data_json,
+ last_stream_ordering=last_stream_ordering,
+ last_success=last_success,
+ failing_since=failing_since,
+ # If we're using SQLite, then boolean values are integers. This is
+ # troublesome since some code using the return value of this method might
+ # expect it to be a boolean, or will expose it to clients (in responses).
+ enabled=bool(enabled),
+ device_id=device_id,
+ access_token=access_token,
+ )
def get_pushers_stream_token(self) -> int:
return self._pushers_id_gen.get_current_token()
@@ -136,7 +193,7 @@ class PusherWorkerStore(SQLBaseStore):
The pushers for which the given columns have the given values.
"""
- def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+ def get_pushers_by_txn(txn: LoggingTransaction) -> List[PusherRow]:
# We could technically use simple_select_list here, but we need to call
# COALESCE on the 'enabled' column. While it is technically possible to give
# simple_select_list the whole `COALESCE(...) AS ...` as a column name, it
@@ -154,7 +211,7 @@ class PusherWorkerStore(SQLBaseStore):
txn.execute(sql, list(keyvalues.values()))
- return self.db_pool.cursor_to_dict(txn)
+ return cast(List[PusherRow], txn.fetchall())
ret = await self.db_pool.runInteraction(
desc="get_pushers_by",
@@ -164,14 +221,22 @@ class PusherWorkerStore(SQLBaseStore):
return self._decode_pushers_rows(ret)
async def get_enabled_pushers(self) -> Iterator[PusherConfig]:
- def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]:
- txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)")
- rows = self.db_pool.cursor_to_dict(txn)
-
- return self._decode_pushers_rows(rows)
+ def get_enabled_pushers_txn(txn: LoggingTransaction) -> List[PusherRow]:
+ txn.execute(
+ """
+ SELECT id, user_name, access_token, profile_tag, kind, app_id,
+ app_display_name, device_display_name, pushkey, ts, lang, data,
+ last_stream_ordering, last_success, failing_since,
+ enabled, device_id
+ FROM pushers WHERE COALESCE(enabled, TRUE)
+ """
+ )
+ return cast(List[PusherRow], txn.fetchall())
- return await self.db_pool.runInteraction(
- "get_enabled_pushers", get_enabled_pushers_txn
+ return self._decode_pushers_rows(
+ await self.db_pool.runInteraction(
+ "get_enabled_pushers", get_enabled_pushers_txn
+ )
)
async def get_all_updated_pushers_rows(
@@ -304,7 +369,7 @@ class PusherWorkerStore(SQLBaseStore):
)
async def get_throttle_params_by_room(
- self, pusher_id: str
+ self, pusher_id: int
) -> Dict[str, ThrottleParams]:
res = await self.db_pool.simple_select_list(
"pusher_throttle",
@@ -323,7 +388,7 @@ class PusherWorkerStore(SQLBaseStore):
return params_by_room
async def set_throttle_params(
- self, pusher_id: str, room_id: str, params: ThrottleParams
+ self, pusher_id: int, room_id: str, params: ThrottleParams
) -> None:
await self.db_pool.simple_upsert(
"pusher_throttle",
@@ -534,7 +599,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
(last_pusher_id, batch_size),
)
- rows = self.db_pool.cursor_to_dict(txn)
+ rows = txn.fetchall()
if len(rows) == 0:
return 0
@@ -550,19 +615,19 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
txn=txn,
table="pushers",
key_names=("id",),
- key_values=[(row["pusher_id"],) for row in rows],
+ key_values=[row[0] for row in rows],
value_names=("device_id", "access_token"),
# If there was already a device_id on the pusher, we only want to clear
# the access_token column, so we keep the existing device_id. Otherwise,
# we set the device_id we got from joining the access_tokens table.
value_values=[
- (row["pusher_device_id"] or row["token_device_id"], None)
- for row in rows
+ (pusher_device_id or token_device_id, None)
+ for _, pusher_device_id, token_device_id in rows
],
)
self.db_pool.updates._background_update_progress_txn(
- txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["pusher_id"]}
+ txn, "set_device_id_for_pushers", {"pusher_id": rows[-1][0]}
)
return len(rows)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 0231f9407b..b2645ab43c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -313,25 +313,25 @@ class ReceiptsWorkerStore(SQLBaseStore):
) -> Sequence[JsonMapping]:
"""See get_linearized_receipts_for_room"""
- def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]:
if from_key:
sql = (
- "SELECT * FROM receipts_linearized WHERE"
+ "SELECT receipt_type, user_id, event_id, data"
+ " FROM receipts_linearized WHERE"
" room_id = ? AND stream_id > ? AND stream_id <= ?"
)
txn.execute(sql, (room_id, from_key, to_key))
else:
sql = (
- "SELECT * FROM receipts_linearized WHERE"
+ "SELECT receipt_type, user_id, event_id, data"
+ " FROM receipts_linearized WHERE"
" room_id = ? AND stream_id <= ?"
)
txn.execute(sql, (room_id, to_key))
- rows = self.db_pool.cursor_to_dict(txn)
-
- return rows
+ return cast(List[Tuple[str, str, str, str]], txn.fetchall())
rows = await self.db_pool.runInteraction("get_linearized_receipts_for_room", f)
@@ -339,10 +339,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
return []
content: JsonDict = {}
- for row in rows:
- content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[
- row["user_id"]
- ] = db_to_json(row["data"])
+ for receipt_type, user_id, event_id, data in rows:
+ content.setdefault(event_id, {}).setdefault(receipt_type, {})[
+ user_id
+ ] = db_to_json(data)
return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}]
@@ -357,10 +357,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
if not room_ids:
return {}
- def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+ def f(
+ txn: LoggingTransaction,
+ ) -> List[Tuple[str, str, str, str, Optional[str], str]]:
if from_key:
sql = """
- SELECT * FROM receipts_linearized WHERE
+ SELECT room_id, receipt_type, user_id, event_id, thread_id, data
+ FROM receipts_linearized WHERE
stream_id > ? AND stream_id <= ? AND
"""
clause, args = make_in_list_sql_clause(
@@ -370,7 +373,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
txn.execute(sql + clause, [from_key, to_key] + list(args))
else:
sql = """
- SELECT * FROM receipts_linearized WHERE
+ SELECT room_id, receipt_type, user_id, event_id, thread_id, data
+ FROM receipts_linearized WHERE
stream_id <= ? AND
"""
@@ -380,29 +384,31 @@ class ReceiptsWorkerStore(SQLBaseStore):
txn.execute(sql + clause, [to_key] + list(args))
- return self.db_pool.cursor_to_dict(txn)
+ return cast(
+ List[Tuple[str, str, str, str, Optional[str], str]], txn.fetchall()
+ )
txn_results = await self.db_pool.runInteraction(
"_get_linearized_receipts_for_rooms", f
)
results: JsonDict = {}
- for row in txn_results:
+ for room_id, receipt_type, user_id, event_id, thread_id, data in txn_results:
# We want a single event per room, since we want to batch the
# receipts by room, event and type.
room_event = results.setdefault(
- row["room_id"],
- {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}},
+ room_id,
+ {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}},
)
# The content is of the form:
# {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
- event_entry = room_event["content"].setdefault(row["event_id"], {})
- receipt_type = event_entry.setdefault(row["receipt_type"], {})
+ event_entry = room_event["content"].setdefault(event_id, {})
+ receipt_type_dict = event_entry.setdefault(receipt_type, {})
- receipt_type[row["user_id"]] = db_to_json(row["data"])
- if row["thread_id"]:
- receipt_type[row["user_id"]]["thread_id"] = row["thread_id"]
+ receipt_type_dict[user_id] = db_to_json(data)
+ if thread_id:
+ receipt_type_dict[user_id]["thread_id"] = thread_id
results = {
room_id: [results[room_id]] if room_id in results else []
@@ -428,10 +434,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
A dictionary of roomids to a list of receipts.
"""
- def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]:
if from_key:
sql = """
- SELECT * FROM receipts_linearized WHERE
+ SELECT room_id, receipt_type, user_id, event_id, data
+ FROM receipts_linearized WHERE
stream_id > ? AND stream_id <= ?
ORDER BY stream_id DESC
LIMIT 100
@@ -439,7 +446,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
txn.execute(sql, [from_key, to_key])
else:
sql = """
- SELECT * FROM receipts_linearized WHERE
+ SELECT room_id, receipt_type, user_id, event_id, data
+ FROM receipts_linearized WHERE
stream_id <= ?
ORDER BY stream_id DESC
LIMIT 100
@@ -447,27 +455,27 @@ class ReceiptsWorkerStore(SQLBaseStore):
txn.execute(sql, [to_key])
- return self.db_pool.cursor_to_dict(txn)
+ return cast(List[Tuple[str, str, str, str, str]], txn.fetchall())
txn_results = await self.db_pool.runInteraction(
"get_linearized_receipts_for_all_rooms", f
)
results: JsonDict = {}
- for row in txn_results:
+ for room_id, receipt_type, user_id, event_id, data in txn_results:
# We want a single event per room, since we want to batch the
# receipts by room, event and type.
room_event = results.setdefault(
- row["room_id"],
- {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}},
+ room_id,
+ {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}},
)
# The content is of the form:
# {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
- event_entry = room_event["content"].setdefault(row["event_id"], {})
- receipt_type = event_entry.setdefault(row["receipt_type"], {})
+ event_entry = room_event["content"].setdefault(event_id, {})
+ receipt_type_dict = event_entry.setdefault(receipt_type, {})
- receipt_type[row["user_id"]] = db_to_json(row["data"])
+ receipt_type_dict[user_id] = db_to_json(data)
return results
@@ -742,7 +750,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
event_ids: List[str],
thread_id: Optional[str],
data: dict,
- ) -> Optional[Tuple[int, int]]:
+ ) -> Optional[int]:
"""Insert a receipt, either from local client or remote server.
Automatically does conversion between linearized and graph
@@ -804,9 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
data,
)
- max_persisted_id = self._receipts_id_gen.get_current_token()
-
- return stream_id, max_persisted_id
+ return stream_id
async def _insert_graph_receipt(
self,
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index cc964604e2..9e8643ae4d 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -143,6 +143,14 @@ class LoginTokenLookupResult:
"""The session ID advertised by the SSO Identity Provider."""
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class ThreepidResult:
+ medium: str
+ address: str
+ validated_at: int
+ added_at: int
+
+
class RegistrationWorkerStore(CacheInvalidationWorkerStore):
def __init__(
self,
@@ -195,7 +203,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]:
"""Returns info about the user account, if it exists."""
- def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]:
+ def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[UserInfo]:
# We could technically use simple_select_one here, but it would not perform
# the COALESCEs (unless hacked into the column names), which could yield
# confusing results.
@@ -213,35 +221,46 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
(user_id,),
)
- rows = self.db_pool.cursor_to_dict(txn)
-
- if len(rows) == 0:
+ row = txn.fetchone()
+ if not row:
return None
- return rows[0]
+ (
+ name,
+ is_guest,
+ admin,
+ consent_version,
+ consent_ts,
+ consent_server_notice_sent,
+ appservice_id,
+ creation_ts,
+ user_type,
+ deactivated,
+ shadow_banned,
+ approved,
+ locked,
+ ) = row
+
+ return UserInfo(
+ appservice_id=appservice_id,
+ consent_server_notice_sent=consent_server_notice_sent,
+ consent_version=consent_version,
+ consent_ts=consent_ts,
+ creation_ts=creation_ts,
+ is_admin=bool(admin),
+ is_deactivated=bool(deactivated),
+ is_guest=bool(is_guest),
+ is_shadow_banned=bool(shadow_banned),
+ user_id=UserID.from_string(name),
+ user_type=user_type,
+ approved=bool(approved),
+ locked=bool(locked),
+ )
- row = await self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
desc="get_user_by_id",
func=get_user_by_id_txn,
)
- if row is None:
- return None
-
- return UserInfo(
- appservice_id=row["appservice_id"],
- consent_server_notice_sent=row["consent_server_notice_sent"],
- consent_version=row["consent_version"],
- consent_ts=row["consent_ts"],
- creation_ts=row["creation_ts"],
- is_admin=bool(row["admin"]),
- is_deactivated=bool(row["deactivated"]),
- is_guest=bool(row["is_guest"]),
- is_shadow_banned=bool(row["shadow_banned"]),
- user_id=UserID.from_string(row["name"]),
- user_type=row["user_type"],
- approved=bool(row["approved"]),
- locked=bool(row["locked"]),
- )
async def is_trial_user(self, user_id: str) -> bool:
"""Checks if user is in the "trial" period, i.e. within the first
@@ -579,16 +598,31 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"""
txn.execute(sql, (token,))
- rows = self.db_pool.cursor_to_dict(txn)
-
- if rows:
- row = rows[0]
+ row = txn.fetchone()
- # This field is nullable, ensure it comes out as a boolean
- if row["token_used"] is None:
- row["token_used"] = False
-
- return TokenLookupResult(**row)
+ if row:
+ (
+ user_id,
+ is_guest,
+ shadow_banned,
+ token_id,
+ device_id,
+ valid_until_ms,
+ token_owner,
+ token_used,
+ ) = row
+
+ return TokenLookupResult(
+ user_id=user_id,
+ is_guest=is_guest,
+ shadow_banned=shadow_banned,
+ token_id=token_id,
+ device_id=device_id,
+ valid_until_ms=valid_until_ms,
+ token_owner=token_owner,
+ # This field is nullable, ensure it comes out as a boolean
+ token_used=bool(token_used),
+ )
return None
@@ -833,11 +867,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"""Counts all users registered on the homeserver."""
def _count_users(txn: LoggingTransaction) -> int:
- txn.execute("SELECT COUNT(*) AS users FROM users")
- rows = self.db_pool.cursor_to_dict(txn)
- if rows:
- return rows[0]["users"]
- return 0
+ txn.execute("SELECT COUNT(*) FROM users")
+ row = txn.fetchone()
+ assert row is not None
+ return row[0]
return await self.db_pool.runInteraction("count_users", _count_users)
@@ -891,11 +924,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"""Counts all users without a special user_type registered on the homeserver."""
def _count_users(txn: LoggingTransaction) -> int:
- txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null")
- rows = self.db_pool.cursor_to_dict(txn)
- if rows:
- return rows[0]["users"]
- return 0
+ txn.execute("SELECT COUNT(*) FROM users where user_type is null")
+ row = txn.fetchone()
+ assert row is not None
+ return row[0]
return await self.db_pool.runInteraction("count_real_users", _count_users)
@@ -964,13 +996,14 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
{"user_id": user_id, "validated_at": validated_at, "added_at": added_at},
)
- async def user_get_threepids(self, user_id: str) -> List[Dict[str, Any]]:
- return await self.db_pool.simple_select_list(
+ async def user_get_threepids(self, user_id: str) -> List[ThreepidResult]:
+ results = await self.db_pool.simple_select_list(
"user_threepids",
- {"user_id": user_id},
- ["medium", "address", "validated_at", "added_at"],
- "user_get_threepids",
+ keyvalues={"user_id": user_id},
+ retcols=["medium", "address", "validated_at", "added_at"],
+ desc="user_get_threepids",
)
+ return [ThreepidResult(**r) for r in results]
async def user_delete_threepid(
self, user_id: str, medium: str, address: str
@@ -1252,12 +1285,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
)
txn.execute(sql, [])
- res = self.db_pool.cursor_to_dict(txn)
- if res:
- for user in res:
- self.set_expiration_date_for_user_txn(
- txn, user["name"], use_delta=True
- )
+ for (name,) in txn.fetchall():
+ self.set_expiration_date_for_user_txn(txn, name, use_delta=True)
await self.db_pool.runInteraction(
"get_users_with_no_expiration_date",
@@ -1963,11 +1992,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
(user_id,),
)
- rows = self.db_pool.cursor_to_dict(txn)
+ row = txn.fetchone()
+ assert row is not None
# We cast to bool because the value returned by the database engine might
# be an integer if we're using SQLite.
- return bool(rows[0]["approved"])
+ return bool(row[0])
return await self.db_pool.runInteraction(
desc="is_user_pending_approval",
@@ -2045,22 +2075,22 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
(last_user, batch_size),
)
- rows = self.db_pool.cursor_to_dict(txn)
+ rows = txn.fetchall()
if not rows:
return True, 0
rows_processed_nb = 0
- for user in rows:
- if not user["count_tokens"] and not user["count_threepids"]:
- self.set_user_deactivated_status_txn(txn, user["name"], True)
+ for name, count_tokens, count_threepids in rows:
+ if not count_tokens and not count_threepids:
+ self.set_user_deactivated_status_txn(txn, name, True)
rows_processed_nb += 1
logger.info("Marked %d rows as deactivated", rows_processed_nb)
self.db_pool.updates._background_update_progress_txn(
- txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+ txn, "users_set_deactivated_flag", {"user_id": rows[-1][0]}
)
if batch_size > len(rows):
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 9246b418f5..7f40e2c446 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -349,16 +349,19 @@ class RelationsWorkerStore(SQLBaseStore):
def get_all_relation_ids_for_event_with_types_txn(
txn: LoggingTransaction,
) -> List[str]:
- rows = self.db_pool.simple_select_many_txn(
- txn=txn,
- table="event_relations",
- column="relation_type",
- iterable=relation_types,
- keyvalues={"relates_to_id": event_id},
- retcols=["event_id"],
+ rows = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn=txn,
+ table="event_relations",
+ column="relation_type",
+ iterable=relation_types,
+ keyvalues={"relates_to_id": event_id},
+ retcols=["event_id"],
+ ),
)
- return [row["event_id"] for row in rows]
+ return [row[0] for row in rows]
return await self.db_pool.runInteraction(
desc="get_all_relation_ids_for_event_with_types",
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 719e11aea6..9d24d2c347 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -831,7 +831,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
def get_retention_policy_for_room_txn(
txn: LoggingTransaction,
- ) -> List[Dict[str, Optional[int]]]:
+ ) -> Optional[Tuple[Optional[int], Optional[int]]]:
txn.execute(
"""
SELECT min_lifetime, max_lifetime FROM room_retention
@@ -841,7 +841,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
(room_id,),
)
- return self.db_pool.cursor_to_dict(txn)
+ return cast(Optional[Tuple[Optional[int], Optional[int]]], txn.fetchone())
ret = await self.db_pool.runInteraction(
"get_retention_policy_for_room",
@@ -856,8 +856,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
max_lifetime=self.config.retention.retention_default_max_lifetime,
)
- min_lifetime = ret[0]["min_lifetime"]
- max_lifetime = ret[0]["max_lifetime"]
+ min_lifetime, max_lifetime = ret
# If one of the room's policy's attributes isn't defined, use the matching
# attribute from the default policy.
@@ -1162,14 +1161,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
txn.execute(sql, args)
- rows = self.db_pool.cursor_to_dict(txn)
- rooms_dict = {}
-
- for row in rows:
- rooms_dict[row["room_id"]] = RetentionPolicy(
- min_lifetime=row["min_lifetime"],
- max_lifetime=row["max_lifetime"],
+ rooms_dict = {
+ room_id: RetentionPolicy(
+ min_lifetime=min_lifetime,
+ max_lifetime=max_lifetime,
)
+ for room_id, min_lifetime, max_lifetime in txn
+ }
if include_null:
# If required, do a second query that retrieves all of the rooms we know
@@ -1178,13 +1176,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
txn.execute(sql)
- rows = self.db_pool.cursor_to_dict(txn)
-
# If a room isn't already in the dict (i.e. it doesn't have a retention
# policy in its state), add it with a null policy.
- for row in rows:
- if row["room_id"] not in rooms_dict:
- rooms_dict[row["room_id"]] = RetentionPolicy()
+ for (room_id,) in txn:
+ if room_id not in rooms_dict:
+ rooms_dict[room_id] = RetentionPolicy()
return rooms_dict
@@ -1300,14 +1296,17 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
complete.
"""
- rows: List[Dict[str, str]] = await self.db_pool.simple_select_many_batch(
- table="partial_state_rooms",
- column="room_id",
- iterable=room_ids,
- retcols=("room_id",),
- desc="is_partial_state_room_batched",
- )
- partial_state_rooms = {row_dict["room_id"] for row_dict in rows}
+ rows = cast(
+ List[Tuple[str]],
+ await self.db_pool.simple_select_many_batch(
+ table="partial_state_rooms",
+ column="room_id",
+ iterable=room_ids,
+ retcols=("room_id",),
+ desc="is_partial_state_room_batched",
+ ),
+ )
+ partial_state_rooms = {row[0] for row in rows}
return {room_id: room_id in partial_state_rooms for room_id in room_ids}
async def get_join_event_id_and_device_lists_stream_id_for_partial_state(
@@ -1703,24 +1702,24 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
(last_room, batch_size),
)
- rows = self.db_pool.cursor_to_dict(txn)
+ rows = txn.fetchall()
if not rows:
return True
- for row in rows:
- if not row["json"]:
+ for room_id, event_id, json in rows:
+ if not json:
retention_policy = {}
else:
- ev = db_to_json(row["json"])
+ ev = db_to_json(json)
retention_policy = ev["content"]
self.db_pool.simple_insert_txn(
txn=txn,
table="room_retention",
values={
- "room_id": row["room_id"],
- "event_id": row["event_id"],
+ "room_id": room_id,
+ "event_id": event_id,
"min_lifetime": retention_policy.get("min_lifetime"),
"max_lifetime": retention_policy.get("max_lifetime"),
},
@@ -1729,7 +1728,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
logger.info("Inserted %d rows into room_retention", len(rows))
self.db_pool.updates._background_update_progress_txn(
- txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
+ txn, "insert_room_retention", {"room_id": rows[-1][0]}
)
if batch_size > len(rows):
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index e93573f315..3a87eba430 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -27,6 +27,7 @@ from typing import (
Set,
Tuple,
Union,
+ cast,
)
import attr
@@ -683,25 +684,28 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
Map from user_id to set of rooms that is currently in.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="current_state_events",
- column="state_key",
- iterable=user_ids,
- retcols=(
- "state_key",
- "room_id",
+ rows = cast(
+ List[Tuple[str, str]],
+ await self.db_pool.simple_select_many_batch(
+ table="current_state_events",
+ column="state_key",
+ iterable=user_ids,
+ retcols=(
+ "state_key",
+ "room_id",
+ ),
+ keyvalues={
+ "type": EventTypes.Member,
+ "membership": Membership.JOIN,
+ },
+ desc="get_rooms_for_users",
),
- keyvalues={
- "type": EventTypes.Member,
- "membership": Membership.JOIN,
- },
- desc="get_rooms_for_users",
)
user_rooms: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids}
- for row in rows:
- user_rooms[row["state_key"]].add(row["room_id"])
+ for state_key, room_id in rows:
+ user_rooms[state_key].add(room_id)
return {key: frozenset(rooms) for key, rooms in user_rooms.items()}
@@ -892,17 +896,20 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
Map from event ID to `user_id`, or None if event is not a join.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="room_memberships",
- column="event_id",
- iterable=event_ids,
- retcols=("user_id", "event_id"),
- keyvalues={"membership": Membership.JOIN},
- batch_size=1000,
- desc="_get_user_ids_from_membership_event_ids",
+ rows = cast(
+ List[Tuple[str, str]],
+ await self.db_pool.simple_select_many_batch(
+ table="room_memberships",
+ column="event_id",
+ iterable=event_ids,
+ retcols=("event_id", "user_id"),
+ keyvalues={"membership": Membership.JOIN},
+ batch_size=1000,
+ desc="_get_user_ids_from_membership_event_ids",
+ ),
)
- return {row["event_id"]: row["user_id"] for row in rows}
+ return dict(rows)
@cached(max_entries=10000)
async def is_host_joined(self, room_id: str, host: str) -> bool:
@@ -1202,21 +1209,22 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
membership event, otherwise the value is None.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="room_memberships",
- column="event_id",
- iterable=member_event_ids,
- retcols=("user_id", "membership", "event_id"),
- keyvalues={},
- batch_size=500,
- desc="get_membership_from_event_ids",
+ rows = cast(
+ List[Tuple[str, str, str]],
+ await self.db_pool.simple_select_many_batch(
+ table="room_memberships",
+ column="event_id",
+ iterable=member_event_ids,
+ retcols=("user_id", "membership", "event_id"),
+ keyvalues={},
+ batch_size=500,
+ desc="get_membership_from_event_ids",
+ ),
)
return {
- row["event_id"]: EventIdMembership(
- membership=row["membership"], user_id=row["user_id"]
- )
- for row in rows
+ event_id: EventIdMembership(membership=membership, user_id=user_id)
+ for user_id, membership, event_id in rows
}
async def is_local_host_in_room_ignoring_users(
@@ -1349,18 +1357,16 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
- rows = self.db_pool.cursor_to_dict(txn)
+ rows = txn.fetchall()
if not rows:
return 0
- min_stream_id = rows[-1]["stream_ordering"]
+ min_stream_id = rows[-1][0]
to_update = []
- for row in rows:
- event_id = row["event_id"]
- room_id = row["room_id"]
+ for _, event_id, room_id, json in rows:
try:
- event_json = db_to_json(row["json"])
+ event_json = db_to_json(json)
content = event_json["content"]
except Exception:
continue
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index a7aae661d8..1d69c4a5f0 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -179,22 +179,24 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# store_search_entries_txn with a generator function, but that
# would mean having two cursors open on the database at once.
# Instead we just build a list of results.
- rows = self.db_pool.cursor_to_dict(txn)
+ rows = txn.fetchall()
if not rows:
return 0
- min_stream_id = rows[-1]["stream_ordering"]
+ min_stream_id = rows[-1][0]
event_search_rows = []
- for row in rows:
+ for (
+ stream_ordering,
+ event_id,
+ room_id,
+ etype,
+ json,
+ origin_server_ts,
+ ) in rows:
try:
- event_id = row["event_id"]
- room_id = row["room_id"]
- etype = row["type"]
- stream_ordering = row["stream_ordering"]
- origin_server_ts = row["origin_server_ts"]
try:
- event_json = db_to_json(row["json"])
+ event_json = db_to_json(json)
content = event_json["content"]
except Exception:
continue
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 5eaaff5b68..598025dd91 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -20,10 +20,12 @@ from typing import (
Collection,
Dict,
Iterable,
+ List,
Mapping,
Optional,
Set,
Tuple,
+ cast,
)
import attr
@@ -388,16 +390,19 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
Raises:
RuntimeError if the state is unknown at any of the given events
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="event_to_state_groups",
- column="event_id",
- iterable=event_ids,
- keyvalues={},
- retcols=("event_id", "state_group"),
- desc="_get_state_group_for_events",
+ rows = cast(
+ List[Tuple[str, int]],
+ await self.db_pool.simple_select_many_batch(
+ table="event_to_state_groups",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=("event_id", "state_group"),
+ desc="_get_state_group_for_events",
+ ),
)
- res = {row["event_id"]: row["state_group"] for row in rows}
+ res = dict(rows)
for e in event_ids:
if e not in res:
raise RuntimeError("No state group for unknown or outlier event %s" % e)
@@ -415,16 +420,19 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
The subset of state groups that are referenced.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="event_to_state_groups",
- column="state_group",
- iterable=state_groups,
- keyvalues={},
- retcols=("DISTINCT state_group",),
- desc="get_referenced_state_groups",
+ rows = cast(
+ List[Tuple[int]],
+ await self.db_pool.simple_select_many_batch(
+ table="event_to_state_groups",
+ column="state_group",
+ iterable=state_groups,
+ keyvalues={},
+ retcols=("DISTINCT state_group",),
+ desc="get_referenced_state_groups",
+ ),
)
- return {row["state_group"] for row in rows}
+ return {row[0] for row in rows}
async def update_state_for_partial_state_event(
self,
@@ -624,16 +632,22 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
# potentially stale, since there may have been a period where the
# server didn't share a room with the remote user and therefore may
# have missed any device updates.
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="current_state_events",
- column="room_id",
- iterable=to_delete,
- keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN},
- retcols=("state_key",),
+ rows = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="current_state_events",
+ column="room_id",
+ iterable=to_delete,
+ keyvalues={
+ "type": EventTypes.Member,
+ "membership": Membership.JOIN,
+ },
+ retcols=("state_key",),
+ ),
)
- potentially_left_users = {row["state_key"] for row in rows}
+ potentially_left_users = {row[0] for row in rows}
# Now lets actually delete the rooms from the DB.
self.db_pool.simple_delete_many_txn(
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 445213e12a..3151186e0c 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -13,7 +13,9 @@
# limitations under the License.
import logging
-from typing import Any, Dict, List, Tuple
+from typing import List, Optional, Tuple
+
+import attr
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
@@ -22,6 +24,20 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class StateDelta:
+ stream_id: int
+ room_id: str
+ event_type: str
+ state_key: str
+
+ event_id: Optional[str]
+ """new event_id for this state key. None if the state has been deleted."""
+
+ prev_event_id: Optional[str]
+ """previous event_id for this state key. None if it's new state."""
+
+
class StateDeltasStore(SQLBaseStore):
# This class must be mixed in with a child class which provides the following
# attribute. TODO: can we get static analysis to enforce this?
@@ -29,31 +45,21 @@ class StateDeltasStore(SQLBaseStore):
async def get_partial_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int
- ) -> Tuple[int, List[Dict[str, Any]]]:
+ ) -> Tuple[int, List[StateDelta]]:
"""Fetch a list of room state changes since the given stream id
- Each entry in the result contains the following fields:
- - stream_id (int)
- - room_id (str)
- - type (str): event type
- - state_key (str):
- - event_id (str|None): new event_id for this state key. None if the
- state has been deleted.
- - prev_event_id (str|None): previous event_id for this state key. None
- if it's new state.
-
This may be the partial state if we're lazy joining the room.
Args:
prev_stream_id: point to get changes since (exclusive)
max_stream_id: the point that we know has been correctly persisted
- - ie, an upper limit to return changes from.
+ - ie, an upper limit to return changes from.
Returns:
A tuple consisting of:
- - the stream id which these results go up to
- - list of current_state_delta_stream rows. If it is empty, we are
- up to date.
+ - the stream id which these results go up to
+ - list of current_state_delta_stream rows. If it is empty, we are
+ up to date.
"""
prev_stream_id = int(prev_stream_id)
@@ -72,7 +78,7 @@ class StateDeltasStore(SQLBaseStore):
def get_current_state_deltas_txn(
txn: LoggingTransaction,
- ) -> Tuple[int, List[Dict[str, Any]]]:
+ ) -> Tuple[int, List[StateDelta]]:
# First we calculate the max stream id that will give us less than
# N results.
# We arbitrarily limit to 100 stream_id entries to ensure we don't
@@ -112,7 +118,17 @@ class StateDeltasStore(SQLBaseStore):
ORDER BY stream_id ASC
"""
txn.execute(sql, (prev_stream_id, clipped_stream_id))
- return clipped_stream_id, self.db_pool.cursor_to_dict(txn)
+ return clipped_stream_id, [
+ StateDelta(
+ stream_id=row[0],
+ room_id=row[1],
+ event_type=row[2],
+ state_key=row[3],
+ event_id=row[4],
+ prev_event_id=row[5],
+ )
+ for row in txn.fetchall()
+ ]
return await self.db_pool.runInteraction(
"get_current_state_deltas", get_current_state_deltas_txn
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 9d403919e4..5b2d0ba870 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -506,25 +506,28 @@ class StatsStore(StateDeltasStore):
) -> Tuple[List[str], Dict[str, int], int, List[str], int]:
pos = self.get_room_max_stream_ordering() # type: ignore[attr-defined]
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="current_state_events",
- column="type",
- iterable=[
- EventTypes.Create,
- EventTypes.JoinRules,
- EventTypes.RoomHistoryVisibility,
- EventTypes.RoomEncryption,
- EventTypes.Name,
- EventTypes.Topic,
- EventTypes.RoomAvatar,
- EventTypes.CanonicalAlias,
- ],
- keyvalues={"room_id": room_id, "state_key": ""},
- retcols=["event_id"],
+ rows = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="current_state_events",
+ column="type",
+ iterable=[
+ EventTypes.Create,
+ EventTypes.JoinRules,
+ EventTypes.RoomHistoryVisibility,
+ EventTypes.RoomEncryption,
+ EventTypes.Name,
+ EventTypes.Topic,
+ EventTypes.RoomAvatar,
+ EventTypes.CanonicalAlias,
+ ],
+ keyvalues={"room_id": room_id, "state_key": ""},
+ retcols=["event_id"],
+ ),
)
- event_ids = cast(List[str], [row["event_id"] for row in rows])
+ event_ids = [row[0] for row in rows]
txn.execute(
"""
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 5a3611c415..ea06e4eee0 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -266,7 +266,7 @@ def generate_next_token(
# when we are going backwards so we subtract one from the
# stream part.
last_stream_ordering -= 1
- return RoomStreamToken(last_topo_ordering, last_stream_ordering)
+ return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering)
def _make_generic_sql_bound(
@@ -558,7 +558,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if p > min_pos
}
- return RoomStreamToken(None, min_pos, immutabledict(positions))
+ return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions))
async def get_room_events_stream_for_rooms(
self,
@@ -708,7 +708,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ret.reverse()
if rows:
- key = RoomStreamToken(None, min(r.stream_ordering for r in rows))
+ key = RoomStreamToken(stream=min(r.stream_ordering for r in rows))
else:
# Assume we didn't get anything because there was nothing to
# get.
@@ -969,7 +969,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
topo = await self.db_pool.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
- return RoomStreamToken(topo, stream_ordering)
+ return RoomStreamToken(topological=topo, stream=stream_ordering)
@overload
def get_stream_id_for_event_txn(
@@ -1033,7 +1033,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
retcols=("stream_ordering", "topological_ordering"),
desc="get_topological_token_for_event",
)
- return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
+ return RoomStreamToken(
+ topological=row["topological_ordering"], stream=row["stream_ordering"]
+ )
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
"""Gets the topological token in a room after or at the given stream
@@ -1114,8 +1116,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
else:
topo = None
internal = event.internal_metadata
- internal.before = RoomStreamToken(topo, stream - 1)
- internal.after = RoomStreamToken(topo, stream)
+ internal.before = RoomStreamToken(topological=topo, stream=stream - 1)
+ internal.after = RoomStreamToken(topological=topo, stream=stream)
internal.order = (int(topo) if topo else 0, int(stream))
async def get_events_around(
@@ -1191,11 +1193,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# Paginating backwards includes the event at the token, but paginating
# forward doesn't.
before_token = RoomStreamToken(
- results["topological_ordering"] - 1, results["stream_ordering"]
+ topological=results["topological_ordering"] - 1,
+ stream=results["stream_ordering"],
)
after_token = RoomStreamToken(
- results["topological_ordering"], results["stream_ordering"]
+ topological=results["topological_ordering"],
+ stream=results["stream_ordering"],
)
rows, start_token = self._paginate_room_events_txn(
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 5c5372a825..5555b53575 100644
--- a/synapse/storage/databases/main/task_scheduler.py
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import TYPE_CHECKING, Any, Dict, List, Optional
+from typing import TYPE_CHECKING, Any, List, Optional, Tuple, cast
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -27,6 +27,8 @@ from synapse.util import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer
+ScheduledTaskRow = Tuple[str, str, str, int, str, str, str, str]
+
class TaskSchedulerWorkerStore(SQLBaseStore):
def __init__(
@@ -38,13 +40,18 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
super().__init__(database, db_conn, hs)
@staticmethod
- def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
- row["status"] = TaskStatus(row["status"])
- if row["params"] is not None:
- row["params"] = db_to_json(row["params"])
- if row["result"] is not None:
- row["result"] = db_to_json(row["result"])
- return ScheduledTask(**row)
+ def _convert_row_to_task(row: ScheduledTaskRow) -> ScheduledTask:
+ task_id, action, status, timestamp, resource_id, params, result, error = row
+ return ScheduledTask(
+ id=task_id,
+ action=action,
+ status=TaskStatus(status),
+ timestamp=timestamp,
+ resource_id=resource_id,
+ params=db_to_json(params) if params is not None else None,
+ result=db_to_json(result) if result is not None else None,
+ error=error,
+ )
async def get_scheduled_tasks(
self,
@@ -68,7 +75,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
Returns: a list of `ScheduledTask`, ordered by increasing timestamps
"""
- def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[ScheduledTaskRow]:
clauses: List[str] = []
args: List[Any] = []
if resource_id:
@@ -101,7 +108,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
args.append(limit)
txn.execute(sql, args)
- return self.db_pool.cursor_to_dict(txn)
+ return cast(List[ScheduledTaskRow], txn.fetchall())
rows = await self.db_pool.runInteraction(
"get_scheduled_tasks", get_scheduled_tasks_txn
@@ -193,7 +200,22 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
desc="get_scheduled_task",
)
- return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None
+ return (
+ TaskSchedulerWorkerStore._convert_row_to_task(
+ (
+ row["id"],
+ row["action"],
+ row["status"],
+ row["timestamp"],
+ row["resource_id"],
+ row["params"],
+ row["result"],
+ row["error"],
+ )
+ )
+ if row
+ else None
+ )
async def delete_scheduled_task(self, id: str) -> None:
"""Delete a specific task from its id.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 8f70eff809..c4a6475060 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -211,18 +211,28 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
async def get_destination_retry_timings_batch(
self, destinations: StrCollection
) -> Mapping[str, Optional[DestinationRetryTimings]]:
- rows = await self.db_pool.simple_select_many_batch(
- table="destinations",
- iterable=destinations,
- column="destination",
- retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
- desc="get_destination_retry_timings_batch",
+ rows = cast(
+ List[Tuple[str, Optional[int], Optional[int], Optional[int]]],
+ await self.db_pool.simple_select_many_batch(
+ table="destinations",
+ iterable=destinations,
+ column="destination",
+ retcols=(
+ "destination",
+ "failure_ts",
+ "retry_last_ts",
+ "retry_interval",
+ ),
+ desc="get_destination_retry_timings_batch",
+ ),
)
return {
- row.pop("destination"): DestinationRetryTimings(**row)
- for row in rows
- if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"]
+ destination: DestinationRetryTimings(
+ failure_ts, retry_last_ts, retry_interval
+ )
+ for destination, failure_ts, retry_last_ts, retry_interval in rows
+ if retry_last_ts and failure_ts and retry_interval
}
async def set_destination_retry_timings(
@@ -526,7 +536,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
start: int,
limit: int,
direction: Direction = Direction.FORWARDS,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[Tuple[str, int]], int]:
"""Function to retrieve a paginated list of destination's rooms.
This will return a json list of rooms and the
total number of rooms.
@@ -537,12 +547,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
limit: number of rows to retrieve
direction: sort ascending or descending by room_id
Returns:
- A tuple of a dict of rooms and a count of total rooms.
+ A tuple of a list of room tuples and a count of total rooms.
+
+ Each room tuple is room_id, stream_ordering.
"""
def get_destination_rooms_paginate_txn(
txn: LoggingTransaction,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[Tuple[str, int]], int]:
if direction == Direction.BACKWARDS:
order = "DESC"
else:
@@ -556,14 +568,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
txn.execute(sql, [destination])
count = cast(Tuple[int], txn.fetchone())[0]
- rooms = self.db_pool.simple_select_list_paginate_txn(
- txn=txn,
- table="destination_rooms",
- orderby="room_id",
- start=start,
- limit=limit,
- retcols=("room_id", "stream_ordering"),
- order_direction=order,
+ rooms = cast(
+ List[Tuple[str, int]],
+ self.db_pool.simple_select_list_paginate_txn(
+ txn=txn,
+ table="destination_rooms",
+ orderby="room_id",
+ start=start,
+ limit=limit,
+ retcols=("room_id", "stream_ordering"),
+ order_direction=order,
+ ),
)
return rooms, count
diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py
index f38bedbbcd..919c66f553 100644
--- a/synapse/storage/databases/main/ui_auth.py
+++ b/synapse/storage/databases/main/ui_auth.py
@@ -337,13 +337,16 @@ class UIAuthWorkerStore(SQLBaseStore):
# If a registration token was used, decrement the pending counter
# before deleting the session.
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="ui_auth_sessions_credentials",
- column="session_id",
- iterable=session_ids,
- keyvalues={"stage_type": LoginType.REGISTRATION_TOKEN},
- retcols=["result"],
+ rows = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="ui_auth_sessions_credentials",
+ column="session_id",
+ iterable=session_ids,
+ keyvalues={"stage_type": LoginType.REGISTRATION_TOKEN},
+ retcols=["result"],
+ ),
)
# Get the tokens used and how much pending needs to be decremented by.
@@ -353,23 +356,25 @@ class UIAuthWorkerStore(SQLBaseStore):
# registration token stage for that session will be True.
# If a token was used to authenticate, but registration was
# never completed, the result will be the token used.
- token = db_to_json(r["result"])
+ token = db_to_json(r[0])
if isinstance(token, str):
token_counts[token] = token_counts.get(token, 0) + 1
# Update the `pending` counters.
if len(token_counts) > 0:
- token_rows = self.db_pool.simple_select_many_txn(
- txn,
- table="registration_tokens",
- column="token",
- iterable=list(token_counts.keys()),
- keyvalues={},
- retcols=["token", "pending"],
+ token_rows = cast(
+ List[Tuple[str, int]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="registration_tokens",
+ column="token",
+ iterable=list(token_counts.keys()),
+ keyvalues={},
+ retcols=["token", "pending"],
+ ),
)
- for token_row in token_rows:
- token = token_row["token"]
- new_pending = token_row["pending"] - token_counts[token]
+ for token, pending in token_rows:
+ new_pending = pending - token_counts[token]
self.db_pool.simple_update_one_txn(
txn,
table="registration_tokens",
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index f0dc31fee6..23eb92c514 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -410,25 +410,24 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
)
# Next fetch their profiles. Note that not all users have profiles.
- profile_rows = self.db_pool.simple_select_many_txn(
- txn,
- table="profiles",
- column="full_user_id",
- iterable=list(users_to_insert),
- retcols=(
- "full_user_id",
- "displayname",
- "avatar_url",
+ profile_rows = cast(
+ List[Tuple[str, Optional[str], Optional[str]]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="profiles",
+ column="full_user_id",
+ iterable=list(users_to_insert),
+ retcols=(
+ "full_user_id",
+ "displayname",
+ "avatar_url",
+ ),
+ keyvalues={},
),
- keyvalues={},
)
profiles = {
- row["full_user_id"]: _UserDirProfile(
- row["full_user_id"],
- row["displayname"],
- row["avatar_url"],
- )
- for row in profile_rows
+ full_user_id: _UserDirProfile(full_user_id, displayname, avatar_url)
+ for full_user_id, displayname, avatar_url in profile_rows
}
profiles_to_insert = [
@@ -517,18 +516,21 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined]
]
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="users",
- column="name",
- iterable=users,
- keyvalues={
- "deactivated": 0,
- },
- retcols=("name", "user_type"),
+ rows = cast(
+ List[Tuple[str, Optional[str]]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="users",
+ column="name",
+ iterable=users,
+ keyvalues={
+ "deactivated": 0,
+ },
+ retcols=("name", "user_type"),
+ ),
)
- return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT]
+ return [name for name, user_type in rows if user_type != UserTypes.SUPPORT]
async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
"""Check if the room is either world_readable or publically joinable"""
diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py
index 06fcbe5e54..8bd58c6e3d 100644
--- a/synapse/storage/databases/main/user_erasure_store.py
+++ b/synapse/storage/databases/main/user_erasure_store.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Iterable, Mapping
+from typing import Iterable, List, Mapping, Tuple, cast
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main import CacheInvalidationWorkerStore
@@ -50,14 +50,17 @@ class UserErasureWorkerStore(CacheInvalidationWorkerStore):
Returns:
for each user, whether the user has requested erasure.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="erased_users",
- column="user_id",
- iterable=user_ids,
- retcols=("user_id",),
- desc="are_users_erased",
+ rows = cast(
+ List[Tuple[str]],
+ await self.db_pool.simple_select_many_batch(
+ table="erased_users",
+ column="user_id",
+ iterable=user_ids,
+ retcols=("user_id",),
+ desc="are_users_erased",
+ ),
)
- erased_users = {row["user_id"] for row in rows}
+ erased_users = {row[0] for row in rows}
return {u: u in erased_users for u in user_ids}
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 6984d11352..09d2a8c5b3 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -13,7 +13,17 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ Collection,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ cast,
+)
import attr
@@ -730,19 +740,22 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="state_group_edges",
- column="prev_state_group",
- iterable=state_groups_to_delete,
- keyvalues={},
- retcols=("state_group",),
+ rows = cast(
+ List[Tuple[int]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ retcols=("state_group",),
+ ),
)
remaining_state_groups = {
- row["state_group"]
- for row in rows
- if row["state_group"] not in state_groups_to_delete
+ state_group
+ for state_group, in rows
+ if state_group not in state_groups_to_delete
}
logger.info(
@@ -799,16 +812,19 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
A mapping from state group to previous state group.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="state_group_edges",
- column="prev_state_group",
- iterable=state_groups,
- keyvalues={},
- retcols=("prev_state_group", "state_group"),
- desc="get_previous_state_groups",
+ rows = cast(
+ List[Tuple[int, int]],
+ await self.db_pool.simple_select_many_batch(
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=state_groups,
+ keyvalues={},
+ retcols=("state_group", "prev_state_group"),
+ desc="get_previous_state_groups",
+ ),
)
- return {row["state_group"]: row["prev_state_group"] for row in rows}
+ return dict(rows)
async def purge_room_state(
self, room_id: str, state_groups_to_delete: Collection[int]
diff --git a/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql
new file mode 100644
index 0000000000..fc948166e6
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql
@@ -0,0 +1,20 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (8204, 'e2e_room_keys_index_room_id', '{}');
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (8204, 'room_account_data_index_room_id', '{}');
|