diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 25f70fee84..0be12f0e06 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -349,7 +349,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
table="devices",
column="user_id",
iterable=user_ids_to_query,
- keyvalues={"user_id": user_id, "hidden": False},
+ keyvalues={"hidden": False},
retcols=("device_id",),
)
@@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def delete_messages_for_device(
- self, user_id: str, device_id: Optional[str], up_to_stream_id: int
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ up_to_stream_id: int,
+ limit: int,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
+ limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
+ ROW_ID_NAME = self.database_engine.row_id_name
+
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
- sql = (
- "DELETE FROM device_inbox"
- " WHERE user_id = ? AND device_id = ?"
- " AND stream_id <= ?"
- )
+ sql = f"""
+ DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
+ SELECT {ROW_ID_NAME} FROM device_inbox
+ WHERE user_id = ? AND device_id = ? AND stream_id <= ?
+ LIMIT {limit}
+ )
+ """
txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
@@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": f"deleted {count} messages for device", "count": count})
+ # In this case we don't know if we hit the limit or the delete is complete
+ # so let's not update the cache.
+ if count == limit:
+ return count
+
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index e4162f846b..70faf4b1ec 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -759,18 +759,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
mapping of user_id -> device_id -> device_info.
"""
unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids}
- user_map = await self.get_device_list_last_stream_id_for_remotes(
- list(unique_user_ids)
- )
- # We go and check if any of the users need to have their device lists
- # resynced. If they do then we remove them from the cached list.
- users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
+ user_ids_in_cache = await self.get_users_whose_devices_are_cached(
unique_user_ids
)
- user_ids_in_cache = {
- user_id for user_id, stream_id in user_map.items() if stream_id
- } - users_needing_resync
user_ids_not_in_cache = unique_user_ids - user_ids_in_cache
# First fetch all the users which all devices are to be returned.
@@ -792,6 +784,22 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return user_ids_not_in_cache, results
+ async def get_users_whose_devices_are_cached(
+ self, user_ids: StrCollection
+ ) -> Set[str]:
+ """Checks which of the given users we have cached the devices for."""
+ user_map = await self.get_device_list_last_stream_id_for_remotes(user_ids)
+
+ # We go and check if any of the users need to have their device lists
+ # resynced. If they do then we remove them from the cached list.
+ users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
+ user_ids
+ )
+ user_ids_in_cache = {
+ user_id for user_id, stream_id in user_map.items() if stream_id
+ } - users_needing_resync
+ return user_ids_in_cache
+
@cached(num_args=2, tree=True)
async def _get_cached_user_device(self, user_id: str, device_id: str) -> JsonDict:
content = await self.db_pool.simple_select_one_onecol(
@@ -1766,14 +1774,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
self.db_pool.simple_delete_many_txn(
txn,
- table="device_inbox",
- column="device_id",
- values=device_ids,
- keyvalues={"user_id": user_id},
- )
-
- self.db_pool.simple_delete_many_txn(
- txn,
table="device_auth_providers",
column="device_id",
values=device_ids,
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 07bda7d6be..b958a39aeb 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1740,42 +1740,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# We sleep to ensure that we don't overwhelm the DB.
await self._clock.sleep(1.0)
-
-class EventPushActionsStore(EventPushActionsWorkerStore):
- EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
-
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- self.db_pool.updates.register_background_index_update(
- self.EPA_HIGHLIGHT_INDEX,
- index_name="event_push_actions_u_highlight",
- table="event_push_actions",
- columns=["user_id", "stream_ordering"],
- )
-
- self.db_pool.updates.register_background_index_update(
- "event_push_actions_highlights_index",
- index_name="event_push_actions_highlights_index",
- table="event_push_actions",
- columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
- where_clause="highlight=1",
- )
-
- # Add index to make deleting old push actions faster.
- self.db_pool.updates.register_background_index_update(
- "event_push_actions_stream_highlight_index",
- index_name="event_push_actions_stream_highlight_index",
- table="event_push_actions",
- columns=["highlight", "stream_ordering"],
- where_clause="highlight=0",
- )
-
async def get_push_actions_for_user(
self,
user_id: str,
@@ -1834,6 +1798,42 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
]
+class EventPushActionsStore(EventPushActionsWorkerStore):
+ EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
+
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
+ super().__init__(database, db_conn, hs)
+
+ self.db_pool.updates.register_background_index_update(
+ self.EPA_HIGHLIGHT_INDEX,
+ index_name="event_push_actions_u_highlight",
+ table="event_push_actions",
+ columns=["user_id", "stream_ordering"],
+ )
+
+ self.db_pool.updates.register_background_index_update(
+ "event_push_actions_highlights_index",
+ index_name="event_push_actions_highlights_index",
+ table="event_push_actions",
+ columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
+ where_clause="highlight=1",
+ )
+
+ # Add index to make deleting old push actions faster.
+ self.db_pool.updates.register_background_index_update(
+ "event_push_actions_stream_highlight_index",
+ index_name="event_push_actions_stream_highlight_index",
+ table="event_push_actions",
+ columns=["highlight", "stream_ordering"],
+ where_clause="highlight=0",
+ )
+
+
def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool:
for action in actions:
if not isinstance(action, dict):
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index a3b4744855..41563371dc 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -16,14 +16,17 @@
import itertools
import json
import logging
-from typing import Dict, Iterable, Mapping, Optional, Tuple
+from typing import Dict, Iterable, Optional, Tuple
+from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
from unpaddedbase64 import decode_base64
+from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.keys import FetchKeyResult, FetchKeyResultForRemote
from synapse.storage.types import Cursor
+from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.iterutils import batch_iter
@@ -36,162 +39,84 @@ db_binary_type = memoryview
class KeyStore(CacheInvalidationWorkerStore):
"""Persistence for signature verification keys"""
- @cached()
- def _get_server_signature_key(
- self, server_name_and_key_id: Tuple[str, str]
- ) -> FetchKeyResult:
- raise NotImplementedError()
-
- @cachedList(
- cached_method_name="_get_server_signature_key",
- list_name="server_name_and_key_ids",
- )
- async def get_server_signature_keys(
- self, server_name_and_key_ids: Iterable[Tuple[str, str]]
- ) -> Dict[Tuple[str, str], FetchKeyResult]:
- """
- Args:
- server_name_and_key_ids:
- iterable of (server_name, key-id) tuples to fetch keys for
-
- Returns:
- A map from (server_name, key_id) -> FetchKeyResult, or None if the
- key is unknown
- """
- keys = {}
-
- def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str], ...]) -> None:
- """Processes a batch of keys to fetch, and adds the result to `keys`."""
-
- # batch_iter always returns tuples so it's safe to do len(batch)
- sql = """
- SELECT server_name, key_id, verify_key, ts_valid_until_ms
- FROM server_signature_keys WHERE 1=0
- """ + " OR (server_name=? AND key_id=?)" * len(
- batch
- )
-
- txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
-
- for row in txn:
- server_name, key_id, key_bytes, ts_valid_until_ms = row
-
- if ts_valid_until_ms is None:
- # Old keys may be stored with a ts_valid_until_ms of null,
- # in which case we treat this as if it was set to `0`, i.e.
- # it won't match key requests that define a minimum
- # `ts_valid_until_ms`.
- ts_valid_until_ms = 0
-
- keys[(server_name, key_id)] = FetchKeyResult(
- verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
- valid_until_ts=ts_valid_until_ms,
- )
-
- def _txn(txn: Cursor) -> Dict[Tuple[str, str], FetchKeyResult]:
- for batch in batch_iter(server_name_and_key_ids, 50):
- _get_keys(txn, batch)
- return keys
-
- return await self.db_pool.runInteraction("get_server_signature_keys", _txn)
-
- async def store_server_signature_keys(
+ async def store_server_keys_response(
self,
+ server_name: str,
from_server: str,
ts_added_ms: int,
- verify_keys: Mapping[Tuple[str, str], FetchKeyResult],
+ verify_keys: Dict[str, FetchKeyResult],
+ response_json: JsonDict,
) -> None:
- """Stores NACL verification keys for remote servers.
+ """Stores the keys for the given server that we got from `from_server`.
+
Args:
- from_server: Where the verification keys were looked up
- ts_added_ms: The time to record that the key was added
- verify_keys:
- keys to be stored. Each entry is a triplet of
- (server_name, key_id, key).
+ server_name: The owner of the keys
+ from_server: Which server we got the keys from
+ ts_added_ms: When we're adding the keys
+ verify_keys: The decoded keys
+ response_json: The full *signed* response JSON that contains the keys.
"""
- key_values = []
- value_values = []
- invalidations = []
- for (server_name, key_id), fetch_result in verify_keys.items():
- key_values.append((server_name, key_id))
- value_values.append(
- (
- from_server,
- ts_added_ms,
- fetch_result.valid_until_ts,
- db_binary_type(fetch_result.verify_key.encode()),
- )
- )
- # invalidate takes a tuple corresponding to the params of
- # _get_server_signature_key. _get_server_signature_key only takes one
- # param, which is itself the 2-tuple (server_name, key_id).
- invalidations.append((server_name, key_id))
- await self.db_pool.simple_upsert_many(
- table="server_signature_keys",
- key_names=("server_name", "key_id"),
- key_values=key_values,
- value_names=(
- "from_server",
- "ts_added_ms",
- "ts_valid_until_ms",
- "verify_key",
- ),
- value_values=value_values,
- desc="store_server_signature_keys",
- )
+ key_json_bytes = encode_canonical_json(response_json)
+
+ def store_server_keys_response_txn(txn: LoggingTransaction) -> None:
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="server_signature_keys",
+ key_names=("server_name", "key_id"),
+ key_values=[(server_name, key_id) for key_id in verify_keys],
+ value_names=(
+ "from_server",
+ "ts_added_ms",
+ "ts_valid_until_ms",
+ "verify_key",
+ ),
+ value_values=[
+ (
+ from_server,
+ ts_added_ms,
+ fetch_result.valid_until_ts,
+ db_binary_type(fetch_result.verify_key.encode()),
+ )
+ for fetch_result in verify_keys.values()
+ ],
+ )
- invalidate = self._get_server_signature_key.invalidate
- for i in invalidations:
- invalidate((i,))
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="server_keys_json",
+ key_names=("server_name", "key_id", "from_server"),
+ key_values=[
+ (server_name, key_id, from_server) for key_id in verify_keys
+ ],
+ value_names=(
+ "ts_added_ms",
+ "ts_valid_until_ms",
+ "key_json",
+ ),
+ value_values=[
+ (
+ ts_added_ms,
+ fetch_result.valid_until_ts,
+ db_binary_type(key_json_bytes),
+ )
+ for fetch_result in verify_keys.values()
+ ],
+ )
- async def store_server_keys_json(
- self,
- server_name: str,
- key_id: str,
- from_server: str,
- ts_now_ms: int,
- ts_expires_ms: int,
- key_json_bytes: bytes,
- ) -> None:
- """Stores the JSON bytes for a set of keys from a server
- The JSON should be signed by the originating server, the intermediate
- server, and by this server. Updates the value for the
- (server_name, key_id, from_server) triplet if one already existed.
- Args:
- server_name: The name of the server.
- key_id: The identifier of the key this JSON is for.
- from_server: The server this JSON was fetched from.
- ts_now_ms: The time now in milliseconds.
- ts_valid_until_ms: The time when this json stops being valid.
- key_json_bytes: The encoded JSON.
- """
- await self.db_pool.simple_upsert(
- table="server_keys_json",
- keyvalues={
- "server_name": server_name,
- "key_id": key_id,
- "from_server": from_server,
- },
- values={
- "server_name": server_name,
- "key_id": key_id,
- "from_server": from_server,
- "ts_added_ms": ts_now_ms,
- "ts_valid_until_ms": ts_expires_ms,
- "key_json": db_binary_type(key_json_bytes),
- },
- desc="store_server_keys_json",
- )
+ # invalidate takes a tuple corresponding to the params of
+ # _get_server_keys_json. _get_server_keys_json only takes one
+ # param, which is itself the 2-tuple (server_name, key_id).
+ for key_id in verify_keys:
+ self._invalidate_cache_and_stream(
+ txn, self._get_server_keys_json, ((server_name, key_id),)
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_server_key_json_for_remote, (server_name, key_id)
+ )
- # invalidate takes a tuple corresponding to the params of
- # _get_server_keys_json. _get_server_keys_json only takes one
- # param, which is itself the 2-tuple (server_name, key_id).
- await self.invalidate_cache_and_stream(
- "_get_server_keys_json", ((server_name, key_id),)
- )
- await self.invalidate_cache_and_stream(
- "get_server_key_json_for_remote", (server_name, key_id)
+ await self.db_pool.runInteraction(
+ "store_server_keys_response", store_server_keys_response_txn
)
@cached()
@@ -221,12 +146,17 @@ class KeyStore(CacheInvalidationWorkerStore):
"""Processes a batch of keys to fetch, and adds the result to `keys`."""
# batch_iter always returns tuples so it's safe to do len(batch)
- sql = """
- SELECT server_name, key_id, key_json, ts_valid_until_ms
- FROM server_keys_json WHERE 1=0
- """ + " OR (server_name=? AND key_id=?)" * len(
- batch
- )
+ where_clause = " OR (server_name=? AND key_id=?)" * len(batch)
+
+ # `server_keys_json` can have multiple entries per server (one per
+ # remote server we fetched from, if using perspectives). Order by
+ # `ts_added_ms` so the most recently fetched one always wins.
+ sql = f"""
+ SELECT server_name, key_id, key_json, ts_valid_until_ms
+ FROM server_keys_json WHERE 1=0
+ {where_clause}
+ ORDER BY ts_added_ms
+ """
txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index b52f48cf04..dea0e0458c 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -450,10 +450,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
- "insertion_events",
- "insertion_event_extremities",
- "insertion_event_edges",
- "batch_events",
"room_account_data",
"room_tags",
# "rooms" happens last, to keep the foreign keys in the other tables
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 5ee5c7ad9f..e4d10ff250 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
receipts."""
def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
- if isinstance(self.database_engine, PostgresEngine):
- ROW_ID_NAME = "ctid"
- else:
- ROW_ID_NAME = "rowid"
-
+ ROW_ID_NAME = self.database_engine.row_id_name
# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# The following query takes less than a minute on matrix.org.
|