summary refs log tree commit diff
path: root/synapse/replication/http/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/http/_base.py')
-rw-r--r--synapse/replication/http/_base.py41
1 files changed, 14 insertions, 27 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py

index 793cef6c26..ba16f22c91 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -16,32 +16,24 @@ import abc import logging import re +import urllib from inspect import signature from typing import Dict, List, Tuple -from six import raise_from -from six.moves import urllib - -from twisted.internet import defer - from synapse.api.errors import ( CodeMessageException, HttpResponseException, RequestSendFailed, SynapseError, ) -from synapse.logging.opentracing import ( - inject_active_span_byte_dict, - trace, - trace_servlet, -) +from synapse.logging.opentracing import inject_active_span_byte_dict, trace from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) -class ReplicationEndpoint(object): +class ReplicationEndpoint: """Helper base class for defining new replication HTTP endpoints. This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..` @@ -98,16 +90,16 @@ class ReplicationEndpoint(object): # assert here that sub classes don't try and use the name. assert ( "instance_name" not in self.PATH_ARGS - ), "`instance_name` is a reserved paramater name" + ), "`instance_name` is a reserved parameter name" assert ( "instance_name" not in signature(self.__class__._serialize_payload).parameters - ), "`instance_name` is a reserved paramater name" + ), "`instance_name` is a reserved parameter name" assert self.METHOD in ("PUT", "POST", "GET") @abc.abstractmethod - def _serialize_payload(**kwargs): + async def _serialize_payload(**kwargs): """Static method that is called when creating a request. Concrete implementations should have explicit parameters (rather than @@ -116,9 +108,8 @@ class ReplicationEndpoint(object): argument list. Returns: - Deferred[dict]|dict: If POST/PUT request then dictionary must be - JSON serialisable, otherwise must be appropriate for adding as - query args. + dict: If POST/PUT request then dictionary must be JSON serialisable, + otherwise must be appropriate for adding as query args. """ return {} @@ -150,8 +141,7 @@ class ReplicationEndpoint(object): instance_map = hs.config.worker.instance_map @trace(opname="outgoing_replication_request") - @defer.inlineCallbacks - def send_request(instance_name="master", **kwargs): + async def send_request(instance_name="master", **kwargs): if instance_name == local_instance_name: raise Exception("Trying to send HTTP request to self") if instance_name == "master": @@ -165,7 +155,7 @@ class ReplicationEndpoint(object): "Instance %r not in 'instance_map' config" % (instance_name,) ) - data = yield cls._serialize_payload(**kwargs) + data = await cls._serialize_payload(**kwargs) url_args = [ urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS @@ -203,7 +193,7 @@ class ReplicationEndpoint(object): headers = {} # type: Dict[bytes, List[bytes]] inject_active_span_byte_dict(headers, None, check_destination=False) try: - result = yield request_func(uri, data, headers=headers) + result = await request_func(uri, data, headers=headers) break except CodeMessageException as e: if e.code != 504 or not cls.RETRY_ON_TIMEOUT: @@ -213,14 +203,14 @@ class ReplicationEndpoint(object): # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. - yield clock.sleep(1) + await clock.sleep(1) except HttpResponseException as e: # We convert to SynapseError as we know that it was a SynapseError # on the master process that we should send to the client. (And # importantly, not stack traces everywhere) raise e.to_synapse_error() except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to talk to master"), e) + raise SynapseError(502, "Failed to talk to master") from e return result @@ -242,11 +232,8 @@ class ReplicationEndpoint(object): args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args) pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args)) - handler = trace_servlet(self.__class__.__name__, extract_context=True)(handler) - # We don't let register paths trace this servlet using the default tracing - # options because we wish to extract the context explicitly. http_server.register_paths( - method, [pattern], handler, self.__class__.__name__, trace=False + method, [pattern], handler, self.__class__.__name__, ) def _cached_handler(self, request, txn_id, **kwargs):