From 0ac2a79faa918280767c18e4db7ec29d7d3a3afb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Feb 2015 17:24:14 +0000 Subject: Initial stab at implementing a batched get_missing_pdus request --- synapse/storage/event_federation.py | 63 ++++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 5 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3fbc090224..22bf7ad832 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -32,15 +32,15 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ - def get_auth_chain(self, event_ids): + def get_auth_chain(self, event_ids, have_ids=set()): return self.runInteraction( "get_auth_chain", self._get_auth_chain_txn, - event_ids + event_ids, have_ids ) - def _get_auth_chain_txn(self, txn, event_ids): - results = self._get_auth_chain_ids_txn(txn, event_ids) + def _get_auth_chain_txn(self, txn, event_ids, have_ids): + results = self._get_auth_chain_ids_txn(txn, event_ids, have_ids) return self._get_events_txn(txn, results) @@ -51,8 +51,9 @@ class EventFederationStore(SQLBaseStore): event_ids ) - def _get_auth_chain_ids_txn(self, txn, event_ids): + def _get_auth_chain_ids_txn(self, txn, event_ids, have_ids): results = set() + have_ids = set(have_ids) base_sql = ( "SELECT auth_id FROM event_auth WHERE event_id = ?" @@ -64,6 +65,10 @@ class EventFederationStore(SQLBaseStore): for f in front: txn.execute(base_sql, (f,)) new_front.update([r[0] for r in txn.fetchall()]) + + new_front -= results + new_front -= have_ids + front = new_front results.update(front) @@ -378,3 +383,51 @@ class EventFederationStore(SQLBaseStore): event_results += new_front return self._get_events_txn(txn, event_results) + + def get_missing_events(self, room_id, earliest_events, latest_events, + limit, min_depth): + return self.runInteraction( + "get_missing_events", + self._get_missing_events, + room_id, earliest_events, latest_events, limit, min_depth + ) + + def _get_missing_events(self, txn, room_id, earliest_events, latest_events, + limit, min_depth): + + earliest_events = set(earliest_events) + front = set(latest_events) - earliest_events + + event_results = set() + + query = ( + "SELECT prev_event_id FROM event_edges " + "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "LIMIT ?" + ) + + while front and len(event_results) < limit: + new_front = set() + for event_id in front: + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) + + for e_id, in txn.fetchall(): + new_front.add(e_id) + + new_front -= earliest_events + new_front -= event_results + + front = new_front + event_results |= new_front + + events = self._get_events_txn(txn, event_results) + + events = sorted( + [ev for ev in events if ev.depth >= min_depth], + key=lambda e: e.depth, + ) + + return events[:limit] -- cgit 1.4.1 From 42b972bccd0cf7d903befb498f9c1bbd5c4e6583 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Feb 2015 14:35:23 +0000 Subject: Revert get_auth_chain changes --- synapse/storage/event_federation.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 22bf7ad832..2deda8ac50 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -32,15 +32,15 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ - def get_auth_chain(self, event_ids, have_ids=set()): + def get_auth_chain(self, event_ids): return self.runInteraction( "get_auth_chain", self._get_auth_chain_txn, - event_ids, have_ids + event_ids ) - def _get_auth_chain_txn(self, txn, event_ids, have_ids): - results = self._get_auth_chain_ids_txn(txn, event_ids, have_ids) + def _get_auth_chain_txn(self, txn, event_ids): + results = self._get_auth_chain_ids_txn(txn, event_ids) return self._get_events_txn(txn, results) @@ -51,9 +51,8 @@ class EventFederationStore(SQLBaseStore): event_ids ) - def _get_auth_chain_ids_txn(self, txn, event_ids, have_ids): + def _get_auth_chain_ids_txn(self, txn, event_ids): results = set() - have_ids = set(have_ids) base_sql = ( "SELECT auth_id FROM event_auth WHERE event_id = ?" @@ -67,7 +66,6 @@ class EventFederationStore(SQLBaseStore): new_front.update([r[0] for r in txn.fetchall()]) new_front -= results - new_front -= have_ids front = new_front results.update(front) -- cgit 1.4.1