diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0ae0541bd3..70790aaa72 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -62,6 +62,9 @@ class FederationHandler(BaseHandler):
self.pdu_codec = PduCodec(hs)
+ # When joining a room we need to queue any events for that room up
+ self.room_queues = {}
+
@log_function
@defer.inlineCallbacks
def handle_new_event(self, event, snapshot):
@@ -95,22 +98,25 @@ class FederationHandler(BaseHandler):
logger.debug("Got event: %s", event.event_id)
+ if event.room_id in self.room_queues:
+ self.room_queues[event.room_id].append(pdu)
+ return
+
if state:
state = [self.pdu_codec.event_from_pdu(p) for p in state]
state = {(e.type, e.state_key): e for e in state}
- yield self.state_handler.annotate_state_groups(event, state=state)
+
+ is_new_state = yield self.state_handler.annotate_state_groups(
+ event,
+ state=state
+ )
logger.debug("Event: %s", event)
if not backfilled:
yield self.auth.check(event, None, raises=True)
- if event.is_state and not backfilled:
- is_new_state = yield self.state_handler.handle_new_state(
- pdu
- )
- else:
- is_new_state = False
+ is_new_state = is_new_state and not backfilled
# TODO: Implement something in federation that allows us to
# respond to PDU.
@@ -211,6 +217,8 @@ class FederationHandler(BaseHandler):
assert(event.state_key == joinee)
assert(event.room_id == room_id)
+ self.room_queues[room_id] = []
+
event.event_id = self.event_factory.create_event_id()
event.content = content
@@ -219,15 +227,14 @@ class FederationHandler(BaseHandler):
self.pdu_codec.pdu_from_event(event)
)
- # TODO (erikj): Time out here.
- d = defer.Deferred()
- self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
- reactor.callLater(10, d.cancel)
+ state = [self.pdu_codec.event_from_pdu(p) for p in state]
- try:
- yield d
- except defer.CancelledError:
- raise SynapseError(500, "Unable to join remote room")
+ logger.debug("do_invite_join state: %s", state)
+
+ is_new_state = yield self.state_handler.annotate_state_groups(
+ event,
+ state=state
+ )
try:
yield self.store.store_room(
@@ -239,6 +246,32 @@ class FederationHandler(BaseHandler):
# FIXME
pass
+ for e in state:
+ # FIXME: Auth these.
+ is_new_state = yield self.state_handler.annotate_state_groups(
+ e,
+ state=state
+ )
+
+ yield self.store.persist_event(
+ e,
+ backfilled=False,
+ is_new_state=False
+ )
+
+ yield self.store.persist_event(
+ event,
+ backfilled=False,
+ is_new_state=is_new_state
+ )
+
+ room_queue = self.room_queues[room_id]
+ del self.room_queues[room_id]
+
+ for p in room_queue:
+ p.outlier = True
+ yield self.on_receive_pdu(p, backfilled=False)
+
defer.returnValue(True)
@defer.inlineCallbacks
@@ -264,13 +297,9 @@ class FederationHandler(BaseHandler):
def on_send_join_request(self, origin, pdu):
event = self.pdu_codec.event_from_pdu(pdu)
- yield self.state_handler.annotate_state_groups(event)
+ is_new_state= yield self.state_handler.annotate_state_groups(event)
yield self.auth.check(event, None, raises=True)
- is_new_state = yield self.state_handler.handle_new_state(
- pdu
- )
-
# FIXME (erikj): All this is duplicated above :(
yield self.store.persist_event(
@@ -303,7 +332,10 @@ class FederationHandler(BaseHandler):
yield self.replication_layer.send_pdu(new_pdu)
- defer.returnValue(event.state_events.values())
+ defer.returnValue([
+ self.pdu_codec.pdu_from_event(e)
+ for e in event.state_events.values()
+ ])
@defer.inlineCallbacks
def get_state_for_pdu(self, pdu_id, pdu_origin):
|