summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/9042.feature1
-rw-r--r--synapse/federation/federation_server.py21
2 files changed, 17 insertions, 5 deletions
diff --git a/changelog.d/9042.feature b/changelog.d/9042.feature
new file mode 100644
index 0000000000..4ec319f1f2
--- /dev/null
+++ b/changelog.d/9042.feature
@@ -0,0 +1 @@
+Add experimental support for handling and persistence of to-device messages to happen on worker processes.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 35e345ce70..e5339aca23 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import random
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -860,8 +861,10 @@ class FederationHandlerRegistry:
         )  # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
         self.query_handlers = {}  # type: Dict[str, Callable[[dict], Awaitable[None]]]
 
-        # Map from type to instance name that we should route EDU handling to.
-        self._edu_type_to_instance = {}  # type: Dict[str, str]
+        # Map from type to instance names that we should route EDU handling to.
+        # We randomly choose one instance from the list to route to for each new
+        # EDU received.
+        self._edu_type_to_instance = {}  # type: Dict[str, List[str]]
 
     def register_edu_handler(
         self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
@@ -905,7 +908,12 @@ class FederationHandlerRegistry:
     def register_instance_for_edu(self, edu_type: str, instance_name: str):
         """Register that the EDU handler is on a different instance than master.
         """
-        self._edu_type_to_instance[edu_type] = instance_name
+        self._edu_type_to_instance[edu_type] = [instance_name]
+
+    def register_instances_for_edu(self, edu_type: str, instance_names: List[str]):
+        """Register that the EDU handler is on multiple instances.
+        """
+        self._edu_type_to_instance[edu_type] = instance_names
 
     async def on_edu(self, edu_type: str, origin: str, content: dict):
         if not self.config.use_presence and edu_type == "m.presence":
@@ -924,8 +932,11 @@ class FederationHandlerRegistry:
             return
 
         # Check if we can route it somewhere else that isn't us
-        route_to = self._edu_type_to_instance.get(edu_type, "master")
-        if route_to != self._instance_name:
+        instances = self._edu_type_to_instance.get(edu_type, ["master"])
+        if self._instance_name not in instances:
+            # Pick an instance randomly so that we don't overload one.
+            route_to = random.choice(instances)
+
             try:
                 await self._send_edu(
                     instance_name=route_to,