summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-10-29 16:59:24 +0000
committerErik Johnston <erik@matrix.org>2014-10-29 16:59:24 +0000
commite7858b6d7ef37849a3d2d5004743cdd21ec330a8 (patch)
treebe917c0b61075a287f169ec5210a0b2dab563603 /synapse/handlers
parentDon't reference PDU when persisting event (diff)
downloadsynapse-e7858b6d7ef37849a3d2d5004743cdd21ec330a8.tar.xz
Start filling out and using new events tables
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py4
-rw-r--r--synapse/handlers/federation.py90
2 files changed, 55 insertions, 39 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index cd6c35f194..787a01efc5 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -16,6 +16,8 @@
 from twisted.internet import defer
 from synapse.api.errors import LimitExceededError
 
+from synapse.util.async import run_on_reactor
+
 class BaseHandler(object):
 
     def __init__(self, hs):
@@ -45,6 +47,8 @@ class BaseHandler(object):
     @defer.inlineCallbacks
     def _on_new_room_event(self, event, snapshot, extra_destinations=[],
                            extra_users=[], suppress_auth=False):
+        yield run_on_reactor()
+
         snapshot.fill_out_prev_events(event)
 
         yield self.state_handler.annotate_state_groups(event)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b575986fc3..5f86ed03fa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -22,6 +22,7 @@ from synapse.api.constants import Membership
 from synapse.util.logutils import log_function
 from synapse.federation.pdu_codec import PduCodec, encode_event_id
 from synapse.api.errors import SynapseError
+from synapse.util.async import run_on_reactor
 
 from twisted.internet import defer, reactor
 
@@ -81,6 +82,8 @@ class FederationHandler(BaseHandler):
             processing.
         """
 
+        yield run_on_reactor()
+
         pdu = self.pdu_codec.pdu_from_event(event)
 
         if not hasattr(pdu, "destinations") or not pdu.destinations:
@@ -102,6 +105,8 @@ class FederationHandler(BaseHandler):
             self.room_queues[event.room_id].append(pdu)
             return
 
+        logger.debug("Processing event: %s", event.event_id)
+
         if state:
             state = [self.pdu_codec.event_from_pdu(p) for p in state]
 
@@ -216,58 +221,65 @@ class FederationHandler(BaseHandler):
         assert(event.state_key == joinee)
         assert(event.room_id == room_id)
 
-        self.room_queues[room_id] = []
-
-        event.event_id = self.event_factory.create_event_id()
-        event.content = content
+        event.outlier = False
 
-        state = yield self.replication_layer.send_join(
-            target_host,
-            self.pdu_codec.pdu_from_event(event)
-        )
+        self.room_queues[room_id] = []
 
-        state = [self.pdu_codec.event_from_pdu(p) for p in state]
+        try:
+            event.event_id = self.event_factory.create_event_id()
+            event.content = content
 
-        logger.debug("do_invite_join state: %s", state)
+            state = yield self.replication_layer.send_join(
+                target_host,
+                self.pdu_codec.pdu_from_event(event)
+            )
 
-        is_new_state = yield self.state_handler.annotate_state_groups(
-            event,
-            state=state
-        )
+            state = [self.pdu_codec.event_from_pdu(p) for p in state]
 
-        try:
-            yield self.store.store_room(
-                room_id=room_id,
-                room_creator_user_id="",
-                is_public=False
-            )
-        except:
-            # FIXME
-            pass
+            logger.debug("do_invite_join state: %s", state)
 
-        for e in state:
-            # FIXME: Auth these.
             is_new_state = yield self.state_handler.annotate_state_groups(
-                e,
+                event,
+                state=state
             )
 
+            logger.debug("do_invite_join event: %s", event)
+
+            try:
+                yield self.store.store_room(
+                    room_id=room_id,
+                    room_creator_user_id="",
+                    is_public=False
+                )
+            except:
+                # FIXME
+                pass
+
+            for e in state:
+                # FIXME: Auth these.
+                e.outlier = True
+
+                yield self.state_handler.annotate_state_groups(
+                    e,
+                )
+
+                yield self.store.persist_event(
+                    e,
+                    backfilled=False,
+                    is_new_state=False
+                )
+
             yield self.store.persist_event(
-                e,
+                event,
                 backfilled=False,
-                is_new_state=False
+                is_new_state=is_new_state
             )
+        finally:
+            room_queue = self.room_queues[room_id]
+            del self.room_queues[room_id]
 
-        yield self.store.persist_event(
-            event,
-            backfilled=False,
-            is_new_state=is_new_state
-        )
-
-        room_queue = self.room_queues[room_id]
-        del self.room_queues[room_id]
-
-        for p in room_queue:
-            yield self.on_receive_pdu(p, backfilled=False)
+            for p in room_queue:
+                yield self.on_receive_pdu(p, backfilled=False)
 
         defer.returnValue(True)