diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/event_federation.py | 80 |
1 files changed, 49 insertions, 31 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 5d4b7843f3..1ba073884b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging +from Queue import PriorityQueue, Empty logger = logging.getLogger(__name__) @@ -330,12 +331,13 @@ class EventFederationStore(SQLBaseStore): " WHERE event_id = ? AND room_id = ?" " )" " AND NOT EXISTS (" - " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " + " AND outlier = ?" " )" ) txn.executemany(query, [ - (e_id, room_id, e_id, room_id, e_id, room_id, ) + (e_id, room_id, e_id, room_id, e_id, room_id, False) for e_id, _ in prev_events ]) @@ -362,7 +364,11 @@ class EventFederationStore(SQLBaseStore): return self.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, event_list, limit - ).addCallback(self._get_events) + ).addCallback( + self._get_events + ).addCallback( + lambda l: sorted(l, key=lambda e: -e.depth) + ) def _get_backfill_events(self, txn, room_id, event_list, limit): logger.debug( @@ -370,43 +376,54 @@ class EventFederationStore(SQLBaseStore): room_id, repr(event_list), limit ) - event_results = event_list + event_results = set() - front = event_list + # We want to make sure that we do a breadth-first, "depth" ordered + # search. query = ( - "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? " - "LIMIT ?" + "SELECT depth, prev_event_id FROM event_edges" + " INNER JOIN events" + " ON prev_event_id = events.event_id" + " AND event_edges.room_id = events.room_id" + " WHERE event_edges.room_id = ? AND event_edges.event_id = ?" + " AND event_edges.is_state = ?" + " LIMIT ?" ) - # We iterate through all event_ids in `front` to select their previous - # events. These are dumped in `new_front`. - # We continue until we reach the limit *or* new_front is empty (i.e., - # we've run out of things to select - while front and len(event_results) < limit: + queue = PriorityQueue() - new_front = [] - for event_id in front: - logger.debug( - "_backfill_interaction: id=%s", - event_id - ) + for event_id in event_list: + depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="depth" + ) - txn.execute( - query, - (room_id, event_id, limit - len(event_results)) - ) + queue.put((-depth, event_id)) - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got id=%s", - *row - ) - new_front.append(row[0]) + while not queue.empty() and len(event_results) < limit: + try: + _, event_id = queue.get_nowait() + except Empty: + break - front = new_front - event_results += new_front + if event_id in event_results: + continue + + event_results.add(event_id) + + txn.execute( + query, + (room_id, event_id, False, limit - len(event_results)) + ) + + for row in txn.fetchall(): + if row[1] not in event_results: + queue.put((-row[0], row[1])) return event_results @@ -472,3 +489,4 @@ class EventFederationStore(SQLBaseStore): query = "DELETE FROM event_forward_extremities WHERE room_id = ?" txn.execute(query, (room_id,)) + txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id) |