summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-09-21 15:45:08 -0500
committerEric Eastwood <erice@element.io>2022-09-21 15:45:08 -0500
commitdd4be2453f59926af005598b2d168654d13cadd0 (patch)
treed2c66daa283e65eeeb4da6ce70cd05c2f2903361
parentScratch changes for fix have_seen_event not being invalidated (diff)
downloadsynapse-dd4be2453f59926af005598b2d168654d13cadd0.tar.xz
Fix have_seen_event cache not being invalidated when we persist the event
Fix for
https://github.com/matrix-org/synapse/issues/13856

Fixed by calling `_invalidate_caches_for_event`
when we persist an event.

And an additional fix in `_invalidate_caches_for_event`
to make sure it uses the correct cache key. This seems
like it would be an easy foot-gun for any `tree=True`
cache.

Wrong:
```py
self.have_seen_event.invalidate((room_id, event_id))
```

Correct:
```py
self.have_seen_event.invalidate(((room_id, event_id),))
```
-rw-r--r--synapse/handlers/message.py1
-rw-r--r--synapse/storage/databases/main/cache.py11
-rw-r--r--synapse/storage/databases/main/events.py18
-rw-r--r--synapse/storage/databases/main/events_worker.py4
-rw-r--r--synapse/util/caches/deferred_cache.py6
-rw-r--r--synapse/util/caches/lrucache.py6
-rw-r--r--tests/storage/databases/main/test_events_worker.py135
7 files changed, 98 insertions, 83 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e391338406..10b5dad030 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1078,7 +1078,6 @@ class EventCreationHandler:
         else:
             prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
 
-        logger.info("allow_no_prev_events=%s", allow_no_prev_events)
         # Do a quick sanity check here, rather than waiting until we've created the
         # event and then try to auth it (which fails with a somewhat confusing "No
         # create event in auth events")
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index aabf3dbba0..53646b978a 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -223,9 +223,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         # This invalidates any local in-memory cached event objects, the original
         # process triggering the invalidation is responsible for clearing any external
         # cached objects.
-        logger.info("_invalidate_caches_for_event event_id=%s", event_id)
+        logger.info(
+            "CacheInvalidationWorkerStore _invalidate_caches_for_event room_id=%s event_id=%s",
+            room_id,
+            event_id,
+        )
+        logger.info(
+            "CacheInvalidationWorkerStore self.have_seen_event=%s", self.have_seen_event
+        )
         self._invalidate_local_get_event_cache(event_id)
-        self.have_seen_event.invalidate((room_id, event_id))
+        self.have_seen_event.invalidate(((room_id, event_id),))
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 5932668f2f..368e9b47e9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -434,6 +434,24 @@ class PersistEventsStore:
 
         self._store_event_txn(txn, events_and_contexts=events_and_contexts)
 
+        for event, _ in events_and_contexts:
+            # We expect events to be persisted by this point
+            assert event.internal_metadata.stream_ordering
+
+            relation = relation_from_event(event)
+            self.store._invalidate_caches_for_event(
+                stream_ordering=event.internal_metadata.stream_ordering,
+                event_id=event.event_id,
+                room_id=event.room_id,
+                etype=event.type,
+                state_key=None,  # event.state_key,
+                # TODO
+                redacts=None,
+                relates_to=relation.parent_id if relation else None,
+                # TODO
+                backfilled=False,
+            )
+
         self._persist_transaction_ids_txn(txn, events_and_contexts)
 
         # Insert into event_to_state_groups.
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 9f6b1fcef1..debe8e5f3f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1453,7 +1453,7 @@ class EventsWorkerStore(SQLBaseStore):
     @trace
     @tag_args
     async def have_seen_events(
-        self, room_id: str, event_ids: Iterable[str]
+        self, room_id: str, event_ids: Collection[str]
     ) -> Set[str]:
         """Given a list of event ids, check if we have already processed them.
 
@@ -1468,6 +1468,7 @@ class EventsWorkerStore(SQLBaseStore):
         Returns:
             The set of events we have already seen.
         """
