summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/9953.feature1
-rw-r--r--changelog.d/9973.feature1
-rw-r--r--changelog.d/9973.misc1
-rw-r--r--synapse/handlers/federation.py12
-rw-r--r--synapse/storage/databases/main/cache.py1
-rw-r--r--synapse/storage/databases/main/events_worker.py61
-rw-r--r--synapse/storage/databases/main/purge_events.py26
-rw-r--r--tests/storage/databases/__init__.py13
-rw-r--r--tests/storage/databases/main/__init__.py13
-rw-r--r--tests/storage/databases/main/test_events_worker.py96
10 files changed, 205 insertions, 20 deletions
diff --git a/changelog.d/9953.feature b/changelog.d/9953.feature
new file mode 100644
index 0000000000..6b3d1adc70
--- /dev/null
+++ b/changelog.d/9953.feature
@@ -0,0 +1 @@
+Improve performance of incoming federation transactions in large rooms.
diff --git a/changelog.d/9973.feature b/changelog.d/9973.feature
new file mode 100644
index 0000000000..6b3d1adc70
--- /dev/null
+++ b/changelog.d/9973.feature
@@ -0,0 +1 @@
+Improve performance of incoming federation transactions in large rooms.
diff --git a/changelog.d/9973.misc b/changelog.d/9973.misc
deleted file mode 100644
index 7f22d42291..0000000000
--- a/changelog.d/9973.misc
+++ /dev/null
@@ -1 +0,0 @@
-Make `LruCache.invalidate` support tree invalidation, and remove `invalidate_many`.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index bf11315251..49ed7cabcc 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -577,7 +577,9 @@ class FederationHandler(BaseHandler):
 
         # Fetch the state events from the DB, and check we have the auth events.
         event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
-        auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
+        auth_events_in_store = await self.store.have_seen_events(
+            room_id, auth_event_ids
+        )
 
         # Check for missing events. We handle state and auth event seperately,
         # as we want to pull the state from the DB, but we don't for the auth
@@ -610,7 +612,7 @@ class FederationHandler(BaseHandler):
 
             if missing_auth_events:
                 auth_events_in_store = await self.store.have_seen_events(
-                    missing_auth_events
+                    room_id, missing_auth_events
                 )
                 missing_auth_events.difference_update(auth_events_in_store)
 
@@ -710,7 +712,7 @@ class FederationHandler(BaseHandler):
 
         missing_auth_events = set(auth_event_ids) - fetched_events.keys()
         missing_auth_events.difference_update(
-            await self.store.have_seen_events(missing_auth_events)
+            await self.store.have_seen_events(room_id, missing_auth_events)
         )
         logger.debug("We are also missing %i auth events", len(missing_auth_events))
 
@@ -2475,7 +2477,7 @@ class FederationHandler(BaseHandler):
         #
         # we start by checking if they are in the store, and then try calling /event_auth/.
         if missing_auth:
-            have_events = await self.store.have_seen_events(missing_auth)
+            have_events = await self.store.have_seen_events(event.room_id, missing_auth)
             logger.debug("Events %s are in the store", have_events)
             missing_auth.difference_update(have_events)
 
@@ -2494,7 +2496,7 @@ class FederationHandler(BaseHandler):
                     return context
 
                 seen_remotes = await self.store.have_seen_events(
-                    [e.event_id for e in remote_auth_chain]
+                    event.room_id, [e.event_id for e in remote_auth_chain]
                 )
 
                 for e in remote_auth_chain:
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index f7872501a0..c57ae5ef15 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -168,6 +168,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         backfilled,
     ):
         self._invalidate_get_event_cache(event_id)
+        self.have_seen_event.invalidate((room_id, event_id))
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6963bbf7f4..403a5ddaba 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -22,6 +22,7 @@ from typing import (
     Iterable,
     List,
     Optional,
+    Set,
     Tuple,
     overload,
 )
