summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_federation.py80
1 files changed, 49 insertions, 31 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5d4b7843f3..1ba073884b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached
 from syutil.base64util import encode_base64
 
 import logging
+from Queue import PriorityQueue, Empty
 
 
 logger = logging.getLogger(__name__)
@@ -330,12 +331,13 @@ class EventFederationStore(SQLBaseStore):
                 " WHERE event_id = ? AND room_id = ?"
                 " )"
                 " AND NOT EXISTS ("
-                " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?"
+                " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+                " AND outlier = ?"
                 " )"
             )
 
             txn.executemany(query, [
-                (e_id, room_id, e_id, room_id, e_id, room_id, )
+                (e_id, room_id, e_id, room_id, e_id, room_id, False)
                 for e_id, _ in prev_events
             ])
 
@@ -362,7 +364,11 @@ class EventFederationStore(SQLBaseStore):
         return self.runInteraction(
             "get_backfill_events",
             self._get_backfill_events, room_id, event_list, limit
-        ).addCallback(self._get_events)
+        ).addCallback(
+            self._get_events
+        ).addCallback(
+            lambda l: sorted(l, key=lambda e: -e.depth)
+        )
 
     def _get_backfill_events(self, txn, room_id, event_list, limit):
         logger.debug(
@@ -370,43 +376,54 @@ class EventFederationStore(SQLBaseStore):
             room_id, repr(event_list), limit
         )
 
-        event_results = event_list
+        event_results = set()
 
-        front = event_list
+        # We want to make sure that we do a breadth-first, "depth" ordered
+        # search.
 
         query = (
-            "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? "
-            "LIMIT ?"
+            "SELECT depth, prev_event_id FROM event_edges"
+            " INNER JOIN events"
+            " ON prev_event_id = events.event_id"
+            " AND event_edges.room_id = events.room_id"
+            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " AND event_edges.is_state = ?"
+            " LIMIT ?"
         )
 
-        # We iterate through all event_ids in `front` to select their previous
-        # events. These are dumped in `new_front`.
-        # We continue until we reach the limit *or* new_front is empty (i.e.,
-        # we've run out of things to select
-        while front and len(event_results) < limit:
+        queue = PriorityQueue()
 
-            new_front = []
-            for event_id in front:
-                logger.debug(
-                    "_backfill_interaction: id=%s",
-                    event_id
-                )
+        for event_id in event_list:
+            depth = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "event_id": event_id,
+                },
+                retcol="depth"
+            )
 
-                txn.execute(
-                    query,
-                    (room_id, event_id, limit - len(event_results))
-                )
+            queue.put((-depth, event_id))
 
-                for row in txn.fetchall():
-                    logger.debug(
-                        "_backfill_interaction: got id=%s",
-                        *row
-                    )
-                    new_front.append(row[0])
+        while not queue.empty() and len(event_results) < limit:
+            try:
+                _, event_id = queue.get_nowait()
+            except Empty:
+                break
 
-            front = new_front
-            event_results += new_front
+            if event_id in event_results:
+                continue
+
+            event_results.add(event_id)
+
+            txn.execute(
+                query,
+                (room_id, event_id, False, limit - len(event_results))
+            )
+
+            for row in txn.fetchall():
+                if row[1] not in event_results:
+                    queue.put((-row[0], row[1]))
 
         return event_results
 
@@ -472,3 +489,4 @@ class EventFederationStore(SQLBaseStore):
         query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
 
         txn.execute(query, (room_id,))
+        txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id)