summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-08-19 01:30:23 -0500
committerEric Eastwood <erice@element.io>2022-08-19 01:30:23 -0500
commit2fdbca62e4d69621be61963d7dece90fbc1adbcd (patch)
treedf94b97283fe43c9925f342bbf068c3dd2167c10
parentStart of optimizing (diff)
downloadsynapse-2fdbca62e4d69621be61963d7dece90fbc1adbcd.tar.xz
Add benchmark
-rw-r--r--synapse/storage/databases/main/events_worker.py12
-rw-r--r--tests/storage/databases/main/test_events_worker.py60
2 files changed, 58 insertions, 14 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 051810cab1..2d6bc53f20 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1455,7 +1455,7 @@ class EventsWorkerStore(SQLBaseStore):
     @trace
     @tag_args
     async def have_seen_events(
-        self, room_id: str, event_ids: Iterable[str]
+        self, room_id: str, event_ids: Collection[str]
     ) -> Set[str]:
         """Given a list of event ids, check if we have already processed them.
 
@@ -1480,14 +1480,14 @@ class EventsWorkerStore(SQLBaseStore):
         # the batches as big as possible.
 
         remaining_event_ids: Set[str] = set()
-        for chunk in batch_iter(event_ids, 1000):
+        for chunk in batch_iter(event_ids, 500):
             remaining_event_ids_from_chunk = await self._have_seen_events_dict(chunk)
             remaining_event_ids.update(remaining_event_ids_from_chunk)
 
         return remaining_event_ids
 
-    @cachedList(cached_method_name="have_seen_event", list_name="event_ids")
-    async def _have_seen_events_dict(self, event_ids: Iterable[str]) -> Set[str]:
+    # @cachedList(cached_method_name="have_seen_event", list_name="event_ids")
+    async def _have_seen_events_dict(self, event_ids: Collection[str]) -> set[str]:
         """Helper for have_seen_events
 
         Returns:
@@ -1501,10 +1501,10 @@ class EventsWorkerStore(SQLBaseStore):
             event_id for event_id in event_ids if event_id not in event_ids_in_cache
         }
         if not remaining_event_ids:
-            return []
+            return set()
 
         def have_seen_events_txn(txn: LoggingTransaction) -> None:
-            global remaining_event_ids
+            nonlocal remaining_event_ids
             # we deliberately do *not* query the database for room_id, to make the
             # query an index-only lookup on `events_event_id_key`.
             #
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
index f35b2ef0bb..ca65e77d3e 100644
--- a/tests/storage/databases/main/test_events_worker.py
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
 import json
 from contextlib import contextmanager
 from typing import Generator, List, Tuple
@@ -36,6 +37,8 @@ from synapse.util.async_helpers import yieldable_gather_results
 
 from tests import unittest
 
+logger = logging.getLogger(__name__)
+
 
 class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
     def prepare(self, reactor, clock, hs):
@@ -91,15 +94,56 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
             )
             self.event_ids.append(event_id)
 
-    # def test_benchmark(self):
-    #     with LoggingContext(name="test") as ctx:
-    #         res = self.get_success(
-    #             self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
-    #         )
-    #         self.assertEqual(res, {self.event_ids[0]})
+    def test_benchmark(self):
+        import time
+
+        room_id = "room123"
+        event_ids = []
+        setup_start_time = time.time()
+        with LoggingContext(name="test-setup") as ctx:
+            for i in range(50000):
+                event_json = {"type": f"test {i}", "room_id": room_id}
+                event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
+                event_id = event.event_id
+
+                event_ids.append(event_id)
+
+                self.get_success(
+                    self.store.db_pool.simple_insert(
+                        "events",
+                        {
+                            "event_id": event_id,
+                            "room_id": room_id,
+                            "topological_ordering": i,
+                            "stream_ordering": 123 + i,
+                            "type": event.type,
+                            "processed": True,
+                            "outlier": False,
+                        },
+                    )
+                )
 
-    #         # that should result in a single db query
-    #         self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
+        setup_end_time = time.time()
+        logger.info(
+            "Setup time: %s",
+            (setup_end_time - setup_start_time),
+        )
+
+        with LoggingContext(name="test") as ctx:
+            benchmark_start_time = time.time()
+            remaining_event_ids = self.get_success(
+                self.store.have_seen_events(room_id, event_ids)
+            )
+            benchmark_end_time = time.time()
+            logger.info("afewewf %s %s", benchmark_start_time, benchmark_end_time)
+            logger.info(
+                "Benchmark time: %s",
+                (benchmark_end_time - benchmark_start_time),
+            )
+            # self.assertEqual(remaining_event_ids, set())
+
+            # that should result in a many db queries
+            self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
 
     def test_simple(self):
         with LoggingContext(name="test") as ctx: