summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-06-22 11:42:10 +0100
committerErik Johnston <erik@matrix.org>2015-06-22 11:42:10 +0100
commit96be533f1f7dde8a0eb62d26c05f50f5e517b90a (patch)
treedf62a82eab17e00aa464ba6c023b7d6bfb86da5e
parentProperly cache get_server_verify_keys (diff)
downloadsynapse-96be533f1f7dde8a0eb62d26c05f50f5e517b90a.tar.xz
Use new store.persist_events function in federation handler
-rw-r--r--synapse/handlers/federation.py189
-rw-r--r--synapse/storage/events.py11
2 files changed, 130 insertions, 70 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c8baf90b4a..214f02a188 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -138,26 +138,25 @@ class FederationHandler(BaseHandler):
         if state and auth_chain is not None:
             # If we have any state or auth_chain given to us by the replication
             # layer, then we should handle them (if we haven't before.)
+
+            event_infos = []
+
             for e in itertools.chain(auth_chain, state):
                 if e.event_id in seen_ids:
                     continue
-
                 e.internal_metadata.outlier = True
-                try:
-                    auth_ids = [e_id for e_id, _ in e.auth_events]
-                    auth = {
-                        (e.type, e.state_key): e for e in auth_chain
-                        if e.event_id in auth_ids
-                    }
-                    yield self._handle_new_event(
-                        origin, e, auth_events=auth
-                    )
-                    seen_ids.add(e.event_id)
-                except:
-                    logger.exception(
-                        "Failed to handle state event %s",
-                        e.event_id,
-                    )
+                auth_ids = [e_id for e_id, _ in e.auth_events]
+                auth = {
+                    (e.type, e.state_key): e for e in auth_chain
+                    if e.event_id in auth_ids
+                }
+                event_infos.append[{
+                    "event": e,
+                    "auth_events": auth,
+                }]
+                seen_ids.add(e.event_id)
+
+            yield self._handle_new_events(origin, event_infos)
 
         try:
             _, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -292,38 +291,29 @@ class FederationHandler(BaseHandler):
         ).addErrback(unwrapFirstError)
         auth_events.update({a.event_id: a for a in results})
 
