diff --git a/changelog.d/10996.misc b/changelog.d/10996.misc
new file mode 100644
index 0000000000..c830d7ec2c
--- /dev/null
+++ b/changelog.d/10996.misc
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse 1.21.0 that causes opentracing and Prometheus metrics for replication requests to be measured incorrectly.
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 5276c4bfcc..20d23a4260 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -807,6 +807,14 @@ def trace(func=None, opname=None):
result.addCallbacks(call_back, err_back)
else:
+ if inspect.isawaitable(result):
+ logger.error(
+ "@trace may not have wrapped %s correctly! "
+ "The function is not async but returned a %s.",
+ func.__qualname__,
+ type(result).__name__,
+ )
+
scope.__exit__(None, None, None)
return result
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index f1b78d09f9..e047ec74d8 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -182,85 +182,87 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
)
@trace(opname="outgoing_replication_request")
- @outgoing_gauge.track_inprogress()
async def send_request(*, instance_name="master", **kwargs):
- if instance_name == local_instance_name:
- raise Exception("Trying to send HTTP request to self")
- if instance_name == "master":
- host = master_host
- port = master_port
- elif instance_name in instance_map:
- host = instance_map[instance_name].host
- port = instance_map[instance_name].port
- else:
- raise Exception(
- "Instance %r not in 'instance_map' config" % (instance_name,)
+ with outgoing_gauge.track_inprogress():
+ if instance_name == local_instance_name:
+ raise Exception("Trying to send HTTP request to self")
+ if instance_name == "master":
+ host = master_host
+ port = master_port
+ elif instance_name in instance_map:
+ host = instance_map[instance_name].host
+ port = instance_map[instance_name].port
+ else:
+ raise Exception(
+ "Instance %r not in 'instance_map' config" % (instance_name,)
+ )
+
+ data = await cls._serialize_payload(**kwargs)
+
+ url_args = [
+ urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
+ ]
+
+ if cls.CACHE:
+ txn_id = random_string(10)
+ url_args.append(txn_id)
+
+ if cls.METHOD == "POST":
+ request_func = client.post_json_get_json
+ elif cls.METHOD == "PUT":
+ request_func = client.put_json
+ elif cls.METHOD == "GET":
+ request_func = client.get_json
+ else:
+ # We have already asserted in the constructor that a
+ # compatible was picked, but lets be paranoid.
+ raise Exception(
+ "Unknown METHOD on %s replication endpoint" % (cls.NAME,)
+ )
+
+ uri = "http://%s:%s/_synapse/replication/%s/%s" % (
+ host,
+ port,
+ cls.NAME,
+ "/".join(url_args),
)
- data = await cls._serialize_payload(**kwargs)
-
- url_args = [
- urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
- ]
-
- if cls.CACHE:
- txn_id = random_string(10)
- url_args.append(txn_id)
-
- if cls.METHOD == "POST":
- request_func = client.post_json_get_json
- elif cls.METHOD == "PUT":
- request_func = client.put_json
- elif cls.METHOD == "GET":
- request_func = client.get_json
- else:
- # We have already asserted in the constructor that a
- # compatible was picked, but lets be paranoid.
- raise Exception(
- "Unknown METHOD on %s replication endpoint" % (cls.NAME,)
- )
-
- uri = "http://%s:%s/_synapse/replication/%s/%s" % (
- host,
- port,
- cls.NAME,
- "/".join(url_args),
- )
-
- try:
- # We keep retrying the same request for timeouts. This is so that we
- # have a good idea that the request has either succeeded or failed on
- # the master, and so whether we should clean up or not.
- while True:
- headers: Dict[bytes, List[bytes]] = {}
- # Add an authorization header, if configured.
- if replication_secret:
- headers[b"Authorization"] = [b"Bearer " + replication_secret]
- opentracing.inject_header_dict(headers, check_destination=False)
- try:
- result = await request_func(uri, data, headers=headers)
- break
- except RequestTimedOutError:
- if not cls.RETRY_ON_TIMEOUT:
- raise
-
- logger.warning("%s request timed out; retrying", cls.NAME)
-
- # If we timed out we probably don't need to worry about backing
- # off too much, but lets just wait a little anyway.
- await clock.sleep(1)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the main process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- _outgoing_request_counter.labels(cls.NAME, e.code).inc()
- raise e.to_synapse_error()
- except Exception as e:
- _outgoing_request_counter.labels(cls.NAME, "ERR").inc()
- raise SynapseError(502, "Failed to talk to main process") from e
-
- _outgoing_request_counter.labels(cls.NAME, 200).inc()
- return result
+ try:
+ # We keep retrying the same request for timeouts. This is so that we
+ # have a good idea that the request has either succeeded or failed
+ # on the master, and so whether we should clean up or not.
+ while True:
+ headers: Dict[bytes, List[bytes]] = {}
+ # Add an authorization header, if configured.
+ if replication_secret:
+ headers[b"Authorization"] = [
+ b"Bearer " + replication_secret
+ ]
+ opentracing.inject_header_dict(headers, check_destination=False)
+ try:
+ result = await request_func(uri, data, headers=headers)
+ break
+ except RequestTimedOutError:
+ if not cls.RETRY_ON_TIMEOUT:
+ raise
+
+ logger.warning("%s request timed out; retrying", cls.NAME)
+
+ # If we timed out we probably don't need to worry about backing
+ # off too much, but lets just wait a little anyway.
+ await clock.sleep(1)
+ except HttpResponseException as e:
+ # We convert to SynapseError as we know that it was a SynapseError
+ # on the main process that we should send to the client. (And
+ # importantly, not stack traces everywhere)
+ _outgoing_request_counter.labels(cls.NAME, e.code).inc()
+ raise e.to_synapse_error()
+ except Exception as e:
+ _outgoing_request_counter.labels(cls.NAME, "ERR").inc()
+ raise SynapseError(502, "Failed to talk to main process") from e
+
+ _outgoing_request_counter.labels(cls.NAME, 200).inc()
+ return result
return send_request
|