diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 5685cf2121..f13a7c23b4 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -23,7 +23,8 @@ from prometheus_client import Counter, Gauge
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
-from synapse.logging.opentracing import inject_active_span_byte_dict, trace
+from synapse.logging import opentracing
+from synapse.logging.opentracing import trace
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
@@ -235,7 +236,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
- inject_active_span_byte_dict(headers, None, check_destination=False)
+ opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
@@ -284,7 +285,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
self.__class__.__name__,
)
- def _check_auth_and_handle(self, request, **kwargs):
+ async def _check_auth_and_handle(self, request, **kwargs):
"""Called on new incoming requests when caching is enabled. Checks
if there is a cached response for the request and returns that,
otherwise calls `_handle_request` and caches its response.
@@ -299,8 +300,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
if self.CACHE:
txn_id = kwargs.pop("txn_id")
- return self.response_cache.wrap(
+ return await self.response_cache.wrap(
txn_id, self._handle_request, request, **kwargs
)
- return self._handle_request(request, **kwargs)
+ return await self._handle_request(request, **kwargs)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index bd03030b4b..34206c5060 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -114,7 +114,7 @@ class ReplicationRemoteKnockRestServlet(ReplicationEndpoint):
NAME = "remote_knock"
PATH_ARGS = ("room_id", "user_id")
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.federation_handler = hs.get_federation_handler()
@@ -345,7 +345,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
return {}
- def _handle_request( # type: ignore
+ async def _handle_request( # type: ignore
self, request: Request, room_id: str, user_id: str, change: str
) -> Tuple[int, JsonDict]:
logger.info("user membership change: %s in %s", user_id, room_id)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 7ced4c543c..2ad7a200bb 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -571,7 +571,7 @@ class ReplicationCommandHandler:
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
):
- """"Called when get a new REMOTE_SERVER_UP command."""
+ """Called when get a new REMOTE_SERVER_UP command."""
self._replication_data_handler.on_remote_server_up(cmd.data)
self._notifier.notify_remote_server_up(cmd.data)
|