summary refs log tree commit diff
path: root/synapse/replication/http
diff options
context:
space:
mode:
authorSean Quah <8349537+squahtx@users.noreply.github.com>2022-07-07 13:19:31 +0100
committerGitHub <noreply@github.com>2022-07-07 12:19:31 +0000
commit1391a76cd2b287daebe61f7d8ea03b258ed522f5 (patch)
tree9a0265253ca0dd85b3fd12df1b2b3068f360554d /synapse/replication/http
parentUse a single query in `ProfileHandler.get_profile` (#13209) (diff)
downloadsynapse-1391a76cd2b287daebe61f7d8ea03b258ed522f5.tar.xz
Faster room joins: fix race in recalculation of current room state (#13151)
Bounce recalculation of current state to the correct event persister and
move recalculation of current state into the event persistence queue, to
avoid concurrent updates to a room's current state.

Also give recalculation of a room's current state a real stream
ordering.

Signed-off-by: Sean Quah <seanq@matrix.org>
Diffstat (limited to 'synapse/replication/http')
-rw-r--r--synapse/replication/http/__init__.py2
-rw-r--r--synapse/replication/http/state.py75
2 files changed, 77 insertions, 0 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index aec040ee19..53aa7fa4c6 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -25,6 +25,7 @@ from synapse.replication.http import (
     push,
     register,
     send_event,
+    state,
     streams,
 )
 
@@ -48,6 +49,7 @@ class ReplicationRestResource(JsonResource):
         streams.register_servlets(hs, self)
         account_data.register_servlets(hs, self)
         push.register_servlets(hs, self)
+        state.register_servlets(hs, self)
 
         # The following can't currently be instantiated on workers.
         if hs.config.worker.worker_app is None:
diff --git a/synapse/replication/http/state.py b/synapse/replication/http/state.py
new file mode 100644
index 0000000000..838b7584e5
--- /dev/null
+++ b/synapse/replication/http/state.py
@@ -0,0 +1,75 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING, Tuple
+
+from twisted.web.server import Request
+
+from synapse.api.errors import SynapseError
+from synapse.http.server import HttpServer
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationUpdateCurrentStateRestServlet(ReplicationEndpoint):
+    """Recalculates the current state for a room, and persists it.
+
+    The API looks like:
+
+        POST /_synapse/replication/update_current_state/:room_id
+
+        {}
+
+        200 OK
+
+        {}
+    """
+
+    NAME = "update_current_state"
+    PATH_ARGS = ("room_id",)
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self._state_handler = hs.get_state_handler()
+        self._events_shard_config = hs.config.worker.events_shard_config
+        self._instance_name = hs.get_instance_name()
+
+    @staticmethod
+    async def _serialize_payload(room_id: str) -> JsonDict:  # type: ignore[override]
+        return {}
+
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, room_id: str
+    ) -> Tuple[int, JsonDict]:
+        writer_instance = self._events_shard_config.get_instance(room_id)
+        if writer_instance != self._instance_name:
+            raise SynapseError(
+                400, "/update_current_state request was routed to the wrong worker"
+            )
+
+        await self._state_handler.update_current_state(room_id)
+
+        return 200, {}
+
+
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    if hs.get_instance_name() in hs.config.worker.writers.events:
+        ReplicationUpdateCurrentStateRestServlet(hs).register(http_server)