summary refs log tree commit diff
path: root/synapse/metrics/_reactor_metrics.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2023-11-06 08:31:22 -0500
committerGitHub <noreply@github.com>2023-11-06 08:31:22 -0500
commitcc4fe68adff0fb5660b94f92bd40978e7e292098 (patch)
treec85bb3026c4ac8b5d1ac59c963fc563233e69936 /synapse/metrics/_reactor_metrics.py
parentBump setuptools-rust from 1.8.0 to 1.8.1 (#16601) (diff)
downloadsynapse-cc4fe68adff0fb5660b94f92bd40978e7e292098.tar.xz
Support reactor timing metric on more reactors. (#16532)
Previously only Twisted's EPollReactor was compatible with the
reactor timing metric, notably not working when asyncio was used.

After this change, the following configurations support the reactor
timing metric:

* poll, epoll, or select reactors
* asyncio reactor with a poll, epoll, select, /dev/poll, or kqueue event loop.
Diffstat (limited to 'synapse/metrics/_reactor_metrics.py')
-rw-r--r--synapse/metrics/_reactor_metrics.py130
1 files changed, 103 insertions, 27 deletions
diff --git a/synapse/metrics/_reactor_metrics.py b/synapse/metrics/_reactor_metrics.py
index a2c6e6842d..dd486dd3e2 100644
--- a/synapse/metrics/_reactor_metrics.py
+++ b/synapse/metrics/_reactor_metrics.py
@@ -12,17 +12,45 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import select
+import logging
 import time
-from typing import Any, Iterable, List, Tuple
+from selectors import SelectSelector, _PollLikeSelector  # type: ignore[attr-defined]
+from typing import Any, Callable, Iterable
 
 from prometheus_client import Histogram, Metric
 from prometheus_client.core import REGISTRY, GaugeMetricFamily
 
-from twisted.internet import reactor
+from twisted.internet import reactor, selectreactor
+from twisted.internet.asyncioreactor import AsyncioSelectorReactor
 
 from synapse.metrics._types import Collector
 
+try:
+    from selectors import KqueueSelector
+except ImportError:
+
+    class KqueueSelector:  # type: ignore[no-redef]
+        pass
+
+
+try:
+    from twisted.internet.epollreactor import EPollReactor
+except ImportError:
+
+    class EPollReactor:  # type: ignore[no-redef]
+        pass
+
+
+try:
+    from twisted.internet.pollreactor import PollReactor
+except ImportError:
+
+    class PollReactor:  # type: ignore[no-redef]
+        pass
+
+
+logger = logging.getLogger(__name__)
+
 #
 # Twisted reactor metrics
 #
@@ -34,52 +62,100 @@ tick_time = Histogram(
 )
 
 
-class EpollWrapper:
-    """a wrapper for an epoll object which records the time between polls"""
+class CallWrapper:
+    """A wrapper for a callable which records the time between calls"""
 
-    def __init__(self, poller: "select.epoll"):  # type: ignore[name-defined]
+    def __init__(self, wrapped: Callable[..., Any]):
         self.last_polled = time.time()
-        self._poller = poller
+        self._wrapped = wrapped
 
-    def poll(self, *args, **kwargs) -> List[Tuple[int, int]]:  # type: ignore[no-untyped-def]
-        # record the time since poll() was last called. This gives a good proxy for
+    def __call__(self, *args, **kwargs) -> Any:  # type: ignore[no-untyped-def]
+        # record the time since this was last called. This gives a good proxy for
         # how long it takes to run everything in the reactor - ie, how long anything
         # waiting for the next tick will have to wait.
         tick_time.observe(time.time() - self.last_polled)
 
-        ret = self._poller.poll(*args, **kwargs)
+        ret = self._wrapped(*args, **kwargs)
 
         self.last_polled = time.time()
         return ret
 
+
+class ObjWrapper:
+    """A wrapper for an object which wraps a specified method in CallWrapper.
+
+    Other methods/attributes are passed to the original object.
+
+    This is necessary when the wrapped object does not allow the attribute to be
+    overwritten.
+    """
+
+    def __init__(self, wrapped: Any, method_name: str):
+        self._wrapped = wrapped
+        self._method_name = method_name
+        self._wrapped_method = CallWrapper(getattr(wrapped, method_name))
+
     def __getattr__(self, item: str) -> Any:
-        return getattr(self._poller, item)
+        if item == self._method_name:
+            return self._wrapped_method
+
+        return getattr(self._wrapped, item)
 
 
 class ReactorLastSeenMetric(Collector):
-    def __init__(self, epoll_wrapper: EpollWrapper):
-        self._epoll_wrapper = epoll_wrapper
+    def __init__(self, call_wrapper: CallWrapper):
+        self._call_wrapper = call_wrapper
 
     def collect(self) -> Iterable[Metric]:
         cm = GaugeMetricFamily(
             "python_twisted_reactor_last_seen",
             "Seconds since the Twisted reactor was last seen",
         )
-        cm.add_metric([], time.time() - self._epoll_wrapper.last_polled)
+        cm.add_metric([], time.time() - self._call_wrapper.last_polled)
         yield cm
 
 
+# Twisted has already select a reasonable reactor for us, so assumptions can be
+# made about the shape.
+wrapper = None
 try:
-    # if the reactor has a `_poller` attribute, which is an `epoll` object
-    # (ie, it's an EPollReactor), we wrap the `epoll` with a thing that will
-    # measure the time between ticks
-    from select import epoll  # type: ignore[attr-defined]
-
-    poller = reactor._poller  # type: ignore[attr-defined]
-except (AttributeError, ImportError):
-    pass
-else:
-    if isinstance(poller, epoll):
-        poller = EpollWrapper(poller)
-        reactor._poller = poller  # type: ignore[attr-defined]
-        REGISTRY.register(ReactorLastSeenMetric(poller))
+    if isinstance(reactor, (PollReactor, EPollReactor)):
+        reactor._poller = ObjWrapper(reactor._poller, "poll")  # type: ignore[attr-defined]
+        wrapper = reactor._poller._wrapped_method  # type: ignore[attr-defined]
+
+    elif isinstance(reactor, selectreactor.SelectReactor):
+        # Twisted uses a module-level _select function.
+        wrapper = selectreactor._select = CallWrapper(selectreactor._select)
+
+    elif isinstance(reactor, AsyncioSelectorReactor):
+        # For asyncio look at the underlying asyncio event loop.
+        asyncio_loop = reactor._asyncioEventloop  # A sub-class of BaseEventLoop,
+
+        # A sub-class of BaseSelector.
+        selector = asyncio_loop._selector  # type: ignore[attr-defined]
+
+        if isinstance(selector, SelectSelector):
+            wrapper = selector._select = CallWrapper(selector._select)  # type: ignore[attr-defined]
+
+        # poll, epoll, and /dev/poll.
+        elif isinstance(selector, _PollLikeSelector):
+            selector._selector = ObjWrapper(selector._selector, "poll")  # type: ignore[attr-defined]
+            wrapper = selector._selector._wrapped_method  # type: ignore[attr-defined]
+
+        elif isinstance(selector, KqueueSelector):
+            selector._selector = ObjWrapper(selector._selector, "control")  # type: ignore[attr-defined]
+            wrapper = selector._selector._wrapped_method  # type: ignore[attr-defined]
+
+        else:
+            # E.g. this does not support the (Windows-only) ProactorEventLoop.
+            logger.warning(
+                "Skipping configuring ReactorLastSeenMetric: unexpected asyncio loop selector: %r via %r",
+                selector,
+                asyncio_loop,
+            )
+except Exception as e:
+    logger.warning("Configuring ReactorLastSeenMetric failed: %r", e)
+
+
+if wrapper:
+    REGISTRY.register(ReactorLastSeenMetric(wrapper))