diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 2f913adf20..34948c30cf 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached
from syutil.base64util import encode_base64
import logging
+from Queue import PriorityQueue
logger = logging.getLogger(__name__)
@@ -380,41 +381,41 @@ class EventFederationStore(SQLBaseStore):
event_results = set(event_list)
- front = event_list
+ # We want to make sure that we do a breadth-first, "depth" ordered
+ # search.
query = (
- "SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? "
- "LIMIT ?"
+ "SELECT depth, prev_event_id FROM event_edges"
+ " INNER JOIN events"
+ " ON prev_event_id = events.event_id"
+ " AND event_edges.room_id = events.room_id"
+ " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " LIMIT ?"
)
- # We iterate through all event_ids in `front` to select their previous
- # events. These are dumped in `new_front`.
- # We continue until we reach the limit *or* new_front is empty (i.e.,
- # we've run out of things to select
- while front and len(event_results) < limit:
+ queue = PriorityQueue()
- new_front = []
- for event_id in front:
- logger.debug(
- "_backfill_interaction: id=%s",
- event_id
- )
+ for event_id in event_list:
+ txn.execute(
+ query,
+ (room_id, event_id, limit - len(event_results))
+ )
- txn.execute(
- query,
- (room_id, event_id, limit - len(event_results))
- )
+ for row in txn.fetchall():
+ queue.put(row)
- for row in txn.fetchall():
- logger.debug(
- "_backfill_interaction: got id=%s",
- *row
- )
- new_front.append(row[0])
+ while not queue.empty() and len(event_results) < limit:
+ _, event_id = queue.get_nowait()
- front = new_front
- event_results += new_front
+ event_results.add(event_id)
+
+ txn.execute(
+ query,
+ (room_id, event_id, limit - len(event_results))
+ )
+
+ for row in txn.fetchall():
+ queue.put(row)
return self._get_events_txn(txn, event_results)
|