summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/cache.py7
-rw-r--r--synapse/storage/databases/main/devices.py2
-rw-r--r--synapse/storage/databases/main/event_push_actions.py2
-rw-r--r--synapse/storage/databases/main/events.py8
-rw-r--r--synapse/storage/databases/main/events_worker.py61
-rw-r--r--synapse/storage/databases/main/media_repository.py7
-rw-r--r--synapse/storage/databases/main/purge_events.py26
-rw-r--r--synapse/storage/databases/main/receipts.py6
-rw-r--r--synapse/storage/databases/main/room.py2
9 files changed, 90 insertions, 31 deletions
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py

index ecc1f935e2..c57ae5ef15 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -168,10 +168,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): backfilled, ): self._invalidate_get_event_cache(event_id) + self.have_seen_event.invalidate((room_id, event_id)) self.get_latest_event_ids_in_room.invalidate((room_id,)) - self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,)) + self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,)) if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) @@ -184,8 +185,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self.get_invited_rooms_for_local_user.invalidate((state_key,)) if relates_to: - self.get_relations_for_event.invalidate_many((relates_to,)) - self.get_aggregation_groups_for_event.invalidate_many((relates_to,)) + self.get_relations_for_event.invalidate((relates_to,)) + self.get_aggregation_groups_for_event.invalidate((relates_to,)) self.get_applicable_edit.invalidate((relates_to,)) async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]): diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index fd87ba71ab..18f07d96dc 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -1282,7 +1282,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,)) - txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,)) + txn.call_after(self._get_cached_user_device.invalidate, (user_id,)) txn.call_after( self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 5845322118..d1237c65cc 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -860,7 +860,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): not be deleted. """ txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id, user_id), ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fd25c8112d..897fa06639 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -1748,9 +1748,9 @@ class PersistEventsStore: }, ) - txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,)) + txn.call_after(self.store.get_relations_for_event.invalidate, (parent_id,)) txn.call_after( - self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,) + self.store.get_aggregation_groups_for_event.invalidate, (parent_id,) ) if rel_type == RelationTypes.REPLACE: @@ -1903,7 +1903,7 @@ class PersistEventsStore: for user_id in user_ids: txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.store.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id, user_id), ) @@ -1917,7 +1917,7 @@ class PersistEventsStore: def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.store.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id,), ) txn.execute( diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6963bbf7f4..403a5ddaba 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -22,6 +22,7 @@ from typing import ( Iterable, List, Optional, + Set, Tuple, overload, ) @@ -55,7 +56,7 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict, get_domain_from_id -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -1045,32 +1046,74 @@ class EventsWorkerStore(SQLBaseStore): return {r["event_id"] for r in rows} - async def have_seen_events(self, event_ids): + async def have_seen_events( + self, room_id: str, event_ids: Iterable[str] + ) -> Set[str]: """Given a list of event ids, check if we have already processed them. + The room_id is only used to structure the cache (so that it can later be + invalidated by room_id) - there is no guarantee that the events are actually + in the room in question. + Args: - event_ids (iterable[str]): + room_id: Room we are polling + event_ids: events we are looking for Returns: set[str]: The events we have already seen. """ + res = await self._have_seen_events_dict( + (room_id, event_id) for event_id in event_ids + ) + return {eid for ((_rid, eid), have_event) in res.items() if have_event} + + @cachedList("have_seen_event", "keys") + async def _have_seen_events_dict( + self, keys: Iterable[Tuple[str, str]] + ) -> Dict[Tuple[str, str], bool]: + """Helper for have_seen_events + + Returns: + a dict {(room_id, event_id)-> bool} + """ # if the event cache contains the event, obviously we've seen it. - results = {x for x in event_ids if self._get_event_cache.contains(x)} - def have_seen_events_txn(txn, chunk): - sql = "SELECT event_id FROM events as e WHERE " + cache_results = { + (rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,)) + } + results = {x: True for x in cache_results} + + def have_seen_events_txn(txn, chunk: Tuple[Tuple[str, str], ...]): + # we deliberately do *not* query the database for room_id, to make the + # query an index-only lookup on `events_event_id_key`. + # + # We therefore pull the events from the database into a set... + + sql = "SELECT event_id FROM events AS e WHERE " clause, args = make_in_list_sql_clause( - txn.database_engine, "e.event_id", chunk + txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk] ) txn.execute(sql + clause, args) - results.update(row[0] for row in txn) + found_events = {eid for eid, in txn} - for chunk in batch_iter((x for x in event_ids if x not in results), 100): + # ... and then we can update the results for each row in the batch + results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk}) + + # each batch requires its own index scan, so we make the batches as big as + # possible. + for chunk in batch_iter((k for k in keys if k not in cache_results), 500): await self.db_pool.runInteraction( "have_seen_events", have_seen_events_txn, chunk ) + return results + @cached(max_entries=100000, tree=True) + async def have_seen_event(self, room_id: str, event_id: str): + # this only exists for the benefit of the @cachedList descriptor on + # _have_seen_events_dict + raise NotImplementedError() + def _get_current_state_event_counts_txn(self, txn, room_id): """ See get_current_state_event_counts. diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index c584868188..2fa945d171 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py
@@ -143,6 +143,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "created_ts", "quarantined_by", "url_cache", + "safe_from_quarantine", ), allow_none=True, desc="get_local_media", @@ -296,12 +297,12 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_local_media", ) - async def mark_local_media_as_safe(self, media_id: str) -> None: - """Mark a local media as safe from quarantining.""" + async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None: + """Mark a local media as safe or unsafe from quarantining.""" await self.db_pool.simple_update_one( table="local_media_repository", keyvalues={"media_id": media_id}, - updatevalues={"safe_from_quarantine": True}, + updatevalues={"safe_from_quarantine": safe}, desc="mark_local_media_as_safe", ) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 8f83748b5e..7fb7780d0f 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -16,14 +16,14 @@ import logging from typing import Any, List, Set, Tuple from synapse.api.errors import SynapseError -from synapse.storage._base import SQLBaseStore +from synapse.storage.databases.main import CacheInvalidationWorkerStore from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) -class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): +class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): async def purge_history( self, room_id: str, token: str, delete_local_events: bool ) -> Set[int]: @@ -203,8 +203,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): "DELETE FROM event_to_state_groups " "WHERE event_id IN (SELECT event_id from events_to_purge)" ) - for event_id, _ in event_rows: - txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) # Delete all remote non-state events for table in ( @@ -283,6 +281,20 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): # so make sure to keep this actually last. txn.execute("DROP TABLE events_to_purge") + for event_id, should_delete in event_rows: + self._invalidate_cache_and_stream( + txn, self._get_state_group_for_event, (event_id,) + ) + + # XXX: This is racy, since have_seen_events could be called between the + # transaction completing and the invalidation running. On the other hand, + # that's no different to calling `have_seen_events` just before the + # event is deleted from the database. + if should_delete: + self._invalidate_cache_and_stream( + txn, self.have_seen_event, (room_id, event_id) + ) + logger.info("[purge] done") return referenced_state_groups @@ -422,7 +434,11 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): # index on them. In any case we should be clearing out 'stream' tables # periodically anyway (#5888) - # TODO: we could probably usefully do a bunch of cache invalidation here + # TODO: we could probably usefully do a bunch more cache invalidation here + + # XXX: as with purge_history, this is racy, but no worse than other races + # that already exist. + self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,)) logger.info("[purge] done") diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3647276acb..edeaacd7a6 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -460,7 +460,7 @@ class ReceiptsWorkerStore(SQLBaseStore): def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) - self._get_linearized_receipts_for_room.invalidate_many((room_id,)) + self._get_linearized_receipts_for_room.invalidate((room_id,)) self.get_last_receipt_event_id_for_user.invalidate( (user_id, room_id, receipt_type) ) @@ -659,9 +659,7 @@ class ReceiptsWorkerStore(SQLBaseStore): ) txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type)) # FIXME: This shouldn't invalidate the whole cache - txn.call_after( - self._get_linearized_receipts_for_room.invalidate_many, (room_id,) - ) + txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) self.db_pool.simple_delete_txn( txn, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 5f38634f48..0cf450f81d 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -1498,7 +1498,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): room_id: str, event_id: str, user_id: str, - reason: str, + reason: Optional[str], content: JsonDict, received_ts: int, ) -> None: