diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c940f864d1..2fbd389c71 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -18,6 +18,8 @@ import logging
from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple
from synapse.api.constants import EventTypes
+from synapse.config._base import Config
+from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.tcp.streams import BackfillStream, CachesStream
from synapse.replication.tcp.streams.events import (
EventsStream,
@@ -52,6 +54,21 @@ PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
# As above, but for invalidating room caches on room deletion
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
+# How long between cache invalidation table cleanups, once we have caught up
+# with the backlog.
+REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h")
+
+# How long between cache invalidation table cleanups, before we have caught
+# up with the backlog.
+CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m")
+
+# Maximum number of cache invalidation rows to delete at once.
+CLEAN_UP_MAX_BATCH_SIZE = 20_000
+
+# Keep cache invalidations for 7 days
+# (This is likely to be quite excessive.)
+RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d")
+
class CacheInvalidationWorkerStore(SQLBaseStore):
def __init__(
@@ -98,6 +115,18 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
else:
self._cache_id_gen = None
+ # Occasionally clean up the cache invalidations stream table by deleting
+ # old rows.
+ # This is only applicable when Postgres is in use; this table is unused
+ # and not populated at all when SQLite is the active database engine.
+ if hs.config.worker.run_background_tasks and isinstance(
+ self.database_engine, PostgresEngine
+ ):
+ self.hs.get_clock().call_later(
+ CATCH_UP_CLEANUP_INTERVAL_MS / 1000,
+ self._clean_up_cache_invalidation_wrapper,
+ )
+
async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
@@ -554,3 +583,104 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
return self._cache_id_gen.get_current_token_for_writer(instance_name)
else:
return 0
+
+ @wrap_as_background_process("clean_up_old_cache_invalidations")
+ async def _clean_up_cache_invalidation_wrapper(self) -> None:
+ """
+ Clean up cache invalidation stream table entries occasionally.
+ If we are behind (i.e. there are entries old enough to
+ be deleted but too many of them to be deleted in one go),
+ then we run slightly more frequently.
+ """
+ delete_up_to: int = (
+ self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS
+ )
+
+ in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to)
+
+ # Vary how long we wait before calling again depending on whether we
+ # are still sifting through backlog or we have caught up.
+ if in_backlog:
+ next_interval = CATCH_UP_CLEANUP_INTERVAL_MS
+ else:
+ next_interval = REGULAR_CLEANUP_INTERVAL_MS
+
+ self.hs.get_clock().call_later(
+ next_interval / 1000, self._clean_up_cache_invalidation_wrapper
+ )
+
+ async def _clean_up_batch_of_old_cache_invalidations(
+ self, delete_up_to_millisec: int
+ ) -> bool:
+ """
+ Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite).
+
+ Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once.
+
+ Returns true if and only if we were limited by batch size (i.e. we are in backlog:
+ there are more things to clean up).
+ """
+
+ def _clean_up_batch_of_old_cache_invalidations_txn(
+ txn: LoggingTransaction,
+ ) -> bool:
+ # First get the earliest stream ID
+ txn.execute(
+ """
+ SELECT stream_id FROM cache_invalidation_stream_by_instance
+ ORDER BY stream_id ASC
+ LIMIT 1
+ """
+ )
+ row = txn.fetchone()
+ if row is None:
+ return False
+ earliest_stream_id: int = row[0]
+
+ # Then find the last stream ID of the range we will delete
+ txn.execute(
+ """
+ SELECT stream_id FROM cache_invalidation_stream_by_instance
+ WHERE stream_id <= ? AND invalidation_ts <= ?
+ ORDER BY stream_id DESC
+ LIMIT 1
+ """,
+ (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec),
+ )
+ row = txn.fetchone()
+ if row is None:
+ return False
+ cutoff_stream_id: int = row[0]
+
+ # Determine whether we are caught up or still catching up
+ txn.execute(
+ """
+ SELECT invalidation_ts FROM cache_invalidation_stream_by_instance
+ WHERE stream_id > ?
+ ORDER BY stream_id ASC
+ LIMIT 1
+ """,
+ (cutoff_stream_id,),
+ )
+ row = txn.fetchone()
+ if row is None:
+ in_backlog = False
+ else:
+ # We are in backlog if the next row could have been deleted
+ # if we didn't have such a small batch size
+ in_backlog = row[0] <= delete_up_to_millisec
+
+ txn.execute(
+ """
+ DELETE FROM cache_invalidation_stream_by_instance
+ WHERE ? <= stream_id AND stream_id <= ?
+ """,
+ (earliest_stream_id, cutoff_stream_id),
+ )
+
+ return in_backlog
+
+ return await self.db_pool.runInteraction(
+ "clean_up_old_cache_invalidations",
+ _clean_up_batch_of_old_cache_invalidations_txn,
+ )
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index b2cda52ce5..534dc32413 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -843,7 +843,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
* because the schema change is in a background update, it's not
* necessarily safe to assume that it will have been completed.
*/
- AND edge.is_state is ? /* False */
+ AND edge.is_state is FALSE
/**
* We only want backwards extremities that are older than or at
* the same position of the given `current_depth` (where older
@@ -886,7 +886,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
sql,
(
room_id,
- False,
current_depth,
self._clock.time_msec(),
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 2b83a69426..c1353b18c1 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -127,8 +127,6 @@ class PersistEventsStore:
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
- self._msc3970_enabled = hs.config.experimental.msc3970_enabled
-
@trace
async def _persist_events_and_state_updates(
self,
@@ -1012,9 +1010,11 @@ class PersistEventsStore:
)
)
- # Pre-MSC3970, we rely on the access_token_id to scope the txn_id for events.
- # Since this is an experimental flag, we still store the mapping even if the
- # flag is disabled.
+ # Synapse usually relies on the device_id to scope transactions for events,
+ # except for users without device IDs (appservice, guests, and access
+ # tokens minted with the admin API) which use the access token ID instead.
+ #
+ # TODO https://github.com/matrix-org/synapse/issues/16042
if to_insert_token_id:
self.db_pool.simple_insert_many_txn(
txn,
@@ -1030,10 +1030,7 @@ class PersistEventsStore:
values=to_insert_token_id,
)
- # With MSC3970, we rely on the device_id instead to scope the txn_id for events.
- # We're only inserting if MSC3970 is *enabled*, because else the pre-MSC3970
- # behaviour would allow for a UNIQUE constraint violation on this table
- if to_insert_device_id and self._msc3970_enabled:
+ if to_insert_device_id:
self.db_pool.simple_insert_many_txn(
txn,
table="event_txn_id_device_id",
@@ -1455,8 +1452,8 @@ class PersistEventsStore:
},
)
- sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
- txn.execute(sql, (False, event.event_id))
+ sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?"
+ txn.execute(sql, (event.event_id,))
# Update the event_backward_extremities table now that this
# event isn't an outlier any more.
@@ -1549,13 +1546,13 @@ class PersistEventsStore:
for event, _ in events_and_contexts
if not event.internal_metadata.is_redacted()
]
- sql = "UPDATE redactions SET have_censored = ? WHERE "
+ sql = "UPDATE redactions SET have_censored = FALSE WHERE "
clause, args = make_in_list_sql_clause(
self.database_engine,
"redacts",
unredacted_events,
)
- txn.execute(sql + clause, [False] + args)
+ txn.execute(sql + clause, args)
self.db_pool.simple_insert_many_txn(
txn,
@@ -2318,14 +2315,14 @@ class PersistEventsStore:
" SELECT 1 FROM events"
" LEFT JOIN event_edges edge"
" ON edge.event_id = events.event_id"
- " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)"
+ " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
" )"
)
txn.execute_batch(
query,
[
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index c89b4f7919..1680bf6168 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from contextlib import AsyncExitStack
from types import TracebackType
-from typing import TYPE_CHECKING, Optional, Set, Tuple, Type
+from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
from weakref import WeakValueDictionary
from twisted.internet.interfaces import IReactorCore
@@ -208,76 +209,85 @@ class LockStore(SQLBaseStore):
used (otherwise the lock will leak).
"""
+ try:
+ lock = await self.db_pool.runInteraction(
+ "try_acquire_read_write_lock",
+ self._try_acquire_read_write_lock_txn,
+ lock_name,
+ lock_key,
+ write,
+ )
+ except self.database_engine.module.IntegrityError:
+ return None
+
+ return lock
+
+ def _try_acquire_read_write_lock_txn(
+ self,
+ txn: LoggingTransaction,
+ lock_name: str,
+ lock_key: str,
+ write: bool,
+ ) -> "Lock":
+ # We attempt to acquire the lock by inserting into
+ # `worker_read_write_locks` and seeing if that fails any
+ # constraints. If it doesn't then we have acquired the lock,
+ # otherwise we haven't.
+ #
+ # Before that though we clear the table of any stale locks.
+
now = self._clock.time_msec()
token = random_string(6)
- def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None:
- # We attempt to acquire the lock by inserting into
- # `worker_read_write_locks` and seeing if that fails any
- # constraints. If it doesn't then we have acquired the lock,
- # otherwise we haven't.
- #
- # Before that though we clear the table of any stale locks.
-
- delete_sql = """
- DELETE FROM worker_read_write_locks
- WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
- """
-
- insert_sql = """
- INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
- VALUES (?, ?, ?, ?, ?, ?)
- """
-
- if isinstance(self.database_engine, PostgresEngine):
- # For Postgres we can send these queries at the same time.
- txn.execute(
- delete_sql + ";" + insert_sql,
- (
- # DELETE args
- now - _LOCK_TIMEOUT_MS,
- lock_name,
- lock_key,
- # UPSERT args
- lock_name,
- lock_key,
- write,
- self._instance_name,
- token,
- now,
- ),
- )
- else:
- # For SQLite these need to be two queries.
- txn.execute(
- delete_sql,
- (
- now - _LOCK_TIMEOUT_MS,
- lock_name,
- lock_key,
- ),
- )
- txn.execute(
- insert_sql,
- (
- lock_name,
- lock_key,
- write,
- self._instance_name,
- token,
- now,
- ),
- )
+ delete_sql = """
+ DELETE FROM worker_read_write_locks
+ WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
+ """
- return
+ insert_sql = """
+ INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
+ VALUES (?, ?, ?, ?, ?, ?)
+ """
- try:
- await self.db_pool.runInteraction(
- "try_acquire_read_write_lock",
- _try_acquire_read_write_lock_txn,
+ if isinstance(self.database_engine, PostgresEngine):
+ # For Postgres we can send these queries at the same time.
+ txn.execute(
+ delete_sql + ";" + insert_sql,
+ (
+ # DELETE args
+ now - _LOCK_TIMEOUT_MS,
+ lock_name,
+ lock_key,
+ # UPSERT args
+ lock_name,
+ lock_key,
+ write,
+ self._instance_name,
+ token,
+ now,
+ ),
+ )
+ else:
+ # For SQLite these need to be two queries.
+ txn.execute(
+ delete_sql,
+ (
+ now - _LOCK_TIMEOUT_MS,
+ lock_name,
+ lock_key,
+ ),
+ )
+ txn.execute(
+ insert_sql,
+ (
+ lock_name,
+ lock_key,
+ write,
+ self._instance_name,
+ token,
+ now,
+ ),
)
- except self.database_engine.module.IntegrityError:
- return None
lock = Lock(
self._reactor,
@@ -289,10 +299,58 @@ class LockStore(SQLBaseStore):
token=token,
)
- self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock
+ def set_lock() -> None:
+ self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock
+
+ txn.call_after(set_lock)
return lock
+ async def try_acquire_multi_read_write_lock(
+ self,
+ lock_names: Collection[Tuple[str, str]],
+ write: bool,
+ ) -> Optional[AsyncExitStack]:
+ """Try to acquire multiple locks for the given names/keys. Will return
+ an async context manager if the locks are successfully acquired, which
+ *must* be used (otherwise the lock will leak).
+
+ If only a subset of the locks can be acquired then it will immediately
+ drop them and return `None`.
+ """
+ try:
+ locks = await self.db_pool.runInteraction(
+ "try_acquire_multi_read_write_lock",
+ self._try_acquire_multi_read_write_lock_txn,
+ lock_names,
+ write,
+ )
+ except self.database_engine.module.IntegrityError:
+ return None
+
+ stack = AsyncExitStack()
+
+ for lock in locks:
+ await stack.enter_async_context(lock)
+
+ return stack
+
+ def _try_acquire_multi_read_write_lock_txn(
+ self,
+ txn: LoggingTransaction,
+ lock_names: Collection[Tuple[str, str]],
+ write: bool,
+ ) -> Collection["Lock"]:
+ locks = []
+
+ for lock_name, lock_key in lock_names:
+ lock = self._try_acquire_read_write_lock_txn(
+ txn, lock_name, lock_key, write
+ )
+ locks.append(lock)
+
+ return locks
+
class Lock:
"""An async context manager that manages an acquired lock, ensuring it is
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 9773c1fcd2..b52f48cf04 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -249,12 +249,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# Mark all state and own events as outliers
logger.info("[purge] marking remaining events as outliers")
txn.execute(
- "UPDATE events SET outlier = ?"
+ "UPDATE events SET outlier = TRUE"
" WHERE event_id IN ("
- " SELECT event_id FROM events_to_purge "
- " WHERE NOT should_delete"
- ")",
- (True,),
+ " SELECT event_id FROM events_to_purge "
+ " WHERE NOT should_delete"
+ ")"
)
# synapse tries to take out an exclusive lock on room_depth whenever it
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index e098ceea3c..c13c0bc7d7 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -560,19 +560,19 @@ class PushRuleStore(PushRulesWorkerStore):
if isinstance(self.database_engine, PostgresEngine):
sql = """
INSERT INTO push_rules_enable (id, user_name, rule_id, enabled)
- VALUES (?, ?, ?, ?)
+ VALUES (?, ?, ?, 1)
ON CONFLICT DO NOTHING
"""
elif isinstance(self.database_engine, Sqlite3Engine):
sql = """
INSERT OR IGNORE INTO push_rules_enable (id, user_name, rule_id, enabled)
- VALUES (?, ?, ?, ?)
+ VALUES (?, ?, ?, 1)
"""
else:
raise RuntimeError("Unknown database engine")
new_enable_id = self._push_rules_enable_id_gen.get_next()
- txn.execute(sql, (new_enable_id, user_id, rule_id, 1))
+ txn.execute(sql, (new_enable_id, user_id, rule_id))
async def delete_push_rule(self, user_id: str, rule_id: str) -> None:
"""
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 676d03bb7e..c582cf0573 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -454,9 +454,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
) -> List[Tuple[str, int]]:
sql = (
"SELECT user_id, expiration_ts_ms FROM account_validity"
- " WHERE email_sent = ? AND (expiration_ts_ms - ?) <= ?"
+ " WHERE email_sent = FALSE AND (expiration_ts_ms - ?) <= ?"
)
- values = [False, now_ms, renew_at]
+ values = [now_ms, renew_at]
txn.execute(sql, values)
return cast(List[Tuple[str, int]], txn.fetchall())
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 830658f328..719e11aea6 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -936,11 +936,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
JOIN event_json USING (room_id, event_id)
WHERE room_id = ?
%(where_clause)s
- AND contains_url = ? AND outlier = ?
+ AND contains_url = TRUE AND outlier = FALSE
ORDER BY stream_ordering DESC
LIMIT ?
"""
- txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100))
+ txn.execute(sql % {"where_clause": ""}, (room_id, 100))
local_media_mxcs = []
remote_media_mxcs = []
@@ -976,7 +976,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
txn.execute(
sql % {"where_clause": "AND stream_ordering < ?"},
- (room_id, next_token, True, False, 100),
+ (room_id, next_token, 100),
)
return local_media_mxcs, remote_media_mxcs
@@ -1086,9 +1086,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
# set quarantine
if quarantined_by is not None:
- sql += "AND safe_from_quarantine = ?"
+ sql += "AND safe_from_quarantine = FALSE"
txn.executemany(
- sql, [(quarantined_by, media_id, False) for media_id in local_mxcs]
+ sql, [(quarantined_by, media_id) for media_id in local_mxcs]
)
# remove from quarantine
else:
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 92cbe262a6..5a3611c415 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1401,7 +1401,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
`to_token`), or `limit` is zero.
"""
- args = [False, room_id]
+ args: List[Any] = [room_id]
order, from_bound, to_bound = generate_pagination_bounds(
direction, from_token, to_token
@@ -1475,7 +1475,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event.topological_ordering, event.stream_ordering
FROM events AS event
%(join_clause)s
- WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
+ WHERE event.outlier = FALSE AND event.room_id = ? AND %(bounds)s
ORDER BY event.topological_ordering %(order)s,
event.stream_ordering %(order)s LIMIT ?
""" % {
|