summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/__init__.py8
-rw-r--r--synapse/federation/federation_base.py6
-rw-r--r--synapse/federation/federation_client.py1
-rw-r--r--synapse/federation/federation_server.py129
-rw-r--r--synapse/federation/replication.py73
-rw-r--r--synapse/federation/transport/server.py2
6 files changed, 82 insertions, 137 deletions
diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index 2e32d245ba..f5f0bdfca3 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -15,11 +15,3 @@
 
 """ This package includes all the federation specific logic.
 """
-
-from .replication import ReplicationLayer
-
-
-def initialize_http_replication(hs):
-    transport = hs.get_federation_transport_client()
-
-    return ReplicationLayer(hs, transport)
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 7918d3e442..79eaa31031 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -27,7 +27,13 @@ logger = logging.getLogger(__name__)
 
 class FederationBase(object):
     def __init__(self, hs):
+        self.hs = hs
+
+        self.server_name = hs.hostname
+        self.keyring = hs.get_keyring()
         self.spam_checker = hs.get_spam_checker()
+        self.store = hs.get_datastore()
+        self._clock = hs.get_clock()
 
     @defer.inlineCallbacks
     def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 813907f7f2..38440da5b5 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -58,6 +58,7 @@ class FederationClient(FederationBase):
             self._clear_tried_cache, 60 * 1000,
         )
         self.state = hs.get_state_handler()
+        self.transport_layer = hs.get_federation_transport_client()
 
     def _clear_tried_cache(self):
         """Clear pdu_destination_tried cache"""
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9849953c9b..bea7fd0b71 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -17,12 +17,14 @@ import logging
 import simplejson as json
 from twisted.internet import defer
 
-from synapse.api.errors import AuthError, FederationError, SynapseError
+from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.federation.federation_base import (
     FederationBase,
     event_from_pdu_json,
 )
+
+from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 import synapse.metrics
 from synapse.types import get_domain_from_id
@@ -52,50 +54,19 @@ class FederationServer(FederationBase):
         super(FederationServer, self).__init__(hs)
 
         self.auth = hs.get_auth()
+        self.handler = hs.get_handlers().federation_handler
 
         self._server_linearizer = async.Linearizer("fed_server")
         self._transaction_linearizer = async.Linearizer("fed_txn_handler")
 
+        self.transaction_actions = TransactionActions(self.store)
+
+        self.registry = hs.get_federation_registry()
+
         # We cache responses to state queries, as they take a while and often
         # come in waves.
         self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
 
-    def set_handler(self, handler):
-        """Sets the handler that the replication layer will use to communicate
-        receipt of new PDUs from other home servers. The required methods are
-        documented on :py:class:`.ReplicationHandler`.
-        """
-        self.handler = handler
-
-    def register_edu_handler(self, edu_type, handler):
-        if edu_type in self.edu_handlers:
-            raise KeyError("Already have an EDU handler for %s" % (edu_type,))
-
-        self.edu_handlers[edu_type] = handler
-
-    def register_query_handler(self, query_type, handler):
-        """Sets the handler callable that will be used to handle an incoming
-        federation Query of the given type.
-
-        Args:
-            query_type (str): Category name of the query, which should match
-                the string used by make_query.
-            handler (callable): Invoked to handle incoming queries of this type
-
-        handler is invoked as:
-            result = handler(args)
-
-        where 'args' is a dict mapping strings to strings of the query
-          arguments. It should return a Deferred that will eventually yield an
-          object to encode as JSON.
-        """
-        if query_type in self.query_handlers:
-            raise KeyError(
-                "Already have a Query handler for %s" % (query_type,)
-            )
-
-        self.query_handlers[query_type] = handler
-
     @defer.inlineCallbacks
     @log_function
     def on_backfill_request(self, origin, room_id, versions, limit):
@@ -229,16 +200,7 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     def received_edu(self, origin, edu_type, content):
         received_edus_counter.inc()
-
-        if edu_type in self.edu_handlers:
-            try:
-                yield self.edu_handlers[edu_type](origin, content)
-            except SynapseError as e:
-                logger.info("Failed to handle edu %r: %r", edu_type, e)
-            except Exception as e:
-                logger.exception("Failed to handle edu %r", edu_type)
-        else:
-            logger.warn("Received EDU of type %s with no handler", edu_type)
+        yield self.registry.on_edu(edu_type, origin, content)
 
     @defer.inlineCallbacks
     @log_function
@@ -328,14 +290,8 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     def on_query_request(self, query_type, args):
         received_queries_counter.inc(query_type)
