summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-07-26 11:44:22 +0100
committerErik Johnston <erik@matrix.org>2018-08-06 15:23:31 +0100
commita3f5bf79a0fc0ea6d59069945f53717a3e9c6581 (patch)
tree5e8cabe33af2923ff80502b9e3e4eda6118af65b /synapse/federation/federation_server.py
parentAdd replication APIs for persisting federation events (diff)
downloadsynapse-a3f5bf79a0fc0ea6d59069945f53717a3e9c6581.tar.xz
Add EDU/query handling over replication
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py43
1 files changed, 43 insertions, 0 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bf89d568af..941e30a596 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -33,6 +33,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEduRestServlet,
+    ReplicationGetQueryRestServlet,
+)
 from synapse.types import get_domain_from_id
 from synapse.util import async
 from synapse.util.caches.response_cache import ResponseCache
@@ -745,6 +749,8 @@ class FederationHandlerRegistry(object):
         if edu_type in self.edu_handlers:
             raise KeyError("Already have an EDU handler for %s" % (edu_type,))
 
+        logger.info("Registering federation EDU handler for %r", edu_type)
+
         self.edu_handlers[edu_type] = handler
 
     def register_query_handler(self, query_type, handler):
@@ -763,6 +769,8 @@ class FederationHandlerRegistry(object):
                 "Already have a Query handler for %s" % (query_type,)
             )
 
+        logger.info("Registering federation query handler for %r", query_type)
+
         self.query_handlers[query_type] = handler
 
     @defer.inlineCallbacks
@@ -785,3 +793,38 @@ class FederationHandlerRegistry(object):
             raise NotFoundError("No handler for Query type '%s'" % (query_type,))
 
         return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+    def __init__(self, hs):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        super(ReplicationFederationHandlerRegistry, self).__init__()
+
+    def on_edu(self, edu_type, origin, content):
+        handler = self.edu_handlers.get(edu_type)
+        if handler:
+            return super(ReplicationFederationHandlerRegistry, self).on_edu(
+                edu_type, origin, content,
+            )
+
+        return self._send_edu(
+                edu_type=edu_type,
+                origin=origin,
+                content=content,
+        )
+
+    def on_query(self, query_type, args):
+        handler = self.query_handlers.get(query_type)
+        if handler:
+            return handler(args)
+
+        return self._get_query_client(
+                query_type=query_type,
+                args=args,
+        )