diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index ecc1f935e2..c57ae5ef15 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -168,10 +168,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
backfilled,
):
self._invalidate_get_event_cache(event_id)
+ self.have_seen_event.invalidate((room_id, event_id))
self.get_latest_event_ids_in_room.invalidate((room_id,))
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
+ self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
@@ -184,8 +185,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self.get_invited_rooms_for_local_user.invalidate((state_key,))
if relates_to:
- self.get_relations_for_event.invalidate_many((relates_to,))
- self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
+ self.get_relations_for_event.invalidate((relates_to,))
+ self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index fd87ba71ab..18f07d96dc 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1282,7 +1282,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
)
txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
- txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
+ txn.call_after(self._get_cached_user_device.invalidate, (user_id,))
txn.call_after(
self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
)
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 5845322118..d1237c65cc 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -860,7 +860,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
not be deleted.
"""
txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ self.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fd25c8112d..897fa06639 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1748,9 +1748,9 @@ class PersistEventsStore:
},
)
- txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,))
+ txn.call_after(self.store.get_relations_for_event.invalidate, (parent_id,))
txn.call_after(
- self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
+ self.store.get_aggregation_groups_for_event.invalidate, (parent_id,)
)
if rel_type == RelationTypes.REPLACE:
@@ -1903,7 +1903,7 @@ class PersistEventsStore:
for user_id in user_ids:
txn.call_after(
- self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)
@@ -1917,7 +1917,7 @@ class PersistEventsStore:
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
- self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id,),
)
txn.execute(
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6963bbf7f4..403a5ddaba 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -22,6 +22,7 @@ from typing import (
Iterable,
List,
Optional,
+ Set,
Tuple,
overload,
)
@@ -55,7 +56,7 @@ from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
@@ -1045,32 +1046,74 @@ class EventsWorkerStore(SQLBaseStore):
return {r["event_id"] for r in rows}
- async def have_seen_events(self, event_ids):
+ async def have_seen_events(
+ self, room_id: str, event_ids: Iterable[str]
+ ) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.
+ The room_id is only used to structure the cache (so that it can later be
+ invalidated by room_id) - there is no guarantee that the events are actually
+ in the room in question.
+
Args:
- event_ids (iterable[str]):
+ room_id: Room we are polling
+ event_ids: events we are looking for
Returns:
set[str]: The events we have already seen.
"""
+ res = await self._have_seen_events_dict(
+ (room_id, event_id) for event_id in event_ids
+ )
+ return {eid for ((_rid, eid), have_event) in res.items() if have_event}
+
+ @cachedList("have_seen_event", "keys")
+ async def _have_seen_events_dict(
+ self, keys: Iterable[Tuple[str, str]]
+ ) -> Dict[Tuple[str, str], bool]:
+ """Helper for have_seen_events
+
+ Returns:
+ a dict {(room_id, event_id)-> bool}
+ """
# if the event cache contains the event, obviously we've seen it.
- results = {x for x in event_ids if self._get_event_cache.contains(x)}
- def have_seen_events_txn(txn, chunk):
- sql = "SELECT event_id FROM events as e WHERE "
+ cache_results = {
+ (rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
+ }
+ results = {x: True for x in cache_results}
+
+ def have_seen_events_txn(txn, chunk: Tuple[Tuple[str, str], ...]):
+ # we deliberately do *not* query the database for room_id, to make the
+ # query an index-only lookup on `events_event_id_key`.
+ #
+ # We therefore pull the events from the database into a set...
+
+ sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
- txn.database_engine, "e.event_id", chunk
+ txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk]
)
txn.execute(sql + clause, args)
- results.update(row[0] for row in txn)
+ found_events = {eid for eid, in txn}
- for chunk in batch_iter((x for x in event_ids if x not in results), 100):
+ # ... and then we can update the results for each row in the batch
+ results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk})
+
+ # each batch requires its own index scan, so we make the batches as big as
+ # possible.
+ for chunk in batch_iter((k for k in keys if k not in cache_results), 500):
await self.db_pool.runInteraction(
"have_seen_events", have_seen_events_txn, chunk
)
+
return results
+ @cached(max_entries=100000, tree=True)
+ async def have_seen_event(self, room_id: str, event_id: str):
+ # this only exists for the benefit of the @cachedList descriptor on
+ # _have_seen_events_dict
+ raise NotImplementedError()
+
def _get_current_state_event_counts_txn(self, txn, room_id):
"""
See get_current_state_event_counts.
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index c584868188..2fa945d171 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -143,6 +143,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"created_ts",
"quarantined_by",
"url_cache",
+ "safe_from_quarantine",
),
allow_none=True,
desc="get_local_media",
@@ -296,12 +297,12 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_local_media",
)
- async def mark_local_media_as_safe(self, media_id: str) -> None:
- """Mark a local media as safe from quarantining."""
+ async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None:
+ """Mark a local media as safe or unsafe from quarantining."""
await self.db_pool.simple_update_one(
table="local_media_repository",
keyvalues={"media_id": media_id},
- updatevalues={"safe_from_quarantine": True},
+ updatevalues={"safe_from_quarantine": safe},
desc="mark_local_media_as_safe",
)
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 8f83748b5e..7fb7780d0f 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -16,14 +16,14 @@ import logging
from typing import Any, List, Set, Tuple
from synapse.api.errors import SynapseError
-from synapse.storage._base import SQLBaseStore
+from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken
logger = logging.getLogger(__name__)
-class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
+class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
@@ -203,8 +203,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
"DELETE FROM event_to_state_groups "
"WHERE event_id IN (SELECT event_id from events_to_purge)"
)
- for event_id, _ in event_rows:
- txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
# Delete all remote non-state events
for table in (
@@ -283,6 +281,20 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")
+ for event_id, should_delete in event_rows:
+ self._invalidate_cache_and_stream(
+ txn, self._get_state_group_for_event, (event_id,)
+ )
+
+ # XXX: This is racy, since have_seen_events could be called between the
+ # transaction completing and the invalidation running. On the other hand,
+ # that's no different to calling `have_seen_events` just before the
+ # event is deleted from the database.
+ if should_delete:
+ self._invalidate_cache_and_stream(
+ txn, self.have_seen_event, (room_id, event_id)
+ )
+
logger.info("[purge] done")
return referenced_state_groups
@@ -422,7 +434,11 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)
- # TODO: we could probably usefully do a bunch of cache invalidation here
+ # TODO: we could probably usefully do a bunch more cache invalidation here
+
+ # XXX: as with purge_history, this is racy, but no worse than other races
+ # that already exist.
+ self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
logger.info("[purge] done")
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3647276acb..edeaacd7a6 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -460,7 +460,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
- self._get_linearized_receipts_for_room.invalidate_many((room_id,))
+ self._get_linearized_receipts_for_room.invalidate((room_id,))
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
@@ -659,9 +659,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
)
txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type))
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(
- self._get_linearized_receipts_for_room.invalidate_many, (room_id,)
- )
+ txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
self.db_pool.simple_delete_txn(
txn,
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 5f38634f48..0cf450f81d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1498,7 +1498,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
room_id: str,
event_id: str,
user_id: str,
- reason: str,
+ reason: Optional[str],
content: JsonDict,
received_ts: int,
) -> None:
|