diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index d482193851..8c7d510ef6 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -403,12 +403,19 @@ class ReplicationLayer(object):
defer.returnValue(
(404, "No handler for Query type '%s'" % (query_type, ))
)
+
@defer.inlineCallbacks
def on_make_join_request(self, context, user_id):
pdu = yield self.handler.on_make_join_request(context, user_id)
defer.returnValue(pdu.get_dict())
@defer.inlineCallbacks
+ def on_invite_request(self, origin, content):
+ pdu = Pdu(**content)
+ ret_pdu = yield self.handler.on_send_join_request(origin, pdu)
+ defer.returnValue((200, ret_pdu.get_dict()))
+
+ @defer.inlineCallbacks
def on_send_join_request(self, origin, content):
pdu = Pdu(**content)
state = yield self.handler.on_send_join_request(origin, pdu)
@@ -426,8 +433,9 @@ class ReplicationLayer(object):
defer.returnValue(Pdu(**pdu_dict))
+ @defer.inlineCallbacks
def send_join(self, destination, pdu):
- return self.transport_layer.send_join(
+ _, content = yield self.transport_layer.send_join(
destination,
pdu.context,
pdu.pdu_id,
@@ -435,6 +443,13 @@ class ReplicationLayer(object):
pdu.get_dict(),
)
+ logger.debug("Got content: %s", content)
+ pdus = [Pdu(outlier=True, **p) for p in content.get("pdus", [])]
+ for pdu in pdus:
+ yield self._handle_new_pdu(destination, pdu)
+
+ defer.returnValue(pdus)
+
@defer.inlineCallbacks
@log_function
def _get_persisted_pdu(self, pdu_id, pdu_origin):
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index a0d34fd24d..de64702e2f 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -229,13 +229,36 @@ class TransportLayer(object):
pdu_id,
)
- response = yield self.client.put_json(
+ code, content = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
- defer.returnValue(response)
+ if not 200 <= code < 300:
+ raise RuntimeError("Got %d from send_join", code)
+
+ defer.returnValue(json.loads(content))
+
+ @defer.inlineCallbacks
+ @log_function
+ def send_invite(self, destination, context, pdu_id, origin, content):
+ path = PREFIX + "/invite/%s/%s/%s" % (
+ context,
+ origin,
+ pdu_id,
+ )
+
+ code, content = yield self.client.put_json(
+ destination=destination,
+ path=path,
+ data=content,
+ )
+
+ if not 200 <= code < 300:
+ raise RuntimeError("Got %d from send_invite", code)
+
+ defer.returnValue(json.loads(content))
@defer.inlineCallbacks
def _authenticate_request(self, request):
@@ -297,9 +320,13 @@ class TransportLayer(object):
@defer.inlineCallbacks
def new_handler(request, *args, **kwargs):
(origin, content) = yield self._authenticate_request(request)
- response = yield handler(
- origin, content, request.args, *args, **kwargs
- )
+ try:
+ response = yield handler(
+ origin, content, request.args, *args, **kwargs
+ )
+ except:
+ logger.exception("Callback failed")
+ raise
defer.returnValue(response)
return new_handler
@@ -419,6 +446,17 @@ class TransportLayer(object):
)
)
+ self.server.register_path(
+ "PUT",
+ re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)/([^/]*)$"),
+ self._with_authentication(
+ lambda origin, content, query, context, pdu_origin, pdu_id:
+ self._on_invite_request(
+ origin, content, query,
+ )
+ )
+ )
+
@defer.inlineCallbacks
@log_function
def _on_send_request(self, origin, content, query, transaction_id):
@@ -524,6 +562,15 @@ class TransportLayer(object):
defer.returnValue((200, content))
+ @defer.inlineCallbacks
+ @log_function
+ def _on_invite_request(self, origin, content, query):
+ content = yield self.request_handler.on_invite_request(
+ origin, content,
+ )
+
+ defer.returnValue((200, content))
+
class TransportReceivedHandler(object):
""" Callbacks used when we receive a transaction
|