diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 793cef6c26..ba16f22c91 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -16,32 +16,24 @@
import abc
import logging
import re
+import urllib
from inspect import signature
from typing import Dict, List, Tuple
-from six import raise_from
-from six.moves import urllib
-
-from twisted.internet import defer
-
from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
RequestSendFailed,
SynapseError,
)
-from synapse.logging.opentracing import (
- inject_active_span_byte_dict,
- trace,
- trace_servlet,
-)
+from synapse.logging.opentracing import inject_active_span_byte_dict, trace
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
-class ReplicationEndpoint(object):
+class ReplicationEndpoint:
"""Helper base class for defining new replication HTTP endpoints.
This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
@@ -98,16 +90,16 @@ class ReplicationEndpoint(object):
# assert here that sub classes don't try and use the name.
assert (
"instance_name" not in self.PATH_ARGS
- ), "`instance_name` is a reserved paramater name"
+ ), "`instance_name` is a reserved parameter name"
assert (
"instance_name"
not in signature(self.__class__._serialize_payload).parameters
- ), "`instance_name` is a reserved paramater name"
+ ), "`instance_name` is a reserved parameter name"
assert self.METHOD in ("PUT", "POST", "GET")
@abc.abstractmethod
- def _serialize_payload(**kwargs):
+ async def _serialize_payload(**kwargs):
"""Static method that is called when creating a request.
Concrete implementations should have explicit parameters (rather than
@@ -116,9 +108,8 @@ class ReplicationEndpoint(object):
argument list.
Returns:
- Deferred[dict]|dict: If POST/PUT request then dictionary must be
- JSON serialisable, otherwise must be appropriate for adding as
- query args.
+ dict: If POST/PUT request then dictionary must be JSON serialisable,
+ otherwise must be appropriate for adding as query args.
"""
return {}
@@ -150,8 +141,7 @@ class ReplicationEndpoint(object):
instance_map = hs.config.worker.instance_map
@trace(opname="outgoing_replication_request")
- @defer.inlineCallbacks
- def send_request(instance_name="master", **kwargs):
+ async def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
@@ -165,7 +155,7 @@ class ReplicationEndpoint(object):
"Instance %r not in 'instance_map' config" % (instance_name,)
)
- data = yield cls._serialize_payload(**kwargs)
+ data = await cls._serialize_payload(**kwargs)
url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
@@ -203,7 +193,7 @@ class ReplicationEndpoint(object):
headers = {} # type: Dict[bytes, List[bytes]]
inject_active_span_byte_dict(headers, None, check_destination=False)
try:
- result = yield request_func(uri, data, headers=headers)
+ result = await request_func(uri, data, headers=headers)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
@@ -213,14 +203,14 @@ class ReplicationEndpoint(object):
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
- yield clock.sleep(1)
+ await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to talk to master"), e)
+ raise SynapseError(502, "Failed to talk to master") from e
return result
@@ -242,11 +232,8 @@ class ReplicationEndpoint(object):
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
- handler = trace_servlet(self.__class__.__name__, extract_context=True)(handler)
- # We don't let register paths trace this servlet using the default tracing
- # options because we wish to extract the context explicitly.
http_server.register_paths(
- method, [pattern], handler, self.__class__.__name__, trace=False
+ method, [pattern], handler, self.__class__.__name__,
)
def _cached_handler(self, request, txn_id, **kwargs):
|