diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 74 | ||||
-rw-r--r-- | synapse/handlers/message.py | 6 |
2 files changed, 56 insertions, 24 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0ae0541bd3..70790aaa72 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -62,6 +62,9 @@ class FederationHandler(BaseHandler): self.pdu_codec = PduCodec(hs) + # When joining a room we need to queue any events for that room up + self.room_queues = {} + @log_function @defer.inlineCallbacks def handle_new_event(self, event, snapshot): @@ -95,22 +98,25 @@ class FederationHandler(BaseHandler): logger.debug("Got event: %s", event.event_id) + if event.room_id in self.room_queues: + self.room_queues[event.room_id].append(pdu) + return + if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] state = {(e.type, e.state_key): e for e in state} - yield self.state_handler.annotate_state_groups(event, state=state) + + is_new_state = yield self.state_handler.annotate_state_groups( + event, + state=state + ) logger.debug("Event: %s", event) if not backfilled: yield self.auth.check(event, None, raises=True) - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - else: - is_new_state = False + is_new_state = is_new_state and not backfilled # TODO: Implement something in federation that allows us to # respond to PDU. @@ -211,6 +217,8 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) + self.room_queues[room_id] = [] + event.event_id = self.event_factory.create_event_id() event.content = content @@ -219,15 +227,14 @@ class FederationHandler(BaseHandler): self.pdu_codec.pdu_from_event(event) ) - # TODO (erikj): Time out here. - d = defer.Deferred() - self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d) - reactor.callLater(10, d.cancel) + state = [self.pdu_codec.event_from_pdu(p) for p in state] - try: - yield d - except defer.CancelledError: - raise SynapseError(500, "Unable to join remote room") + logger.debug("do_invite_join state: %s", state) + + is_new_state = yield self.state_handler.annotate_state_groups( + event, + state=state + ) try: yield self.store.store_room( @@ -239,6 +246,32 @@ class FederationHandler(BaseHandler): # FIXME pass + for e in state: + # FIXME: Auth these. + is_new_state = yield self.state_handler.annotate_state_groups( + e, + state=state + ) + + yield self.store.persist_event( + e, + backfilled=False, + is_new_state=False + ) + + yield self.store.persist_event( + event, + backfilled=False, + is_new_state=is_new_state + ) + + room_queue = self.room_queues[room_id] + del self.room_queues[room_id] + + for p in room_queue: + p.outlier = True + yield self.on_receive_pdu(p, backfilled=False) + defer.returnValue(True) @defer.inlineCallbacks @@ -264,13 +297,9 @@ class FederationHandler(BaseHandler): def on_send_join_request(self, origin, pdu): event = self.pdu_codec.event_from_pdu(pdu) - yield self.state_handler.annotate_state_groups(event) + is_new_state= yield self.state_handler.annotate_state_groups(event) yield self.auth.check(event, None, raises=True) - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - # FIXME (erikj): All this is duplicated above :( yield self.store.persist_event( @@ -303,7 +332,10 @@ class FederationHandler(BaseHandler): yield self.replication_layer.send_pdu(new_pdu) - defer.returnValue(event.state_events.values()) + defer.returnValue([ + self.pdu_codec.pdu_from_event(e) + for e in event.state_events.values() + ]) @defer.inlineCallbacks def get_state_for_pdu(self, pdu_id, pdu_origin): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c2cbce151..4aaf97a83e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -199,7 +199,7 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_current_state( + data = yield self.state_handler.get_current_state( room_id, event_type, state_key ) defer.returnValue(data) @@ -238,7 +238,7 @@ class MessageHandler(BaseHandler): yield self.auth.check_joined_room(room_id, user_id) # TODO: This is duplicating logic from snapshot_all_rooms - current_state = yield self.store.get_current_state(room_id) + current_state = yield self.state_handler.get_current_state(room_id) defer.returnValue([self.hs.serialize_event(c) for c in current_state]) @defer.inlineCallbacks @@ -315,7 +315,7 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), } - current_state = yield self.store.get_current_state( + current_state = yield self.state_handler.get_current_state( event.room_id ) d["state"] = [self.hs.serialize_event(c) for c in current_state] |