diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2018-07-10 10:00:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-07-10 10:00:24 +0100 |
commit | b1fe697b3c962b70c4a7587b35a319acca7df615 (patch) | |
tree | 18e2e125a3b42286c3b282f47cb43385f012801f /synapse | |
parent | Merge pull request #3464 from matrix-org/hawkowl/isort-run (diff) | |
parent | changelog (diff) | |
download | synapse-b1fe697b3c962b70c4a7587b35a319acca7df615.tar.xz |
Merge pull request #3497 from matrix-org/rav/measure_fetch_event_loop
Add CPU metrics for _fetch_event_list
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/events_worker.py | 51 |
1 files changed, 32 insertions, 19 deletions
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 5fe1fd13e5..fa2659403d 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -222,25 +222,39 @@ class EventsWorkerStore(SQLBaseStore): """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ - event_list = [] i = 0 while True: - try: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - single_threaded = self.database_engine.single_threaded - if single_threaded or i > EVENT_QUEUE_ITERATIONS: - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + single_threaded = self.database_engine.single_threaded + if single_threaded or i > EVENT_QUEUE_ITERATIONS: + self._event_fetch_ongoing -= 1 + return + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 + + self._fetch_event_list(conn, event_list) + + def _fetch_event_list(self, conn, event_list): + """Handle a load of requests from the _event_fetch_list queue + + Args: + conn (twisted.enterprise.adbapi.Connection): database connection + + event_list (list[Tuple[list[str], Deferred]]): + The fetch requests. Each entry consists of a list of event + ids to be fetched, and a deferred to be completed once the + events have been fetched. + """ + with Measure(self._clock, "_fetch_event_list"): + try: event_id_lists = zip(*event_list)[0] event_ids = [ item for sublist in event_id_lists for item in sublist @@ -280,9 +294,8 @@ class EventsWorkerStore(SQLBaseStore): with PreserveLoggingContext(): d.errback(e) - if event_list: - with PreserveLoggingContext(): - self.hs.get_reactor().callFromThread(fire, event_list) + with PreserveLoggingContext(): + self.hs.get_reactor().callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): |