diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9849953c9b..90302c9537 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -17,7 +17,7 @@ import logging
import simplejson as json
from twisted.internet import defer
-from synapse.api.errors import AuthError, FederationError, SynapseError
+from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import (
FederationBase,
@@ -56,6 +56,8 @@ class FederationServer(FederationBase):
self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler")
+ self.registry = hs.get_federation_registry()
+
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
@@ -67,35 +69,6 @@ class FederationServer(FederationBase):
"""
self.handler = handler
- def register_edu_handler(self, edu_type, handler):
- if edu_type in self.edu_handlers:
- raise KeyError("Already have an EDU handler for %s" % (edu_type,))
-
- self.edu_handlers[edu_type] = handler
-
- def register_query_handler(self, query_type, handler):
- """Sets the handler callable that will be used to handle an incoming
- federation Query of the given type.
-
- Args:
- query_type (str): Category name of the query, which should match
- the string used by make_query.
- handler (callable): Invoked to handle incoming queries of this type
-
- handler is invoked as:
- result = handler(args)
-
- where 'args' is a dict mapping strings to strings of the query
- arguments. It should return a Deferred that will eventually yield an
- object to encode as JSON.
- """
- if query_type in self.query_handlers:
- raise KeyError(
- "Already have a Query handler for %s" % (query_type,)
- )
-
- self.query_handlers[query_type] = handler
-
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
@@ -229,16 +202,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()
-
- if edu_type in self.edu_handlers:
- try:
- yield self.edu_handlers[edu_type](origin, content)
- except SynapseError as e:
- logger.info("Failed to handle edu %r: %r", edu_type, e)
- except Exception as e:
- logger.exception("Failed to handle edu %r", edu_type)
- else:
- logger.warn("Received EDU of type %s with no handler", edu_type)
+ yield self.registry.on_edu(edu_type, origin, content)
@defer.inlineCallbacks
@log_function
@@ -328,14 +292,8 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)
-
- if query_type in self.query_handlers:
- response = yield self.query_handlers[query_type](args)
- defer.returnValue((200, response))
- else:
- defer.returnValue(
- (404, "No handler for Query type '%s'" % (query_type,))
- )
+ resp = yield self.registry.on_query(query_type, args)
+ defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_make_join_request(self, room_id, user_id):
@@ -607,3 +565,66 @@ class FederationServer(FederationBase):
origin, room_id, event_dict
)
defer.returnValue(ret)
+
+
+class FederationHandlerRegistry(object):
+ """Allows classes to register themselves as handlers for a given EDU or
+ query type for incoming federation traffic.
+ """
+ def __init__(self):
+ self.edu_handlers = {}
+ self.query_handlers = {}
+
+ def register_edu_handler(self, edu_type, handler):
+ """Sets the handler callable that will be used to handle an incoming
+ federation EDU of the given type.
+
+ Args:
+ edu_type (str): The type of the incoming EDU to register handler for
+ handler (Callable[[str, dict]]): A callable invoked on incoming EDU
+ of the given type. The arguments are the origin server name and
+ the EDU contents.
+ """
+ if edu_type in self.edu_handlers:
+ raise KeyError("Already have an EDU handler for %s" % (edu_type,))
+
+ self.edu_handlers[edu_type] = handler
+
+ def register_query_handler(self, query_type, handler):
+ """Sets the handler callable that will be used to handle an incoming
+ federation query of the given type.
+
+ Args:
+ query_type (str): Category name of the query, which should match
+ the string used by make_query.
+ handler (Callable[[dict], Deferred[dict]]): Invoked to handle
+ incoming queries of this type. The return will be yielded
+ on and the result used as the response to the query request.
+ """
+ if query_type in self.query_handlers:
+ raise KeyError(
+ "Already have a Query handler for %s" % (query_type,)
+ )
+
+ self.query_handlers[query_type] = handler
+
+ @defer.inlineCallbacks
+ def on_edu(self, edu_type, origin, content):
+ handler = self.edu_handlers.get(edu_type)
+ if not handler:
+ logger.warn("No handler registered for EDU type %s", edu_type)
+
+ try:
+ yield handler(origin, content)
+ except SynapseError as e:
+ logger.info("Failed to handle edu %r: %r", edu_type, e)
+ except Exception as e:
+ logger.exception("Failed to handle edu %r", edu_type)
+
+ def on_query(self, query_type, args):
+ handler = self.query_handlers.get(query_type)
+ if not handler:
+ logger.warn("No handler registered for query type %s", query_type)
+ raise NotFoundError("No handler for Query type '%s'" % (query_type,))
+
+ return handler(args)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 0e83453851..9e58dbe64e 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -41,10 +41,12 @@ class DeviceHandler(BaseHandler):
self._edu_updater = DeviceListEduUpdater(hs, self)
- self.federation.register_edu_handler(
+ federation_registry = hs.get_federation_registry()
+
+ federation_registry.register_edu_handler(
"m.device_list_update", self._edu_updater.incoming_device_list_update,
)
- self.federation.register_query_handler(
+ federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices,
)
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index d996aa90bb..f147a20b73 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -37,7 +37,7 @@ class DeviceMessageHandler(object):
self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler(
+ hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8580ada60a..e955cb1f3c 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -37,7 +37,7 @@ class DirectoryHandler(BaseHandler):
self.event_creation_handler = hs.get_event_creation_handler()
self.federation = hs.get_replication_layer()
- self.federation.register_query_handler(
+ hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query
)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 9aa95f89e6..57f50a4e27 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -40,7 +40,7 @@ class E2eKeysHandler(object):
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
- self.federation.register_query_handler(
+ hs.get_federation_registry().register_query_handler(
"client_keys", self.on_federation_query_client_keys
)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cb158ba962..b11ae78350 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -98,24 +98,26 @@ class PresenceHandler(object):
self.state = hs.get_state_handler()
- self.replication.register_edu_handler(
+ federation_registry = hs.get_federation_registry()
+
+ federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index c9c2879038..c386c79bbd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -32,7 +32,7 @@ class ProfileHandler(BaseHandler):
super(ProfileHandler, self).__init__(hs)
self.federation = hs.get_replication_layer()
- self.federation.register_query_handler(
+ hs.get_federation_registry().register_query_handler(
"profile", self.on_profile_query
)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 0525765272..3f215c2b4e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -35,7 +35,7 @@ class ReceiptsHandler(BaseHandler):
self.store = hs.get_datastore()
self.hs = hs
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler(
+ hs.get_federation_registry().register_edu_handler(
"m.receipt", self._received_remote_receipt
)
self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 82dedbbc99..77c0cf146f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -56,7 +56,7 @@ class TypingHandler(object):
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
+ hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
hs.get_distributor().observe("user_left_room", self.user_left_room)
diff --git a/synapse/server.py b/synapse/server.py
index 5b6effbe31..1bc8d6f702 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -34,6 +34,7 @@ from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker
from synapse.federation import initialize_http_replication
from synapse.federation.send_queue import FederationRemoteSendQueue
+from synapse.federation.federation_server import FederationHandlerRegistry
from synapse.federation.transport.client import TransportLayerClient
from synapse.federation.transaction_queue import TransactionQueue
from synapse.handlers import Handlers
@@ -147,6 +148,7 @@ class HomeServer(object):
'groups_attestation_renewer',
'spam_checker',
'room_member_handler',
+ 'federation_registry',
]
def __init__(self, hostname, **kwargs):
@@ -387,6 +389,9 @@ class HomeServer(object):
def build_room_member_handler(self):
return RoomMemberHandler(self)
+ def build_federation_registry(self):
+ return FederationHandlerRegistry()
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py
index 5712773909..b103921498 100644
--- a/tests/handlers/test_directory.py
+++ b/tests/handlers/test_directory.py
@@ -35,21 +35,20 @@ class DirectoryTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
- self.mock_federation = Mock(spec=[
- "make_query",
- "register_edu_handler",
- ])
+ self.mock_federation = Mock()
+ self.mock_registry = Mock()
self.query_handlers = {}
def register_query_handler(query_type, handler):
self.query_handlers[query_type] = handler
- self.mock_federation.register_query_handler = register_query_handler
+ self.mock_registry.register_query_handler = register_query_handler
hs = yield setup_test_homeserver(
http_client=None,
resource_for_federation=Mock(),
replication_layer=self.mock_federation,
+ federation_registry=self.mock_registry,
)
hs.handlers = DirectoryHandlers(hs)
diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py
index a5f47181d7..73223ffbd3 100644
--- a/tests/handlers/test_profile.py
+++ b/tests/handlers/test_profile.py
@@ -37,23 +37,22 @@ class ProfileTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
- self.mock_federation = Mock(spec=[
- "make_query",
- "register_edu_handler",
- ])
+ self.mock_federation = Mock()
+ self.mock_registry = Mock()
self.query_handlers = {}
def register_query_handler(query_type, handler):
self.query_handlers[query_type] = handler
- self.mock_federation.register_query_handler = register_query_handler
+ self.mock_registry.register_query_handler = register_query_handler
hs = yield setup_test_homeserver(
http_client=None,
handlers=None,
resource_for_federation=Mock(),
replication_layer=self.mock_federation,
+ federation_registry=self.mock_registry,
ratelimiter=NonCallableMock(spec_set=[
"send_message",
])
|