-
-        if query_type in self.query_handlers:
-            response = yield self.query_handlers[query_type](args)
-            defer.returnValue((200, response))
-        else:
-            defer.returnValue(
-                (404, "No handler for Query type '%s'" % (query_type,))
-            )
+        resp = yield self.registry.on_query(query_type, args)
+        defer.returnValue((200, resp))
 
     @defer.inlineCallbacks
     def on_make_join_request(self, room_id, user_id):
@@ -607,3 +563,66 @@ class FederationServer(FederationBase):
             origin, room_id, event_dict
         )
         defer.returnValue(ret)
+
+
+class FederationHandlerRegistry(object):
+    """Allows classes to register themselves as handlers for a given EDU or
+    query type for incoming federation traffic.
+    """
+    def __init__(self):
+        self.edu_handlers = {}
+        self.query_handlers = {}
+
+    def register_edu_handler(self, edu_type, handler):
+        """Sets the handler callable that will be used to handle an incoming
+        federation EDU of the given type.
+
+        Args:
+            edu_type (str): The type of the incoming EDU to register handler for
+            handler (Callable[[str, dict]]): A callable invoked on incoming EDU
+                of the given type. The arguments are the origin server name and
+                the EDU contents.
+        """
+        if edu_type in self.edu_handlers:
+            raise KeyError("Already have an EDU handler for %s" % (edu_type,))
+
+        self.edu_handlers[edu_type] = handler
+
+    def register_query_handler(self, query_type, handler):
+        """Sets the handler callable that will be used to handle an incoming
+        federation query of the given type.
+
+        Args:
+            query_type (str): Category name of the query, which should match
+                the string used by make_query.
+            handler (Callable[[dict], Deferred[dict]]): Invoked to handle
+                incoming queries of this type. The return will be yielded
+                on and the result used as the response to the query request.
+        """
+        if query_type in self.query_handlers:
+            raise KeyError(
+                "Already have a Query handler for %s" % (query_type,)
+            )
+
+        self.query_handlers[query_type] = handler
+
+    @defer.inlineCallbacks
+    def on_edu(self, edu_type, origin, content):
+        handler = self.edu_handlers.get(edu_type)
+        if not handler:
+            logger.warn("No handler registered for EDU type %s", edu_type)
+
+        try:
+            yield handler(origin, content)
+        except SynapseError as e:
+            logger.info("Failed to handle edu %r: %r", edu_type, e)
+        except Exception as e:
+            logger.exception("Failed to handle edu %r", edu_type)
+
+    def on_query(self, query_type, args):
+        handler = self.query_handlers.get(query_type)
+        if not handler:
+            logger.warn("No handler registered for query type %s", query_type)
+            raise NotFoundError("No handler for Query type '%s'" % (query_type,))
+
+        return handler(args)
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
deleted file mode 100644
index 62d865ec4b..0000000000
--- a/synapse/federation/replication.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""This layer is responsible for replicating with remote home servers using
-a given transport.
-"""
-
-from .federation_client import FederationClient
-from .federation_server import FederationServer
-
-from .persistence import TransactionActions
-
-import logging
-
-
-logger = logging.getLogger(__name__)
-
-
-class ReplicationLayer(FederationClient, FederationServer):
-    """This layer is responsible for replicating with remote home servers over
-    the given transport. I.e., does the sending and receiving of PDUs to
-    remote home servers.
-
-    The layer communicates with the rest of the server via a registered
-    ReplicationHandler.
-
-    In more detail, the layer:
-        * Receives incoming data and processes it into transactions and pdus.
-        * Fetches any PDUs it thinks it might have missed.
-        * Keeps the current state for contexts up to date by applying the
-          suitable conflict resolution.
-        * Sends outgoing pdus wrapped in transactions.
-        * Fills out the references to previous pdus/transactions appropriately
-          for outgoing data.
-    """
-
-    def __init__(self, hs, transport_layer):
-        self.server_name = hs.hostname
-
-        self.keyring = hs.get_keyring()
-
-        self.transport_layer = transport_layer
-
-        self.federation_client = self
-
-        self.store = hs.get_datastore()
-
-        self.handler = None
-        self.edu_handlers = {}
-        self.query_handlers = {}
-
-        self._clock = hs.get_clock()
-
-        self.transaction_actions = TransactionActions(self.store)
-
-        self.hs = hs
-
-        super(ReplicationLayer, self).__init__(hs)
-
-    def __str__(self):
-        return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 06c16ba4fa..a66a6b0692 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = (
 def register_servlets(hs, resource, authenticator, ratelimiter):
     for servletclass in FEDERATION_SERVLET_CLASSES:
         servletclass(
-            handler=hs.get_replication_layer(),
+            handler=hs.get_federation_server(),
             authenticator=authenticator,
             ratelimiter=ratelimiter,
             server_name=hs.hostname,