summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py74
1 files changed, 53 insertions, 21 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0ae0541bd3..70790aaa72 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -62,6 +62,9 @@ class FederationHandler(BaseHandler):
 
         self.pdu_codec = PduCodec(hs)
 
+        # When joining a room we need to queue any events for that room up
+        self.room_queues = {}
+
     @log_function
     @defer.inlineCallbacks
     def handle_new_event(self, event, snapshot):
@@ -95,22 +98,25 @@ class FederationHandler(BaseHandler):
 
         logger.debug("Got event: %s", event.event_id)
 
+        if event.room_id in self.room_queues:
+            self.room_queues[event.room_id].append(pdu)
+            return
+
         if state:
             state = [self.pdu_codec.event_from_pdu(p) for p in state]
             state = {(e.type, e.state_key): e for e in state}
-        yield self.state_handler.annotate_state_groups(event, state=state)
+
+        is_new_state = yield self.state_handler.annotate_state_groups(
+            event,
+            state=state
+        )
 
         logger.debug("Event: %s", event)
 
         if not backfilled:
             yield self.auth.check(event, None, raises=True)
 
-        if event.is_state and not backfilled:
-            is_new_state = yield self.state_handler.handle_new_state(
-                pdu
-            )
-        else:
-            is_new_state = False
+        is_new_state = is_new_state and not backfilled
 
         # TODO: Implement something in federation that allows us to
         # respond to PDU.
@@ -211,6 +217,8 @@ class FederationHandler(BaseHandler):
         assert(event.state_key == joinee)
         assert(event.room_id == room_id)
 
+        self.room_queues[room_id] = []
+
         event.event_id = self.event_factory.create_event_id()
         event.content = content
 
@@ -219,15 +227,14 @@ class FederationHandler(BaseHandler):
             self.pdu_codec.pdu_from_event(event)
         )
 
-        # TODO (erikj): Time out here.
-        d = defer.Deferred()
-        self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
-        reactor.callLater(10, d.cancel)
+        state = [self.pdu_codec.event_from_pdu(p) for p in state]
 
-        try:
-            yield d
-        except defer.CancelledError:
-            raise SynapseError(500, "Unable to join remote room")
+        logger.debug("do_invite_join state: %s", state)
+
+        is_new_state = yield self.state_handler.annotate_state_groups(
+            event,
+            state=state
+        )
 
         try:
             yield self.store.store_room(
@@ -239,6 +246,32 @@ class FederationHandler(BaseHandler):
             # FIXME
             pass
 
+        for e in state:
+            # FIXME: Auth these.
+            is_new_state = yield self.state_handler.annotate_state_groups(
+                e,
+                state=state
+            )
+
+            yield self.store.persist_event(
+                e,
+                backfilled=False,
+                is_new_state=False
+            )
+
+        yield self.store.persist_event(
+            event,
+            backfilled=False,
+            is_new_state=is_new_state
+        )
+
+        room_queue = self.room_queues[room_id]
+        del self.room_queues[room_id]
+
+        for p in room_queue:
+            p.outlier = True
+            yield self.on_receive_pdu(p, backfilled=False)
+
         defer.returnValue(True)
 
     @defer.inlineCallbacks
@@ -264,13 +297,9 @@ class FederationHandler(BaseHandler):
     def on_send_join_request(self, origin, pdu):
         event = self.pdu_codec.event_from_pdu(pdu)
 
-        yield self.state_handler.annotate_state_groups(event)
+        is_new_state= yield self.state_handler.annotate_state_groups(event)
         yield self.auth.check(event, None, raises=True)
 
-        is_new_state = yield self.state_handler.handle_new_state(
-            pdu
-        )
-
         # FIXME (erikj):  All this is duplicated above :(
 
         yield self.store.persist_event(
@@ -303,7 +332,10 @@ class FederationHandler(BaseHandler):
 
         yield self.replication_layer.send_pdu(new_pdu)
 
-        defer.returnValue(event.state_events.values())
+        defer.returnValue([
+            self.pdu_codec.pdu_from_event(e)
+            for e in event.state_events.values()
+        ])
 
     @defer.inlineCallbacks
     def get_state_for_pdu(self, pdu_id, pdu_origin):