diff options
author | Eric Eastwood <erice@element.io> | 2022-08-19 01:30:23 -0500 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2022-08-19 01:30:23 -0500 |
commit | 2fdbca62e4d69621be61963d7dece90fbc1adbcd (patch) | |
tree | df94b97283fe43c9925f342bbf068c3dd2167c10 | |
parent | Start of optimizing (diff) | |
download | synapse-2fdbca62e4d69621be61963d7dece90fbc1adbcd.tar.xz |
Add benchmark
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 12 | ||||
-rw-r--r-- | tests/storage/databases/main/test_events_worker.py | 60 |
2 files changed, 58 insertions, 14 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 051810cab1..2d6bc53f20 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1455,7 +1455,7 @@ class EventsWorkerStore(SQLBaseStore): @trace @tag_args async def have_seen_events( - self, room_id: str, event_ids: Iterable[str] + self, room_id: str, event_ids: Collection[str] ) -> Set[str]: """Given a list of event ids, check if we have already processed them. @@ -1480,14 +1480,14 @@ class EventsWorkerStore(SQLBaseStore): # the batches as big as possible. remaining_event_ids: Set[str] = set() - for chunk in batch_iter(event_ids, 1000): + for chunk in batch_iter(event_ids, 500): remaining_event_ids_from_chunk = await self._have_seen_events_dict(chunk) remaining_event_ids.update(remaining_event_ids_from_chunk) return remaining_event_ids - @cachedList(cached_method_name="have_seen_event", list_name="event_ids") - async def _have_seen_events_dict(self, event_ids: Iterable[str]) -> Set[str]: + # @cachedList(cached_method_name="have_seen_event", list_name="event_ids") + async def _have_seen_events_dict(self, event_ids: Collection[str]) -> set[str]: """Helper for have_seen_events Returns: @@ -1501,10 +1501,10 @@ class EventsWorkerStore(SQLBaseStore): event_id for event_id in event_ids if event_id not in event_ids_in_cache } if not remaining_event_ids: - return [] + return set() def have_seen_events_txn(txn: LoggingTransaction) -> None: - global remaining_event_ids + nonlocal remaining_event_ids # we deliberately do *not* query the database for room_id, to make the # query an index-only lookup on `events_event_id_key`. # diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index f35b2ef0bb..ca65e77d3e 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import json from contextlib import contextmanager from typing import Generator, List, Tuple @@ -36,6 +37,8 @@ from synapse.util.async_helpers import yieldable_gather_results from tests import unittest +logger = logging.getLogger(__name__) + class HaveSeenEventsTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): @@ -91,15 +94,56 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): ) self.event_ids.append(event_id) - # def test_benchmark(self): - # with LoggingContext(name="test") as ctx: - # res = self.get_success( - # self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) - # ) - # self.assertEqual(res, {self.event_ids[0]}) + def test_benchmark(self): + import time + + room_id = "room123" + event_ids = [] + setup_start_time = time.time() + with LoggingContext(name="test-setup") as ctx: + for i in range(50000): + event_json = {"type": f"test {i}", "room_id": room_id} + event = make_event_from_dict(event_json, room_version=RoomVersions.V4) + event_id = event.event_id + + event_ids.append(event_id) + + self.get_success( + self.store.db_pool.simple_insert( + "events", + { + "event_id": event_id, + "room_id": room_id, + "topological_ordering": i, + "stream_ordering": 123 + i, + "type": event.type, + "processed": True, + "outlier": False, + }, + ) + ) - # # that should result in a single db query - # self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + setup_end_time = time.time() + logger.info( + "Setup time: %s", + (setup_end_time - setup_start_time), + ) + + with LoggingContext(name="test") as ctx: + benchmark_start_time = time.time() + remaining_event_ids = self.get_success( + self.store.have_seen_events(room_id, event_ids) + ) + benchmark_end_time = time.time() + logger.info("afewewf %s %s", benchmark_start_time, benchmark_end_time) + logger.info( + "Benchmark time: %s", + (benchmark_end_time - benchmark_start_time), + ) + # self.assertEqual(remaining_event_ids, set()) + + # that should result in a many db queries + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) def test_simple(self): with LoggingContext(name="test") as ctx: |