diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 3249060bcf..d3b46b24c1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -165,10 +165,11 @@ class FederationClient(FederationBase):
for p in transaction_data["pdus"]
]
- for i, pdu in enumerate(pdus):
- pdus[i] = yield self._check_sigs_and_hash(pdu)
-
- # FIXME: We should handle signature failures more gracefully.
+ # FIXME: We should handle signature failures more gracefully.
+ pdus[:] = yield defer.gatherResults(
+ [self._check_sigs_and_hash(pdu) for pdu in pdus],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
defer.returnValue(pdus)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d85b1cf5de..46ce3699d7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -230,27 +230,65 @@ class FederationHandler(BaseHandler):
if not extremities:
extremities = yield self.store.get_oldest_events_in_room(room_id)
- pdus = yield self.replication_layer.backfill(
+ events = yield self.replication_layer.backfill(
dest,
room_id,
- limit,
+ limit=limit,
extremities=extremities,
)
- events = []
+ event_map = {e.event_id: e for e in events}
- for pdu in pdus:
- event = pdu
+ event_ids = set(e.event_id for e in events)
- # FIXME (erikj): Not sure this actually works :/
- context = yield self.state_handler.compute_event_context(event)
+ edges = [
+ ev.event_id
+ for ev in events
+ if set(e_id for e_id, _ in ev.prev_events) - event_ids
+ ]
- events.append((event, context))
+ # For each edge get the current state.
- yield self.store.persist_event(
- event,
- context=context,
- backfilled=True
+ auth_events = {}
+ events_to_state = {}
+ for e_id in edges:
+ state, auth = yield self.replication_layer.get_state_for_room(
+ destination=dest,
+ room_id=room_id,
+ event_id=e_id
+ )
+ auth_events.update({a.event_id: a for a in auth})
+ events_to_state[e_id] = state
+
+ yield defer.gatherResults(
+ [
+ self._handle_new_event(dest, a)
+ for a in auth_events.values()
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
+
+ yield defer.gatherResults(
+ [
+ self._handle_new_event(
+ dest, event_map[e_id],
+ state=events_to_state[e_id],
+ backfilled=True,
+ )
+ for e_id in events_to_state
+ ],
+ consumeErrors=True
+ ).addErrback(unwrapFirstError)
+
+ events.sort(key=lambda e: e.depth)
+
+ for event in events:
+ if event in events_to_state:
+ continue
+
+ yield self._handle_new_event(
+ dest, event,
+ backfilled=True,
)
defer.returnValue(events)
@@ -347,7 +385,7 @@ class FederationHandler(BaseHandler):
logger.info(e.message)
continue
except Exception as e:
- logger.warn(
+ logger.exception(
"Failed to backfill from %s because %s",
dom, e,
)
@@ -517,54 +555,9 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- auth_ids_to_deferred = {}
-
- def process_auth_ev(ev):
- auth_ids = [e_id for e_id, _ in ev.auth_events]
-
- prev_ds = [
- auth_ids_to_deferred[i]
- for i in auth_ids
- if i in auth_ids_to_deferred
- ]
-
- d = defer.Deferred()
-
- auth_ids_to_deferred[ev.event_id] = d
-
- @defer.inlineCallbacks
- def f(*_):
- ev.internal_metadata.outlier = True
-
- try:
- auth = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
-
- yield self._handle_new_event(
- origin, ev, auth_events=auth
- )
- except:
- logger.exception(
- "Failed to handle auth event %s",
- ev.event_id,
- )
-
- d.callback(None)
-
- if prev_ds:
- dx = defer.DeferredList(prev_ds)
- dx.addBoth(f)
- else:
- f()
-
- for e in auth_chain:
- if e.event_id == event.event_id:
- return
- process_auth_ev(e)
-
- yield defer.DeferredList(auth_ids_to_deferred.values())
+ yield self._handle_auth_events(
+ origin, [e for e in auth_chain if e.event_id != event.event_id]
+ )
@defer.inlineCallbacks
def handle_state(e):
@@ -1348,3 +1341,52 @@ class FederationHandler(BaseHandler):
},
"missing": [e.event_id for e in missing_locals],
})
+
+ @defer.inlineCallbacks
+ def _handle_auth_events(self, origin, auth_events):
+ auth_ids_to_deferred = {}
+
+ def process_auth_ev(ev):
+ auth_ids = [e_id for e_id, _ in ev.auth_events]
+
+ prev_ds = [
+ auth_ids_to_deferred[i]
+ for i in auth_ids
+ if i in auth_ids_to_deferred
+ ]
+
+ d = defer.Deferred()
+
+ auth_ids_to_deferred[ev.event_id] = d
+
+ @defer.inlineCallbacks
+ def f(*_):
+ ev.internal_metadata.outlier = True
+
+ try:
+ auth = {
+ (e.type, e.state_key): e for e in auth_events
+ if e.event_id in auth_ids
+ }
+
+ yield self._handle_new_event(
+ origin, ev, auth_events=auth
+ )
+ except:
+ logger.exception(
+ "Failed to handle auth event %s",
+ ev.event_id,
+ )
+
+ d.callback(None)
+
+ if prev_ds:
+ dx = defer.DeferredList(prev_ds)
+ dx.addBoth(f)
+ else:
+ f()
+
+ for e in auth_events:
+ process_auth_ev(e)
+
+ yield defer.DeferredList(auth_ids_to_deferred.values())
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 114ccece65..1ba073884b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached
from syutil.base64util import encode_base64
import logging
+from Queue import PriorityQueue, Empty
logger = logging.getLogger(__name__)
@@ -363,7 +364,11 @@ class EventFederationStore(SQLBaseStore):
return self.runInteraction(
"get_backfill_events",
self._get_backfill_events, room_id, event_list, limit
- ).addCallback(self._get_events)
+ ).addCallback(
+ self._get_events
+ ).addCallback(
+ lambda l: sorted(l, key=lambda e: -e.depth)
+ )
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug(
@@ -371,43 +376,54 @@ class EventFederationStore(SQLBaseStore):
room_id, repr(event_list), limit
)
- event_results = event_list
+ event_results = set()
- front = event_list
+ # We want to make sure that we do a breadth-first, "depth" ordered
+ # search.
query = (
- "SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? "
- "LIMIT ?"
+ "SELECT depth, prev_event_id FROM event_edges"
+ " INNER JOIN events"
+ " ON prev_event_id = events.event_id"
+ " AND event_edges.room_id = events.room_id"
+ " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " AND event_edges.is_state = ?"
+ " LIMIT ?"
)
- # We iterate through all event_ids in `front` to select their previous
- # events. These are dumped in `new_front`.
- # We continue until we reach the limit *or* new_front is empty (i.e.,
- # we've run out of things to select
- while front and len(event_results) < limit:
+ queue = PriorityQueue()
- new_front = []
- for event_id in front:
- logger.debug(
- "_backfill_interaction: id=%s",
- event_id
- )
+ for event_id in event_list:
+ depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="depth"
+ )
- txn.execute(
- query,
- (room_id, event_id, limit - len(event_results))
- )
+ queue.put((-depth, event_id))
- for row in txn.fetchall():
- logger.debug(
- "_backfill_interaction: got id=%s",
- *row
- )
- new_front.append(row[0])
+ while not queue.empty() and len(event_results) < limit:
+ try:
+ _, event_id = queue.get_nowait()
+ except Empty:
+ break
- front = new_front
- event_results += new_front
+ if event_id in event_results:
+ continue
+
+ event_results.add(event_id)
+
+ txn.execute(
+ query,
+ (room_id, event_id, False, limit - len(event_results))
+ )
+
+ for row in txn.fetchall():
+ if row[1] not in event_results:
+ queue.put((-row[0], row[1]))
return event_results
|