summary refs log tree commit diff
path: root/synapse/replication/http/delayed_events.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/replication/http/delayed_events.py62
1 files changed, 62 insertions, 0 deletions
diff --git a/synapse/replication/http/delayed_events.py b/synapse/replication/http/delayed_events.py
new file mode 100644

index 0000000000..229022070c --- /dev/null +++ b/synapse/replication/http/delayed_events.py
@@ -0,0 +1,62 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# + +import logging +from typing import TYPE_CHECKING, Dict, Optional, Tuple + +from twisted.web.server import Request + +from synapse.http.server import HttpServer +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import JsonDict, JsonMapping + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationAddedDelayedEventRestServlet(ReplicationEndpoint): + """Handle a delayed event being added by another worker. + + Request format: + + POST /_synapse/replication/delayed_event_added/ + + {} + """ + + NAME = "added_delayed_event" + PATH_ARGS = () + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.handler = hs.get_delayed_events_handler() + + @staticmethod + async def _serialize_payload(next_send_ts: int) -> JsonDict: # type: ignore[override] + return {"next_send_ts": next_send_ts} + + async def _handle_request( # type: ignore[override] + self, request: Request, content: JsonDict + ) -> Tuple[int, Dict[str, Optional[JsonMapping]]]: + self.handler.on_added(int(content["next_send_ts"])) + + return 200, {} + + +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + ReplicationAddedDelayedEventRestServlet(hs).register(http_server)