diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 64edadb624..2b3972cb14 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -92,7 +92,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
if self.CACHE:
self.response_cache = ResponseCache(
hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000
- )
+ ) # type: ResponseCache[str]
# We reserve `instance_name` as a parameter to sending requests, so we
# assert here that sub classes don't try and use the name.
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 5393b9a9e7..7a0dbb5b1a 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -62,7 +62,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.clock = hs.get_clock()
- self.federation_handler = hs.get_handlers().federation_handler
+ self.federation_handler = hs.get_federation_handler()
@staticmethod
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
@@ -254,20 +254,20 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
return 200, {}
-class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
+class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint):
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Request format:
- POST /_synapse/replication/store_room_on_invite/:room_id/:txn_id
+ POST /_synapse/replication/store_room_on_outlier_membership/:room_id/:txn_id
{
"room_version": "1",
}
"""
- NAME = "store_room_on_invite"
+ NAME = "store_room_on_outlier_membership"
PATH_ARGS = ("room_id",)
def __init__(self, hs):
@@ -282,7 +282,7 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
async def _handle_request(self, request, room_id):
content = parse_json_object_from_request(request)
room_version = KNOWN_ROOM_VERSIONS[content["room_version"]]
- await self.store.maybe_store_room_on_invite(room_id, room_version)
+ await self.store.maybe_store_room_on_outlier_membership(room_id, room_version)
return 200, {}
@@ -291,4 +291,4 @@ def register_servlets(hs, http_server):
ReplicationFederationSendEduRestServlet(hs).register(http_server)
ReplicationGetQueryRestServlet(hs).register(http_server)
ReplicationCleanRoomRestServlet(hs).register(http_server)
- ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
+ ReplicationStoreRoomOnOutlierMembershipRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 30680baee8..84e002f934 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -12,9 +12,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, List, Optional, Tuple
+
+from twisted.web.http import Request
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@@ -47,21 +48,28 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
def __init__(self, hs):
super().__init__(hs)
- self.federation_handler = hs.get_handlers().federation_handler
+ self.federation_handler = hs.get_federation_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@staticmethod
- async def _serialize_payload(
- requester, room_id, user_id, remote_room_hosts, content
- ):
+ async def _serialize_payload( # type: ignore
+ requester: Requester,
+ room_id: str,
+ user_id: str,
+ remote_room_hosts: List[str],
+ content: JsonDict,
+ ) -> JsonDict:
"""
Args:
- requester(Requester)
- room_id (str)
- user_id (str)
- remote_room_hosts (list[str]): Servers to try and join via
- content(dict): The event content to use for the join event
+ requester: The user making the request according to the access token
+ room_id: The ID of the room.
+ user_id: The ID of the user.
+ remote_room_hosts: Servers to try and join via
+ content: The event content to use for the join event
+
+ Returns:
+ A dict representing the payload of the request.
"""
return {
"requester": requester.serialize(),
@@ -69,7 +77,9 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
"content": content,
}
- async def _handle_request(self, request, room_id, user_id):
+ async def _handle_request( # type: ignore
+ self, request: Request, room_id: str, user_id: str
+ ) -> Tuple[int, JsonDict]:
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
@@ -77,8 +87,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
requester = Requester.deserialize(self.store, content["requester"])
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
logger.info("remote_join: %s into room: %s", user_id, room_id)
@@ -119,14 +128,17 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
txn_id: Optional[str],
requester: Requester,
content: JsonDict,
- ):
+ ) -> JsonDict:
"""
Args:
- invite_event_id: ID of the invite to be rejected
- txn_id: optional transaction ID supplied by the client
- requester: user making the rejection request, according to the access token
- content: additional content to include in the rejection event.
+ invite_event_id: The ID of the invite to be rejected.
+ txn_id: Optional transaction ID supplied by the client
+ requester: User making the rejection request, according to the access token
+ content: Additional content to include in the rejection event.
Normally an empty dict.
+
+ Returns:
+ A dict representing the payload of the request.
"""
return {
"txn_id": txn_id,
@@ -134,7 +146,9 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
"content": content,
}
- async def _handle_request(self, request, invite_event_id):
+ async def _handle_request( # type: ignore
+ self, request: Request, invite_event_id: str
+ ) -> Tuple[int, JsonDict]:
content = parse_json_object_from_request(request)
txn_id = content["txn_id"]
@@ -142,8 +156,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
requester = Requester.deserialize(self.store, content["requester"])
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
# hopefully we're now on the master, so this won't recurse!
event_id, stream_id = await self.member_handler.remote_reject_invite(
@@ -176,18 +189,25 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
self.distributor = hs.get_distributor()
@staticmethod
- async def _serialize_payload(room_id, user_id, change):
+ async def _serialize_payload( # type: ignore
+ room_id: str, user_id: str, change: str
+ ) -> JsonDict:
"""
Args:
- room_id (str)
- user_id (str)
- change (str): "left"
+ room_id: The ID of the room.
+ user_id: The ID of the user.
+ change: "left"
+
+ Returns:
+ A dict representing the payload of the request.
"""
assert change == "left"
return {}
- def _handle_request(self, request, room_id, user_id, change):
+ def _handle_request( # type: ignore
+ self, request: Request, room_id: str, user_id: str, change: str
+ ) -> Tuple[int, JsonDict]:
logger.info("user membership change: %s in %s", user_id, room_id)
user = UserID.from_string(user_id)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 9a3a694d5d..8fa104c8d3 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -46,6 +46,12 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"ratelimit": true,
"extra_users": [],
}
+
+ 200 OK
+
+ { "stream_id": 12345, "event_id": "$abcdef..." }
+
+ The returned event ID may not match the sent event if it was deduplicated.
"""
NAME = "send_event"
@@ -109,18 +115,23 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
logger.info(
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)
- stream_id = await self.event_creation_handler.persist_and_notify_client_event(
+ event = await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
- return 200, {"stream_id": stream_id}
+ return (
+ 200,
+ {
+ "stream_id": event.internal_metadata.stream_ordering,
+ "event_id": event.event_id,
+ },
+ )
def register_servlets(hs, http_server):
|