diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b575986fc3..5f86ed03fa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -22,6 +22,7 @@ from synapse.api.constants import Membership
from synapse.util.logutils import log_function
from synapse.federation.pdu_codec import PduCodec, encode_event_id
from synapse.api.errors import SynapseError
+from synapse.util.async import run_on_reactor
from twisted.internet import defer, reactor
@@ -81,6 +82,8 @@ class FederationHandler(BaseHandler):
processing.
"""
+ yield run_on_reactor()
+
pdu = self.pdu_codec.pdu_from_event(event)
if not hasattr(pdu, "destinations") or not pdu.destinations:
@@ -102,6 +105,8 @@ class FederationHandler(BaseHandler):
self.room_queues[event.room_id].append(pdu)
return
+ logger.debug("Processing event: %s", event.event_id)
+
if state:
state = [self.pdu_codec.event_from_pdu(p) for p in state]
@@ -216,58 +221,65 @@ class FederationHandler(BaseHandler):
assert(event.state_key == joinee)
assert(event.room_id == room_id)
- self.room_queues[room_id] = []
-
- event.event_id = self.event_factory.create_event_id()
- event.content = content
+ event.outlier = False
- state = yield self.replication_layer.send_join(
- target_host,
- self.pdu_codec.pdu_from_event(event)
- )
+ self.room_queues[room_id] = []
- state = [self.pdu_codec.event_from_pdu(p) for p in state]
+ try:
+ event.event_id = self.event_factory.create_event_id()
+ event.content = content
- logger.debug("do_invite_join state: %s", state)
+ state = yield self.replication_layer.send_join(
+ target_host,
+ self.pdu_codec.pdu_from_event(event)
+ )
- is_new_state = yield self.state_handler.annotate_state_groups(
- event,
- state=state
- )
+ state = [self.pdu_codec.event_from_pdu(p) for p in state]
- try:
- yield self.store.store_room(
- room_id=room_id,
- room_creator_user_id="",
- is_public=False
- )
- except:
- # FIXME
- pass
+ logger.debug("do_invite_join state: %s", state)
- for e in state:
- # FIXME: Auth these.
is_new_state = yield self.state_handler.annotate_state_groups(
- e,
+ event,
+ state=state
)
+ logger.debug("do_invite_join event: %s", event)
+
+ try:
+ yield self.store.store_room(
+ room_id=room_id,
+ room_creator_user_id="",
+ is_public=False
+ )
+ except:
+ # FIXME
+ pass
+
+ for e in state:
+ # FIXME: Auth these.
+ e.outlier = True
+
+ yield self.state_handler.annotate_state_groups(
+ e,
+ )
+
+ yield self.store.persist_event(
+ e,
+ backfilled=False,
+ is_new_state=False
+ )
+
yield self.store.persist_event(
- e,
+ event,
backfilled=False,
- is_new_state=False
+ is_new_state=is_new_state
)
+ finally:
+ room_queue = self.room_queues[room_id]
+ del self.room_queues[room_id]
- yield self.store.persist_event(
- event,
- backfilled=False,
- is_new_state=is_new_state
- )
-
- room_queue = self.room_queues[room_id]
- del self.room_queues[room_id]
-
- for p in room_queue:
- yield self.on_receive_pdu(p, backfilled=False)
+ for p in room_queue:
+ yield self.on_receive_pdu(p, backfilled=False)
defer.returnValue(True)
|