diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index c287c4e269..5393b9a9e7 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -15,8 +15,6 @@
import logging
-from twisted.internet import defer
-
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
@@ -59,7 +57,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
PATH_ARGS = ()
def __init__(self, hs):
- super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.storage = hs.get_storage()
@@ -67,18 +65,18 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.federation_handler = hs.get_handlers().federation_handler
@staticmethod
- @defer.inlineCallbacks
- def _serialize_payload(store, event_and_contexts, backfilled):
+ async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"""
Args:
store
+ room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
"""
event_payloads = []
for event, context in event_and_contexts:
- serialized_context = yield context.serialize(event, store)
+ serialized_context = await context.serialize(event, store)
event_payloads.append(
{
@@ -91,7 +89,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
}
)
- payload = {"events": event_payloads, "backfilled": backfilled}
+ payload = {
+ "events": event_payloads,
+ "backfilled": backfilled,
+ "room_id": room_id,
+ }
return payload
@@ -99,6 +101,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)
+ room_id = content["room_id"]
backfilled = content["backfilled"]
event_payloads = content["events"]
@@ -123,7 +126,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
logger.info("Got %d events from federation", len(event_and_contexts))
max_stream_id = await self.federation_handler.persist_events_and_notify(
- event_and_contexts, backfilled
+ room_id, event_and_contexts, backfilled
)
return 200, {"max_stream_id": max_stream_id}
@@ -147,14 +150,14 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
PATH_ARGS = ("edu_type",)
def __init__(self, hs):
- super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.registry = hs.get_federation_registry()
@staticmethod
- def _serialize_payload(edu_type, origin, content):
+ async def _serialize_payload(edu_type, origin, content):
return {"origin": origin, "content": content}
async def _handle_request(self, request, edu_type):
@@ -190,14 +193,14 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
CACHE = False
def __init__(self, hs):
- super(ReplicationGetQueryRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.registry = hs.get_federation_registry()
@staticmethod
- def _serialize_payload(query_type, args):
+ async def _serialize_payload(query_type, args):
"""
Args:
query_type (str)
@@ -233,12 +236,12 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
PATH_ARGS = ("room_id",)
def __init__(self, hs):
- super(ReplicationCleanRoomRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
@staticmethod
- def _serialize_payload(room_id, args):
+ async def _serialize_payload(room_id, args):
"""
Args:
room_id (str)
@@ -273,7 +276,7 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
self.store = hs.get_datastore()
@staticmethod
- def _serialize_payload(room_id, room_version):
+ async def _serialize_payload(room_id, room_version):
return {"room_version": room_version.identifier}
async def _handle_request(self, request, room_id):
|