diff options
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 24 |
1 files changed, 17 insertions, 7 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 4b6ab470d0..171d25c945 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random from typing import ( TYPE_CHECKING, Any, @@ -48,7 +49,6 @@ from synapse.events import EventBase from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction -from synapse.http.endpoint import parse_server_name from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( make_deferred_yieldable, @@ -65,6 +65,7 @@ from synapse.types import JsonDict, get_domain_from_id from synapse.util import glob_to_regex, json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.util.stringutils import parse_server_name if TYPE_CHECKING: from synapse.server import HomeServer @@ -845,7 +846,6 @@ class FederationHandlerRegistry: def __init__(self, hs: "HomeServer"): self.config = hs.config - self.http_client = hs.get_simple_http_client() self.clock = hs.get_clock() self._instance_name = hs.get_instance_name() @@ -861,8 +861,10 @@ class FederationHandlerRegistry: ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]] self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]] - # Map from type to instance name that we should route EDU handling to. - self._edu_type_to_instance = {} # type: Dict[str, str] + # Map from type to instance names that we should route EDU handling to. + # We randomly choose one instance from the list to route to for each new + # EDU received. + self._edu_type_to_instance = {} # type: Dict[str, List[str]] def register_edu_handler( self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] @@ -906,7 +908,12 @@ class FederationHandlerRegistry: def register_instance_for_edu(self, edu_type: str, instance_name: str): """Register that the EDU handler is on a different instance than master. """ - self._edu_type_to_instance[edu_type] = instance_name + self._edu_type_to_instance[edu_type] = [instance_name] + + def register_instances_for_edu(self, edu_type: str, instance_names: List[str]): + """Register that the EDU handler is on multiple instances. + """ + self._edu_type_to_instance[edu_type] = instance_names async def on_edu(self, edu_type: str, origin: str, content: dict): if not self.config.use_presence and edu_type == "m.presence": @@ -925,8 +932,11 @@ class FederationHandlerRegistry: return # Check if we can route it somewhere else that isn't us - route_to = self._edu_type_to_instance.get(edu_type, "master") - if route_to != self._instance_name: + instances = self._edu_type_to_instance.get(edu_type, ["master"]) + if self._instance_name not in instances: + # Pick an instance randomly so that we don't overload one. + route_to = random.choice(instances) + try: await self._send_edu( instance_name=route_to, |