summary refs log tree commit diff
path: root/synapse/util/batching_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-05-27 14:32:31 +0100
committerGitHub <noreply@github.com>2021-05-27 14:32:31 +0100
commit78b5102ae71f828deb851eca8e677381710bf716 (patch)
treead51fcc15d4996ff2466aca48f35a7b9e64d2971 /synapse/util/batching_queue.py
parentClarify security note regarding the domain Synapse is hosted on. (#9221) (diff)
downloadsynapse-78b5102ae71f828deb851eca8e677381710bf716.tar.xz
Fix up `BatchingQueue` (#10078)
Fixes #10068
Diffstat (limited to 'synapse/util/batching_queue.py')
-rw-r--r--synapse/util/batching_queue.py70
1 files changed, 48 insertions, 22 deletions
diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py
index 44bbb7b1a8..8fd5bfb69b 100644
--- a/synapse/util/batching_queue.py
+++ b/synapse/util/batching_queue.py
@@ -25,10 +25,11 @@ from typing import (
     TypeVar,
 )
 
+from prometheus_client import Gauge
+
 from twisted.internet import defer
 
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util import Clock
 
@@ -38,6 +39,24 @@ logger = logging.getLogger(__name__)
 V = TypeVar("V")
 R = TypeVar("R")
 
+number_queued = Gauge(
+    "synapse_util_batching_queue_number_queued",
+    "The number of items waiting in the queue across all keys",
+    labelnames=("name",),
+)
+
+number_in_flight = Gauge(
+    "synapse_util_batching_queue_number_pending",
+    "The number of items across all keys either being processed or waiting in a queue",
+    labelnames=("name",),
+)
+
+number_of_keys = Gauge(
+    "synapse_util_batching_queue_number_of_keys",
+    "The number of distinct keys that have items queued",
+    labelnames=("name",),
+)
+
 
 class BatchingQueue(Generic[V, R]):
     """A queue that batches up work, calling the provided processing function
@@ -48,10 +67,20 @@ class BatchingQueue(Generic[V, R]):
     called, and will keep being called until the queue has been drained (for the
     given key).
 
+    If the processing function raises an exception then the exception is proxied
+    through to the callers waiting on that batch of work.
+
     Note that the return value of `add_to_queue` will be the return value of the
     processing function that processed the given item. This means that the
     returned value will likely include data for other items that were in the
     batch.
+
+    Args:
+        name: A name for the queue, used for logging contexts and metrics.
+            This must be unique, otherwise the metrics will be wrong.
+        clock: The clock to use to schedule work.
+        process_batch_callback: The callback to to be run to process a batch of
+            work.
     """
 
     def __init__(
@@ -73,19 +102,15 @@ class BatchingQueue(Generic[V, R]):
         # The function to call with batches of values.
         self._process_batch_callback = process_batch_callback
 
-        LaterGauge(
-            "synapse_util_batching_queue_number_queued",
-            "The number of items waiting in the queue across all keys",
-            labels=("name",),
-            caller=lambda: sum(len(v) for v in self._next_values.values()),
+        number_queued.labels(self._name).set_function(
+            lambda: sum(len(q) for q in self._next_values.values())
         )
 
-        LaterGauge(
-            "synapse_util_batching_queue_number_of_keys",
-            "The number of distinct keys that have items queued",
-            labels=("name",),
-            caller=lambda: len(self._next_values),
-        )
+        number_of_keys.labels(self._name).set_function(lambda: len(self._next_values))
+
+        self._number_in_flight_metric = number_in_flight.labels(
+            self._name
+        )  # type: Gauge
 
     async def add_to_queue(self, value: V, key: Hashable = ()) -> R:
         """Adds the value to the queue with the given key, returning the result
@@ -107,17 +132,18 @@ class BatchingQueue(Generic[V, R]):
         if key not in self._processing_keys:
             run_as_background_process(self._name, self._process_queue, key)
 
-        return await make_deferred_yieldable(d)
+        with self._number_in_flight_metric.track_inprogress():
+            return await make_deferred_yieldable(d)
 
     async def _process_queue(self, key: Hashable) -> None:
         """A background task to repeatedly pull things off the queue for the
         given key and call the `self._process_batch_callback` with the values.
         """
 
-        try:
-            if key in self._processing_keys:
-                return
+        if key in self._processing_keys:
+            return
 
+        try:
             self._processing_keys.add(key)
 
             while True:
@@ -137,16 +163,16 @@ class BatchingQueue(Generic[V, R]):
                     values = [value for value, _ in next_values]
                     results = await self._process_batch_callback(values)
 
-                    for _, deferred in next_values:
-                        with PreserveLoggingContext():
+                    with PreserveLoggingContext():
+                        for _, deferred in next_values:
                             deferred.callback(results)
 
                 except Exception as e:
-                    for _, deferred in next_values:
-                        if deferred.called:
-                            continue
+                    with PreserveLoggingContext():
+                        for _, deferred in next_values:
+                            if deferred.called:
+                                continue
 
-                        with PreserveLoggingContext():
                             deferred.errback(e)
 
         finally: