summary refs log tree commit diff
path: root/synapse/storage/events_worker.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-07-09 18:06:03 +0100
committerRichard van der Hoff <richard@matrix.org>2018-07-09 18:15:54 +0100
commite31e5dee38156c9075a66399a5f55ba3f888869a (patch)
treeb708c62845ce52c9cc428ef02346ed03a602d7b9 /synapse/storage/events_worker.py
parentMerge pull request #3464 from matrix-org/hawkowl/isort-run (diff)
downloadsynapse-e31e5dee38156c9075a66399a5f55ba3f888869a.tar.xz
Add CPU metrics for _fetch_event_list
add a Measure block on _fetch_event_list, in the hope that we can better
measure CPU usage here.
Diffstat (limited to 'synapse/storage/events_worker.py')
-rw-r--r--synapse/storage/events_worker.py51
1 files changed, 32 insertions, 19 deletions
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 5fe1fd13e5..fa2659403d 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -222,25 +222,39 @@ class EventsWorkerStore(SQLBaseStore):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
         """
-        event_list = []
         i = 0
         while True:
-            try:
-                with self._event_fetch_lock:
-                    event_list = self._event_fetch_list
-                    self._event_fetch_list = []
-
-                    if not event_list:
-                        single_threaded = self.database_engine.single_threaded
-                        if single_threaded or i > EVENT_QUEUE_ITERATIONS:
-                            self._event_fetch_ongoing -= 1
-                            return
-                        else:
-                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                            i += 1
-                            continue
-                    i = 0
+            with self._event_fetch_lock:
+                event_list = self._event_fetch_list
+                self._event_fetch_list = []
+
+                if not event_list:
+                    single_threaded = self.database_engine.single_threaded
+                    if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+                        self._event_fetch_ongoing -= 1
+                        return
+                    else:
+                        self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+                        i += 1
+                        continue
+                i = 0
+
+            self._fetch_event_list(conn, event_list)
+
+    def _fetch_event_list(self, conn, event_list):
+        """Handle a load of requests from the _event_fetch_list queue
+
+        Args:
+            conn (twisted.enterprise.adbapi.Connection): database connection
+
+            event_list (list[Tuple[list[str], Deferred]]):
+                The fetch requests. Each entry consists of a list of event
+                ids to be fetched, and a deferred to be completed once the
+                events have been fetched.
 
+        """
+        with Measure(self._clock, "_fetch_event_list"):
+            try:
                 event_id_lists = zip(*event_list)[0]
                 event_ids = [
                     item for sublist in event_id_lists for item in sublist
@@ -280,9 +294,8 @@ class EventsWorkerStore(SQLBaseStore):
                             with PreserveLoggingContext():
                                 d.errback(e)
 
-                if event_list:
-                    with PreserveLoggingContext():
-                        self.hs.get_reactor().callFromThread(fire, event_list)
+                with PreserveLoggingContext():
+                    self.hs.get_reactor().callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):