diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 114ccece65..1ba073884b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached
from syutil.base64util import encode_base64
import logging
+from Queue import PriorityQueue, Empty
logger = logging.getLogger(__name__)
@@ -363,7 +364,11 @@ class EventFederationStore(SQLBaseStore):
return self.runInteraction(
"get_backfill_events",
self._get_backfill_events, room_id, event_list, limit
- ).addCallback(self._get_events)
+ ).addCallback(
+ self._get_events
+ ).addCallback(
+ lambda l: sorted(l, key=lambda e: -e.depth)
+ )
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug(
@@ -371,43 +376,54 @@ class EventFederationStore(SQLBaseStore):
room_id, repr(event_list), limit
)
- event_results = event_list
+ event_results = set()
- front = event_list
+ # We want to make sure that we do a breadth-first, "depth" ordered
+ # search.
query = (
- "SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? "
- "LIMIT ?"
+ "SELECT depth, prev_event_id FROM event_edges"
+ " INNER JOIN events"
+ " ON prev_event_id = events.event_id"
+ " AND event_edges.room_id = events.room_id"
+ " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " AND event_edges.is_state = ?"
+ " LIMIT ?"
)
- # We iterate through all event_ids in `front` to select their previous
- # events. These are dumped in `new_front`.
- # We continue until we reach the limit *or* new_front is empty (i.e.,
- # we've run out of things to select
- while front and len(event_results) < limit:
+ queue = PriorityQueue()
- new_front = []
- for event_id in front:
- logger.debug(
- "_backfill_interaction: id=%s",
- event_id
- )
+ for event_id in event_list:
+ depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="depth"
+ )
- txn.execute(
- query,
- (room_id, event_id, limit - len(event_results))
- )
+ queue.put((-depth, event_id))
- for row in txn.fetchall():
- logger.debug(
- "_backfill_interaction: got id=%s",
- *row
- )
- new_front.append(row[0])
+ while not queue.empty() and len(event_results) < limit:
+ try:
+ _, event_id = queue.get_nowait()
+ except Empty:
+ break
- front = new_front
- event_results += new_front
+ if event_id in event_results:
+ continue
+
+ event_results.add(event_id)
+
+ txn.execute(
+ query,
+ (room_id, event_id, False, limit - len(event_results))
+ )
+
+ for row in txn.fetchall():
+ if row[1] not in event_results:
+ queue.put((-row[0], row[1]))
return event_results
|