summary refs log tree commit diff
path: root/synapse/replication/http/federation.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-01-18 19:35:29 +0000
committerGitHub <noreply@github.com>2023-01-18 19:35:29 +0000
commit9187fd940e2b2bbfd4df7204053cc26b2707aad4 (patch)
treef01de70e9daf00857dff25751e8dbc7c162e5fe5 /synapse/replication/http/federation.py
parentChange default room version to 10. Implements MSC3904 (#14111) (diff)
downloadsynapse-9187fd940e2b2bbfd4df7204053cc26b2707aad4.tar.xz
Wait for streams to catch up when processing HTTP replication. (#14820)
This should hopefully mitigate a class of races where data gets out of
sync due a HTTP replication request racing with the replication streams.
Diffstat (limited to 'synapse/replication/http/federation.py')
-rw-r--r--synapse/replication/http/federation.py28
1 files changed, 9 insertions, 19 deletions
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index d3abafed28..53ad327030 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -21,7 +21,6 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
 from synapse.http.server import HttpServer
-from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
 from synapse.types import JsonDict
 from synapse.util.metrics import Measure
@@ -114,10 +113,8 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
         return payload
 
-    async def _handle_request(self, request: Request) -> Tuple[int, JsonDict]:  # type: ignore[override]
+    async def _handle_request(self, request: Request, content: JsonDict) -> Tuple[int, JsonDict]:  # type: ignore[override]
         with Measure(self.clock, "repl_fed_send_events_parse"):
-            content = parse_json_object_from_request(request)
-
             room_id = content["room_id"]
             backfilled = content["backfilled"]
 
@@ -181,13 +178,10 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
         return {"origin": origin, "content": content}
 
     async def _handle_request(  # type: ignore[override]
-        self, request: Request, edu_type: str
+        self, request: Request, content: JsonDict, edu_type: str
     ) -> Tuple[int, JsonDict]:
-        with Measure(self.clock, "repl_fed_send_edu_parse"):
-            content = parse_json_object_from_request(request)
-
-            origin = content["origin"]
-            edu_content = content["content"]
+        origin = content["origin"]
+        edu_content = content["content"]
 
         logger.info("Got %r edu from %s", edu_type, origin)
 
@@ -231,13 +225,10 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
         return {"args": args}
 
     async def _handle_request(  # type: ignore[override]
-        self, request: Request, query_type: str
+        self, request: Request, content: JsonDict, query_type: str
     ) -> Tuple[int, JsonDict]:
-        with Measure(self.clock, "repl_fed_query_parse"):
-            content = parse_json_object_from_request(request)
-
-            args = content["args"]
-            args["origin"] = content["origin"]
+        args = content["args"]
+        args["origin"] = content["origin"]
 
         logger.info("Got %r query from %s", query_type, args["origin"])
 
@@ -274,7 +265,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
         return {}
 
     async def _handle_request(  # type: ignore[override]
-        self, request: Request, room_id: str
+        self, request: Request, content: JsonDict, room_id: str
     ) -> Tuple[int, JsonDict]:
         await self.store.clean_room_for_join(room_id)
 
@@ -307,9 +298,8 @@ class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint):
         return {"room_version": room_version.identifier}
 
     async def _handle_request(  # type: ignore[override]
-        self, request: Request, room_id: str
+        self, request: Request, content: JsonDict, room_id: str
     ) -> Tuple[int, JsonDict]:
-        content = parse_json_object_from_request(request)
         room_version = KNOWN_ROOM_VERSIONS[content["room_version"]]
         await self.store.maybe_store_room_on_outlier_membership(room_id, room_version)
         return 200, {}