diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 7426dbcad6..62fbd05534 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -49,7 +49,11 @@ else:
if TYPE_CHECKING:
from synapse.server import HomeServer
- from synapse.storage.database import DatabasePool, LoggingTransaction
+ from synapse.storage.database import (
+ DatabasePool,
+ LoggingDatabaseConnection,
+ LoggingTransaction,
+ )
logger = logging.getLogger(__name__)
@@ -746,10 +750,10 @@ class BackgroundUpdater:
The named index will be dropped upon completion of the new index.
"""
- def create_index_psql(conn: Connection) -> None:
+ def create_index_psql(conn: "LoggingDatabaseConnection") -> None:
conn.rollback()
# postgres insists on autocommit for the index
- conn.set_session(autocommit=True) # type: ignore
+ conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
c = conn.cursor()
@@ -793,9 +797,9 @@ class BackgroundUpdater:
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
conn.cursor().execute(undo_timeout_sql)
- conn.set_session(autocommit=False) # type: ignore
+ conn.engine.attempt_to_set_autocommit(conn.conn, False)
- def create_index_sqlite(conn: Connection) -> None:
+ def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
# Sqlite doesn't support concurrent creation of indexes.
#
# We assume that sqlite doesn't give us invalid indices; however
@@ -825,7 +829,9 @@ class BackgroundUpdater:
c.execute(sql)
if isinstance(self.db_pool.engine, engines.PostgresEngine):
- runner: Optional[Callable[[Connection], None]] = create_index_psql
+ runner: Optional[
+ Callable[[LoggingDatabaseConnection], None]
+ ] = create_index_psql
elif psql_only:
runner = None
else:
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 7aa24ccf21..b57e260fe0 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -45,7 +45,7 @@ class Databases(Generic[DataStoreT]):
"""
databases: List[DatabasePool]
- main: "DataStore" # FIXME: #11165: actually an instance of `main_store_class`
+ main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class`
state: StateGroupDataStore
persist_events: Optional[PersistEventsStore]
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index d7482a1f4e..07f9b65af3 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
# Invalidate the cache for any ignored users which were added or removed.
- for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
- self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.ignored_by,
+ [
+ (ignored_user_id,)
+ for ignored_user_id in (
+ previously_ignored_users ^ currently_ignored_users
+ )
+ ],
+ )
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
async def remove_account_data_for_user(
@@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
# Invalidate the cache for ignored users which were removed.
- for ignored_user_id in previously_ignored_users:
- self._invalidate_cache_and_stream(
- txn, self.ignored_by, (ignored_user_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.ignored_by,
+ [
+ (ignored_user_id,)
+ for ignored_user_id in previously_ignored_users
+ ],
+ )
# Invalidate for this user the cache tracking ignored users.
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 71eefe6b7c..659631ab65 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
user_id: str,
device_id: Optional[str],
up_to_stream_id: int,
- limit: Optional[int] = None,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
- limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
- def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
- limit_statement = "" if limit is None else f"LIMIT {limit}"
- sql = f"""
- DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
- SELECT MAX(stream_id) FROM (
- SELECT stream_id FROM device_inbox
- WHERE user_id = ? AND device_id = ? AND stream_id <= ?
- ORDER BY stream_id
- {limit_statement}
- ) AS q1
- )
- """
- txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
- return txn.rowcount
-
- count = await self.db_pool.runInteraction(
- "delete_messages_for_device", delete_messages_for_device_txn
- )
+ from_stream_id = None
+ count = 0
+ while True:
+ from_stream_id, loop_count = await self.delete_messages_for_device_between(
+ user_id,
+ device_id,
+ from_stream_id=from_stream_id,
+ to_stream_id=up_to_stream_id,
+ limit=1000,
+ )
+ count += loop_count
+ if from_stream_id is None:
+ break
log_kv({"message": f"deleted {count} messages for device", "count": count})
- # In this case we don't know if we hit the limit or the delete is complete
- # so let's not update the cache.
- if count == limit:
- return count
-
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
@@ -515,6 +503,74 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count
@trace
+ async def delete_messages_for_device_between(
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ from_stream_id: Optional[int],
+ to_stream_id: int,
+ limit: int,
+ ) -> Tuple[Optional[int], int]:
+ """Delete N device messages between the stream IDs, returning the
+ highest stream ID deleted (or None if all messages in the range have
+ been deleted) and the number of messages deleted.
+
+ This is more efficient than `delete_messages_for_device` when calling in
+ a loop to batch delete messages.
+ """
+
+ # Keeping track of a lower bound of stream ID where we've deleted
+ # everything below makes the queries much faster. Otherwise, every time
+ # we scan for rows to delete we'd re-scan across all the rows that have
+ # previously deleted (until the next table VACUUM).
+
+ if from_stream_id is None:
+ # Minimum device stream ID is 1.
+ from_stream_id = 0
+
+ def delete_messages_for_device_between_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[Optional[int], int]:
+ txn.execute(
+ """
+ SELECT MAX(stream_id) FROM (
+ SELECT stream_id FROM device_inbox
+ WHERE user_id = ? AND device_id = ?
+ AND ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id
+ LIMIT ?
+ ) AS d
+ """,
+ (user_id, device_id, from_stream_id, to_stream_id, limit),
+ )
+ row = txn.fetchone()
+ if row is None or row[0] is None:
+ return None, 0
+
+ (max_stream_id,) = row
+
+ txn.execute(
+ """
+ DELETE FROM device_inbox
+ WHERE user_id = ? AND device_id = ?
+ AND ? < stream_id AND stream_id <= ?
+ """,
+ (user_id, device_id, from_stream_id, max_stream_id),
+ )
+
+ num_deleted = txn.rowcount
+ if num_deleted < limit:
+ return None, num_deleted
+
+ return max_stream_id, num_deleted
+
+ return await self.db_pool.runInteraction(
+ "delete_messages_for_device_between",
+ delete_messages_for_device_between_txn,
+ db_autocommit=True, # We don't need to run in a transaction
+ )
+
+ @trace
async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
) -> Tuple[List[JsonDict], int]:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 8cb61eaee3..9e98729330 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
return otk_rows
+ async def get_master_cross_signing_key_updatable_before(
+ self, user_id: str
+ ) -> Tuple[bool, Optional[int]]:
+ """Get time before which a master cross-signing key may be replaced without UIA.
+
+ (UIA means "User-Interactive Auth".)
+
+ There are three cases to distinguish:
+ (1) No master cross-signing key.
+ (2) The key exists, but there is no replace-without-UI timestamp in the DB.
+ (3) The key exists, and has such a timestamp recorded.
+
+ Returns: a 2-tuple of:
+ - a boolean: is there a master cross-signing key already?
+ - an optional timestamp, directly taken from the DB.
+
+ In terms of the cases above, these are:
+ (1) (False, None).
+ (2) (True, None).
+ (3) (True, <timestamp in ms>).
+
+ """
+
+ def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]:
+ # We want to distinguish between three cases:
+ txn.execute(
+ """
+ SELECT updatable_without_uia_before_ms
+ FROM e2e_cross_signing_keys
+ WHERE user_id = ? AND keytype = 'master'
+ ORDER BY stream_id DESC
+ LIMIT 1
+ """,
+ (user_id,),
+ )
+ row = cast(Optional[Tuple[Optional[int]]], txn.fetchone())
+ if row is None:
+ return False, None
+ return True, row[0]
+
+ return await self.db_pool.runInteraction(
+ "e2e_cross_signing_keys",
+ impl,
+ )
+
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def __init__(
@@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
],
desc="add_e2e_signing_key",
)
+
+ async def allow_master_cross_signing_key_replacement_without_uia(
+ self, user_id: str, duration_ms: int
+ ) -> Optional[int]:
+ """Mark this user's latest master key as being replaceable without UIA.
+
+ Said replacement will only be permitted for a short time after calling this
+ function. That time period is controlled by the duration argument.
+
+ Returns:
+ None, if there is no such key.
+ Otherwise, the timestamp before which replacement is allowed without UIA.
+ """
+ timestamp = self._clock.time_msec() + duration_ms
+
+ def impl(txn: LoggingTransaction) -> Optional[int]:
+ txn.execute(
+ """
+ UPDATE e2e_cross_signing_keys
+ SET updatable_without_uia_before_ms = ?
+ WHERE stream_id = (
+ SELECT stream_id
+ FROM e2e_cross_signing_keys
+ WHERE user_id = ? AND keytype = 'master'
+ ORDER BY stream_id DESC
+ LIMIT 1
+ )
+ """,
+ (timestamp, user_id),
+ )
+ if txn.rowcount == 0:
+ return None
+
+ return timestamp
+
+ return await self.db_pool.runInteraction(
+ "allow_master_cross_signing_key_replacement_without_uia",
+ impl,
+ )
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 0061805150..0c91f19c8e 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -425,7 +425,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
"""Background update to clean out extremities that should have been
deleted previously.
- Mainly used to deal with the aftermath of #5269.
+ Mainly used to deal with the aftermath of https://github.com/matrix-org/synapse/issues/5269.
"""
# This works by first copying all existing forward extremities into the
@@ -558,7 +558,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
logger.info(
- "Deleted %d forward extremities of %d checked, to clean up #5269",
+ "Deleted %d forward extremities of %d checked, to clean up matrix-org/synapse#5269",
deleted,
len(original_set),
)
@@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
# Iterate the parent IDs and invalidate caches.
- for parent_id in {r[1] for r in relations_to_insert}:
- cache_tuple = (parent_id,)
- self._invalidate_cache_and_stream( # type: ignore[attr-defined]
- txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
- )
- self._invalidate_cache_and_stream( # type: ignore[attr-defined]
- txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
- )
+ cache_tuples = {(r[1],) for r in relations_to_insert}
+ self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
+ txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined]
+ )
+ self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
+ txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined]
+ )
if results:
latest_event_id = results[-1][0]
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e831146cca..a880788f00 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1312,7 +1312,8 @@ class EventsWorkerStore(SQLBaseStore):
room_version: Optional[RoomVersion]
if not room_version_id:
# this should only happen for out-of-band membership events which
- # arrived before #6983 landed. For all other events, we should have
+ # arrived before https://github.com/matrix-org/synapse/issues/6983
+ # landed. For all other events, we should have
# an entry in the 'rooms' table.
#
# However, the 'out_of_band_membership' flag is unreliable for older
@@ -1323,7 +1324,8 @@ class EventsWorkerStore(SQLBaseStore):
"Room %s for event %s is unknown" % (d["room_id"], event_id)
)
- # so, assuming this is an out-of-band-invite that arrived before #6983
+ # so, assuming this is an out-of-band-invite that arrived before
+ # https://github.com/matrix-org/synapse/issues/6983
# landed, we know that the room version must be v5 or earlier (because
# v6 hadn't been invented at that point, so invites from such rooms
# would have been rejected.)
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ce88772f9e..c700872fdc 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore):
# invalidate takes a tuple corresponding to the params of
# _get_server_keys_json. _get_server_keys_json only takes one
# param, which is itself the 2-tuple (server_name, key_id).
- for key_id in verify_keys:
- self._invalidate_cache_and_stream(
- txn, self._get_server_keys_json, ((server_name, key_id),)
- )
- self._invalidate_cache_and_stream(
- txn, self.get_server_key_json_for_remote, (server_name, key_id)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_server_keys_json,
+ [((server_name, key_id),) for key_id in verify_keys],
+ )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.get_server_key_json_for_remote,
+ [(server_name, key_id) for key_id in verify_keys],
+ )
await self.db_pool.runInteraction(
"store_server_keys_response", store_server_keys_response_txn
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 3f80a64dc5..149135b8b5 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -49,13 +49,14 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = (
class LocalMedia:
media_id: str
media_type: str
- media_length: int
+ media_length: Optional[int]
upload_name: str
created_ts: int
url_cache: Optional[str]
last_access_ts: int
quarantined_by: Optional[str]
safe_from_quarantine: bool
+ user_id: Optional[str]
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -149,6 +150,13 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
self._drop_media_index_without_method,
)
+ if hs.config.media.can_load_media_repo:
+ self.unused_expiration_time: Optional[
+ int
+ ] = hs.config.media.unused_expiration_time
+ else:
+ self.unused_expiration_time = None
+
async def _drop_media_index_without_method(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -202,6 +210,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"url_cache",
"last_access_ts",
"safe_from_quarantine",
+ "user_id",
),
allow_none=True,
desc="get_local_media",
@@ -218,6 +227,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
url_cache=row[5],
last_access_ts=row[6],
safe_from_quarantine=row[7],
+ user_id=row[8],
)
async def get_local_media_by_user_paginate(
@@ -272,7 +282,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
url_cache,
last_access_ts,
quarantined_by,
- safe_from_quarantine
+ safe_from_quarantine,
+ user_id
FROM local_media_repository
WHERE user_id = ?
ORDER BY {order_by_column} {order}, media_id ASC
@@ -295,6 +306,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
last_access_ts=row[6],
quarantined_by=row[7],
safe_from_quarantine=bool(row[8]),
+ user_id=row[9],
)
for row in txn
]
@@ -392,6 +404,23 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
)
@trace
+ async def store_local_media_id(
+ self,
+ media_id: str,
+ time_now_ms: int,
+ user_id: UserID,
+ ) -> None:
+ await self.db_pool.simple_insert(
+ "local_media_repository",
+ {
+ "media_id": media_id,
+ "created_ts": time_now_ms,
+ "user_id": user_id.to_string(),
+ },
+ desc="store_local_media_id",
+ )
+
+ @trace
async def store_local_media(
self,
media_id: str,
@@ -416,6 +445,30 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_local_media",
)
+ async def update_local_media(
+ self,
+ media_id: str,
+ media_type: str,
+ upload_name: Optional[str],
+ media_length: int,
+ user_id: UserID,
+ url_cache: Optional[str] = None,
+ ) -> None:
+ await self.db_pool.simple_update_one(
+ "local_media_repository",
+ keyvalues={
+ "user_id": user_id.to_string(),
+ "media_id": media_id,
+ },
+ updatevalues={
+ "media_type": media_type,
+ "upload_name": upload_name,
+ "media_length": media_length,
+ "url_cache": url_cache,
+ },
+ desc="update_local_media",
+ )
+
async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None:
"""Mark a local media as safe or unsafe from quarantining."""
await self.db_pool.simple_update_one(
@@ -425,6 +478,39 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="mark_local_media_as_safe",
)
+ async def count_pending_media(self, user_id: UserID) -> Tuple[int, int]:
+ """Count the number of pending media for a user.
+
+ Returns:
+ A tuple of two integers: the total pending media requests and the earliest
+ expiration timestamp.
+ """
+
+ def get_pending_media_txn(txn: LoggingTransaction) -> Tuple[int, int]:
+ sql = """
+ SELECT COUNT(*), MIN(created_ts)
+ FROM local_media_repository
+ WHERE user_id = ?
+ AND created_ts > ?
+ AND media_length IS NULL
+ """
+ assert self.unused_expiration_time is not None
+ txn.execute(
+ sql,
+ (
+ user_id.to_string(),
+ self._clock.time_msec() - self.unused_expiration_time,
+ ),
+ )
+ row = txn.fetchone()
+ if not row:
+ return 0, 0
+ return row[0], (row[1] + self.unused_expiration_time if row[1] else 0)
+
+ return await self.db_pool.runInteraction(
+ "get_pending_media", get_pending_media_txn
+ )
+
async def get_url_cache(self, url: str, ts: int) -> Optional[UrlCache]:
"""Get the media_id and ts for a cached URL as of the given timestamp
Returns:
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 4b1061e6d7..2911e53310 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -317,7 +317,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
if user_id:
is_support = self.is_support_user_txn(txn, user_id)
if not is_support:
- # We do this manually here to avoid hitting #6791
+ # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791
self.db_pool.simple_upsert_txn(
txn,
table="monthly_active_users",
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 3b444d2d07..0198bb09d2 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
)
- for user_id in user_ids:
- self._invalidate_cache_and_stream(
- txn, self._get_full_presence_stream_token_for_user, (user_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_full_presence_stream_token_for_user,
+ [(user_id,) for user_id in user_ids],
+ )
return await self.db_pool.runInteraction(
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 1e11bf2706..1a5b5731bb 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")
- for event_id, should_delete in event_rows:
- self._invalidate_cache_and_stream(
- txn, self._get_state_group_for_event, (event_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_state_group_for_event,
+ [(event_id,) for event_id, _ in event_rows],
+ )
- # XXX: This is racy, since have_seen_events could be called between the
- # transaction completing and the invalidation running. On the other hand,
- # that's no different to calling `have_seen_events` just before the
- # event is deleted from the database.
+ # XXX: This is racy, since have_seen_events could be called between the
+ # transaction completing and the invalidation running. On the other hand,
+ # that's no different to calling `have_seen_events` just before the
+ # event is deleted from the database.
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.have_seen_event,
+ [
+ (room_id, event_id)
+ for event_id, should_delete in event_rows
+ if should_delete
+ ],
+ )
+
+ for event_id, should_delete in event_rows:
if should_delete:
- self._invalidate_cache_and_stream(
- txn, self.have_seen_event, (room_id, event_id)
- )
self.invalidate_get_event_cache_after_txn(txn, event_id)
logger.info("[purge] done")
@@ -485,7 +494,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# - room_tags_revisions
# The problem with these is that they are largeish and there is no room_id
# index on them. In any case we should be clearing out 'stream' tables
- # periodically anyway (#5888)
+ # periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
self._invalidate_caches_for_room_and_stream(txn, room_id)
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index f72a23c584..cf622e195c 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -449,26 +449,28 @@ class PushRuleStore(PushRulesWorkerStore):
before: str,
after: str,
) -> None:
- # Lock the table since otherwise we'll have annoying races between the
- # SELECT here and the UPSERT below.
- self.database_engine.lock_table(txn, "push_rules")
-
relative_to_rule = before or after
- res = self.db_pool.simple_select_one_txn(
- txn,
- table="push_rules",
- keyvalues={"user_name": user_id, "rule_id": relative_to_rule},
- retcols=["priority_class", "priority"],
- allow_none=True,
- )
+ sql = """
+ SELECT priority, priority_class FROM push_rules
+ WHERE user_name = ? AND rule_id = ?
+ """
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql += " FOR UPDATE"
+ else:
+ # Annoyingly SQLite doesn't support row level locking, so lock the whole table
+ self.database_engine.lock_table(txn, "push_rules")
+
+ txn.execute(sql, (user_id, relative_to_rule))
+ row = txn.fetchone()
- if not res:
+ if row is None:
raise RuleNotFoundException(
"before/after rule not found: %s" % (relative_to_rule,)
)
- base_priority_class, base_rule_priority = res
+ base_rule_priority, base_priority_class = row
if base_priority_class != priority_class:
raise InconsistentRuleException(
@@ -516,9 +518,18 @@ class PushRuleStore(PushRulesWorkerStore):
conditions_json: str,
actions_json: str,
) -> None:
- # Lock the table since otherwise we'll have annoying races between the
- # SELECT here and the UPSERT below.
- self.database_engine.lock_table(txn, "push_rules")
+ if isinstance(self.database_engine, PostgresEngine):
+ # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first
+ # then re-select the count/max below.
+ sql = """
+ SELECT * FROM push_rules
+ WHERE user_name = ? and priority_class = ?
+ FOR UPDATE
+ """
+ txn.execute(sql, (user_id, priority_class))
+ else:
+ # Annoyingly SQLite doesn't support row level locking, so lock the whole table
+ self.database_engine.lock_table(txn, "push_rules")
# find the highest priority rule in that class
sql = (
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index dec9858575..2c3f30e2eb 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
updatevalues={"shadow_banned": shadow_banned},
)
# In order for this to apply immediately, clear the cache for this user.
- tokens = self.db_pool.simple_select_onecol_txn(
+ tokens = self.db_pool.simple_select_list_txn(
txn,
table="access_tokens",
keyvalues={"user_id": user_id},
- retcol="token",
+ retcols=("token",),
+ )
+ self._invalidate_cache_and_stream_bulk(
+ txn, self.get_user_by_access_token, tokens
)
- for token in tokens:
- self._invalidate_cache_and_stream(
- txn, self.get_user_by_access_token, (token,)
- )
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
@@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
)
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
- for token, _, _ in tokens_and_devices:
- self._invalidate_cache_and_stream(
- txn, self.get_user_by_access_token, (token,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.get_user_by_access_token,
+ [(token,) for token, _, _ in tokens_and_devices],
+ )
txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f4bef4c99b..e25d86818b 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
- conn.set_session(autocommit=True)
+ conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
c = conn.cursor()
@@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we should now be able to delete the GIST index.
c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
finally:
- conn.set_session(autocommit=False)
+ conn.engine.attempt_to_set_autocommit(conn.conn, False)
if isinstance(self.database_engine, PostgresEngine):
await self.db_pool.runWithConnection(create_index)
@@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
def create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
- conn.set_session(autocommit=True)
+ conn.engine.attempt_to_set_autocommit(conn.conn, True)
c = conn.cursor()
# We create with NULLS FIRST so that when we search *backwards*
@@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
"""
)
- conn.set_session(autocommit=False)
+ conn.engine.attempt_to_set_autocommit(conn.conn, False)
await self.db_pool.runWithConnection(create_index)
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 0f9c550b27..2c3151526d 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
conn.rollback()
if isinstance(self.database_engine, PostgresEngine):
# postgres insists on autocommit for the index
- conn.set_session(autocommit=True)
+ conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
txn = conn.cursor()
txn.execute(
@@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
)
txn.execute("DROP INDEX IF EXISTS state_groups_state_id")
finally:
- conn.set_session(autocommit=False)
+ conn.engine.attempt_to_set_autocommit(conn.conn, False)
else:
txn = conn.cursor()
txn.execute(
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 6309363217..ec4c4041b7 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -38,7 +38,8 @@ class PostgresEngine(
super().__init__(psycopg2, database_config)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
- # Disables passing `bytes` to txn.execute, c.f. #6186. If you do
+ # Disables passing `bytes` to txn.execute, c.f.
+ # https://github.com/matrix-org/synapse/issues/6186. If you do
# actually want to use bytes than wrap it in `bytearray`.
def _disable_bytes_adapter(_: bytes) -> NoReturn:
raise Exception("Passing bytes to DB is disabled.")
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 158b528dce..03e5a0f55d 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -109,7 +109,8 @@ Changes in SCHEMA_VERSION = 78
Changes in SCHEMA_VERSION = 79
- Add tables to handle in DB read-write locks.
- - Add some mitigations for a painful race between foreground and background updates, cf #15677.
+ - Add some mitigations for a painful race between foreground and background updates, cf
+ https://github.com/matrix-org/synapse/issues/15677.
Changes in SCHEMA_VERSION = 80
- The event_txn_id_device_id is always written to for new events.
diff --git a/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres b/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres
new file mode 100644
index 0000000000..6bdd1f9569
--- /dev/null
+++ b/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Annotate some tables in Postgres with a REPLICA IDENTITY.
+-- Any table that doesn't have a primary key should be annotated explicitly with
+-- a REPLICA IDENTITY so that logical replication can be used.
+-- If this is not done, then UPDATE and DELETE statements on those tables
+-- will fail if logical replication is in use.
+
+
+-- Re-use unique indices already defined on tables as a replica identity.
+ALTER TABLE applied_module_schemas REPLICA IDENTITY USING INDEX applied_module_schemas_module_name_file_key;
+ALTER TABLE applied_schema_deltas REPLICA IDENTITY USING INDEX applied_schema_deltas_version_file_key;
+ALTER TABLE background_updates REPLICA IDENTITY USING INDEX background_updates_uniqueness;
+ALTER TABLE schema_compat_version REPLICA IDENTITY USING INDEX schema_compat_version_lock_key;
+ALTER TABLE schema_version REPLICA IDENTITY USING INDEX schema_version_lock_key;
+
+
diff --git a/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql
index b062ec840c..f713e42aa0 100644
--- a/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql
+++ b/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql
@@ -14,7 +14,7 @@
*/
-- Start a background job to cleanup extremities that were incorrectly added
--- by bug #5269.
+-- by bug https://github.com/matrix-org/synapse/issues/5269.
INSERT INTO background_updates (update_name, progress_json) VALUES
('delete_soft_failed_extremities', '{}');
diff --git a/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql
index aeb17813d3..246c3359f7 100644
--- a/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql
+++ b/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql
@@ -13,6 +13,7 @@
* limitations under the License.
*/
--- Now that #6232 is a thing, we can remove old rooms from the directory.
+-- Now that https://github.com/matrix-org/synapse/pull/6232 is a thing, we can
+-- remove old rooms from the directory.
INSERT INTO background_updates (update_name, progress_json) VALUES
('remove_tombstoned_rooms_from_directory', '{}');
diff --git a/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql b/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql
index aed79635b2..31a61defa7 100644
--- a/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql
+++ b/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql
@@ -13,7 +13,8 @@
* limitations under the License.
*/
--- Clean up left over rows from bug #11833, which was fixed in #12770.
+-- Clean up left over rows from bug https://github.com/matrix-org/synapse/issues/11833,
+-- which was fixed in https://github.com/matrix-org/synapse/pull/12770.
DELETE FROM federation_inbound_events_staging WHERE room_id not in (
SELECT room_id FROM rooms
);
diff --git a/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres b/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres
new file mode 100644
index 0000000000..8296d56e56
--- /dev/null
+++ b/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres
@@ -0,0 +1,88 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Annotate some tables in Postgres with a REPLICA IDENTITY.
+-- Any table that doesn't have a primary key should be annotated explicitly with
+-- a REPLICA IDENTITY so that logical replication can be used.
+-- If this is not done, then UPDATE and DELETE statements on those tables
+-- will fail if logical replication is in use.
+
+
+-- Where possible, re-use unique indices already defined on tables as a replica
+-- identity.
+ALTER TABLE appservice_room_list REPLICA IDENTITY USING INDEX appservice_room_list_idx;
+ALTER TABLE batch_events REPLICA IDENTITY USING INDEX chunk_events_event_id;
+ALTER TABLE blocked_rooms REPLICA IDENTITY USING INDEX blocked_rooms_idx;
+ALTER TABLE cache_invalidation_stream_by_instance REPLICA IDENTITY USING INDEX cache_invalidation_stream_by_instance_id;
+ALTER TABLE device_lists_changes_in_room REPLICA IDENTITY USING INDEX device_lists_changes_in_stream_id;
+ALTER TABLE device_lists_outbound_last_success REPLICA IDENTITY USING INDEX device_lists_outbound_last_success_unique_idx;
+ALTER TABLE device_lists_remote_cache REPLICA IDENTITY USING INDEX device_lists_remote_cache_unique_id;
+ALTER TABLE device_lists_remote_extremeties REPLICA IDENTITY USING INDEX device_lists_remote_extremeties_unique_idx;
+ALTER TABLE device_lists_remote_resync REPLICA IDENTITY USING INDEX device_lists_remote_resync_idx;
+ALTER TABLE e2e_cross_signing_keys REPLICA IDENTITY USING INDEX e2e_cross_signing_keys_stream_idx;
+ALTER TABLE e2e_room_keys REPLICA IDENTITY USING INDEX e2e_room_keys_with_version_idx;
+ALTER TABLE e2e_room_keys_versions REPLICA IDENTITY USING INDEX e2e_room_keys_versions_idx;
+ALTER TABLE erased_users REPLICA IDENTITY USING INDEX erased_users_user;
+ALTER TABLE event_relations REPLICA IDENTITY USING INDEX event_relations_id;
+ALTER TABLE federation_inbound_events_staging REPLICA IDENTITY USING INDEX federation_inbound_events_staging_instance_event;
+ALTER TABLE federation_stream_position REPLICA IDENTITY USING INDEX federation_stream_position_instance;
+ALTER TABLE ignored_users REPLICA IDENTITY USING INDEX ignored_users_uniqueness;
+ALTER TABLE insertion_events REPLICA IDENTITY USING INDEX insertion_events_event_id;
+ALTER TABLE insertion_event_extremities REPLICA IDENTITY USING INDEX insertion_event_extremities_event_id;
+ALTER TABLE monthly_active_users REPLICA IDENTITY USING INDEX monthly_active_users_users;
+ALTER TABLE ratelimit_override REPLICA IDENTITY USING INDEX ratelimit_override_idx;
+ALTER TABLE room_stats_earliest_token REPLICA IDENTITY USING INDEX room_stats_earliest_token_idx;
+ALTER TABLE room_stats_state REPLICA IDENTITY USING INDEX room_stats_state_room;
+ALTER TABLE stream_positions REPLICA IDENTITY USING INDEX stream_positions_idx;
+ALTER TABLE user_directory REPLICA IDENTITY USING INDEX user_directory_user_idx;
+ALTER TABLE user_directory_search REPLICA IDENTITY USING INDEX user_directory_search_user_idx;
+ALTER TABLE user_ips REPLICA IDENTITY USING INDEX user_ips_user_token_ip_unique_index;
+ALTER TABLE user_signature_stream REPLICA IDENTITY USING INDEX user_signature_stream_idx;
+ALTER TABLE users_in_public_rooms REPLICA IDENTITY USING INDEX users_in_public_rooms_u_idx;
+ALTER TABLE users_who_share_private_rooms REPLICA IDENTITY USING INDEX users_who_share_private_rooms_u_idx;
+ALTER TABLE user_threepid_id_server REPLICA IDENTITY USING INDEX user_threepid_id_server_idx;
+ALTER TABLE worker_locks REPLICA IDENTITY USING INDEX worker_locks_key;
+
+
+-- Where there are no unique indices, use the entire rows as replica identities.
+ALTER TABLE current_state_delta_stream REPLICA IDENTITY FULL;
+ALTER TABLE deleted_pushers REPLICA IDENTITY FULL;
+ALTER TABLE device_auth_providers REPLICA IDENTITY FULL;
+ALTER TABLE device_federation_inbox REPLICA IDENTITY FULL;
+ALTER TABLE device_federation_outbox REPLICA IDENTITY FULL;
+ALTER TABLE device_inbox REPLICA IDENTITY FULL;
+ALTER TABLE device_lists_outbound_pokes REPLICA IDENTITY FULL;
+ALTER TABLE device_lists_stream REPLICA IDENTITY FULL;
+ALTER TABLE e2e_cross_signing_signatures REPLICA IDENTITY FULL;
+ALTER TABLE event_auth_chain_links REPLICA IDENTITY FULL;
+ALTER TABLE event_auth REPLICA IDENTITY FULL;
+ALTER TABLE event_push_actions_staging REPLICA IDENTITY FULL;
+ALTER TABLE insertion_event_edges REPLICA IDENTITY FULL;
+ALTER TABLE local_media_repository_url_cache REPLICA IDENTITY FULL;
+ALTER TABLE presence_stream REPLICA IDENTITY FULL;
+ALTER TABLE push_rules_stream REPLICA IDENTITY FULL;
+ALTER TABLE room_alias_servers REPLICA IDENTITY FULL;
+ALTER TABLE stream_ordering_to_exterm REPLICA IDENTITY FULL;
+ALTER TABLE timeline_gaps REPLICA IDENTITY FULL;
+ALTER TABLE user_daily_visits REPLICA IDENTITY FULL;
+ALTER TABLE users_pending_deactivation REPLICA IDENTITY FULL;
+
+-- special cases: unique indices on nullable columns can't be used
+ALTER TABLE event_push_summary REPLICA IDENTITY FULL;
+ALTER TABLE event_search REPLICA IDENTITY FULL;
+ALTER TABLE local_media_repository_thumbnails REPLICA IDENTITY FULL;
+ALTER TABLE remote_media_cache_thumbnails REPLICA IDENTITY FULL;
+ALTER TABLE threepid_guest_access_tokens REPLICA IDENTITY FULL;
+ALTER TABLE user_filters REPLICA IDENTITY FULL; -- sadly the `CHECK` constraint is not enough here
diff --git a/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql
new file mode 100644
index 0000000000..b74bdd71fa
--- /dev/null
+++ b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql
@@ -0,0 +1,15 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ALTER TABLE e2e_cross_signing_keys ADD COLUMN updatable_without_uia_before_ms bigint DEFAULT NULL;
\ No newline at end of file
diff --git a/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres b/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres
new file mode 100644
index 0000000000..a592af814e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres
@@ -0,0 +1,80 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Annotate some tables in Postgres with a REPLICA IDENTITY.
+-- Any table that doesn't have a primary key should be annotated explicitly with
+-- a REPLICA IDENTITY so that logical replication can be used.
+-- If this is not done, then UPDATE and DELETE statements on those tables
+-- will fail if logical replication is in use.
+
+
+-- Where possible, re-use unique indices already defined on tables as a replica
+-- identity.
+ALTER TABLE account_data REPLICA IDENTITY USING INDEX account_data_uniqueness;
+ALTER TABLE application_services_txns REPLICA IDENTITY USING INDEX application_services_txns_as_id_txn_id_key;
+ALTER TABLE appservice_stream_position REPLICA IDENTITY USING INDEX appservice_stream_position_lock_key;
+ALTER TABLE current_state_events REPLICA IDENTITY USING INDEX current_state_events_event_id_key;
+ALTER TABLE device_lists_changes_converted_stream_position REPLICA IDENTITY USING INDEX device_lists_changes_converted_stream_position_lock_key;
+ALTER TABLE devices REPLICA IDENTITY USING INDEX device_uniqueness;
+ALTER TABLE e2e_device_keys_json REPLICA IDENTITY USING INDEX e2e_device_keys_json_uniqueness;
+ALTER TABLE e2e_fallback_keys_json REPLICA IDENTITY USING INDEX e2e_fallback_keys_json_uniqueness;
+ALTER TABLE e2e_one_time_keys_json REPLICA IDENTITY USING INDEX e2e_one_time_keys_json_uniqueness;
+ALTER TABLE event_backward_extremities REPLICA IDENTITY USING INDEX event_backward_extremities_event_id_room_id_key;
+ALTER TABLE event_edges REPLICA IDENTITY USING INDEX event_edges_event_id_prev_event_id_idx;
+ALTER TABLE event_forward_extremities REPLICA IDENTITY USING INDEX event_forward_extremities_event_id_room_id_key;
+ALTER TABLE event_json REPLICA IDENTITY USING INDEX event_json_event_id_key;
+ALTER TABLE event_push_summary_last_receipt_stream_id REPLICA IDENTITY USING INDEX event_push_summary_last_receipt_stream_id_lock_key;
+ALTER TABLE event_push_summary_stream_ordering REPLICA IDENTITY USING INDEX event_push_summary_stream_ordering_lock_key;
+ALTER TABLE events REPLICA IDENTITY USING INDEX events_event_id_key;
+ALTER TABLE event_to_state_groups REPLICA IDENTITY USING INDEX event_to_state_groups_event_id_key;
+ALTER TABLE event_txn_id_device_id REPLICA IDENTITY USING INDEX event_txn_id_device_id_event_id;
+ALTER TABLE event_txn_id REPLICA IDENTITY USING INDEX event_txn_id_event_id;
+ALTER TABLE local_current_membership REPLICA IDENTITY USING INDEX local_current_membership_idx;
+ALTER TABLE partial_state_events REPLICA IDENTITY USING INDEX partial_state_events_event_id_key;
+ALTER TABLE partial_state_rooms_servers REPLICA IDENTITY USING INDEX partial_state_rooms_servers_room_id_server_name_key;
+ALTER TABLE profiles REPLICA IDENTITY USING INDEX profiles_user_id_key;
+ALTER TABLE redactions REPLICA IDENTITY USING INDEX redactions_event_id_key;
+ALTER TABLE registration_tokens REPLICA IDENTITY USING INDEX registration_tokens_token_key;
+ALTER TABLE rejections REPLICA IDENTITY USING INDEX rejections_event_id_key;
+ALTER TABLE room_account_data REPLICA IDENTITY USING INDEX room_account_data_uniqueness;
+ALTER TABLE room_aliases REPLICA IDENTITY USING INDEX room_aliases_room_alias_key;
+ALTER TABLE room_depth REPLICA IDENTITY USING INDEX room_depth_room_id_key;
+ALTER TABLE room_forgetter_stream_pos REPLICA IDENTITY USING INDEX room_forgetter_stream_pos_lock_key;
+ALTER TABLE room_memberships REPLICA IDENTITY USING INDEX room_memberships_event_id_key;
+ALTER TABLE room_tags REPLICA IDENTITY USING INDEX room_tag_uniqueness;
+ALTER TABLE room_tags_revisions REPLICA IDENTITY USING INDEX room_tag_revisions_uniqueness;
+ALTER TABLE server_keys_json REPLICA IDENTITY USING INDEX server_keys_json_uniqueness;
+ALTER TABLE sessions REPLICA IDENTITY USING INDEX sessions_session_type_session_id_key;
+ALTER TABLE state_events REPLICA IDENTITY USING INDEX state_events_event_id_key;
+ALTER TABLE stats_incremental_position REPLICA IDENTITY USING INDEX stats_incremental_position_lock_key;
+ALTER TABLE threads REPLICA IDENTITY USING INDEX threads_uniqueness;
+ALTER TABLE ui_auth_sessions_credentials REPLICA IDENTITY USING INDEX ui_auth_sessions_credentials_session_id_stage_type_key;
+ALTER TABLE ui_auth_sessions_ips REPLICA IDENTITY USING INDEX ui_auth_sessions_ips_session_id_ip_user_agent_key;
+ALTER TABLE ui_auth_sessions REPLICA IDENTITY USING INDEX ui_auth_sessions_session_id_key;
+ALTER TABLE user_directory_stream_pos REPLICA IDENTITY USING INDEX user_directory_stream_pos_lock_key;
+ALTER TABLE user_external_ids REPLICA IDENTITY USING INDEX user_external_ids_auth_provider_external_id_key;
+ALTER TABLE user_threepids REPLICA IDENTITY USING INDEX medium_address;
+ALTER TABLE worker_read_write_locks_mode REPLICA IDENTITY USING INDEX worker_read_write_locks_mode_key;
+ALTER TABLE worker_read_write_locks REPLICA IDENTITY USING INDEX worker_read_write_locks_key;
+
+-- special cases: unique indices on nullable columns can't be used
+ALTER TABLE event_push_actions REPLICA IDENTITY FULL;
+ALTER TABLE local_media_repository REPLICA IDENTITY FULL;
+ALTER TABLE receipts_graph REPLICA IDENTITY FULL;
+ALTER TABLE receipts_linearized REPLICA IDENTITY FULL;
+ALTER TABLE received_transactions REPLICA IDENTITY FULL;
+ALTER TABLE remote_media_cache REPLICA IDENTITY FULL;
+ALTER TABLE server_signature_keys REPLICA IDENTITY FULL;
+ALTER TABLE users REPLICA IDENTITY FULL;
diff --git a/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres b/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres
new file mode 100644
index 0000000000..9b792a39e2
--- /dev/null
+++ b/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Annotate some tables in Postgres with a REPLICA IDENTITY.
+-- Any table that doesn't have a primary key should be annotated explicitly with
+-- a REPLICA IDENTITY so that logical replication can be used.
+-- If this is not done, then UPDATE and DELETE statements on those tables
+-- will fail if logical replication is in use.
+-- See also: 82/04_replica_identities.sql.postgres on the main database
+
+
+-- Where possible, re-use unique indices already defined on tables as a replica
+-- identity.
+ALTER TABLE state_group_edges REPLICA IDENTITY USING INDEX state_group_edges_unique_idx;
+
+
+-- Where there are no unique indices, use the entire rows as replica identities.
+ALTER TABLE state_groups_state REPLICA IDENTITY FULL;
|