diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e391338406..10b5dad030 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1078,7 +1078,6 @@ class EventCreationHandler:
else:
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
- logger.info("allow_no_prev_events=%s", allow_no_prev_events)
# Do a quick sanity check here, rather than waiting until we've created the
# event and then try to auth it (which fails with a somewhat confusing "No
# create event in auth events")
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index aabf3dbba0..53646b978a 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -223,9 +223,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
- logger.info("_invalidate_caches_for_event event_id=%s", event_id)
+ logger.info(
+ "CacheInvalidationWorkerStore _invalidate_caches_for_event room_id=%s event_id=%s",
+ room_id,
+ event_id,
+ )
+ logger.info(
+ "CacheInvalidationWorkerStore self.have_seen_event=%s", self.have_seen_event
+ )
self._invalidate_local_get_event_cache(event_id)
- self.have_seen_event.invalidate((room_id, event_id))
+ self.have_seen_event.invalidate(((room_id, event_id),))
self.get_latest_event_ids_in_room.invalidate((room_id,))
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 5932668f2f..368e9b47e9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -434,6 +434,24 @@ class PersistEventsStore:
self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+ for event, _ in events_and_contexts:
+ # We expect events to be persisted by this point
+ assert event.internal_metadata.stream_ordering
+
+ relation = relation_from_event(event)
+ self.store._invalidate_caches_for_event(
+ stream_ordering=event.internal_metadata.stream_ordering,
+ event_id=event.event_id,
+ room_id=event.room_id,
+ etype=event.type,
+ state_key=None, # event.state_key,
+ # TODO
+ redacts=None,
+ relates_to=relation.parent_id if relation else None,
+ # TODO
+ backfilled=False,
+ )
+
self._persist_transaction_ids_txn(txn, events_and_contexts)
# Insert into event_to_state_groups.
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 9f6b1fcef1..debe8e5f3f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1453,7 +1453,7 @@ class EventsWorkerStore(SQLBaseStore):
@trace
@tag_args
async def have_seen_events(
- self, room_id: str, event_ids: Iterable[str]
+ self, room_id: str, event_ids: Collection[str]
) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.
@@ -1468,6 +1468,7 @@ class EventsWorkerStore(SQLBaseStore):
Returns:
The set of events we have already seen.
"""
+ logger.info("have_seen_events room_id=%s event_ids=%s", room_id, event_ids)
# @cachedList chomps lots of memory if you call it with a big list, so
# we break it down. However, each batch requires its own index scan, so we make
@@ -1491,6 +1492,7 @@ class EventsWorkerStore(SQLBaseStore):
Returns:
a dict {(room_id, event_id)-> bool}
"""
+ logger.info("_have_seen_events_dict keys=%s", keys)
# if the event cache contains the event, obviously we've seen it.
cache_results = {
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index 6425f851ea..36b05fc344 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -383,8 +383,14 @@ class DeferredCache(Generic[KT, VT]):
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
+ import logging
+
+ logger = logging.getLogger(__name__)
+ logger.info("DeferredCache before=%s", self.cache.len())
+ logger.info("DeferredCache invalidate key=%s", key)
self.check_thread()
self.cache.del_multi(key)
+ logger.info("DeferredCache after=%s", self.cache.len())
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned for
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index aa93109d13..5a745eb8c5 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -511,6 +511,7 @@ class LruCache(Generic[KT, VT]):
callbacks,
prune_unread_entries,
)
+ logger.info("LruCache add_node key=%s value=%s", key, value)
cache[key] = node
if size_callback:
@@ -722,7 +723,12 @@ class LruCache(Generic[KT, VT]):
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
+ logger.info(
+ "LruCache cache values before pop %s",
+ {node.key: node.value for node in cache.values()},
+ )
popped = cache.pop(key, None)
+ logger.info("LruCache cache_del_multi key=%s popped=%s", key, popped)
if popped is None:
return
# for each deleted node, we now need to remove it from the linked list
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
index 6602dffea0..158ad1f439 100644
--- a/tests/storage/databases/main/test_events_worker.py
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -20,9 +20,6 @@ from twisted.enterprise.adbapi import ConnectionPool
from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
from twisted.test.proto_helpers import MemoryReactor
-import synapse.rest.admin
-import synapse.rest.client.login
-import synapse.rest.client.room
from synapse.api.room_versions import EventFormatVersions, RoomVersions
from synapse.events import make_event_from_dict
from synapse.logging.context import LoggingContext
@@ -36,76 +33,47 @@ from synapse.storage.databases.main.events_worker import (
from synapse.storage.types import Connection
from synapse.util import Clock
from synapse.util.async_helpers import yieldable_gather_results
-from tests.test_utils.event_injection import create_event
from tests import unittest
+from tests.test_utils.event_injection import create_event, inject_event
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
servlets = [
- synapse.rest.admin.register_servlets,
- synapse.rest.client.login.register_servlets,
- synapse.rest.client.room.register_servlets,
+ admin.register_servlets,
+ room.register_servlets,
+ login.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.hs = hs
self.store: EventsWorkerStore = hs.get_datastores().main
- # insert some test data
- for rid in ("room1", "room2"):
- self.get_success(
- self.store.db_pool.simple_insert(
- "rooms",
- {"room_id": rid, "room_version": 4},
- )
- )
+ self.user = self.register_user("user", "pass")
+ self.token = self.login(self.user, "pass")
+ self.room_id = self.helper.create_room_as(self.user, tok=self.token)
self.event_ids: List[str] = []
- for idx, rid in enumerate(
- (
- "room1",
- "room1",
- "room1",
- "room2",
- )
- ):
- event_json = {"type": f"test {idx}", "room_id": rid}
- event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
- event_id = event.event_id
-
- self.get_success(
- self.store.db_pool.simple_insert(
- "events",
- {
- "event_id": event_id,
- "room_id": rid,
- "topological_ordering": idx,
- "stream_ordering": idx,
- "type": event.type,
- "processed": True,
- "outlier": False,
- },
- )
- )
- self.get_success(
- self.store.db_pool.simple_insert(
- "event_json",
- {
- "event_id": event_id,
- "room_id": rid,
- "json": json.dumps(event_json),
- "internal_metadata": "{}",
- "format_version": 3,
- },
+ for i in range(3):
+ event = self.get_success(
+ inject_event(
+ hs,
+ room_version=RoomVersions.V7.identifier,
+ room_id=self.room_id,
+ sender=self.user,
+ type="test_event_type",
+ content={"body": f"foobarbaz{i}"},
)
)
- self.event_ids.append(event_id)
+
+ self.event_ids.append(event.event_id)
def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
- self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
+ self.store.have_seen_events(
+ self.room_id, [self.event_ids[0], "eventdoesnotexist"]
+ )
)
self.assertEqual(res, {self.event_ids[0]})
@@ -115,7 +83,9 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
- self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
+ self.store.have_seen_events(
+ self.room_id, [self.event_ids[0], "eventdoesnotexist"]
+ )
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
@@ -127,46 +97,53 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(
- self.store.have_seen_events("room1", [self.event_ids[0]])
+ self.store.have_seen_events(self.room_id, [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
def test_persisting_event_invalidates_cache(self):
- with LoggingContext(name="test") as ctx:
- alice = self.register_user("alice", "pass")
- alice_token = self.login("alice", "pass")
- room_id = self.helper.create_room_as(alice, tok=alice_token)
-
- event, event_context = self.get_success(
- create_event(
- self.hs,
- room_id=room_id,
- room_version="6",
- sender=alice,
- type="test_event_type",
- content={"body": "foobarbaz"},
- )
+ event, event_context = self.get_success(
+ create_event(
+ self.hs,
+ room_id=self.room_id,
+ sender=self.user,
+ type="test_event_type",
+ content={"body": "garply"},
)
+ )
- # Check first `have_seen_events` for an event we have not seen yet
- # to prime the cache with a `false`.
+ with LoggingContext(name="test") as ctx:
+ # First, check `have_seen_event` for an event we have not seen yet
+ # to prime the cache with a `false` value.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, set())
- # that should result in a single db query to lookup if we have the
- # event that we have not persisted yet.
+ # That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
- persistence = self.hs.get_storage_controllers().persistence
- self.get_success(
- persistence.persist_event(
- event,
- event_context,
- )
+ # Persist the event which should invalidate or prefill the
+ # `have_seen_event` cache so we don't return stale values.
+ persistence = self.hs.get_storage_controllers().persistence
+ self.get_success(
+ persistence.persist_event(
+ event,
+ event_context,
)
+ )
+
+ with LoggingContext(name="test") as ctx:
+ # Check `have_seen_event` again and we should see the updated fact
+ # that we have now seen the event after persisting it.
+ res = self.get_success(
+ self.store.have_seen_events(event.room_id, [event.event_id])
+ )
+ self.assertEqual(res, {event.event_id})
+
+ # That should result in a single db query to lookup
+ self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
class EventCacheTestCase(unittest.HomeserverTestCase):
|