diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index c9cf838255..d500051714 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -1,7 +1,7 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
-# Copyright (C) 2023 New Vector, Ltd
+# Copyright (C) 2023-2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING
from synapse.http.server import JsonResource
from synapse.replication.http import (
account_data,
+ delayed_events,
devices,
federation,
login,
@@ -64,3 +65,4 @@ class ReplicationRestResource(JsonResource):
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
+ delayed_events.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 9aa8d90bfe..0002538680 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -128,9 +128,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# We reserve `instance_name` as a parameter to sending requests, so we
# assert here that sub classes don't try and use the name.
- assert (
- "instance_name" not in self.PATH_ARGS
- ), "`instance_name` is a reserved parameter name"
+ assert "instance_name" not in self.PATH_ARGS, (
+ "`instance_name` is a reserved parameter name"
+ )
assert (
"instance_name"
not in signature(self.__class__._serialize_payload).parameters
diff --git a/synapse/replication/http/delayed_events.py b/synapse/replication/http/delayed_events.py
new file mode 100644
index 0000000000..229022070c
--- /dev/null
+++ b/synapse/replication/http/delayed_events.py
@@ -0,0 +1,62 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+
+import logging
+from typing import TYPE_CHECKING, Dict, Optional, Tuple
+
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict, JsonMapping
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationAddedDelayedEventRestServlet(ReplicationEndpoint):
+ """Handle a delayed event being added by another worker.
+
+ Request format:
+
+ POST /_synapse/replication/delayed_event_added/
+
+ {}
+ """
+
+ NAME = "added_delayed_event"
+ PATH_ARGS = ()
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.handler = hs.get_delayed_events_handler()
+
+ @staticmethod
+ async def _serialize_payload(next_send_ts: int) -> JsonDict: # type: ignore[override]
+ return {"next_send_ts": next_send_ts}
+
+ async def _handle_request( # type: ignore[override]
+ self, request: Request, content: JsonDict
+ ) -> Tuple[int, Dict[str, Optional[JsonMapping]]]:
+ self.handler.on_added(int(content["next_send_ts"]))
+
+ return 200, {}
+
+
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+ ReplicationAddedDelayedEventRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 9c537427df..940f418396 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -119,7 +119,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
return payload
- async def _handle_request(self, request: Request, content: JsonDict) -> Tuple[int, JsonDict]: # type: ignore[override]
+ async def _handle_request( # type: ignore[override]
+ self, request: Request, content: JsonDict
+ ) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_fed_send_events_parse"):
room_id = content["room_id"]
backfilled = content["backfilled"]
diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
index de07e75b46..48e254cdb1 100644
--- a/synapse/replication/http/push.py
+++ b/synapse/replication/http/push.py
@@ -48,7 +48,7 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
"""
- NAME = "add_user_account_data"
+ NAME = "remove_pusher"
PATH_ARGS = ("user_id",)
CACHE = False
@@ -98,7 +98,9 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
self._store = hs.get_datastores().main
@staticmethod
- async def _serialize_payload(user_id: str, old_room_id: str, new_room_id: str) -> JsonDict: # type: ignore[override]
+ async def _serialize_payload( # type: ignore[override]
+ user_id: str, old_room_id: str, new_room_id: str
+ ) -> JsonDict:
return {}
async def _handle_request( # type: ignore[override]
@@ -109,7 +111,6 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
old_room_id: str,
new_room_id: str,
) -> Tuple[int, JsonDict]:
-
await self._store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)
|