diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5d4b7843f3..80eff8e6f2 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached
from syutil.base64util import encode_base64
import logging
+from Queue import PriorityQueue
logger = logging.getLogger(__name__)
@@ -330,12 +331,13 @@ class EventFederationStore(SQLBaseStore):
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?"
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
" )"
)
txn.executemany(query, [
- (e_id, room_id, e_id, room_id, e_id, room_id, )
+ (e_id, room_id, e_id, room_id, e_id, room_id, False)
for e_id, _ in prev_events
])
@@ -370,43 +372,43 @@ class EventFederationStore(SQLBaseStore):
room_id, repr(event_list), limit
)
- event_results = event_list
+ event_results = set(event_list)
- front = event_list
+ # We want to make sure that we do a breadth-first, "depth" ordered
+ # search.
query = (
- "SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? "
- "LIMIT ?"
+ "SELECT depth, prev_event_id FROM event_edges"
+ " INNER JOIN events"
+ " ON prev_event_id = events.event_id"
+ " AND event_edges.room_id = events.room_id"
+ " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " LIMIT ?"
)
- # We iterate through all event_ids in `front` to select their previous
- # events. These are dumped in `new_front`.
- # We continue until we reach the limit *or* new_front is empty (i.e.,
- # we've run out of things to select
- while front and len(event_results) < limit:
+ queue = PriorityQueue()
- new_front = []
- for event_id in front:
- logger.debug(
- "_backfill_interaction: id=%s",
- event_id
- )
+ for event_id in event_list:
+ txn.execute(
+ query,
+ (room_id, event_id, limit - len(event_results))
+ )
- txn.execute(
- query,
- (room_id, event_id, limit - len(event_results))
- )
+ for row in txn.fetchall():
+ queue.put(row)
- for row in txn.fetchall():
- logger.debug(
- "_backfill_interaction: got id=%s",
- *row
- )
- new_front.append(row[0])
+ while not queue.empty() and len(event_results) < limit:
+ _, event_id = queue.get_nowait()
- front = new_front
- event_results += new_front
+ event_results.add(event_id)
+
+ txn.execute(
+ query,
+ (room_id, event_id, limit - len(event_results))
+ )
+
+ for row in txn.fetchall():
+ queue.put(row)
return event_results
|