diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 74b4e23590..1ba073884b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -13,10 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore, cached
from syutil.base64util import encode_base64
import logging
+from Queue import PriorityQueue, Empty
logger = logging.getLogger(__name__)
@@ -33,16 +36,7 @@ class EventFederationStore(SQLBaseStore):
"""
def get_auth_chain(self, event_ids):
- return self.runInteraction(
- "get_auth_chain",
- self._get_auth_chain_txn,
- event_ids
- )
-
- def _get_auth_chain_txn(self, txn, event_ids):
- results = self._get_auth_chain_ids_txn(txn, event_ids)
-
- return self._get_events_txn(txn, results)
+ return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
def get_auth_chain_ids(self, event_ids):
return self.runInteraction(
@@ -79,6 +73,28 @@ class EventFederationStore(SQLBaseStore):
room_id,
)
+ def get_oldest_events_with_depth_in_room(self, room_id):
+ return self.runInteraction(
+ "get_oldest_events_with_depth_in_room",
+ self.get_oldest_events_with_depth_in_room_txn,
+ room_id,
+ )
+
+ def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
+ sql = (
+ "SELECT b.event_id, MAX(e.depth) FROM events as e"
+ " INNER JOIN event_edges as g"
+ " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+ " INNER JOIN event_backward_extremities as b"
+ " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+ " WHERE b.room_id = ? AND g.is_state is ?"
+ " GROUP BY b.event_id"
+ )
+
+ txn.execute(sql, (room_id, False,))
+
+ return dict(txn.fetchall())
+
def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn(
txn,
@@ -247,11 +263,13 @@ class EventFederationStore(SQLBaseStore):
do_insert = depth < min_depth if min_depth else True
if do_insert:
- self._simple_insert_txn(
+ self._simple_upsert_txn(
txn,
table="room_depth",
- values={
+ keyvalues={
"room_id": room_id,
+ },
+ values={
"min_depth": depth,
},
)
@@ -306,31 +324,28 @@ class EventFederationStore(SQLBaseStore):
txn.execute(query, (event_id, room_id))
- # Insert all the prev_events as a backwards thing, they'll get
- # deleted in a second if they're incorrect anyway.
- self._simple_insert_many_txn(
- txn,
- table="event_backward_extremities",
- values=[
- {
- "event_id": e_id,
- "room_id": room_id,
- }
- for e_id, _ in prev_events
- ],
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
+ " )"
)
- # Also delete from the backwards extremities table all ones that
- # reference events that we have already seen
+ txn.executemany(query, [
+ (e_id, room_id, e_id, room_id, e_id, room_id, False)
+ for e_id, _ in prev_events
+ ])
+
query = (
- "DELETE FROM event_backward_extremities WHERE EXISTS ("
- "SELECT 1 FROM events "
- "WHERE "
- "event_backward_extremities.event_id = events.event_id "
- "AND not events.outlier "
- ")"
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
)
- txn.execute(query)
+ txn.execute(query, (event_id, room_id))
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, room_id
@@ -349,6 +364,10 @@ class EventFederationStore(SQLBaseStore):
return self.runInteraction(
"get_backfill_events",
self._get_backfill_events, room_id, event_list, limit
+ ).addCallback(
+ self._get_events
+ ).addCallback(
+ lambda l: sorted(l, key=lambda e: -e.depth)
)
def _get_backfill_events(self, txn, room_id, event_list, limit):
@@ -357,54 +376,75 @@ class EventFederationStore(SQLBaseStore):
room_id, repr(event_list), limit
)
- event_results = event_list
+ event_results = set()
- front = event_list
+ # We want to make sure that we do a breadth-first, "depth" ordered
+ # search.
query = (
- "SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? "
- "LIMIT ?"
+ "SELECT depth, prev_event_id FROM event_edges"
+ " INNER JOIN events"
+ " ON prev_event_id = events.event_id"
+ " AND event_edges.room_id = events.room_id"
+ " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " AND event_edges.is_state = ?"
+ " LIMIT ?"
)
- # We iterate through all event_ids in `front` to select their previous
- # events. These are dumped in `new_front`.
- # We continue until we reach the limit *or* new_front is empty (i.e.,
- # we've run out of things to select
- while front and len(event_results) < limit:
+ queue = PriorityQueue()
- new_front = []
- for event_id in front:
- logger.debug(
- "_backfill_interaction: id=%s",
- event_id
- )
+ for event_id in event_list:
+ depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="depth"
+ )
- txn.execute(
- query,
- (room_id, event_id, limit - len(event_results))
- )
+ queue.put((-depth, event_id))
- for row in txn.fetchall():
- logger.debug(
- "_backfill_interaction: got id=%s",
- *row
- )
- new_front.append(row[0])
+ while not queue.empty() and len(event_results) < limit:
+ try:
+ _, event_id = queue.get_nowait()
+ except Empty:
+ break
- front = new_front
- event_results += new_front
+ if event_id in event_results:
+ continue
+
+ event_results.add(event_id)
+
+ txn.execute(
+ query,
+ (room_id, event_id, False, limit - len(event_results))
+ )
+
+ for row in txn.fetchall():
+ if row[1] not in event_results:
+ queue.put((-row[0], row[1]))
- return self._get_events_txn(txn, event_results)
+ return event_results
+ @defer.inlineCallbacks
def get_missing_events(self, room_id, earliest_events, latest_events,
limit, min_depth):
- return self.runInteraction(
+ ids = yield self.runInteraction(
"get_missing_events",
self._get_missing_events,
room_id, earliest_events, latest_events, limit, min_depth
)
+ events = yield self._get_events(ids)
+
+ events = sorted(
+ [ev for ev in events if ev.depth >= min_depth],
+ key=lambda e: e.depth,
+ )
+
+ defer.returnValue(events[:limit])
+
def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
limit, min_depth):
@@ -436,14 +476,7 @@ class EventFederationStore(SQLBaseStore):
front = new_front
event_results |= new_front
- events = self._get_events_txn(txn, event_results)
-
- events = sorted(
- [ev for ev in events if ev.depth >= min_depth],
- key=lambda e: e.depth,
- )
-
- return events[:limit]
+ return event_results
def clean_room_for_join(self, room_id):
return self.runInteraction(
@@ -456,3 +489,4 @@ class EventFederationStore(SQLBaseStore):
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
txn.execute(query, (room_id,))
+ txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id)
|