summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-08-18 22:14:05 -0500
committerEric Eastwood <erice@element.io>2022-08-18 22:14:05 -0500
commit7cb07d3a0338408a921120ac38d2971c312f316f (patch)
treeb538f8332a3242a00445691799e46f2c9866ba6a
parentAdd metrics to track `/messages` response time by room size (#13545) (diff)
downloadsynapse-7cb07d3a0338408a921120ac38d2971c312f316f.tar.xz
Start of optimizing
-rw-r--r--synapse/handlers/federation_event.py6
-rw-r--r--synapse/storage/databases/main/events_worker.py70
-rw-r--r--tests/storage/databases/main/test_events_worker.py10
3 files changed, 54 insertions, 32 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index f40b071a74..d58096f447 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1031,6 +1031,12 @@ class FederationEventHandler:
             InvalidResponseError: if the remote homeserver's response contains fields
                 of the wrong type.
         """
+
+        # It would be better if we could query the difference from our known
+        # state to the given `event_id` so the sending server doesn't have to
+        # send as much and we don't have to process so many events. For example
+        # in a room like #matrixhq, we get 200k events (77k state_events, 122k
+        # auth_events) from this and just the `have_seen_events` takes 20s.
         (
             state_event_ids,
             auth_event_ids,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 8a7cdb024d..051810cab1 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -54,7 +54,13 @@ from synapse.logging.context import (
     current_context,
     make_deferred_yieldable,
 )
-from synapse.logging.opentracing import start_active_span, tag_args, trace
+from synapse.logging.opentracing import (
+    SynapseTags,
+    set_tag,
+    start_active_span,
+    tag_args,
+    trace,
+)
 from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
@@ -1462,44 +1468,43 @@ class EventsWorkerStore(SQLBaseStore):
             event_ids: events we are looking for
 
         Returns:
-            The set of events we have already seen.
+            The remaining set of events we haven't seen.
         """
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+            str(len(event_ids)),
+        )
 
         # @cachedList chomps lots of memory if you call it with a big list, so
         # we break it down. However, each batch requires its own index scan, so we make
         # the batches as big as possible.
 
-        results: Set[str] = set()
-        for chunk in batch_iter(event_ids, 500):
-            r = await self._have_seen_events_dict(
-                [(room_id, event_id) for event_id in chunk]
-            )
-            results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)
+        remaining_event_ids: Set[str] = set()
+        for chunk in batch_iter(event_ids, 1000):
+            remaining_event_ids_from_chunk = await self._have_seen_events_dict(chunk)
+            remaining_event_ids.update(remaining_event_ids_from_chunk)
 
-        return results
+        return remaining_event_ids
 
-    @cachedList(cached_method_name="have_seen_event", list_name="keys")
-    async def _have_seen_events_dict(
-        self, keys: Collection[Tuple[str, str]]
-    ) -> Dict[Tuple[str, str], bool]:
+    @cachedList(cached_method_name="have_seen_event", list_name="event_ids")
+    async def _have_seen_events_dict(self, event_ids: Iterable[str]) -> Set[str]:
         """Helper for have_seen_events
 
         Returns:
-             a dict {(room_id, event_id)-> bool}
+            The remaining set of events we haven't seen.
         """
-        # if the event cache contains the event, obviously we've seen it.
 
-        cache_results = {
-            (rid, eid)
-            for (rid, eid) in keys
-            if await self._get_event_cache.contains((eid,))
+        # if the event cache contains the event, obviously we've seen it.
+        event_entry_map = self._get_events_from_local_cache(event_ids)
+        event_ids_in_cache = event_entry_map.keys()
+        remaining_event_ids = {
+            event_id for event_id in event_ids if event_id not in event_ids_in_cache
         }
-        results = dict.fromkeys(cache_results, True)
-        remaining = [k for k in keys if k not in cache_results]
-        if not remaining:
-            return results
+        if not remaining_event_ids:
+            return []
 
         def have_seen_events_txn(txn: LoggingTransaction) -> None:
+            global remaining_event_ids
             # we deliberately do *not* query the database for room_id, to make the
             # query an index-only lookup on `events_event_id_key`.
             #
@@ -1507,23 +1512,24 @@ class EventsWorkerStore(SQLBaseStore):
 
             sql = "SELECT event_id FROM events AS e WHERE "
             clause, args = make_in_list_sql_clause(
-                txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
+                txn.database_engine, "e.event_id", remaining_event_ids
             )
             txn.execute(sql + clause, args)
-            found_events = {eid for eid, in txn}
+            found_event_ids = {eid for eid, in txn}
 
-            # ... and then we can update the results for each key
-            results.update(
-                {(rid, eid): (eid in found_events) for (rid, eid) in remaining}
-            )
+            remaining_event_ids = {
+                event_id
+                for event_id in remaining_event_ids
+                if event_id not in found_event_ids
+            }
 
         await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
-        return results
+        return remaining_event_ids
 
     @cached(max_entries=100000, tree=True)
     async def have_seen_event(self, room_id: str, event_id: str) -> bool:
-        res = await self._have_seen_events_dict(((room_id, event_id),))
-        return res[(room_id, event_id)]
+        remaining_event_ids = await self._have_seen_events_dict({event_id})
+        return event_id not in remaining_event_ids
 
     def _get_current_state_event_counts_txn(
         self, txn: LoggingTransaction, room_id: str
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
index 46d829b062..f35b2ef0bb 100644
--- a/tests/storage/databases/main/test_events_worker.py
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -91,6 +91,16 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
             )
             self.event_ids.append(event_id)
 
+    # def test_benchmark(self):
+    #     with LoggingContext(name="test") as ctx:
+    #         res = self.get_success(
+    #             self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
+    #         )
+    #         self.assertEqual(res, {self.event_ids[0]})
+
+    #         # that should result in a single db query
+    #         self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
+
     def test_simple(self):
         with LoggingContext(name="test") as ctx:
             res = self.get_success(