@@ -55,7 +56,7 @@ from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import JsonDict, get_domain_from_id
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
@@ -1045,32 +1046,74 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {r["event_id"] for r in rows}
 
-    async def have_seen_events(self, event_ids):
+    async def have_seen_events(
+        self, room_id: str, event_ids: Iterable[str]
+    ) -> Set[str]:
         """Given a list of event ids, check if we have already processed them.
 
+        The room_id is only used to structure the cache (so that it can later be
+        invalidated by room_id) - there is no guarantee that the events are actually
+        in the room in question.
+
         Args:
-            event_ids (iterable[str]):
+            room_id: Room we are polling
+            event_ids: events we are looking for
 
         Returns:
             set[str]: The events we have already seen.
         """
+        res = await self._have_seen_events_dict(
+            (room_id, event_id) for event_id in event_ids
+        )
+        return {eid for ((_rid, eid), have_event) in res.items() if have_event}
+
+    @cachedList("have_seen_event", "keys")
+    async def _have_seen_events_dict(
+        self, keys: Iterable[Tuple[str, str]]
+    ) -> Dict[Tuple[str, str], bool]:
+        """Helper for have_seen_events
+
+        Returns:
+             a dict {(room_id, event_id)-> bool}
+        """
         # if the event cache contains the event, obviously we've seen it.
-        results = {x for x in event_ids if self._get_event_cache.contains(x)}
 
-        def have_seen_events_txn(txn, chunk):
-            sql = "SELECT event_id FROM events as e WHERE "
+        cache_results = {
+            (rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
+        }
+        results = {x: True for x in cache_results}
+
+        def have_seen_events_txn(txn, chunk: Tuple[Tuple[str, str], ...]):
+            # we deliberately do *not* query the database for room_id, to make the
+            # query an index-only lookup on `events_event_id_key`.
+            #
+            # We therefore pull the events from the database into a set...
+
+            sql = "SELECT event_id FROM events AS e WHERE "
             clause, args = make_in_list_sql_clause(
-                txn.database_engine, "e.event_id", chunk
+                txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk]
             )
             txn.execute(sql + clause, args)
-            results.update(row[0] for row in txn)
+            found_events = {eid for eid, in txn}
 
-        for chunk in batch_iter((x for x in event_ids if x not in results), 100):
+            # ... and then we can update the results for each row in the batch
+            results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk})
+
+        # each batch requires its own index scan, so we make the batches as big as
+        # possible.
+        for chunk in batch_iter((k for k in keys if k not in cache_results), 500):
             await self.db_pool.runInteraction(
                 "have_seen_events", have_seen_events_txn, chunk
             )
+
         return results
 
