diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index bea5335f89..01020566cf 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -66,6 +66,7 @@ class ReplicationLayer(object):
self.handler = None
self.edu_handlers = {}
+ self.query_handlers = {}
self._order = 0
@@ -84,6 +85,27 @@ class ReplicationLayer(object):
self.edu_handlers[edu_type] = handler
+ def register_query_handler(self, query_type, handler):
+ """Sets the handler callable that will be used to handle an incoming
+ federation Query of the given type.
+
+ Args:
+ query_type (str): Category name of the query, which should match
+ the string used by make_query.
+ handler (callable): Invoked to handle incoming queries of this type
+
+ handler is invoked as:
+ result = handler(args)
+
+ where 'args' is a dict mapping strings to strings of the query
+ arguments. It should return a Deferred that will eventually yield an
+ object to encode as JSON.
+ """
+ if query_type in self.query_handlers:
+ raise KeyError("Already have a Query handler for %s" % (query_type))
+
+ self.query_handlers[query_type] = handler
+
@defer.inlineCallbacks
@log_function
def send_pdu(self, pdu):
@@ -137,6 +159,24 @@ class ReplicationLayer(object):
# TODO, add errback, etc.
self._transaction_queue.enqueue_edu(edu)
+ @log_function
+ def make_query(self, destination, query_type, args):
+ """Sends a federation Query to a remote homeserver of the given type
+ and arguments.
+
+ Args:
+ destination (str): Domain name of the remote homeserver
+ query_type (str): Category of the query type; should match the
+ handler name used in register_query_handler().
+ args (dict): Mapping of strings to strings containing the details
+ of the query request.
+
+ Returns:
+ a Deferred which will eventually yield a JSON object from the
+ response
+ """
+ return self.transport_layer.make_query(destination, query_type, args)
+
@defer.inlineCallbacks
@log_function
def paginate(self, dest, context, limit):
@@ -341,6 +381,16 @@ class ReplicationLayer(object):
)
@defer.inlineCallbacks
+ def on_query_request(self, query_type, args):
+ if query_type in self.query_handlers:
+ response = yield self.query_handlers[query_type](args)
+ defer.returnValue((200, response))
+ else:
+ defer.returnValue((404, "No handler for Query type '%s'"
+ % (query_type)
+ ))
+
+ @defer.inlineCallbacks
@log_function
def _get_persisted_pdu(self, pdu_id, pdu_origin):
""" Get a PDU from the database with given origin and id.
|