summary refs log tree commit diff
path: root/synapse/replication/http/_base.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/replication/http/_base.py44
1 files changed, 29 insertions, 15 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index ba16f22c91..64edadb624 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -20,20 +20,30 @@ import urllib
 from inspect import signature
 from typing import Dict, List, Tuple
 
-from synapse.api.errors import (
-    CodeMessageException,
-    HttpResponseException,
-    RequestSendFailed,
-    SynapseError,
-)
+from prometheus_client import Counter, Gauge
+
+from synapse.api.errors import HttpResponseException, SynapseError
+from synapse.http import RequestTimedOutError
 from synapse.logging.opentracing import inject_active_span_byte_dict, trace
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.stringutils import random_string
 
 logger = logging.getLogger(__name__)
 
+_pending_outgoing_requests = Gauge(
+    "synapse_pending_outgoing_replication_requests",
+    "Number of active outgoing replication requests, by replication method name",
+    ["name"],
+)
+
+_outgoing_request_counter = Counter(
+    "synapse_outgoing_replication_requests",
+    "Number of outgoing replication requests, by replication method name and result",
+    ["name", "code"],
+)
+
 
-class ReplicationEndpoint:
+class ReplicationEndpoint(metaclass=abc.ABCMeta):
     """Helper base class for defining new replication HTTP endpoints.
 
     This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
@@ -72,8 +82,6 @@ class ReplicationEndpoint:
             is received.
     """
 
-    __metaclass__ = abc.ABCMeta
-
     NAME = abc.abstractproperty()  # type: str  # type: ignore
     PATH_ARGS = abc.abstractproperty()  # type: Tuple[str, ...]  # type: ignore
     METHOD = "POST"
@@ -140,7 +148,10 @@ class ReplicationEndpoint:
 
         instance_map = hs.config.worker.instance_map
 
+        outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
+
         @trace(opname="outgoing_replication_request")
+        @outgoing_gauge.track_inprogress()
         async def send_request(instance_name="master", **kwargs):
             if instance_name == local_instance_name:
                 raise Exception("Trying to send HTTP request to self")
@@ -195,23 +206,26 @@ class ReplicationEndpoint:
                     try:
                         result = await request_func(uri, data, headers=headers)
                         break
-                    except CodeMessageException as e:
-                        if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
+                    except RequestTimedOutError:
+                        if not cls.RETRY_ON_TIMEOUT:
                             raise
 
-                    logger.warning("%s request timed out", cls.NAME)
+                    logger.warning("%s request timed out; retrying", cls.NAME)
 
                     # If we timed out we probably don't need to worry about backing
                     # off too much, but lets just wait a little anyway.
                     await clock.sleep(1)
             except HttpResponseException as e:
                 # We convert to SynapseError as we know that it was a SynapseError
-                # on the master process that we should send to the client. (And
+                # on the main process that we should send to the client. (And
                 # importantly, not stack traces everywhere)
+                _outgoing_request_counter.labels(cls.NAME, e.code).inc()
                 raise e.to_synapse_error()
-            except RequestSendFailed as e:
-                raise SynapseError(502, "Failed to talk to master") from e
+            except Exception as e:
+                _outgoing_request_counter.labels(cls.NAME, "ERR").inc()
+                raise SynapseError(502, "Failed to talk to main process") from e
 
+            _outgoing_request_counter.labels(cls.NAME, 200).inc()
             return result
 
         return send_request