+        logger.info("have_seen_events room_id=%s event_ids=%s", room_id, event_ids)
 
         # @cachedList chomps lots of memory if you call it with a big list, so
         # we break it down. However, each batch requires its own index scan, so we make
@@ -1491,6 +1492,7 @@ class EventsWorkerStore(SQLBaseStore):
         Returns:
              a dict {(room_id, event_id)-> bool}
         """
+        logger.info("_have_seen_events_dict keys=%s", keys)
         # if the event cache contains the event, obviously we've seen it.
 
         cache_results = {
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index 6425f851ea..36b05fc344 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -383,8 +383,14 @@ class DeferredCache(Generic[KT, VT]):
         may be of lower cardinality than the TreeCache - in which case the whole
         subtree is deleted.
         """
+        import logging
+
+        logger = logging.getLogger(__name__)
+        logger.info("DeferredCache before=%s", self.cache.len())
+        logger.info("DeferredCache invalidate key=%s", key)
         self.check_thread()
         self.cache.del_multi(key)
+        logger.info("DeferredCache after=%s", self.cache.len())
 
         # if we have a pending lookup for this key, remove it from the
         # _pending_deferred_cache, which will (a) stop it being returned for
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index aa93109d13..5a745eb8c5 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -511,6 +511,7 @@ class LruCache(Generic[KT, VT]):
                 callbacks,
                 prune_unread_entries,
             )
+            logger.info("LruCache add_node key=%s value=%s", key, value)
             cache[key] = node
 
             if size_callback:
