diff options
Diffstat (limited to 'synapse/storage/event_federation.py')
-rw-r--r-- | synapse/storage/event_federation.py | 176 |
1 files changed, 105 insertions, 71 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 74b4e23590..1ba073884b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -13,10 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging +from Queue import PriorityQueue, Empty logger = logging.getLogger(__name__) @@ -33,16 +36,7 @@ class EventFederationStore(SQLBaseStore): """ def get_auth_chain(self, event_ids): - return self.runInteraction( - "get_auth_chain", - self._get_auth_chain_txn, - event_ids - ) - - def _get_auth_chain_txn(self, txn, event_ids): - results = self._get_auth_chain_ids_txn(txn, event_ids) - - return self._get_events_txn(txn, results) + return self.get_auth_chain_ids(event_ids).addCallback(self._get_events) def get_auth_chain_ids(self, event_ids): return self.runInteraction( @@ -79,6 +73,28 @@ class EventFederationStore(SQLBaseStore): room_id, ) + def get_oldest_events_with_depth_in_room(self, room_id): + return self.runInteraction( + "get_oldest_events_with_depth_in_room", + self.get_oldest_events_with_depth_in_room_txn, + room_id, + ) + + def get_oldest_events_with_depth_in_room_txn(self, txn, room_id): + sql = ( + "SELECT b.event_id, MAX(e.depth) FROM events as e" + " INNER JOIN event_edges as g" + " ON g.event_id = e.event_id AND g.room_id = e.room_id" + " INNER JOIN event_backward_extremities as b" + " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id" + " WHERE b.room_id = ? AND g.is_state is ?" + " GROUP BY b.event_id" + ) + + txn.execute(sql, (room_id, False,)) + + return dict(txn.fetchall()) + def _get_oldest_events_in_room_txn(self, txn, room_id): return self._simple_select_onecol_txn( txn, @@ -247,11 +263,13 @@ class EventFederationStore(SQLBaseStore): do_insert = depth < min_depth if min_depth else True if do_insert: - self._simple_insert_txn( + self._simple_upsert_txn( txn, table="room_depth", - values={ + keyvalues={ "room_id": room_id, + }, + values={ "min_depth": depth, }, ) @@ -306,31 +324,28 @@ class EventFederationStore(SQLBaseStore): txn.execute(query, (event_id, room_id)) - # Insert all the prev_events as a backwards thing, they'll get - # deleted in a second if they're incorrect anyway. - self._simple_insert_many_txn( - txn, - table="event_backward_extremities", - values=[ - { - "event_id": e_id, - "room_id": room_id, - } - for e_id, _ in prev_events - ], + query = ( + "INSERT INTO event_backward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + " )" + " AND NOT EXISTS (" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " + " AND outlier = ?" + " )" ) - # Also delete from the backwards extremities table all ones that - # reference events that we have already seen + txn.executemany(query, [ + (e_id, room_id, e_id, room_id, e_id, room_id, False) + for e_id, _ in prev_events + ]) + query = ( - "DELETE FROM event_backward_extremities WHERE EXISTS (" - "SELECT 1 FROM events " - "WHERE " - "event_backward_extremities.event_id = events.event_id " - "AND not events.outlier " - ")" + "DELETE FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" ) - txn.execute(query) + txn.execute(query, (event_id, room_id)) txn.call_after( self.get_latest_event_ids_in_room.invalidate, room_id @@ -349,6 +364,10 @@ class EventFederationStore(SQLBaseStore): return self.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, event_list, limit + ).addCallback( + self._get_events + ).addCallback( + lambda l: sorted(l, key=lambda e: -e.depth) ) def _get_backfill_events(self, txn, room_id, event_list, limit): @@ -357,54 +376,75 @@ class EventFederationStore(SQLBaseStore): room_id, repr(event_list), limit ) - event_results = event_list + event_results = set() - front = event_list + # We want to make sure that we do a breadth-first, "depth" ordered + # search. query = ( - "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? " - "LIMIT ?" + "SELECT depth, prev_event_id FROM event_edges" + " INNER JOIN events" + " ON prev_event_id = events.event_id" + " AND event_edges.room_id = events.room_id" + " WHERE event_edges.room_id = ? AND event_edges.event_id = ?" + " AND event_edges.is_state = ?" + " LIMIT ?" ) - # We iterate through all event_ids in `front` to select their previous - # events. These are dumped in `new_front`. - # We continue until we reach the limit *or* new_front is empty (i.e., - # we've run out of things to select - while front and len(event_results) < limit: + queue = PriorityQueue() - new_front = [] - for event_id in front: - logger.debug( - "_backfill_interaction: id=%s", - event_id - ) + for event_id in event_list: + depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="depth" + ) - txn.execute( - query, - (room_id, event_id, limit - len(event_results)) - ) + queue.put((-depth, event_id)) - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got id=%s", - *row - ) - new_front.append(row[0]) + while not queue.empty() and len(event_results) < limit: + try: + _, event_id = queue.get_nowait() + except Empty: + break - front = new_front - event_results += new_front + if event_id in event_results: + continue + + event_results.add(event_id) + + txn.execute( + query, + (room_id, event_id, False, limit - len(event_results)) + ) + + for row in txn.fetchall(): + if row[1] not in event_results: + queue.put((-row[0], row[1])) - return self._get_events_txn(txn, event_results) + return event_results + @defer.inlineCallbacks def get_missing_events(self, room_id, earliest_events, latest_events, limit, min_depth): - return self.runInteraction( + ids = yield self.runInteraction( "get_missing_events", self._get_missing_events, room_id, earliest_events, latest_events, limit, min_depth ) + events = yield self._get_events(ids) + + events = sorted( + [ev for ev in events if ev.depth >= min_depth], + key=lambda e: e.depth, + ) + + defer.returnValue(events[:limit]) + def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit, min_depth): @@ -436,14 +476,7 @@ class EventFederationStore(SQLBaseStore): front = new_front event_results |= new_front - events = self._get_events_txn(txn, event_results) - - events = sorted( - [ev for ev in events if ev.depth >= min_depth], - key=lambda e: e.depth, - ) - - return events[:limit] + return event_results def clean_room_for_join(self, room_id): return self.runInteraction( @@ -456,3 +489,4 @@ class EventFederationStore(SQLBaseStore): query = "DELETE FROM event_forward_extremities WHERE room_id = ?" txn.execute(query, (room_id,)) + txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id) |