summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-10-17 18:56:42 +0100
committerErik Johnston <erik@matrix.org>2014-10-17 18:56:42 +0100
commit5ffe5ab43fa090111a0141b04ce6342172f60724 (patch)
tree45af4a0c2fdbb3c89853645cafe1440b13c6d3f4 /synapse/federation
parentFinish implementing the new join dance. (diff)
downloadsynapse-5ffe5ab43fa090111a0141b04ce6342172f60724.tar.xz
Use state groups to get current state. Make join dance actually work.
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/replication.py17
-rw-r--r--synapse/federation/transport.py57
2 files changed, 68 insertions, 6 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index d482193851..8c7d510ef6 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -403,12 +403,19 @@ class ReplicationLayer(object):
             defer.returnValue(
                 (404, "No handler for Query type '%s'" % (query_type, ))
             )
+
     @defer.inlineCallbacks
     def on_make_join_request(self, context, user_id):
         pdu = yield self.handler.on_make_join_request(context, user_id)
         defer.returnValue(pdu.get_dict())
 
     @defer.inlineCallbacks
+    def on_invite_request(self, origin, content):
+        pdu = Pdu(**content)
+        ret_pdu = yield self.handler.on_send_join_request(origin, pdu)
+        defer.returnValue((200, ret_pdu.get_dict()))
+
+    @defer.inlineCallbacks
     def on_send_join_request(self, origin, content):
         pdu = Pdu(**content)
         state = yield self.handler.on_send_join_request(origin, pdu)
@@ -426,8 +433,9 @@ class ReplicationLayer(object):
 
         defer.returnValue(Pdu(**pdu_dict))
 
+    @defer.inlineCallbacks
     def send_join(self, destination, pdu):
-        return self.transport_layer.send_join(
+        _, content = yield self.transport_layer.send_join(
             destination,
             pdu.context,
             pdu.pdu_id,
@@ -435,6 +443,13 @@ class ReplicationLayer(object):
             pdu.get_dict(),
         )
 
+        logger.debug("Got content: %s", content)
+        pdus = [Pdu(outlier=True, **p) for p in content.get("pdus", [])]
+        for pdu in pdus:
+            yield self._handle_new_pdu(destination, pdu)
+
+        defer.returnValue(pdus)
+
     @defer.inlineCallbacks
     @log_function
     def _get_persisted_pdu(self, pdu_id, pdu_origin):
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index a0d34fd24d..de64702e2f 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -229,13 +229,36 @@ class TransportLayer(object):
             pdu_id,
         )
 
-        response = yield self.client.put_json(
+        code, content = yield self.client.put_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        defer.returnValue(response)
+        if not 200 <= code < 300:
+            raise RuntimeError("Got %d from send_join", code)
+
+        defer.returnValue(json.loads(content))
+
+    @defer.inlineCallbacks
+    @log_function
+    def send_invite(self, destination, context, pdu_id, origin, content):
+        path = PREFIX + "/invite/%s/%s/%s" % (
+            context,
+            origin,
+            pdu_id,
+        )
+
+        code, content = yield self.client.put_json(
+            destination=destination,
+            path=path,
+            data=content,
+        )
+
+        if not 200 <= code < 300:
+            raise RuntimeError("Got %d from send_invite", code)
+
+        defer.returnValue(json.loads(content))
 
     @defer.inlineCallbacks
     def _authenticate_request(self, request):
@@ -297,9 +320,13 @@ class TransportLayer(object):
         @defer.inlineCallbacks
         def new_handler(request, *args, **kwargs):
             (origin, content) = yield self._authenticate_request(request)
-            response = yield handler(
-                origin, content, request.args, *args, **kwargs
-            )
+            try:
+                response = yield handler(
+                    origin, content, request.args, *args, **kwargs
+                )
+            except:
+                logger.exception("Callback failed")
+                raise
             defer.returnValue(response)
         return new_handler
 
@@ -419,6 +446,17 @@ class TransportLayer(object):
             )
         )
 
+        self.server.register_path(
+            "PUT",
+            re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)/([^/]*)$"),
+            self._with_authentication(
+                lambda origin, content, query, context, pdu_origin, pdu_id:
+                self._on_invite_request(
+                    origin, content, query,
+                )
+            )
+        )
+
     @defer.inlineCallbacks
     @log_function
     def _on_send_request(self, origin, content, query, transaction_id):
@@ -524,6 +562,15 @@ class TransportLayer(object):
 
         defer.returnValue((200, content))
 
+    @defer.inlineCallbacks
+    @log_function
+    def _on_invite_request(self, origin, content, query):
+        content = yield self.request_handler.on_invite_request(
+            origin, content,
+        )
+
+        defer.returnValue((200, content))
+
 
 class TransportReceivedHandler(object):
     """ Callbacks used when we receive a transaction