diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 4 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 90 |
2 files changed, 55 insertions, 39 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index cd6c35f194..787a01efc5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -16,6 +16,8 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError +from synapse.util.async import run_on_reactor + class BaseHandler(object): def __init__(self, hs): @@ -45,6 +47,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], extra_users=[], suppress_auth=False): + yield run_on_reactor() + snapshot.fill_out_prev_events(event) yield self.state_handler.annotate_state_groups(event) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b575986fc3..5f86ed03fa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,6 +22,7 @@ from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec, encode_event_id from synapse.api.errors import SynapseError +from synapse.util.async import run_on_reactor from twisted.internet import defer, reactor @@ -81,6 +82,8 @@ class FederationHandler(BaseHandler): processing. """ + yield run_on_reactor() + pdu = self.pdu_codec.pdu_from_event(event) if not hasattr(pdu, "destinations") or not pdu.destinations: @@ -102,6 +105,8 @@ class FederationHandler(BaseHandler): self.room_queues[event.room_id].append(pdu) return + logger.debug("Processing event: %s", event.event_id) + if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] @@ -216,58 +221,65 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) - self.room_queues[room_id] = [] - - event.event_id = self.event_factory.create_event_id() - event.content = content + event.outlier = False - state = yield self.replication_layer.send_join( - target_host, - self.pdu_codec.pdu_from_event(event) - ) + self.room_queues[room_id] = [] - state = [self.pdu_codec.event_from_pdu(p) for p in state] + try: + event.event_id = self.event_factory.create_event_id() + event.content = content - logger.debug("do_invite_join state: %s", state) + state = yield self.replication_layer.send_join( + target_host, + self.pdu_codec.pdu_from_event(event) + ) - is_new_state = yield self.state_handler.annotate_state_groups( - event, - state=state - ) + state = [self.pdu_codec.event_from_pdu(p) for p in state] - try: - yield self.store.store_room( - room_id=room_id, - room_creator_user_id="", - is_public=False - ) - except: - # FIXME - pass + logger.debug("do_invite_join state: %s", state) - for e in state: - # FIXME: Auth these. is_new_state = yield self.state_handler.annotate_state_groups( - e, + event, + state=state ) + logger.debug("do_invite_join event: %s", event) + + try: + yield self.store.store_room( + room_id=room_id, + room_creator_user_id="", + is_public=False + ) + except: + # FIXME + pass + + for e in state: + # FIXME: Auth these. + e.outlier = True + + yield self.state_handler.annotate_state_groups( + e, + ) + + yield self.store.persist_event( + e, + backfilled=False, + is_new_state=False + ) + yield self.store.persist_event( - e, + event, backfilled=False, - is_new_state=False + is_new_state=is_new_state ) + finally: + room_queue = self.room_queues[room_id] + del self.room_queues[room_id] - yield self.store.persist_event( - event, - backfilled=False, - is_new_state=is_new_state - ) - - room_queue = self.room_queues[room_id] - del self.room_queues[room_id] - - for p in room_queue: - yield self.on_receive_pdu(p, backfilled=False) + for p in room_queue: + yield self.on_receive_pdu(p, backfilled=False) defer.returnValue(True) |