diff options
Diffstat (limited to 'synapse/replication/http/_base.py')
-rw-r--r-- | synapse/replication/http/_base.py | 18 |
1 files changed, 7 insertions, 11 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index fb0dd04f88..6a28c2db9d 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -20,8 +20,6 @@ import urllib from inspect import signature from typing import Dict, List, Tuple -from twisted.internet import defer - from synapse.api.errors import ( CodeMessageException, HttpResponseException, @@ -101,7 +99,7 @@ class ReplicationEndpoint(object): assert self.METHOD in ("PUT", "POST", "GET") @abc.abstractmethod - def _serialize_payload(**kwargs): + async def _serialize_payload(**kwargs): """Static method that is called when creating a request. Concrete implementations should have explicit parameters (rather than @@ -110,9 +108,8 @@ class ReplicationEndpoint(object): argument list. Returns: - Deferred[dict]|dict: If POST/PUT request then dictionary must be - JSON serialisable, otherwise must be appropriate for adding as - query args. + dict: If POST/PUT request then dictionary must be JSON serialisable, + otherwise must be appropriate for adding as query args. """ return {} @@ -144,8 +141,7 @@ class ReplicationEndpoint(object): instance_map = hs.config.worker.instance_map @trace(opname="outgoing_replication_request") - @defer.inlineCallbacks - def send_request(instance_name="master", **kwargs): + async def send_request(instance_name="master", **kwargs): if instance_name == local_instance_name: raise Exception("Trying to send HTTP request to self") if instance_name == "master": @@ -159,7 +155,7 @@ class ReplicationEndpoint(object): "Instance %r not in 'instance_map' config" % (instance_name,) ) - data = yield cls._serialize_payload(**kwargs) + data = await cls._serialize_payload(**kwargs) url_args = [ urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS @@ -197,7 +193,7 @@ class ReplicationEndpoint(object): headers = {} # type: Dict[bytes, List[bytes]] inject_active_span_byte_dict(headers, None, check_destination=False) try: - result = yield request_func(uri, data, headers=headers) + result = await request_func(uri, data, headers=headers) break except CodeMessageException as e: if e.code != 504 or not cls.RETRY_ON_TIMEOUT: @@ -207,7 +203,7 @@ class ReplicationEndpoint(object): # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. - yield clock.sleep(1) + await clock.sleep(1) except HttpResponseException as e: # We convert to SynapseError as we know that it was a SynapseError # on the master process that we should send to the client. (And |