diff options
author | Eric Eastwood <erice@element.io> | 2022-08-18 22:14:05 -0500 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2022-08-18 22:14:05 -0500 |
commit | 7cb07d3a0338408a921120ac38d2971c312f316f (patch) | |
tree | b538f8332a3242a00445691799e46f2c9866ba6a | |
parent | Add metrics to track `/messages` response time by room size (#13545) (diff) | |
download | synapse-7cb07d3a0338408a921120ac38d2971c312f316f.tar.xz |
Start of optimizing
-rw-r--r-- | synapse/handlers/federation_event.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 70 | ||||
-rw-r--r-- | tests/storage/databases/main/test_events_worker.py | 10 |
3 files changed, 54 insertions, 32 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f40b071a74..d58096f447 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1031,6 +1031,12 @@ class FederationEventHandler: InvalidResponseError: if the remote homeserver's response contains fields of the wrong type. """ + + # It would be better if we could query the difference from our known + # state to the given `event_id` so the sending server doesn't have to + # send as much and we don't have to process so many events. For example + # in a room like #matrixhq, we get 200k events (77k state_events, 122k + # auth_events) from this and just the `have_seen_events` takes 20s. ( state_event_ids, auth_event_ids, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 8a7cdb024d..051810cab1 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -54,7 +54,13 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) -from synapse.logging.opentracing import start_active_span, tag_args, trace +from synapse.logging.opentracing import ( + SynapseTags, + set_tag, + start_active_span, + tag_args, + trace, +) from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -1462,44 +1468,43 @@ class EventsWorkerStore(SQLBaseStore): event_ids: events we are looking for Returns: - The set of events we have already seen. + The remaining set of events we haven't seen. """ + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(event_ids)), + ) # @cachedList chomps lots of memory if you call it with a big list, so # we break it down. However, each batch requires its own index scan, so we make # the batches as big as possible. - results: Set[str] = set() - for chunk in batch_iter(event_ids, 500): - r = await self._have_seen_events_dict( - [(room_id, event_id) for event_id in chunk] - ) - results.update(eid for ((_rid, eid), have_event) in r.items() if have_event) + remaining_event_ids: Set[str] = set() + for chunk in batch_iter(event_ids, 1000): + remaining_event_ids_from_chunk = await self._have_seen_events_dict(chunk) + remaining_event_ids.update(remaining_event_ids_from_chunk) - return results + return remaining_event_ids - @cachedList(cached_method_name="have_seen_event", list_name="keys") - async def _have_seen_events_dict( - self, keys: Collection[Tuple[str, str]] - ) -> Dict[Tuple[str, str], bool]: + @cachedList(cached_method_name="have_seen_event", list_name="event_ids") + async def _have_seen_events_dict(self, event_ids: Iterable[str]) -> Set[str]: """Helper for have_seen_events Returns: - a dict {(room_id, event_id)-> bool} + The remaining set of events we haven't seen. """ - # if the event cache contains the event, obviously we've seen it. - cache_results = { - (rid, eid) - for (rid, eid) in keys - if await self._get_event_cache.contains((eid,)) + # if the event cache contains the event, obviously we've seen it. + event_entry_map = self._get_events_from_local_cache(event_ids) + event_ids_in_cache = event_entry_map.keys() + remaining_event_ids = { + event_id for event_id in event_ids if event_id not in event_ids_in_cache } - results = dict.fromkeys(cache_results, True) - remaining = [k for k in keys if k not in cache_results] - if not remaining: - return results + if not remaining_event_ids: + return [] def have_seen_events_txn(txn: LoggingTransaction) -> None: + global remaining_event_ids # we deliberately do *not* query the database for room_id, to make the # query an index-only lookup on `events_event_id_key`. # @@ -1507,23 +1512,24 @@ class EventsWorkerStore(SQLBaseStore): sql = "SELECT event_id FROM events AS e WHERE " clause, args = make_in_list_sql_clause( - txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining] + txn.database_engine, "e.event_id", remaining_event_ids ) txn.execute(sql + clause, args) - found_events = {eid for eid, in txn} + found_event_ids = {eid for eid, in txn} - # ... and then we can update the results for each key - results.update( - {(rid, eid): (eid in found_events) for (rid, eid) in remaining} - ) + remaining_event_ids = { + event_id + for event_id in remaining_event_ids + if event_id not in found_event_ids + } await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn) - return results + return remaining_event_ids @cached(max_entries=100000, tree=True) async def have_seen_event(self, room_id: str, event_id: str) -> bool: - res = await self._have_seen_events_dict(((room_id, event_id),)) - return res[(room_id, event_id)] + remaining_event_ids = await self._have_seen_events_dict({event_id}) + return event_id not in remaining_event_ids def _get_current_state_event_counts_txn( self, txn: LoggingTransaction, room_id: str diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 46d829b062..f35b2ef0bb 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -91,6 +91,16 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): ) self.event_ids.append(event_id) + # def test_benchmark(self): + # with LoggingContext(name="test") as ctx: + # res = self.get_success( + # self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + # ) + # self.assertEqual(res, {self.event_ids[0]}) + + # # that should result in a single db query + # self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + def test_simple(self): with LoggingContext(name="test") as ctx: res = self.get_success( |