summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-16 15:12:54 +0100
committerGitHub <noreply@github.com>2020-07-16 15:12:54 +0100
commitf2e38ca86711a8f80cf45d3182e426ed8967fc81 (patch)
tree5a46223ed7b3e50f018d96a09776b7e442619377 /synapse/federation
parentAdd ability to run multiple pusher instances (#7855) (diff)
downloadsynapse-f2e38ca86711a8f80cf45d3182e426ed8967fc81.tar.xz
Allow moving typing off master (#7869)
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_server.py125
1 files changed, 74 insertions, 51 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8c53330c49..23625ba995 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -15,7 +15,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Awaitable,
+    Callable,
+    Dict,
+    List,
+    Match,
+    Optional,
+    Tuple,
+    Union,
+)
 
 from canonicaljson import json
 from prometheus_client import Counter, Histogram
@@ -56,6 +67,9 @@ from synapse.util import glob_to_regex, unwrapFirstError
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -768,11 +782,30 @@ class FederationHandlerRegistry(object):
     query type for incoming federation traffic.
     """
 
-    def __init__(self):
-        self.edu_handlers = {}
-        self.query_handlers = {}
+    def __init__(self, hs: "HomeServer"):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+        self._instance_name = hs.get_instance_name()
 
-    def register_edu_handler(self, edu_type: str, handler: Callable[[str, dict], None]):
+        # These are safe to load in monolith mode, but will explode if we try
+        # and use them. However we have guards before we use them to ensure that
+        # we don't route to ourselves, and in monolith mode that will always be
+        # the case.
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        self.edu_handlers = (
+            {}
+        )  # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
+        self.query_handlers = {}  # type: Dict[str, Callable[[dict], Awaitable[None]]]
+
+        # Map from type to instance name that we should route EDU handling to.
+        self._edu_type_to_instance = {}  # type: Dict[str, str]
+
+    def register_edu_handler(
+        self, edu_type: str, handler: Callable[[str, dict], Awaitable[None]]
+    ):
         """Sets the handler callable that will be used to handle an incoming
         federation EDU of the given type.
 
@@ -809,66 +842,56 @@ class FederationHandlerRegistry(object):
 
         self.query_handlers[query_type] = handler
 
+    def register_instance_for_edu(self, edu_type: str, instance_name: str):
+        """Register that the EDU handler is on a different instance than master.
+        """
+        self._edu_type_to_instance[edu_type] = instance_name
+
     async def on_edu(self, edu_type: str, origin: str, content: dict):
+        if not self.config.use_presence and edu_type == "m.presence":
+            return
+
+        # Check if we have a handler on this instance
         handler = self.edu_handlers.get(edu_type)
-        if not handler:
-            logger.warning("No handler registered for EDU type %s", edu_type)
+        if handler:
+            with start_active_span_from_edu(content, "handle_edu"):
+                try:
+                    await handler(origin, content)
+                except SynapseError as e:
+                    logger.info("Failed to handle edu %r: %r", edu_type, e)
+                except Exception:
+                    logger.exception("Failed to handle edu %r", edu_type)
             return
 
-        with start_active_span_from_edu(content, "handle_edu"):
+        # Check if we can route it somewhere else that isn't us
+        route_to = self._edu_type_to_instance.get(edu_type, "master")
+        if route_to != self._instance_name:
             try:
-                await handler(origin, content)
+                await self._send_edu(
+                    instance_name=route_to,
+                    edu_type=edu_type,
+                    origin=origin,
+                    content=content,
+                )
             except SynapseError as e:
                 logger.info("Failed to handle edu %r: %r", edu_type, e)
             except Exception:
                 logger.exception("Failed to handle edu %r", edu_type)
-
-    def on_query(self, query_type: str, args: dict) -> defer.Deferred:
-        handler = self.query_handlers.get(query_type)
-        if not handler:
-            logger.warning("No handler registered for query type %s", query_type)
-            raise NotFoundError("No handler for Query type '%s'" % (query_type,))
-
-        return handler(args)
-
-
-class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
-    """A FederationHandlerRegistry for worker processes.
-
-    When receiving EDU or queries it will check if an appropriate handler has
-    been registered on the worker, if there isn't one then it calls off to the
-    master process.
-    """
-
-    def __init__(self, hs):
-        self.config = hs.config
-        self.http_client = hs.get_simple_http_client()
-        self.clock = hs.get_clock()
-
-        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
-        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
-
-        super(ReplicationFederationHandlerRegistry, self).__init__()
-
-    async def on_edu(self, edu_type: str, origin: str, content: dict):
-        """Overrides FederationHandlerRegistry
-        """
-        if not self.config.use_presence and edu_type == "m.presence":
             return
 
-        handler = self.edu_handlers.get(edu_type)
-        if handler:
-            return await super(ReplicationFederationHandlerRegistry, self).on_edu(
-                edu_type, origin, content
-            )
-
-        return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
+        # Oh well, let's just log and move on.
+        logger.warning("No handler registered for EDU type %s", edu_type)
 
     async def on_query(self, query_type: str, args: dict):
-        """Overrides FederationHandlerRegistry
-        """
         handler = self.query_handlers.get(query_type)
         if handler:
             return await handler(args)
 
-        return await self._get_query_client(query_type=query_type, args=args)
+        # Check if we can route it somewhere else that isn't us
+        if self._instance_name == "master":
+            return await self._get_query_client(query_type=query_type, args=args)
+
+        # Uh oh, no handler! Let's raise an exception so the request returns an
+        # error.
+        logger.warning("No handler registered for query type %s", query_type)
+        raise NotFoundError("No handler for Query type '%s'" % (query_type,))