-        yield defer.gatherResults(
-            [
-                self._handle_new_event(
-                    dest, a,
-                    auth_events={
-                        (auth_events[a_id].type, auth_events[a_id].state_key):
-                        auth_events[a_id]
-                        for a_id, _ in a.auth_events
-                    },
-                )
-                for a in auth_events.values()
-                if a.event_id not in seen_events
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
-
-        yield defer.gatherResults(
-            [
-                self._handle_new_event(
-                    dest, event_map[e_id],
-                    state=events_to_state[e_id],
-                    backfilled=True,
-                    auth_events={
-                        (auth_events[a_id].type, auth_events[a_id].state_key):
-                        auth_events[a_id]
-                        for a_id, _ in event_map[e_id].auth_events
-                    },
-                )
-                for e_id in events_to_state
-            ],
-            consumeErrors=True
-        ).addErrback(unwrapFirstError)
+        ev_infos = []
+        for a in auth_events.values():
+            if a.event_id in seen_events:
+                continue
+            ev_infos.append({
+                "event": a,
+                "auth_events": {
+                    (auth_events[a_id].type, auth_events[a_id].state_key):
+                    auth_events[a_id]
+                    for a_id, _ in a.auth_events
+                }
+            })
+
+        for e_id in events_to_state:
+            ev_infos.append({
+                "event": event_map[e_id],
+                "state": events_to_state[e_id],
+                "auth_events": {
+                    (auth_events[a_id].type, auth_events[a_id].state_key):
+                    auth_events[a_id]
+                    for a_id, _ in event_map[e_id].auth_events
+                }
+            })
 
         events.sort(key=lambda e: e.depth)
 
@@ -331,10 +321,14 @@ class FederationHandler(BaseHandler):
             if event in events_to_state:
                 continue
 
-            yield self._handle_new_event(
-                dest, event,
-                backfilled=True,
-            )
+            ev_infos.append({
+                "event": event,
+            })
+
+        yield self._handle_new_events(
+            dest, ev_infos,
+            backfilled=True,
+        )
 
         defer.returnValue(events)
 
@@ -600,32 +594,26 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
-            yield self._handle_auth_events(
-                origin, [e for e in auth_chain if e.event_id != event.event_id]
-            )
+            # yield self._handle_auth_events(
+            #     origin, [e for e in auth_chain if e.event_id != event.event_id]
+            # )
 
-            @defer.inlineCallbacks
-            def handle_state(e):
+            ev_infos = []
+            for e in itertools.chain(state, auth_chain):
                 if e.event_id == event.event_id:
-                    return
+                    continue
 
                 e.internal_metadata.outlier = True
-                try:
-                    auth_ids = [e_id for e_id, _ in e.auth_events]
-                    auth = {
+                auth_ids = [e_id for e_id, _ in e.auth_events]
+                ev_infos.append({
+                    "event": e,
+                    "auth_events": {
                         (e.type, e.state_key): e for e in auth_chain
                         if e.event_id in auth_ids
                     }
-                    yield self._handle_new_event(
-                        origin, e, auth_events=auth
-                    )
-                except:
-                    logger.exception(
-                        "Failed to handle state event %s",
-                        e.event_id,
-                    )
+                })
 
-            yield defer.DeferredList([handle_state(e) for e in state])
+            yield self._handle_new_events(origin, ev_infos)
 
             auth_ids = [e_id for e_id, _ in event.auth_events]
             auth_events = {
@@ -1006,6 +994,67 @@ class FederationHandler(BaseHandler):
         defer.returnValue((context, event_stream_id, max_stream_id))
 
     @defer.inlineCallbacks
+    def _handle_new_events(self, origin, event_infos, backfilled=False):
+        contexts = yield defer.gatherResults(
+            [
+                self._prep_event(
+                    origin,
+                    ev_info["event"],
+                    state=ev_info.get("state"),
+                    backfilled=backfilled,
+                    auth_events=ev_info.get("auth_events"),
+                )
+                for ev_info in event_infos
+            ]
+        )
+
+        yield self.store.persist_events(
+            [
+                (ev_info["event"], context)
+                for ev_info, context in itertools.izip(event_infos, contexts)
+            ],
+            backfilled=backfilled,
+            is_new_state=(not backfilled),
+        )
+
+    @defer.inlineCallbacks
+    def _prep_event(self, origin, event, state=None, backfilled=False,
+                    current_state=None, auth_events=None):
+        outlier = event.internal_metadata.is_outlier()
+
+        context = yield self.state_handler.compute_event_context(
+            event, old_state=state, outlier=outlier,
+        )
+
+        if not auth_events:
+            auth_events = context.current_state
+
+        # This is a hack to fix some old rooms where the initial join event
+        # didn't reference the create event in its auth events.
+        if event.type == EventTypes.Member and not event.auth_events:
+            if len(event.prev_events) == 1 and event.depth < 5:
+                c = yield self.store.get_event(
+                    event.prev_events[0][0],
+                    allow_none=True,
+                )
+                if c and c.type == EventTypes.Create:
+                    auth_events[(c.type, c.state_key)] = c
+
+        try:
+            yield self.do_auth(
+                origin, event, context, auth_events=auth_events
+            )
+        except AuthError as e:
+            logger.warn(
+                "Rejecting %s because %s",
+                event.event_id, e.msg
+            )
+
+            context.rejected = RejectedReason.AUTH_ERROR
+
+        defer.returnValue(context)
+
+    @defer.inlineCallbacks
     def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
                       missing):
         # Just go through and process each event in `remote_auth_chain`. We
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5fe11cf3fb..e02a8066d6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -45,6 +45,17 @@ EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
 class EventsStore(SQLBaseStore):
+    def persist_events(self, events_and_contexts, backfilled=False,
+                       is_new_state=True):
+        return defer.gatherResults([
+            self.persist_event(
+                event, context,
+                backfilled=backfilled,
+                is_new_state=is_new_state,
+            )
+            for event, context in events_and_contexts
+        ])
+
     @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False,