@@ -722,7 +723,12 @@ class LruCache(Generic[KT, VT]):
             may be of lower cardinality than the TreeCache - in which case the whole
             subtree is deleted.
             """
+            logger.info(
+                "LruCache cache values before pop %s",
+                {node.key: node.value for node in cache.values()},
+            )
             popped = cache.pop(key, None)
+            logger.info("LruCache cache_del_multi key=%s popped=%s", key, popped)
             if popped is None:
                 return
             # for each deleted node, we now need to remove it from the linked list
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
index 6602dffea0..158ad1f439 100644
--- a/tests/storage/databases/main/test_events_worker.py
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -20,9 +20,6 @@ from twisted.enterprise.adbapi import ConnectionPool
 from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
 from twisted.test.proto_helpers import MemoryReactor
 
-import synapse.rest.admin
-import synapse.rest.client.login
-import synapse.rest.client.room
 from synapse.api.room_versions import EventFormatVersions, RoomVersions
 from synapse.events import make_event_from_dict
 from synapse.logging.context import LoggingContext
@@ -36,76 +33,47 @@ from synapse.storage.databases.main.events_worker import (
 from synapse.storage.types import Connection
 from synapse.util import Clock
 from synapse.util.async_helpers import yieldable_gather_results
-from tests.test_utils.event_injection import create_event
 
 from tests import unittest
+from tests.test_utils.event_injection import create_event, inject_event
 
 
 class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
     servlets = [
-        synapse.rest.admin.register_servlets,
-        synapse.rest.client.login.register_servlets,
-        synapse.rest.client.room.register_servlets,
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
     ]
 
     def prepare(self, reactor, clock, hs):
         self.hs = hs
         self.store: EventsWorkerStore = hs.get_datastores().main
 
-        # insert some test data
-        for rid in ("room1", "room2"):
-            self.get_success(
-                self.store.db_pool.simple_insert(
-                    "rooms",
-                    {"room_id": rid, "room_version": 4},
-                )
-            )
+        self.user = self.register_user("user", "pass")
+        self.token = self.login(self.user, "pass")
+        self.room_id = self.helper.create_room_as(self.user, tok=self.token)
 
         self.event_ids: List[str] = []
-        for idx, rid in enumerate(
-            (
-                "room1",
-                "room1",
-                "room1",
-                "room2",
-            )
-        ):
-            event_json = {"type": f"test {idx}", "room_id": rid}
-            event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
-            event_id = event.event_id
-
-            self.get_success(
-                self.store.db_pool.simple_insert(
-                    "events",
-                    {
-                        "event_id": event_id,
-                        "room_id": rid,
-                        "topological_ordering": idx,
-                        "stream_ordering": idx,
-                        "type": event.type,
-                        "processed": True,
-                        "outlier": False,
-                    },
-                )
-            )
-            self.get_success(
-                self.store.db_pool.simple_insert(
-                    "event_json",
-                    {
-                        "event_id": event_id,
-                        "room_id": rid,
-                        "json": json.dumps(event_json),
-                        "internal_metadata": "{}",
-                        "format_version": 3,
-                    },
+        for i in range(3):
+            event = self.get_success(
+                inject_event(
+                    hs,
+                    room_version=RoomVersions.V7.identifier,
+                    room_id=self.room_id,
+                    sender=self.user,
+                    type="test_event_type",
+                    content={"body": f"foobarbaz{i}"},
                 )
             )
-            self.event_ids.append(event_id)
+
+            self.event_ids.append(event.event_id)
 
     def test_simple(self):
         with LoggingContext(name="test") as ctx:
             res = self.get_success(
-                self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
+                self.store.have_seen_events(
+                    self.room_id, [self.event_ids[0], "eventdoesnotexist"]
+                )
             )
             self.assertEqual(res, {self.event_ids[0]})
 
@@ -115,7 +83,9 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
         # a second lookup of the same events should cause no queries
         with LoggingContext(name="test") as ctx:
             res = self.get_success(
-                self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
+                self.store.have_seen_events(
+                    self.room_id, [self.event_ids[0], "eventdoesnotexist"]
+                )
             )
             self.assertEqual(res, {self.event_ids[0]})
             self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
@@ -127,46 +97,53 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
         # looking it up should now cause no db hits
         with LoggingContext(name="test") as ctx:
             res = self.get_success(
-                self.store.have_seen_events("room1", [self.event_ids[0]])
+                self.store.have_seen_events(self.room_id, [self.event_ids[0]])
             )
             self.assertEqual(res, {self.event_ids[0]})
             self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
 
     def test_persisting_event_invalidates_cache(self):
-        with LoggingContext(name="test") as ctx:
-            alice = self.register_user("alice", "pass")
-            alice_token = self.login("alice", "pass")
-            room_id = self.helper.create_room_as(alice, tok=alice_token)
-
-            event, event_context = self.get_success(
-                create_event(
-                    self.hs,
-                    room_id=room_id,
-                    room_version="6",
-                    sender=alice,
-                    type="test_event_type",
-                    content={"body": "foobarbaz"},
-                )
+        event, event_context = self.get_success(
+            create_event(
+                self.hs,
+                room_id=self.room_id,
+                sender=self.user,
+                type="test_event_type",
+                content={"body": "garply"},
             )
+        )
 
-            # Check first `have_seen_events` for an event we have not seen yet
-            # to prime the cache with a `false`.
+        with LoggingContext(name="test") as ctx:
+            # First, check `have_seen_event` for an event we have not seen yet
+            # to prime the cache with a `false` value.
             res = self.get_success(
                 self.store.have_seen_events(event.room_id, [event.event_id])
             )
             self.assertEqual(res, set())
 
-            # that should result in a single db query to lookup if we have the
-            # event that we have not persisted yet.
+            # That should result in a single db query to lookup
             self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
 
-            persistence = self.hs.get_storage_controllers().persistence
-            self.get_success(
-                persistence.persist_event(
-                    event,
-                    event_context,
-                )
+        # Persist the event which should invalidate or prefill the
+        # `have_seen_event` cache so we don't return stale values.
+        persistence = self.hs.get_storage_controllers().persistence
+        self.get_success(
+            persistence.persist_event(
+                event,
+                event_context,
             )
+        )
+
+        with LoggingContext(name="test") as ctx:
+            # Check `have_seen_event` again and we should see the updated fact
+            # that we have now seen the event after persisting it.
+            res = self.get_success(
+                self.store.have_seen_events(event.room_id, [event.event_id])
+            )
+            self.assertEqual(res, {event.event_id})
+
+            # That should result in a single db query to lookup
+            self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
 
 
 class EventCacheTestCase(unittest.HomeserverTestCase):