diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 5685cf2121..2a13026e9a 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -23,7 +23,8 @@ from prometheus_client import Counter, Gauge
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
-from synapse.logging.opentracing import inject_active_span_byte_dict, trace
+from synapse.logging import opentracing
+from synapse.logging.opentracing import trace
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
@@ -235,7 +236,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
- inject_active_span_byte_dict(headers, None, check_destination=False)
+ opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 289a397d68..043c25f63d 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -97,6 +97,76 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
return 200, {"event_id": event_id, "stream_id": stream_id}
+class ReplicationRemoteKnockRestServlet(ReplicationEndpoint):
+ """Perform a remote knock for the given user on the given room
+
+ Request format:
+
+ POST /_synapse/replication/remote_knock/:room_id/:user_id
+
+ {
+ "requester": ...,
+ "remote_room_hosts": [...],
+ "content": { ... }
+ }
+ """
+
+ NAME = "remote_knock"
+ PATH_ARGS = ("room_id", "user_id")
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.federation_handler = hs.get_federation_handler()
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ @staticmethod
+ async def _serialize_payload( # type: ignore
+ requester: Requester,
+ room_id: str,
+ user_id: str,
+ remote_room_hosts: List[str],
+ content: JsonDict,
+ ):
+ """
+ Args:
+ requester: The user making the request, according to the access token.
+ room_id: The ID of the room to knock on.
+ user_id: The ID of the knocking user.
+ remote_room_hosts: Servers to try and send the knock via.
+ content: The event content to use for the knock event.
+ """
+ return {
+ "requester": requester.serialize(),
+ "remote_room_hosts": remote_room_hosts,
+ "content": content,
+ }
+
+ async def _handle_request( # type: ignore
+ self,
+ request: SynapseRequest,
+ room_id: str,
+ user_id: str,
+ ):
+ content = parse_json_object_from_request(request)
+
+ remote_room_hosts = content["remote_room_hosts"]
+ event_content = content["content"]
+
+ requester = Requester.deserialize(self.store, content["requester"])
+
+ request.requester = requester
+
+ logger.debug("remote_knock: %s on room: %s", user_id, room_id)
+
+ event_id, stream_id = await self.federation_handler.do_knock(
+ remote_room_hosts, room_id, user_id, event_content
+ )
+
+ return 200, {"event_id": event_id, "stream_id": stream_id}
+
+
class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
"""Rejects an out-of-band invite we have received from a remote server
@@ -167,6 +237,75 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
return 200, {"event_id": event_id, "stream_id": stream_id}
+class ReplicationRemoteRescindKnockRestServlet(ReplicationEndpoint):
+ """Rescinds a local knock made on a remote room
+
+ Request format:
+
+ POST /_synapse/replication/remote_rescind_knock/:event_id
+
+ {
+ "txn_id": ...,
+ "requester": ...,
+ "content": { ... }
+ }
+ """
+
+ NAME = "remote_rescind_knock"
+ PATH_ARGS = ("knock_event_id",)
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.member_handler = hs.get_room_member_handler()
+
+ @staticmethod
+ async def _serialize_payload( # type: ignore
+ knock_event_id: str,
+ txn_id: Optional[str],
+ requester: Requester,
+ content: JsonDict,
+ ):
+ """
+ Args:
+ knock_event_id: The ID of the knock to be rescinded.
+ txn_id: An optional transaction ID supplied by the client.
+ requester: The user making the rescind request, according to the access token.
+ content: The content to include in the rescind event.
+ """
+ return {
+ "txn_id": txn_id,
+ "requester": requester.serialize(),
+ "content": content,
+ }
+
+ async def _handle_request( # type: ignore
+ self,
+ request: SynapseRequest,
+ knock_event_id: str,
+ ):
+ content = parse_json_object_from_request(request)
+
+ txn_id = content["txn_id"]
+ event_content = content["content"]
+
+ requester = Requester.deserialize(self.store, content["requester"])
+
+ request.requester = requester
+
+ # hopefully we're now on the master, so this won't recurse!
+ event_id, stream_id = await self.member_handler.remote_rescind_knock(
+ knock_event_id,
+ txn_id,
+ requester,
+ event_content,
+ )
+
+ return 200, {"event_id": event_id, "stream_id": stream_id}
+
+
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
"""Notifies that a user has joined or left the room
|