summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/controllers/persist_events.py27
-rw-r--r--synapse/storage/databases/main/cache.py130
-rw-r--r--synapse/storage/databases/main/event_federation.py3
-rw-r--r--synapse/storage/databases/main/events.py27
-rw-r--r--synapse/storage/databases/main/lock.py190
-rw-r--r--synapse/storage/databases/main/purge_events.py9
-rw-r--r--synapse/storage/databases/main/push_rule.py6
-rw-r--r--synapse/storage/databases/main/registration.py4
-rw-r--r--synapse/storage/databases/main/room.py10
-rw-r--r--synapse/storage/databases/main/stream.py4
-rw-r--r--synapse/storage/schema/__init__.py5
11 files changed, 305 insertions, 110 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py

index 35c0680365..35cd1089d6 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -45,6 +45,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.opentracing import ( SynapseTags, @@ -338,6 +339,7 @@ class EventsPersistenceStorageController: ) self._state_resolution_handler = hs.get_state_resolution_handler() self._state_controller = state_controller + self.hs = hs async def _process_event_persist_queue_task( self, @@ -350,15 +352,22 @@ class EventsPersistenceStorageController: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. """ - if isinstance(task, _PersistEventsTask): - return await self._persist_event_batch(room_id, task) - elif isinstance(task, _UpdateCurrentStateTask): - await self._update_current_state(room_id, task) - return {} - else: - raise AssertionError( - f"Found an unexpected task type in event persistence queue: {task}" - ) + + # Ensure that the room can't be deleted while we're persisting events to + # it. We might already have taken out the lock, but since this is just a + # "read" lock its inherently reentrant. + async with self.hs.get_worker_locks_handler().acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + if isinstance(task, _PersistEventsTask): + return await self._persist_event_batch(room_id, task) + elif isinstance(task, _UpdateCurrentStateTask): + await self._update_current_state(room_id, task) + return {} + else: + raise AssertionError( + f"Found an unexpected task type in event persistence queue: {task}" + ) @trace async def persist_events( diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c940f864d1..2fbd389c71 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -18,6 +18,8 @@ import logging from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes +from synapse.config._base import Config +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, @@ -52,6 +54,21 @@ PURGE_HISTORY_CACHE_NAME = "ph_cache_fake" # As above, but for invalidating room caches on room deletion DELETE_ROOM_CACHE_NAME = "dr_cache_fake" +# How long between cache invalidation table cleanups, once we have caught up +# with the backlog. +REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h") + +# How long between cache invalidation table cleanups, before we have caught +# up with the backlog. +CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m") + +# Maximum number of cache invalidation rows to delete at once. +CLEAN_UP_MAX_BATCH_SIZE = 20_000 + +# Keep cache invalidations for 7 days +# (This is likely to be quite excessive.) +RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d") + class CacheInvalidationWorkerStore(SQLBaseStore): def __init__( @@ -98,6 +115,18 @@ class CacheInvalidationWorkerStore(SQLBaseStore): else: self._cache_id_gen = None + # Occasionally clean up the cache invalidations stream table by deleting + # old rows. + # This is only applicable when Postgres is in use; this table is unused + # and not populated at all when SQLite is the active database engine. + if hs.config.worker.run_background_tasks and isinstance( + self.database_engine, PostgresEngine + ): + self.hs.get_clock().call_later( + CATCH_UP_CLEANUP_INTERVAL_MS / 1000, + self._clean_up_cache_invalidation_wrapper, + ) + async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, tuple]], int, bool]: @@ -554,3 +583,104 @@ class CacheInvalidationWorkerStore(SQLBaseStore): return self._cache_id_gen.get_current_token_for_writer(instance_name) else: return 0 + + @wrap_as_background_process("clean_up_old_cache_invalidations") + async def _clean_up_cache_invalidation_wrapper(self) -> None: + """ + Clean up cache invalidation stream table entries occasionally. + If we are behind (i.e. there are entries old enough to + be deleted but too many of them to be deleted in one go), + then we run slightly more frequently. + """ + delete_up_to: int = ( + self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS + ) + + in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to) + + # Vary how long we wait before calling again depending on whether we + # are still sifting through backlog or we have caught up. + if in_backlog: + next_interval = CATCH_UP_CLEANUP_INTERVAL_MS + else: + next_interval = REGULAR_CLEANUP_INTERVAL_MS + + self.hs.get_clock().call_later( + next_interval / 1000, self._clean_up_cache_invalidation_wrapper + ) + + async def _clean_up_batch_of_old_cache_invalidations( + self, delete_up_to_millisec: int + ) -> bool: + """ + Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). + + Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once. + + Returns true if and only if we were limited by batch size (i.e. we are in backlog: + there are more things to clean up). + """ + + def _clean_up_batch_of_old_cache_invalidations_txn( + txn: LoggingTransaction, + ) -> bool: + # First get the earliest stream ID + txn.execute( + """ + SELECT stream_id FROM cache_invalidation_stream_by_instance + ORDER BY stream_id ASC + LIMIT 1 + """ + ) + row = txn.fetchone() + if row is None: + return False + earliest_stream_id: int = row[0] + + # Then find the last stream ID of the range we will delete + txn.execute( + """ + SELECT stream_id FROM cache_invalidation_stream_by_instance + WHERE stream_id <= ? AND invalidation_ts <= ? + ORDER BY stream_id DESC + LIMIT 1 + """, + (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec), + ) + row = txn.fetchone() + if row is None: + return False + cutoff_stream_id: int = row[0] + + # Determine whether we are caught up or still catching up + txn.execute( + """ + SELECT invalidation_ts FROM cache_invalidation_stream_by_instance + WHERE stream_id > ? + ORDER BY stream_id ASC + LIMIT 1 + """, + (cutoff_stream_id,), + ) + row = txn.fetchone() + if row is None: + in_backlog = False + else: + # We are in backlog if the next row could have been deleted + # if we didn't have such a small batch size + in_backlog = row[0] <= delete_up_to_millisec + + txn.execute( + """ + DELETE FROM cache_invalidation_stream_by_instance + WHERE ? <= stream_id AND stream_id <= ? + """, + (earliest_stream_id, cutoff_stream_id), + ) + + return in_backlog + + return await self.db_pool.runInteraction( + "clean_up_old_cache_invalidations", + _clean_up_batch_of_old_cache_invalidations_txn, + ) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index b2cda52ce5..534dc32413 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -843,7 +843,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas * because the schema change is in a background update, it's not * necessarily safe to assume that it will have been completed. */ - AND edge.is_state is ? /* False */ + AND edge.is_state is FALSE /** * We only want backwards extremities that are older than or at * the same position of the given `current_depth` (where older @@ -886,7 +886,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas sql, ( room_id, - False, current_depth, self._clock.time_msec(), BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 2b83a69426..c1353b18c1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -127,8 +127,6 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - @trace async def _persist_events_and_state_updates( self, @@ -1012,9 +1010,11 @@ class PersistEventsStore: ) ) - # Pre-MSC3970, we rely on the access_token_id to scope the txn_id for events. - # Since this is an experimental flag, we still store the mapping even if the - # flag is disabled. + # Synapse usually relies on the device_id to scope transactions for events, + # except for users without device IDs (appservice, guests, and access + # tokens minted with the admin API) which use the access token ID instead. + # + # TODO https://github.com/matrix-org/synapse/issues/16042 if to_insert_token_id: self.db_pool.simple_insert_many_txn( txn, @@ -1030,10 +1030,7 @@ class PersistEventsStore: values=to_insert_token_id, ) - # With MSC3970, we rely on the device_id instead to scope the txn_id for events. - # We're only inserting if MSC3970 is *enabled*, because else the pre-MSC3970 - # behaviour would allow for a UNIQUE constraint violation on this table - if to_insert_device_id and self._msc3970_enabled: + if to_insert_device_id: self.db_pool.simple_insert_many_txn( txn, table="event_txn_id_device_id", @@ -1455,8 +1452,8 @@ class PersistEventsStore: }, ) - sql = "UPDATE events SET outlier = ? WHERE event_id = ?" - txn.execute(sql, (False, event.event_id)) + sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?" + txn.execute(sql, (event.event_id,)) # Update the event_backward_extremities table now that this # event isn't an outlier any more. @@ -1549,13 +1546,13 @@ class PersistEventsStore: for event, _ in events_and_contexts if not event.internal_metadata.is_redacted() ] - sql = "UPDATE redactions SET have_censored = ? WHERE " + sql = "UPDATE redactions SET have_censored = FALSE WHERE " clause, args = make_in_list_sql_clause( self.database_engine, "redacts", unredacted_events, ) - txn.execute(sql + clause, [False] + args) + txn.execute(sql + clause, args) self.db_pool.simple_insert_many_txn( txn, @@ -2318,14 +2315,14 @@ class PersistEventsStore: " SELECT 1 FROM events" " LEFT JOIN event_edges edge" " ON edge.event_id = events.event_id" - " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)" + " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)" " )" ) txn.execute_batch( query, [ - (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) + (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id) for ev in events for e_id in ev.prev_event_ids() if not ev.internal_metadata.is_outlier() diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index c89b4f7919..1680bf6168 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py
@@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from contextlib import AsyncExitStack from types import TracebackType -from typing import TYPE_CHECKING, Optional, Set, Tuple, Type +from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type from weakref import WeakValueDictionary from twisted.internet.interfaces import IReactorCore @@ -208,76 +209,85 @@ class LockStore(SQLBaseStore): used (otherwise the lock will leak). """ + try: + lock = await self.db_pool.runInteraction( + "try_acquire_read_write_lock", + self._try_acquire_read_write_lock_txn, + lock_name, + lock_key, + write, + ) + except self.database_engine.module.IntegrityError: + return None + + return lock + + def _try_acquire_read_write_lock_txn( + self, + txn: LoggingTransaction, + lock_name: str, + lock_key: str, + write: bool, + ) -> "Lock": + # We attempt to acquire the lock by inserting into + # `worker_read_write_locks` and seeing if that fails any + # constraints. If it doesn't then we have acquired the lock, + # otherwise we haven't. + # + # Before that though we clear the table of any stale locks. + now = self._clock.time_msec() token = random_string(6) - def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None: - # We attempt to acquire the lock by inserting into - # `worker_read_write_locks` and seeing if that fails any - # constraints. If it doesn't then we have acquired the lock, - # otherwise we haven't. - # - # Before that though we clear the table of any stale locks. - - delete_sql = """ - DELETE FROM worker_read_write_locks - WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?; - """ - - insert_sql = """ - INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts) - VALUES (?, ?, ?, ?, ?, ?) - """ - - if isinstance(self.database_engine, PostgresEngine): - # For Postgres we can send these queries at the same time. - txn.execute( - delete_sql + ";" + insert_sql, - ( - # DELETE args - now - _LOCK_TIMEOUT_MS, - lock_name, - lock_key, - # UPSERT args - lock_name, - lock_key, - write, - self._instance_name, - token, - now, - ), - ) - else: - # For SQLite these need to be two queries. - txn.execute( - delete_sql, - ( - now - _LOCK_TIMEOUT_MS, - lock_name, - lock_key, - ), - ) - txn.execute( - insert_sql, - ( - lock_name, - lock_key, - write, - self._instance_name, - token, - now, - ), - ) + delete_sql = """ + DELETE FROM worker_read_write_locks + WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?; + """ - return + insert_sql = """ + INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts) + VALUES (?, ?, ?, ?, ?, ?) + """ - try: - await self.db_pool.runInteraction( - "try_acquire_read_write_lock", - _try_acquire_read_write_lock_txn, + if isinstance(self.database_engine, PostgresEngine): + # For Postgres we can send these queries at the same time. + txn.execute( + delete_sql + ";" + insert_sql, + ( + # DELETE args + now - _LOCK_TIMEOUT_MS, + lock_name, + lock_key, + # UPSERT args + lock_name, + lock_key, + write, + self._instance_name, + token, + now, + ), + ) + else: + # For SQLite these need to be two queries. + txn.execute( + delete_sql, + ( + now - _LOCK_TIMEOUT_MS, + lock_name, + lock_key, + ), + ) + txn.execute( + insert_sql, + ( + lock_name, + lock_key, + write, + self._instance_name, + token, + now, + ), ) - except self.database_engine.module.IntegrityError: - return None lock = Lock( self._reactor, @@ -289,10 +299,58 @@ class LockStore(SQLBaseStore): token=token, ) - self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock + def set_lock() -> None: + self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock + + txn.call_after(set_lock) return lock + async def try_acquire_multi_read_write_lock( + self, + lock_names: Collection[Tuple[str, str]], + write: bool, + ) -> Optional[AsyncExitStack]: + """Try to acquire multiple locks for the given names/keys. Will return + an async context manager if the locks are successfully acquired, which + *must* be used (otherwise the lock will leak). + + If only a subset of the locks can be acquired then it will immediately + drop them and return `None`. + """ + try: + locks = await self.db_pool.runInteraction( + "try_acquire_multi_read_write_lock", + self._try_acquire_multi_read_write_lock_txn, + lock_names, + write, + ) + except self.database_engine.module.IntegrityError: + return None + + stack = AsyncExitStack() + + for lock in locks: + await stack.enter_async_context(lock) + + return stack + + def _try_acquire_multi_read_write_lock_txn( + self, + txn: LoggingTransaction, + lock_names: Collection[Tuple[str, str]], + write: bool, + ) -> Collection["Lock"]: + locks = [] + + for lock_name, lock_key in lock_names: + lock = self._try_acquire_read_write_lock_txn( + txn, lock_name, lock_key, write + ) + locks.append(lock) + + return locks + class Lock: """An async context manager that manages an acquired lock, ensuring it is diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 9773c1fcd2..b52f48cf04 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -249,12 +249,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # Mark all state and own events as outliers logger.info("[purge] marking remaining events as outliers") txn.execute( - "UPDATE events SET outlier = ?" + "UPDATE events SET outlier = TRUE" " WHERE event_id IN (" - " SELECT event_id FROM events_to_purge " - " WHERE NOT should_delete" - ")", - (True,), + " SELECT event_id FROM events_to_purge " + " WHERE NOT should_delete" + ")" ) # synapse tries to take out an exclusive lock on room_depth whenever it diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index e098ceea3c..c13c0bc7d7 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -560,19 +560,19 @@ class PushRuleStore(PushRulesWorkerStore): if isinstance(self.database_engine, PostgresEngine): sql = """ INSERT INTO push_rules_enable (id, user_name, rule_id, enabled) - VALUES (?, ?, ?, ?) + VALUES (?, ?, ?, 1) ON CONFLICT DO NOTHING """ elif isinstance(self.database_engine, Sqlite3Engine): sql = """ INSERT OR IGNORE INTO push_rules_enable (id, user_name, rule_id, enabled) - VALUES (?, ?, ?, ?) + VALUES (?, ?, ?, 1) """ else: raise RuntimeError("Unknown database engine") new_enable_id = self._push_rules_enable_id_gen.get_next() - txn.execute(sql, (new_enable_id, user_id, rule_id, 1)) + txn.execute(sql, (new_enable_id, user_id, rule_id)) async def delete_push_rule(self, user_id: str, rule_id: str) -> None: """ diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 676d03bb7e..c582cf0573 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -454,9 +454,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): ) -> List[Tuple[str, int]]: sql = ( "SELECT user_id, expiration_ts_ms FROM account_validity" - " WHERE email_sent = ? AND (expiration_ts_ms - ?) <= ?" + " WHERE email_sent = FALSE AND (expiration_ts_ms - ?) <= ?" ) - values = [False, now_ms, renew_at] + values = [now_ms, renew_at] txn.execute(sql, values) return cast(List[Tuple[str, int]], txn.fetchall()) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 830658f328..719e11aea6 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -936,11 +936,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): JOIN event_json USING (room_id, event_id) WHERE room_id = ? %(where_clause)s - AND contains_url = ? AND outlier = ? + AND contains_url = TRUE AND outlier = FALSE ORDER BY stream_ordering DESC LIMIT ? """ - txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100)) + txn.execute(sql % {"where_clause": ""}, (room_id, 100)) local_media_mxcs = [] remote_media_mxcs = [] @@ -976,7 +976,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): txn.execute( sql % {"where_clause": "AND stream_ordering < ?"}, - (room_id, next_token, True, False, 100), + (room_id, next_token, 100), ) return local_media_mxcs, remote_media_mxcs @@ -1086,9 +1086,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): # set quarantine if quarantined_by is not None: - sql += "AND safe_from_quarantine = ?" + sql += "AND safe_from_quarantine = FALSE" txn.executemany( - sql, [(quarantined_by, media_id, False) for media_id in local_mxcs] + sql, [(quarantined_by, media_id) for media_id in local_mxcs] ) # remove from quarantine else: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 92cbe262a6..5a3611c415 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -1401,7 +1401,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): `to_token`), or `limit` is zero. """ - args = [False, room_id] + args: List[Any] = [room_id] order, from_bound, to_bound = generate_pagination_bounds( direction, from_token, to_token @@ -1475,7 +1475,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event.topological_ordering, event.stream_ordering FROM events AS event %(join_clause)s - WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s + WHERE event.outlier = FALSE AND event.room_id = ? AND %(bounds)s ORDER BY event.topological_ordering %(order)s, event.stream_ordering %(order)s LIMIT ? """ % { diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index d3ec648f6d..7de9949a5b 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 79 # remember to update the list below when updating +SCHEMA_VERSION = 80 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -110,6 +110,9 @@ Changes in SCHEMA_VERSION = 78 Changes in SCHEMA_VERSION = 79 - Add tables to handle in DB read-write locks. - Add some mitigations for a painful race between foreground and background updates, cf #15677. + +Changes in SCHEMA_VERSION = 80 + - The event_txn_id_device_id is always written to for new events. """