diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index fb0dd04f88..64edadb624 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -20,22 +20,30 @@ import urllib
from inspect import signature
from typing import Dict, List, Tuple
-from twisted.internet import defer
+from prometheus_client import Counter, Gauge
-from synapse.api.errors import (
- CodeMessageException,
- HttpResponseException,
- RequestSendFailed,
- SynapseError,
-)
+from synapse.api.errors import HttpResponseException, SynapseError
+from synapse.http import RequestTimedOutError
from synapse.logging.opentracing import inject_active_span_byte_dict, trace
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
+_pending_outgoing_requests = Gauge(
+ "synapse_pending_outgoing_replication_requests",
+ "Number of active outgoing replication requests, by replication method name",
+ ["name"],
+)
+
+_outgoing_request_counter = Counter(
+ "synapse_outgoing_replication_requests",
+ "Number of outgoing replication requests, by replication method name and result",
+ ["name", "code"],
+)
-class ReplicationEndpoint(object):
+
+class ReplicationEndpoint(metaclass=abc.ABCMeta):
"""Helper base class for defining new replication HTTP endpoints.
This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
@@ -74,8 +82,6 @@ class ReplicationEndpoint(object):
is received.
"""
- __metaclass__ = abc.ABCMeta
-
NAME = abc.abstractproperty() # type: str # type: ignore
PATH_ARGS = abc.abstractproperty() # type: Tuple[str, ...] # type: ignore
METHOD = "POST"
@@ -101,7 +107,7 @@ class ReplicationEndpoint(object):
assert self.METHOD in ("PUT", "POST", "GET")
@abc.abstractmethod
- def _serialize_payload(**kwargs):
+ async def _serialize_payload(**kwargs):
"""Static method that is called when creating a request.
Concrete implementations should have explicit parameters (rather than
@@ -110,9 +116,8 @@ class ReplicationEndpoint(object):
argument list.
Returns:
- Deferred[dict]|dict: If POST/PUT request then dictionary must be
- JSON serialisable, otherwise must be appropriate for adding as
- query args.
+ dict: If POST/PUT request then dictionary must be JSON serialisable,
+ otherwise must be appropriate for adding as query args.
"""
return {}
@@ -143,9 +148,11 @@ class ReplicationEndpoint(object):
instance_map = hs.config.worker.instance_map
+ outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
+
@trace(opname="outgoing_replication_request")
- @defer.inlineCallbacks
- def send_request(instance_name="master", **kwargs):
+ @outgoing_gauge.track_inprogress()
+ async def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
@@ -159,7 +166,7 @@ class ReplicationEndpoint(object):
"Instance %r not in 'instance_map' config" % (instance_name,)
)
- data = yield cls._serialize_payload(**kwargs)
+ data = await cls._serialize_payload(**kwargs)
url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
@@ -197,25 +204,28 @@ class ReplicationEndpoint(object):
headers = {} # type: Dict[bytes, List[bytes]]
inject_active_span_byte_dict(headers, None, check_destination=False)
try:
- result = yield request_func(uri, data, headers=headers)
+ result = await request_func(uri, data, headers=headers)
break
- except CodeMessageException as e:
- if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
+ except RequestTimedOutError:
+ if not cls.RETRY_ON_TIMEOUT:
raise
- logger.warning("%s request timed out", cls.NAME)
+ logger.warning("%s request timed out; retrying", cls.NAME)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
- yield clock.sleep(1)
+ await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
+ # on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
+ _outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
- except RequestSendFailed as e:
- raise SynapseError(502, "Failed to talk to master") from e
+ except Exception as e:
+ _outgoing_request_counter.labels(cls.NAME, "ERR").inc()
+ raise SynapseError(502, "Failed to talk to main process") from e
+ _outgoing_request_counter.labels(cls.NAME, 200).inc()
return result
return send_request
|