summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events_worker.py56
1 files changed, 34 insertions, 22 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index ae37901be9..c6bf316d5b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -28,6 +28,7 @@ from typing import (
 
 import attr
 from constantly import NamedConstant, Names
+from prometheus_client import Gauge
 from typing_extensions import Literal
 
 from twisted.internet import defer
@@ -81,6 +82,12 @@ EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
 EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
+event_fetch_ongoing_gauge = Gauge(
+    "synapse_event_fetch_ongoing",
+    "The number of event fetchers that are running",
+)
+
+
 @attr.s(slots=True, auto_attribs=True)
 class _EventCacheEntry:
     event: EventBase
@@ -222,6 +229,7 @@ class EventsWorkerStore(SQLBaseStore):
         self._event_fetch_lock = threading.Condition()
         self._event_fetch_list = []
         self._event_fetch_ongoing = 0
+        event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
 
         # We define this sequence here so that it can be referenced from both
         # the DataStore and PersistEventStore.
@@ -732,28 +740,31 @@ class EventsWorkerStore(SQLBaseStore):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
         """
-        i = 0
-        while True:
-            with self._event_fetch_lock:
-                event_list = self._event_fetch_list
-                self._event_fetch_list = []
-
-                if not event_list:
-                    single_threaded = self.database_engine.single_threaded
-                    if (
-                        not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
-                        or single_threaded
-                        or i > EVENT_QUEUE_ITERATIONS
-                    ):
-                        self._event_fetch_ongoing -= 1
-                        return
-                    else:
-                        self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                        i += 1
-                        continue
-                i = 0
-
-            self._fetch_event_list(conn, event_list)
+        try:
+            i = 0
+            while True:
+                with self._event_fetch_lock:
+                    event_list = self._event_fetch_list
+                    self._event_fetch_list = []
+
+                    if not event_list:
+                        single_threaded = self.database_engine.single_threaded
+                        if (
+                            not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
+                            or single_threaded
+                            or i > EVENT_QUEUE_ITERATIONS
+                        ):
+                            break
+                        else:
+                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+                            i += 1
+                            continue
+                    i = 0
+
+                self._fetch_event_list(conn, event_list)
+        finally:
+            self._event_fetch_ongoing -= 1
+            event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
 
     def _fetch_event_list(
         self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]]
@@ -977,6 +988,7 @@ class EventsWorkerStore(SQLBaseStore):
 
             if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
                 self._event_fetch_ongoing += 1
+                event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
                 should_start = True
             else:
                 should_start = False