diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 7aa24ccf21..b57e260fe0 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -45,7 +45,7 @@ class Databases(Generic[DataStoreT]):
"""
databases: List[DatabasePool]
- main: "DataStore" # FIXME: #11165: actually an instance of `main_store_class`
+ main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class`
state: StateGroupDataStore
persist_events: Optional[PersistEventsStore]
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index d7482a1f4e..07f9b65af3 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
# Invalidate the cache for any ignored users which were added or removed.
- for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
- self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.ignored_by,
+ [
+ (ignored_user_id,)
+ for ignored_user_id in (
+ previously_ignored_users ^ currently_ignored_users
+ )
+ ],
+ )
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
async def remove_account_data_for_user(
@@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
# Invalidate the cache for ignored users which were removed.
- for ignored_user_id in previously_ignored_users:
- self._invalidate_cache_and_stream(
- txn, self.ignored_by, (ignored_user_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.ignored_by,
+ [
+ (ignored_user_id,)
+ for ignored_user_id in previously_ignored_users
+ ],
+ )
# Invalidate for this user the cache tracking ignored users.
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 71eefe6b7c..659631ab65 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
user_id: str,
device_id: Optional[str],
up_to_stream_id: int,
- limit: Optional[int] = None,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
- limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
- def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
- limit_statement = "" if limit is None else f"LIMIT {limit}"
- sql = f"""
- DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
- SELECT MAX(stream_id) FROM (
- SELECT stream_id FROM device_inbox
- WHERE user_id = ? AND device_id = ? AND stream_id <= ?
- ORDER BY stream_id
- {limit_statement}
- ) AS q1
- )
- """
- txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
- return txn.rowcount
-
- count = await self.db_pool.runInteraction(
- "delete_messages_for_device", delete_messages_for_device_txn
- )
+ from_stream_id = None
+ count = 0
+ while True:
+ from_stream_id, loop_count = await self.delete_messages_for_device_between(
+ user_id,
+ device_id,
+ from_stream_id=from_stream_id,
+ to_stream_id=up_to_stream_id,
+ limit=1000,
+ )
+ count += loop_count
+ if from_stream_id is None:
+ break
log_kv({"message": f"deleted {count} messages for device", "count": count})
- # In this case we don't know if we hit the limit or the delete is complete
- # so let's not update the cache.
- if count == limit:
- return count
-
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
@@ -515,6 +503,74 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count
@trace
+ async def delete_messages_for_device_between(
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ from_stream_id: Optional[int],
+ to_stream_id: int,
+ limit: int,
+ ) -> Tuple[Optional[int], int]:
+ """Delete N device messages between the stream IDs, returning the
+ highest stream ID deleted (or None if all messages in the range have
+ been deleted) and the number of messages deleted.
+
+ This is more efficient than `delete_messages_for_device` when calling in
+ a loop to batch delete messages.
+ """
+
+ # Keeping track of a lower bound of stream ID where we've deleted
+ # everything below makes the queries much faster. Otherwise, every time
+ # we scan for rows to delete we'd re-scan across all the rows that have
+ # previously deleted (until the next table VACUUM).
+
+ if from_stream_id is None:
+ # Minimum device stream ID is 1.
+ from_stream_id = 0
+
+ def delete_messages_for_device_between_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[Optional[int], int]:
+ txn.execute(
+ """
+ SELECT MAX(stream_id) FROM (
+ SELECT stream_id FROM device_inbox
+ WHERE user_id = ? AND device_id = ?
+ AND ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id
+ LIMIT ?
+ ) AS d
+ """,
+ (user_id, device_id, from_stream_id, to_stream_id, limit),
+ )
+ row = txn.fetchone()
+ if row is None or row[0] is None:
+ return None, 0
+
+ (max_stream_id,) = row
+
+ txn.execute(
+ """
+ DELETE FROM device_inbox
+ WHERE user_id = ? AND device_id = ?
+ AND ? < stream_id AND stream_id <= ?
+ """,
+ (user_id, device_id, from_stream_id, max_stream_id),
+ )
+
+ num_deleted = txn.rowcount
+ if num_deleted < limit:
+ return None, num_deleted
+
+ return max_stream_id, num_deleted
+
+ return await self.db_pool.runInteraction(
+ "delete_messages_for_device_between",
+ delete_messages_for_device_between_txn,
+ db_autocommit=True, # We don't need to run in a transaction
+ )
+
+ @trace
async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
) -> Tuple[List[JsonDict], int]:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 8cb61eaee3..9e98729330 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
return otk_rows
+ async def get_master_cross_signing_key_updatable_before(
+ self, user_id: str
+ ) -> Tuple[bool, Optional[int]]:
+ """Get time before which a master cross-signing key may be replaced without UIA.
+
+ (UIA means "User-Interactive Auth".)
+
+ There are three cases to distinguish:
+ (1) No master cross-signing key.
+ (2) The key exists, but there is no replace-without-UI timestamp in the DB.
+ (3) The key exists, and has such a timestamp recorded.
+
+ Returns: a 2-tuple of:
+ - a boolean: is there a master cross-signing key already?
+ - an optional timestamp, directly taken from the DB.
+
+ In terms of the cases above, these are:
+ (1) (False, None).
+ (2) (True, None).
+ (3) (True, <timestamp in ms>).
+
+ """
+
+ def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]:
+ # We want to distinguish between three cases:
+ txn.execute(
+ """
+ SELECT updatable_without_uia_before_ms
+ FROM e2e_cross_signing_keys
+ WHERE user_id = ? AND keytype = 'master'
+ ORDER BY stream_id DESC
+ LIMIT 1
+ """,
+ (user_id,),
+ )
+ row = cast(Optional[Tuple[Optional[int]]], txn.fetchone())
+ if row is None:
+ return False, None
+ return True, row[0]
+
+ return await self.db_pool.runInteraction(
+ "e2e_cross_signing_keys",
+ impl,
+ )
+
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def __init__(
@@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
],
desc="add_e2e_signing_key",
)
+
+ async def allow_master_cross_signing_key_replacement_without_uia(
+ self, user_id: str, duration_ms: int
+ ) -> Optional[int]:
+ """Mark this user's latest master key as being replaceable without UIA.
+
+ Said replacement will only be permitted for a short time after calling this
+ function. That time period is controlled by the duration argument.
+
+ Returns:
+ None, if there is no such key.
+ Otherwise, the timestamp before which replacement is allowed without UIA.
+ """
+ timestamp = self._clock.time_msec() + duration_ms
+
+ def impl(txn: LoggingTransaction) -> Optional[int]:
+ txn.execute(
+ """
+ UPDATE e2e_cross_signing_keys
+ SET updatable_without_uia_before_ms = ?
+ WHERE stream_id = (
+ SELECT stream_id
+ FROM e2e_cross_signing_keys
+ WHERE user_id = ? AND keytype = 'master'
+ ORDER BY stream_id DESC
+ LIMIT 1
+ )
+ """,
+ (timestamp, user_id),
+ )
+ if txn.rowcount == 0:
+ return None
+
+ return timestamp
+
+ return await self.db_pool.runInteraction(
+ "allow_master_cross_signing_key_replacement_without_uia",
+ impl,
+ )
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 0061805150..0c91f19c8e 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -425,7 +425,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
"""Background update to clean out extremities that should have been
deleted previously.
- Mainly used to deal with the aftermath of #5269.
+ Mainly used to deal with the aftermath of https://github.com/matrix-org/synapse/issues/5269.
"""
# This works by first copying all existing forward extremities into the
@@ -558,7 +558,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
logger.info(
- "Deleted %d forward extremities of %d checked, to clean up #5269",
+ "Deleted %d forward extremities of %d checked, to clean up matrix-org/synapse#5269",
deleted,
len(original_set),
)
@@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
# Iterate the parent IDs and invalidate caches.
- for parent_id in {r[1] for r in relations_to_insert}:
- cache_tuple = (parent_id,)
- self._invalidate_cache_and_stream( # type: ignore[attr-defined]
- txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
- )
- self._invalidate_cache_and_stream( # type: ignore[attr-defined]
- txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
- )
+ cache_tuples = {(r[1],) for r in relations_to_insert}
+ self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
+ txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined]
+ )
+ self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
+ txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined]
+ )
if results:
latest_event_id = results[-1][0]
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e831146cca..a880788f00 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1312,7 +1312,8 @@ class EventsWorkerStore(SQLBaseStore):
room_version: Optional[RoomVersion]
if not room_version_id:
# this should only happen for out-of-band membership events which
- # arrived before #6983 landed. For all other events, we should have
+ # arrived before https://github.com/matrix-org/synapse/issues/6983
+ # landed. For all other events, we should have
# an entry in the 'rooms' table.
#
# However, the 'out_of_band_membership' flag is unreliable for older
@@ -1323,7 +1324,8 @@ class EventsWorkerStore(SQLBaseStore):
"Room %s for event %s is unknown" % (d["room_id"], event_id)
)
- # so, assuming this is an out-of-band-invite that arrived before #6983
+ # so, assuming this is an out-of-band-invite that arrived before
+ # https://github.com/matrix-org/synapse/issues/6983
# landed, we know that the room version must be v5 or earlier (because
# v6 hadn't been invented at that point, so invites from such rooms
# would have been rejected.)
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ce88772f9e..c700872fdc 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore):
# invalidate takes a tuple corresponding to the params of
# _get_server_keys_json. _get_server_keys_json only takes one
# param, which is itself the 2-tuple (server_name, key_id).
- for key_id in verify_keys:
- self._invalidate_cache_and_stream(
- txn, self._get_server_keys_json, ((server_name, key_id),)
- )
- self._invalidate_cache_and_stream(
- txn, self.get_server_key_json_for_remote, (server_name, key_id)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_server_keys_json,
+ [((server_name, key_id),) for key_id in verify_keys],
+ )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.get_server_key_json_for_remote,
+ [(server_name, key_id) for key_id in verify_keys],
+ )
await self.db_pool.runInteraction(
"store_server_keys_response", store_server_keys_response_txn
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 3f80a64dc5..149135b8b5 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -49,13 +49,14 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = (
class LocalMedia:
media_id: str
media_type: str
- media_length: int
+ media_length: Optional[int]
upload_name: str
created_ts: int
url_cache: Optional[str]
last_access_ts: int
quarantined_by: Optional[str]
safe_from_quarantine: bool
+ user_id: Optional[str]
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -149,6 +150,13 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
self._drop_media_index_without_method,
)
+ if hs.config.media.can_load_media_repo:
+ self.unused_expiration_time: Optional[
+ int
+ ] = hs.config.media.unused_expiration_time
+ else:
+ self.unused_expiration_time = None
+
async def _drop_media_index_without_method(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -202,6 +210,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"url_cache",
"last_access_ts",
"safe_from_quarantine",
+ "user_id",
),
allow_none=True,
desc="get_local_media",
@@ -218,6 +227,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
url_cache=row[5],
last_access_ts=row[6],
safe_from_quarantine=row[7],
+ user_id=row[8],
)
async def get_local_media_by_user_paginate(
@@ -272,7 +282,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
url_cache,
last_access_ts,
quarantined_by,
- safe_from_quarantine
+ safe_from_quarantine,
+ user_id
FROM local_media_repository
WHERE user_id = ?
ORDER BY {order_by_column} {order}, media_id ASC
@@ -295,6 +306,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
last_access_ts=row[6],
quarantined_by=row[7],
safe_from_quarantine=bool(row[8]),
+ user_id=row[9],
)
for row in txn
]
@@ -392,6 +404,23 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
)
@trace
+ async def store_local_media_id(
+ self,
+ media_id: str,
+ time_now_ms: int,
+ user_id: UserID,
+ ) -> None:
+ await self.db_pool.simple_insert(
+ "local_media_repository",
+ {
+ "media_id": media_id,
+ "created_ts": time_now_ms,
+ "user_id": user_id.to_string(),
+ },
+ desc="store_local_media_id",
+ )
+
+ @trace
async def store_local_media(
self,
media_id: str,
@@ -416,6 +445,30 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_local_media",
)
+ async def update_local_media(
+ self,
+ media_id: str,
+ media_type: str,
+ upload_name: Optional[str],
+ media_length: int,
+ user_id: UserID,
+ url_cache: Optional[str] = None,
+ ) -> None:
+ await self.db_pool.simple_update_one(
+ "local_media_repository",
+ keyvalues={
+ "user_id": user_id.to_string(),
+ "media_id": media_id,
+ },
+ updatevalues={
+ "media_type": media_type,
+ "upload_name": upload_name,
+ "media_length": media_length,
+ "url_cache": url_cache,
+ },
+ desc="update_local_media",
+ )
+
async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None:
"""Mark a local media as safe or unsafe from quarantining."""
await self.db_pool.simple_update_one(
@@ -425,6 +478,39 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="mark_local_media_as_safe",
)
+ async def count_pending_media(self, user_id: UserID) -> Tuple[int, int]:
+ """Count the number of pending media for a user.
+
+ Returns:
+ A tuple of two integers: the total pending media requests and the earliest
+ expiration timestamp.
+ """
+
+ def get_pending_media_txn(txn: LoggingTransaction) -> Tuple[int, int]:
+ sql = """
+ SELECT COUNT(*), MIN(created_ts)
+ FROM local_media_repository
+ WHERE user_id = ?
+ AND created_ts > ?
+ AND media_length IS NULL
+ """
+ assert self.unused_expiration_time is not None
+ txn.execute(
+ sql,
+ (
+ user_id.to_string(),
+ self._clock.time_msec() - self.unused_expiration_time,
+ ),
+ )
+ row = txn.fetchone()
+ if not row:
+ return 0, 0
+ return row[0], (row[1] + self.unused_expiration_time if row[1] else 0)
+
+ return await self.db_pool.runInteraction(
+ "get_pending_media", get_pending_media_txn
+ )
+
async def get_url_cache(self, url: str, ts: int) -> Optional[UrlCache]:
"""Get the media_id and ts for a cached URL as of the given timestamp
Returns:
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 4b1061e6d7..2911e53310 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -317,7 +317,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
if user_id:
is_support = self.is_support_user_txn(txn, user_id)
if not is_support:
- # We do this manually here to avoid hitting #6791
+ # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791
self.db_pool.simple_upsert_txn(
txn,
table="monthly_active_users",
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 3b444d2d07..0198bb09d2 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
)
- for user_id in user_ids:
- self._invalidate_cache_and_stream(
- txn, self._get_full_presence_stream_token_for_user, (user_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_full_presence_stream_token_for_user,
+ [(user_id,) for user_id in user_ids],
+ )
return await self.db_pool.runInteraction(
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 1e11bf2706..1a5b5731bb 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")
- for event_id, should_delete in event_rows:
- self._invalidate_cache_and_stream(
- txn, self._get_state_group_for_event, (event_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_state_group_for_event,
+ [(event_id,) for event_id, _ in event_rows],
+ )
- # XXX: This is racy, since have_seen_events could be called between the
- # transaction completing and the invalidation running. On the other hand,
- # that's no different to calling `have_seen_events` just before the
- # event is deleted from the database.
+ # XXX: This is racy, since have_seen_events could be called between the
+ # transaction completing and the invalidation running. On the other hand,
+ # that's no different to calling `have_seen_events` just before the
+ # event is deleted from the database.
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.have_seen_event,
+ [
+ (room_id, event_id)
+ for event_id, should_delete in event_rows
+ if should_delete
+ ],
+ )
+
+ for event_id, should_delete in event_rows:
if should_delete:
- self._invalidate_cache_and_stream(
- txn, self.have_seen_event, (room_id, event_id)
- )
self.invalidate_get_event_cache_after_txn(txn, event_id)
logger.info("[purge] done")
@@ -485,7 +494,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# - room_tags_revisions
# The problem with these is that they are largeish and there is no room_id
# index on them. In any case we should be clearing out 'stream' tables
- # periodically anyway (#5888)
+ # periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
self._invalidate_caches_for_room_and_stream(txn, room_id)
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index f72a23c584..cf622e195c 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -449,26 +449,28 @@ class PushRuleStore(PushRulesWorkerStore):
before: str,
after: str,
) -> None:
- # Lock the table since otherwise we'll have annoying races between the
- # SELECT here and the UPSERT below.
- self.database_engine.lock_table(txn, "push_rules")
-
relative_to_rule = before or after
- res = self.db_pool.simple_select_one_txn(
- txn,
- table="push_rules",
- keyvalues={"user_name": user_id, "rule_id": relative_to_rule},
- retcols=["priority_class", "priority"],
- allow_none=True,
- )
+ sql = """
+ SELECT priority, priority_class FROM push_rules
+ WHERE user_name = ? AND rule_id = ?
+ """
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql += " FOR UPDATE"
+ else:
+ # Annoyingly SQLite doesn't support row level locking, so lock the whole table
+ self.database_engine.lock_table(txn, "push_rules")
+
+ txn.execute(sql, (user_id, relative_to_rule))
+ row = txn.fetchone()
- if not res:
+ if row is None:
raise RuleNotFoundException(
"before/after rule not found: %s" % (relative_to_rule,)
)
- base_priority_class, base_rule_priority = res
+ base_rule_priority, base_priority_class = row
if base_priority_class != priority_class:
raise InconsistentRuleException(
@@ -516,9 +518,18 @@ class PushRuleStore(PushRulesWorkerStore):
conditions_json: str,
actions_json: str,
) -> None:
- # Lock the table since otherwise we'll have annoying races between the
- # SELECT here and the UPSERT below.
- self.database_engine.lock_table(txn, "push_rules")
+ if isinstance(self.database_engine, PostgresEngine):
+ # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first
+ # then re-select the count/max below.
+ sql = """
+ SELECT * FROM push_rules
+ WHERE user_name = ? and priority_class = ?
+ FOR UPDATE
+ """
+ txn.execute(sql, (user_id, priority_class))
+ else:
+ # Annoyingly SQLite doesn't support row level locking, so lock the whole table
+ self.database_engine.lock_table(txn, "push_rules")
# find the highest priority rule in that class
sql = (
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index dec9858575..2c3f30e2eb 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
updatevalues={"shadow_banned": shadow_banned},
)
# In order for this to apply immediately, clear the cache for this user.
- tokens = self.db_pool.simple_select_onecol_txn(
+ tokens = self.db_pool.simple_select_list_txn(
txn,
table="access_tokens",
keyvalues={"user_id": user_id},
- retcol="token",
+ retcols=("token",),
+ )
+ self._invalidate_cache_and_stream_bulk(
+ txn, self.get_user_by_access_token, tokens
)
- for token in tokens:
- self._invalidate_cache_and_stream(
- txn, self.get_user_by_access_token, (token,)
- )
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
@@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
)
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
- for token, _, _ in tokens_and_devices:
- self._invalidate_cache_and_stream(
- txn, self.get_user_by_access_token, (token,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.get_user_by_access_token,
+ [(token,) for token, _, _ in tokens_and_devices],
+ )
txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f4bef4c99b..e25d86818b 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
- conn.set_session(autocommit=True)
+ conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
c = conn.cursor()
@@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we should now be able to delete the GIST index.
c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
finally:
- conn.set_session(autocommit=False)
+ conn.engine.attempt_to_set_autocommit(conn.conn, False)
if isinstance(self.database_engine, PostgresEngine):
await self.db_pool.runWithConnection(create_index)
@@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
def create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
- conn.set_session(autocommit=True)
+ conn.engine.attempt_to_set_autocommit(conn.conn, True)
c = conn.cursor()
# We create with NULLS FIRST so that when we search *backwards*
@@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
"""
)
- conn.set_session(autocommit=False)
+ conn.engine.attempt_to_set_autocommit(conn.conn, False)
await self.db_pool.runWithConnection(create_index)
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 0f9c550b27..2c3151526d 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
conn.rollback()
if isinstance(self.database_engine, PostgresEngine):
# postgres insists on autocommit for the index
- conn.set_session(autocommit=True)
+ conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
txn = conn.cursor()
txn.execute(
@@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
)
txn.execute("DROP INDEX IF EXISTS state_groups_state_id")
finally:
- conn.set_session(autocommit=False)
+ conn.engine.attempt_to_set_autocommit(conn.conn, False)
else:
txn = conn.cursor()
txn.execute(
|