diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 49a3251372..7e23b565b9 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,7 +17,8 @@ import logging
from twisted.internet import defer
-from synapse.events import event_type_from_format_version
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@@ -37,6 +38,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
{
"events": [{
"event": { .. serialized event .. },
+ "room_version": .., // "1", "2", "3", etc: the version of the room
+ // containing the event
+ "event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
@@ -72,6 +76,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_payloads.append(
{
"event": event.get_pdu_json(),
+ "room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
@@ -94,12 +99,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_and_contexts = []
for event_payload in event_payloads:
event_dict = event_payload["event"]
- format_ver = event_payload["event_format_version"]
+ room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
- EventType = event_type_from_format_version(format_ver)
- event = EventType(event_dict, internal_metadata, rejected_reason)
+ event = make_event_from_dict(
+ event_dict, room_ver, internal_metadata, rejected_reason
+ )
context = EventContext.deserialize(
self.storage, event_payload["context"]
@@ -211,7 +217,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
Request format:
- POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+ POST /_synapse/replication/fed_cleanup_room/:room_id/:txn_id
{}
"""
@@ -238,8 +244,41 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
return 200, {}
+class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Request format:
+
+ POST /_synapse/replication/store_room_on_invite/:room_id/:txn_id
+
+ {
+ "room_version": "1",
+ }
+ """
+
+ NAME = "store_room_on_invite"
+ PATH_ARGS = ("room_id",)
+
+ def __init__(self, hs):
+ super().__init__(hs)
+
+ self.store = hs.get_datastore()
+
+ @staticmethod
+ def _serialize_payload(room_id, room_version):
+ return {"room_version": room_version.identifier}
+
+ async def _handle_request(self, request, room_id):
+ content = parse_json_object_from_request(request)
+ room_version = KNOWN_ROOM_VERSIONS[content["room_version"]]
+ await self.store.maybe_store_room_on_invite(room_id, room_version)
+ return 200, {}
+
+
def register_servlets(hs, http_server):
ReplicationFederationSendEventsRestServlet(hs).register(http_server)
ReplicationFederationSendEduRestServlet(hs).register(http_server)
ReplicationGetQueryRestServlet(hs).register(http_server)
ReplicationCleanRoomRestServlet(hs).register(http_server)
+ ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
|