summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_client.py6
-rw-r--r--synapse/handlers/federation.py164
2 files changed, 108 insertions, 62 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 4b3bf97835..6febc8618c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -168,7 +168,11 @@ class FederationClient(FederationBase):
         for i, pdu in enumerate(pdus):
             pdus[i] = yield self._check_sigs_and_hash(pdu)
 
-            # FIXME: We should handle signature failures more gracefully.
+        # FIXME: We should handle signature failures more gracefully.
+        pdus[:] = yield defer.gatherResults(
+            [self._check_sigs_and_hash(pdu) for pdu in pdus],
+            consumeErrors=True,
+        ).addErrback(unwrapFirstError)
 
         defer.returnValue(pdus)
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d85b1cf5de..46ce3699d7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -230,27 +230,65 @@ class FederationHandler(BaseHandler):
         if not extremities:
             extremities = yield self.store.get_oldest_events_in_room(room_id)
 
-        pdus = yield self.replication_layer.backfill(
+        events = yield self.replication_layer.backfill(
             dest,
             room_id,
-            limit,
+            limit=limit,
             extremities=extremities,
         )
 
-        events = []
+        event_map = {e.event_id: e for e in events}
 
-        for pdu in pdus:
-            event = pdu
+        event_ids = set(e.event_id for e in events)
 
-            # FIXME (erikj): Not sure this actually works :/
-            context = yield self.state_handler.compute_event_context(event)
+        edges = [
+            ev.event_id
+            for ev in events
+            if set(e_id for e_id, _ in ev.prev_events) - event_ids
+        ]
 
-            events.append((event, context))
+        # For each edge get the current state.
 
-            yield self.store.persist_event(
-                event,
-                context=context,
-                backfilled=True
+        auth_events = {}
+        events_to_state = {}
+        for e_id in edges:
+            state, auth = yield self.replication_layer.get_state_for_room(
+                destination=dest,
+                room_id=room_id,
+                event_id=e_id
+            )
+            auth_events.update({a.event_id: a for a in auth})
+            events_to_state[e_id] = state
+
+        yield defer.gatherResults(
+            [
+                self._handle_new_event(dest, a)
+                for a in auth_events.values()
+            ],
+            consumeErrors=True,
+        ).addErrback(unwrapFirstError)
+
+        yield defer.gatherResults(
+            [
+                self._handle_new_event(
+                    dest, event_map[e_id],
+                    state=events_to_state[e_id],
+                    backfilled=True,
+                )
+                for e_id in events_to_state
+            ],
+            consumeErrors=True
+        ).addErrback(unwrapFirstError)
+
+        events.sort(key=lambda e: e.depth)
+
+        for event in events:
+            if event in events_to_state:
+                continue
+
+            yield self._handle_new_event(
+                dest, event,
+                backfilled=True,
             )
 
         defer.returnValue(events)
@@ -347,7 +385,7 @@ class FederationHandler(BaseHandler):
                     logger.info(e.message)
                     continue
                 except Exception as e:
-                    logger.warn(
+                    logger.exception(
                         "Failed to backfill from %s because %s",
                         dom, e,
                     )
@@ -517,54 +555,9 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
-            auth_ids_to_deferred = {}
-
-            def process_auth_ev(ev):
-                auth_ids = [e_id for e_id, _ in ev.auth_events]
-
-                prev_ds = [
-                    auth_ids_to_deferred[i]
-                    for i in auth_ids
-                    if i in auth_ids_to_deferred
-                ]
-
-                d = defer.Deferred()
-
-                auth_ids_to_deferred[ev.event_id] = d
-
-                @defer.inlineCallbacks
-                def f(*_):
-                    ev.internal_metadata.outlier = True
-
-                    try:
-                        auth = {
-                            (e.type, e.state_key): e for e in auth_chain
-                            if e.event_id in auth_ids
-                        }
-
-                        yield self._handle_new_event(
-                            origin, ev, auth_events=auth
-                        )
-                    except:
-                        logger.exception(
-                            "Failed to handle auth event %s",
-                            ev.event_id,
-                        )
-
-                    d.callback(None)
-
-                if prev_ds:
-                    dx = defer.DeferredList(prev_ds)
-                    dx.addBoth(f)
-                else:
-                    f()
-
-            for e in auth_chain:
-                if e.event_id == event.event_id:
-                    return
-                process_auth_ev(e)
-
-            yield defer.DeferredList(auth_ids_to_deferred.values())
+            yield self._handle_auth_events(
+                origin, [e for e in auth_chain if e.event_id != event.event_id]
+            )
 
             @defer.inlineCallbacks
             def handle_state(e):
@@ -1348,3 +1341,52 @@ class FederationHandler(BaseHandler):
             },
             "missing": [e.event_id for e in missing_locals],
         })
+
+    @defer.inlineCallbacks
+    def _handle_auth_events(self, origin, auth_events):
+        auth_ids_to_deferred = {}
+
+        def process_auth_ev(ev):
+            auth_ids = [e_id for e_id, _ in ev.auth_events]
+
+            prev_ds = [
+                auth_ids_to_deferred[i]
+                for i in auth_ids
+                if i in auth_ids_to_deferred
+            ]
+
+            d = defer.Deferred()
+
+            auth_ids_to_deferred[ev.event_id] = d
+
+            @defer.inlineCallbacks
+            def f(*_):
+                ev.internal_metadata.outlier = True
+
+                try:
+                    auth = {
+                        (e.type, e.state_key): e for e in auth_events
+                        if e.event_id in auth_ids
+                    }
+
+                    yield self._handle_new_event(
+                        origin, ev, auth_events=auth
+                    )
+                except:
+                    logger.exception(
+                        "Failed to handle auth event %s",
+                        ev.event_id,
+                    )
+
+                d.callback(None)
+
+            if prev_ds:
+                dx = defer.DeferredList(prev_ds)
+                dx.addBoth(f)
+            else:
+                f()
+
+        for e in auth_events:
+            process_auth_ev(e)
+
+        yield defer.DeferredList(auth_ids_to_deferred.values())