summary refs log tree commit diff
path: root/synapse/replication/http/_base.py
diff options
context:
space:
mode:
authorSean Quah <8349537+squahtx@users.noreply.github.com>2021-10-12 11:23:46 +0100
committerGitHub <noreply@github.com>2021-10-12 11:23:46 +0100
commit6b18eb443054087c4a8153b19b3cc4d3b731d324 (patch)
treea1fdaf5c1283026131d8c13160b4f4b861af69ab /synapse/replication/http/_base.py
parentAdd an approximate difference method to StateFilters (#10825) (diff)
downloadsynapse-6b18eb443054087c4a8153b19b3cc4d3b731d324.tar.xz
Fix opentracing and Prometheus metrics for replication requests (#10996)
This commit fixes two bugs to do with decorators not instrumenting
`ReplicationEndpoint`'s `send_request` correctly. There are two
decorators on `send_request`: Prometheus' `Gauge.track_inprogress()`
and Synapse's `opentracing.trace`.

`Gauge.track_inprogress()` does not have any support for async
functions when used as a decorator. Since async functions behave like
regular functions that return coroutines, only the creation of the
coroutine was covered by the metric and none of the actual body of
`send_request`.

`Gauge.track_inprogress()` returns a regular, non-async function
wrapping `send_request`, which is the source of the next bug.
The `opentracing.trace` decorator would normally handle async functions
correctly, but since the wrapped `send_request` is a non-async function,
the decorator ends up suffering from the same issue as
`Gauge.track_inprogress()`: the opentracing span only measures the
creation of the coroutine and none of the actual function body.

Using `Gauge.track_inprogress()` as a context manager instead of a
decorator resolves both bugs.
Diffstat (limited to 'synapse/replication/http/_base.py')
-rw-r--r--synapse/replication/http/_base.py154
1 files changed, 78 insertions, 76 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index f1b78d09f9..e047ec74d8 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -182,85 +182,87 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
             )
 
         @trace(opname="outgoing_replication_request")
-        @outgoing_gauge.track_inprogress()
         async def send_request(*, instance_name="master", **kwargs):
-            if instance_name == local_instance_name:
-                raise Exception("Trying to send HTTP request to self")
-            if instance_name == "master":
-                host = master_host
-                port = master_port
-            elif instance_name in instance_map:
-                host = instance_map[instance_name].host
-                port = instance_map[instance_name].port
-            else:
-                raise Exception(
-                    "Instance %r not in 'instance_map' config" % (instance_name,)
+            with outgoing_gauge.track_inprogress():
+                if instance_name == local_instance_name:
+                    raise Exception("Trying to send HTTP request to self")
+                if instance_name == "master":
+                    host = master_host
+                    port = master_port
+                elif instance_name in instance_map:
+                    host = instance_map[instance_name].host
+                    port = instance_map[instance_name].port
+                else:
+                    raise Exception(
+                        "Instance %r not in 'instance_map' config" % (instance_name,)
+                    )
+
+                data = await cls._serialize_payload(**kwargs)
+
+                url_args = [
+                    urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
+                ]
+
+                if cls.CACHE:
+                    txn_id = random_string(10)
+                    url_args.append(txn_id)
+
+                if cls.METHOD == "POST":
+                    request_func = client.post_json_get_json
+                elif cls.METHOD == "PUT":
+                    request_func = client.put_json
+                elif cls.METHOD == "GET":
+                    request_func = client.get_json
+                else:
+                    # We have already asserted in the constructor that a
+                    # compatible was picked, but lets be paranoid.
+                    raise Exception(
+                        "Unknown METHOD on %s replication endpoint" % (cls.NAME,)
+                    )
+
+                uri = "http://%s:%s/_synapse/replication/%s/%s" % (
+                    host,
+                    port,
+                    cls.NAME,
+                    "/".join(url_args),
                 )
 
-            data = await cls._serialize_payload(**kwargs)
-
-            url_args = [
-                urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
-            ]
-
-            if cls.CACHE:
-                txn_id = random_string(10)
-                url_args.append(txn_id)
-
-            if cls.METHOD == "POST":
-                request_func = client.post_json_get_json
-            elif cls.METHOD == "PUT":
-                request_func = client.put_json
-            elif cls.METHOD == "GET":
-                request_func = client.get_json
-            else:
-                # We have already asserted in the constructor that a
-                # compatible was picked, but lets be paranoid.
-                raise Exception(
-                    "Unknown METHOD on %s replication endpoint" % (cls.NAME,)
-                )
-
-            uri = "http://%s:%s/_synapse/replication/%s/%s" % (
-                host,
-                port,
-                cls.NAME,
-                "/".join(url_args),
-            )
-
-            try:
-                # We keep retrying the same request for timeouts. This is so that we
-                # have a good idea that the request has either succeeded or failed on
-                # the master, and so whether we should clean up or not.
-                while True:
-                    headers: Dict[bytes, List[bytes]] = {}
-                    # Add an authorization header, if configured.
-                    if replication_secret:
-                        headers[b"Authorization"] = [b"Bearer " + replication_secret]
-                    opentracing.inject_header_dict(headers, check_destination=False)
-                    try:
-                        result = await request_func(uri, data, headers=headers)
-                        break
-                    except RequestTimedOutError:
-                        if not cls.RETRY_ON_TIMEOUT:
-                            raise
-
-                    logger.warning("%s request timed out; retrying", cls.NAME)
-
-                    # If we timed out we probably don't need to worry about backing
-                    # off too much, but lets just wait a little anyway.
-                    await clock.sleep(1)
-            except HttpResponseException as e:
-                # We convert to SynapseError as we know that it was a SynapseError
-                # on the main process that we should send to the client. (And
-                # importantly, not stack traces everywhere)
-                _outgoing_request_counter.labels(cls.NAME, e.code).inc()
-                raise e.to_synapse_error()
-            except Exception as e:
-                _outgoing_request_counter.labels(cls.NAME, "ERR").inc()
-                raise SynapseError(502, "Failed to talk to main process") from e
-
-            _outgoing_request_counter.labels(cls.NAME, 200).inc()
-            return result
+                try:
+                    # We keep retrying the same request for timeouts. This is so that we
+                    # have a good idea that the request has either succeeded or failed
+                    # on the master, and so whether we should clean up or not.
+                    while True:
+                        headers: Dict[bytes, List[bytes]] = {}
+                        # Add an authorization header, if configured.
+                        if replication_secret:
+                            headers[b"Authorization"] = [
+                                b"Bearer " + replication_secret
+                            ]
+                        opentracing.inject_header_dict(headers, check_destination=False)
+                        try:
+                            result = await request_func(uri, data, headers=headers)
+                            break
+                        except RequestTimedOutError:
+                            if not cls.RETRY_ON_TIMEOUT:
+                                raise
+
+                        logger.warning("%s request timed out; retrying", cls.NAME)
+
+                        # If we timed out we probably don't need to worry about backing
+                        # off too much, but lets just wait a little anyway.
+                        await clock.sleep(1)
+                except HttpResponseException as e:
+                    # We convert to SynapseError as we know that it was a SynapseError
+                    # on the main process that we should send to the client. (And
+                    # importantly, not stack traces everywhere)
+                    _outgoing_request_counter.labels(cls.NAME, e.code).inc()
+                    raise e.to_synapse_error()
+                except Exception as e:
+                    _outgoing_request_counter.labels(cls.NAME, "ERR").inc()
+                    raise SynapseError(502, "Failed to talk to main process") from e
+
+                _outgoing_request_counter.labels(cls.NAME, 200).inc()
+                return result
 
         return send_request