summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2018-07-10 10:00:24 +0100
committerGitHub <noreply@github.com>2018-07-10 10:00:24 +0100
commitb1fe697b3c962b70c4a7587b35a319acca7df615 (patch)
tree18e2e125a3b42286c3b282f47cb43385f012801f
parentMerge pull request #3464 from matrix-org/hawkowl/isort-run (diff)
parentchangelog (diff)
downloadsynapse-b1fe697b3c962b70c4a7587b35a319acca7df615.tar.xz
Merge pull request #3497 from matrix-org/rav/measure_fetch_event_loop
Add CPU metrics for _fetch_event_list
-rw-r--r--changelog.d/3497.feature1
-rw-r--r--synapse/storage/events_worker.py51
2 files changed, 33 insertions, 19 deletions
diff --git a/changelog.d/3497.feature b/changelog.d/3497.feature
new file mode 100644
index 0000000000..4e333155b6
--- /dev/null
+++ b/changelog.d/3497.feature
@@ -0,0 +1 @@
+Add CPU metrics for _fetch_event_list
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 5fe1fd13e5..fa2659403d 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -222,25 +222,39 @@ class EventsWorkerStore(SQLBaseStore):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
         """
-        event_list = []
         i = 0
         while True:
-            try:
-                with self._event_fetch_lock:
-                    event_list = self._event_fetch_list
-                    self._event_fetch_list = []
-
-                    if not event_list:
-                        single_threaded = self.database_engine.single_threaded
-                        if single_threaded or i > EVENT_QUEUE_ITERATIONS:
-                            self._event_fetch_ongoing -= 1
-                            return
-                        else:
-                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                            i += 1
-                            continue
-                    i = 0
+            with self._event_fetch_lock:
+                event_list = self._event_fetch_list
+                self._event_fetch_list = []
+
+                if not event_list:
+                    single_threaded = self.database_engine.single_threaded
+                    if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+                        self._event_fetch_ongoing -= 1
+                        return
+                    else:
+                        self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+                        i += 1
+                        continue
+                i = 0
+
+            self._fetch_event_list(conn, event_list)
+
+    def _fetch_event_list(self, conn, event_list):
+        """Handle a load of requests from the _event_fetch_list queue
+
+        Args:
+            conn (twisted.enterprise.adbapi.Connection): database connection
+
+            event_list (list[Tuple[list[str], Deferred]]):
+                The fetch requests. Each entry consists of a list of event
+                ids to be fetched, and a deferred to be completed once the
+                events have been fetched.
 
+        """
+        with Measure(self._clock, "_fetch_event_list"):
+            try:
                 event_id_lists = zip(*event_list)[0]
                 event_ids = [
                     item for sublist in event_id_lists for item in sublist
@@ -280,9 +294,8 @@ class EventsWorkerStore(SQLBaseStore):
                             with PreserveLoggingContext():
                                 d.errback(e)
 
-                if event_list:
-                    with PreserveLoggingContext():
-                        self.hs.get_reactor().callFromThread(fire, event_list)
+                with PreserveLoggingContext():
+                    self.hs.get_reactor().callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):