diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index c9cf838255..d500051714 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -1,7 +1,7 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
-# Copyright (C) 2023 New Vector, Ltd
+# Copyright (C) 2023-2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING
from synapse.http.server import JsonResource
from synapse.replication.http import (
account_data,
+ delayed_events,
devices,
federation,
login,
@@ -64,3 +65,4 @@ class ReplicationRestResource(JsonResource):
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
+ delayed_events.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 9aa8d90bfe..0002538680 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -128,9 +128,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# We reserve `instance_name` as a parameter to sending requests, so we
# assert here that sub classes don't try and use the name.
- assert (
- "instance_name" not in self.PATH_ARGS
- ), "`instance_name` is a reserved parameter name"
+ assert "instance_name" not in self.PATH_ARGS, (
+ "`instance_name` is a reserved parameter name"
+ )
assert (
"instance_name"
not in signature(self.__class__._serialize_payload).parameters
diff --git a/synapse/replication/http/delayed_events.py b/synapse/replication/http/delayed_events.py
new file mode 100644
index 0000000000..229022070c
--- /dev/null
+++ b/synapse/replication/http/delayed_events.py
@@ -0,0 +1,62 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+
+import logging
+from typing import TYPE_CHECKING, Dict, Optional, Tuple
+
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict, JsonMapping
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationAddedDelayedEventRestServlet(ReplicationEndpoint):
+ """Handle a delayed event being added by another worker.
+
+ Request format:
+
+ POST /_synapse/replication/delayed_event_added/
+
+ {}
+ """
+
+ NAME = "added_delayed_event"
+ PATH_ARGS = ()
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.handler = hs.get_delayed_events_handler()
+
+ @staticmethod
+ async def _serialize_payload(next_send_ts: int) -> JsonDict: # type: ignore[override]
+ return {"next_send_ts": next_send_ts}
+
+ async def _handle_request( # type: ignore[override]
+ self, request: Request, content: JsonDict
+ ) -> Tuple[int, Dict[str, Optional[JsonMapping]]]:
+ self.handler.on_added(int(content["next_send_ts"]))
+
+ return 200, {}
+
+
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+ ReplicationAddedDelayedEventRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 9c537427df..940f418396 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -119,7 +119,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
return payload
- async def _handle_request(self, request: Request, content: JsonDict) -> Tuple[int, JsonDict]: # type: ignore[override]
+ async def _handle_request( # type: ignore[override]
+ self, request: Request, content: JsonDict
+ ) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_fed_send_events_parse"):
room_id = content["room_id"]
backfilled = content["backfilled"]
diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
index de07e75b46..48e254cdb1 100644
--- a/synapse/replication/http/push.py
+++ b/synapse/replication/http/push.py
@@ -48,7 +48,7 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
"""
- NAME = "add_user_account_data"
+ NAME = "remove_pusher"
PATH_ARGS = ("user_id",)
CACHE = False
@@ -98,7 +98,9 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
self._store = hs.get_datastores().main
@staticmethod
- async def _serialize_payload(user_id: str, old_room_id: str, new_room_id: str) -> JsonDict: # type: ignore[override]
+ async def _serialize_payload( # type: ignore[override]
+ user_id: str, old_room_id: str, new_room_id: str
+ ) -> JsonDict:
return {}
async def _handle_request( # type: ignore[override]
@@ -109,7 +111,6 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
old_room_id: str,
new_room_id: str,
) -> Tuple[int, JsonDict]:
-
await self._store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 3dddbb70b4..0bd5478cd3 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -18,8 +18,8 @@
# [This file includes modifications made by New Vector Limited]
#
#
-"""A replication client for use by synapse workers.
-"""
+"""A replication client for use by synapse workers."""
+
import logging
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index b7a7e77597..6ab5356660 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -23,6 +23,7 @@
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""
+
import abc
import logging
from typing import List, Optional, Tuple, Type, TypeVar
@@ -494,7 +495,7 @@ class LockReleasedCommand(Command):
class NewActiveTaskCommand(_SimpleCommand):
- """Sent to inform instance handling background tasks that a new active task is available to run.
+ """Sent to inform instance handling background tasks that a new task is ready to run.
Format::
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 72a42cb6cc..1fafbb48c3 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -727,7 +727,7 @@ class ReplicationCommandHandler:
) -> None:
"""Called when get a new NEW_ACTIVE_TASK command."""
if self._task_scheduler:
- self._task_scheduler.launch_task_by_id(cmd.data)
+ self._task_scheduler.on_new_task(cmd.data)
def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
@@ -857,7 +857,7 @@ UpdateRow = TypeVar("UpdateRow")
def _batch_updates(
- updates: Iterable[Tuple[UpdateToken, UpdateRow]]
+ updates: Iterable[Tuple[UpdateToken, UpdateRow]],
) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]:
"""Collect stream updates with the same token together
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 4471cc8f0c..fb9c539122 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -23,6 +23,7 @@ protocols.
An explanation of this protocol is available in docs/tcp_replication.md
"""
+
import fcntl
import logging
import struct
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index c0329378ac..d647a2b332 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -18,8 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
-"""The server side of the replication stream.
-"""
+"""The server side of the replication stream."""
import logging
import random
@@ -307,7 +306,7 @@ class ReplicationStreamer:
def _batch_updates(
- updates: List[Tuple[Token, StreamRow]]
+ updates: List[Tuple[Token, StreamRow]],
) -> List[Tuple[Optional[Token], StreamRow]]:
"""Takes a list of updates of form [(token, row)] and sets the token to
None for all rows where the next row has the same token. This is used to
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index d021904de7..ebf5964d29 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -247,7 +247,7 @@ class _StreamFromIdGen(Stream):
def current_token_without_instance(
- current_token: Callable[[], int]
+ current_token: Callable[[], int],
) -> Callable[[str], int]:
"""Takes a current token callback function for a single writer stream
that doesn't take an instance name parameter and wraps it in a function that
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ea0803dfc2..05b55fb033 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -200,9 +200,9 @@ class EventsStream(_StreamFromIdGen):
# we rely on get_all_new_forward_event_rows strictly honouring the limit, so
# that we know it is safe to just take upper_limit = event_rows[-1][0].
- assert (
- len(event_rows) <= target_row_count
- ), "get_all_new_forward_event_rows did not honour row limit"
+ assert len(event_rows) <= target_row_count, (
+ "get_all_new_forward_event_rows did not honour row limit"
+ )
# if we hit the limit on event_updates, there's no point in going beyond the
# last stream_id in the batch for the other sources.
|