summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2018-08-15 14:59:02 +0100
committerGitHub <noreply@github.com>2018-08-15 14:59:02 +0100
commitdc56c47dc0991c4ddd5de59224c5f55c8fad5287 (patch)
tree083791ba6e3b5a80e4b8d51482b986e30bddd309 /synapse/federation/federation_server.py
parentMerge pull request #3687 from matrix-org/neilj/admin_email (diff)
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/split_fede... (diff)
downloadsynapse-dc56c47dc0991c4ddd5de59224c5f55c8fad5287.tar.xz
Merge pull request #3653 from matrix-org/erikj/split_federation
Move more federation APIs to workers
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py54
1 files changed, 54 insertions, 0 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a23136784a..3e0cd294a1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEduRestServlet,
+    ReplicationGetQueryRestServlet,
+)
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
@@ -760,6 +764,8 @@ class FederationHandlerRegistry(object):
         if edu_type in self.edu_handlers:
             raise KeyError("Already have an EDU handler for %s" % (edu_type,))
 
+        logger.info("Registering federation EDU handler for %r", edu_type)
+
         self.edu_handlers[edu_type] = handler
 
     def register_query_handler(self, query_type, handler):
@@ -778,6 +784,8 @@ class FederationHandlerRegistry(object):
                 "Already have a Query handler for %s" % (query_type,)
             )
 
+        logger.info("Registering federation query handler for %r", query_type)
+
         self.query_handlers[query_type] = handler
 
     @defer.inlineCallbacks
@@ -800,3 +808,49 @@ class FederationHandlerRegistry(object):
             raise NotFoundError("No handler for Query type '%s'" % (query_type,))
 
         return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+    """A FederationHandlerRegistry for worker processes.
+
+    When receiving EDU or queries it will check if an appropriate handler has
+    been registered on the worker, if there isn't one then it calls off to the
+    master process.
+    """
+
+    def __init__(self, hs):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        super(ReplicationFederationHandlerRegistry, self).__init__()
+
+    def on_edu(self, edu_type, origin, content):
+        """Overrides FederationHandlerRegistry
+        """
+        handler = self.edu_handlers.get(edu_type)
+        if handler:
+            return super(ReplicationFederationHandlerRegistry, self).on_edu(
+                edu_type, origin, content,
+            )
+
+        return self._send_edu(
+                edu_type=edu_type,
+                origin=origin,
+                content=content,
+        )
+
+    def on_query(self, query_type, args):
+        """Overrides FederationHandlerRegistry
+        """
+        handler = self.query_handlers.get(query_type)
+        if handler:
+            return handler(args)
+
+        return self._get_query_client(
+                query_type=query_type,
+                args=args,
+        )