summary refs log tree commit diff
path: root/synapse/replication/http/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/http/_base.py')
-rw-r--r--synapse/replication/http/_base.py18
1 files changed, 7 insertions, 11 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index fb0dd04f88..6a28c2db9d 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -20,8 +20,6 @@ import urllib
 from inspect import signature
 from typing import Dict, List, Tuple
 
-from twisted.internet import defer
-
 from synapse.api.errors import (
     CodeMessageException,
     HttpResponseException,
@@ -101,7 +99,7 @@ class ReplicationEndpoint(object):
         assert self.METHOD in ("PUT", "POST", "GET")
 
     @abc.abstractmethod
-    def _serialize_payload(**kwargs):
+    async def _serialize_payload(**kwargs):
         """Static method that is called when creating a request.
 
         Concrete implementations should have explicit parameters (rather than
@@ -110,9 +108,8 @@ class ReplicationEndpoint(object):
         argument list.
 
         Returns:
-            Deferred[dict]|dict: If POST/PUT request then dictionary must be
-            JSON serialisable, otherwise must be appropriate for adding as
-            query args.
+            dict: If POST/PUT request then dictionary must be JSON serialisable,
+            otherwise must be appropriate for adding as query args.
         """
         return {}
 
@@ -144,8 +141,7 @@ class ReplicationEndpoint(object):
         instance_map = hs.config.worker.instance_map
 
         @trace(opname="outgoing_replication_request")
-        @defer.inlineCallbacks
-        def send_request(instance_name="master", **kwargs):
+        async def send_request(instance_name="master", **kwargs):
             if instance_name == local_instance_name:
                 raise Exception("Trying to send HTTP request to self")
             if instance_name == "master":
@@ -159,7 +155,7 @@ class ReplicationEndpoint(object):
                     "Instance %r not in 'instance_map' config" % (instance_name,)
                 )
 
-            data = yield cls._serialize_payload(**kwargs)
+            data = await cls._serialize_payload(**kwargs)
 
             url_args = [
                 urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
@@ -197,7 +193,7 @@ class ReplicationEndpoint(object):
                     headers = {}  # type: Dict[bytes, List[bytes]]
                     inject_active_span_byte_dict(headers, None, check_destination=False)
                     try:
-                        result = yield request_func(uri, data, headers=headers)
+                        result = await request_func(uri, data, headers=headers)
                         break
                     except CodeMessageException as e:
                         if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
@@ -207,7 +203,7 @@ class ReplicationEndpoint(object):
 
                     # If we timed out we probably don't need to worry about backing
                     # off too much, but lets just wait a little anyway.
-                    yield clock.sleep(1)
+                    await clock.sleep(1)
             except HttpResponseException as e:
                 # We convert to SynapseError as we know that it was a SynapseError
                 # on the master process that we should send to the client. (And