summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_federation.py55
1 files changed, 28 insertions, 27 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 2f913adf20..34948c30cf 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached
 from syutil.base64util import encode_base64
 
 import logging
+from Queue import PriorityQueue
 
 
 logger = logging.getLogger(__name__)
@@ -380,41 +381,41 @@ class EventFederationStore(SQLBaseStore):
 
         event_results = set(event_list)
 
-        front = event_list
+        # We want to make sure that we do a breadth-first, "depth" ordered
+        # search.
 
         query = (
-            "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? "
-            "LIMIT ?"
+            "SELECT depth, prev_event_id FROM event_edges"
+            " INNER JOIN events"
+            " ON prev_event_id = events.event_id"
+            " AND event_edges.room_id = events.room_id"
+            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " LIMIT ?"
         )
 
-        # We iterate through all event_ids in `front` to select their previous
-        # events. These are dumped in `new_front`.
-        # We continue until we reach the limit *or* new_front is empty (i.e.,
-        # we've run out of things to select
-        while front and len(event_results) < limit:
+        queue = PriorityQueue()
 
-            new_front = []
-            for event_id in front:
-                logger.debug(
-                    "_backfill_interaction: id=%s",
-                    event_id
-                )
+        for event_id in event_list:
+            txn.execute(
+                query,
+                (room_id, event_id, limit - len(event_results))
+            )
 
-                txn.execute(
-                    query,
-                    (room_id, event_id, limit - len(event_results))
-                )
+            for row in txn.fetchall():
+                queue.put(row)
 
-                for row in txn.fetchall():
-                    logger.debug(
-                        "_backfill_interaction: got id=%s",
-                        *row
-                    )
-                    new_front.append(row[0])
+        while not queue.empty() and len(event_results) < limit:
+            _, event_id = queue.get_nowait()
 
-            front = new_front
-            event_results += new_front
+            event_results.add(event_id)
+
+            txn.execute(
+                query,
+                (room_id, event_id, limit - len(event_results))
+            )
+
+            for row in txn.fetchall():
+                queue.put(row)
 
         return self._get_events_txn(txn, event_results)