+    @cached(max_entries=100000, tree=True)
+    async def have_seen_event(self, room_id: str, event_id: str):
+        # this only exists for the benefit of the @cachedList descriptor on
+        # _have_seen_events_dict
+        raise NotImplementedError()
+
     def _get_current_state_event_counts_txn(self, txn, room_id):
         """
         See get_current_state_event_counts.
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 8f83748b5e..7fb7780d0f 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -16,14 +16,14 @@ import logging
 from typing import Any, List, Set, Tuple
 
 from synapse.api.errors import SynapseError
-from synapse.storage._base import SQLBaseStore
+from synapse.storage.databases.main import CacheInvalidationWorkerStore
 from synapse.storage.databases.main.state import StateGroupWorkerStore
 from synapse.types import RoomStreamToken
 
 logger = logging.getLogger(__name__)
 
 
-class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
+class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
     async def purge_history(
         self, room_id: str, token: str, delete_local_events: bool
     ) -> Set[int]:
@@ -203,8 +203,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
             "DELETE FROM event_to_state_groups "
             "WHERE event_id IN (SELECT event_id from events_to_purge)"
         )
-        for event_id, _ in event_rows:
-            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
 
         # Delete all remote non-state events
         for table in (
@@ -283,6 +281,20 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         # so make sure to keep this actually last.
         txn.execute("DROP TABLE events_to_purge")
 
+        for event_id, should_delete in event_rows:
+            self._invalidate_cache_and_stream(
+                txn, self._get_state_group_for_event, (event_id,)
+            )
+
+            # XXX: This is racy, since have_seen_events could be called between the
+            #    transaction completing and the invalidation running. On the other hand,
+            #    that's no different to calling `have_seen_events` just before the
+            #    event is deleted from the database.
+            if should_delete:
+                self._invalidate_cache_and_stream(
+                    txn, self.have_seen_event, (room_id, event_id)
+                )
+
         logger.info("[purge] done")
 
         return referenced_state_groups
@@ -422,7 +434,11 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         #       index on them. In any case we should be clearing out 'stream' tables
         #       periodically anyway (#5888)
 
-        # TODO: we could probably usefully do a bunch of cache invalidation here
+        # TODO: we could probably usefully do a bunch more cache invalidation here
+
+        # XXX: as with purge_history, this is racy, but no worse than other races
+        #   that already exist.
+        self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
 
         logger.info("[purge] done")
 
diff --git a/tests/storage/databases/__init__.py b/tests/storage/databases/__init__.py
new file mode 100644
index 0000000000..c24c7ecd92
--- /dev/null
+++ b/tests/storage/databases/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/storage/databases/main/__init__.py b/tests/storage/databases/main/__init__.py
new file mode 100644
index 0000000000..c24c7ecd92
--- /dev/null
+++ b/tests/storage/databases/main/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
new file mode 100644
index 0000000000..932970fd9a
--- /dev/null
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -0,0 +1,96 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+
+from synapse.logging.context import LoggingContext
+from synapse.storage.databases.main.events_worker import EventsWorkerStore
+
+from tests import unittest
+
+
+class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
+    def prepare(self, reactor, clock, hs):
+        self.store: EventsWorkerStore = hs.get_datastore()
+
+        # insert some test data
+        for rid in ("room1", "room2"):
+            self.get_success(
+                self.store.db_pool.simple_insert(
+                    "rooms",
+                    {"room_id": rid, "room_version": 4},
+                )
+            )
+
+        for idx, (rid, eid) in enumerate(
+            (
+                ("room1", "event10"),
+                ("room1", "event11"),
+                ("room1", "event12"),
+                ("room2", "event20"),
+            )
+        ):
+            self.get_success(
+                self.store.db_pool.simple_insert(
+                    "events",
+                    {
+                        "event_id": eid,
+                        "room_id": rid,
+                        "topological_ordering": idx,
+                        "stream_ordering": idx,
+                        "type": "test",
+                        "processed": True,
+                        "outlier": False,
+                    },
+                )
+            )
+            self.get_success(
+                self.store.db_pool.simple_insert(
+                    "event_json",
+                    {
+                        "event_id": eid,
+                        "room_id": rid,
+                        "json": json.dumps({"type": "test", "room_id": rid}),
+                        "internal_metadata": "{}",
+                        "format_version": 3,
+                    },
+                )
+            )
+
+    def test_simple(self):
+        with LoggingContext(name="test") as ctx:
+            res = self.get_success(
+                self.store.have_seen_events("room1", ["event10", "event19"])
+            )
+            self.assertEquals(res, {"event10"})
+
+            # that should result in a single db query
+            self.assertEquals(ctx.get_resource_usage().db_txn_count, 1)
+
+        # a second lookup of the same events should cause no queries
+        with LoggingContext(name="test") as ctx:
+            res = self.get_success(
+                self.store.have_seen_events("room1", ["event10", "event19"])
+            )
+            self.assertEquals(res, {"event10"})
+            self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)
+
+    def test_query_via_event_cache(self):
+        # fetch an event into the event cache
+        self.get_success(self.store.get_event("event10"))
+
+        # looking it up should now cause no db hits
+        with LoggingContext(name="test") as ctx:
+            res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
+            self.assertEquals(res, {"event10"})
+            self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)