diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 74b4e23590..5d4b7843f3 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore, cached
from syutil.base64util import encode_base64
@@ -33,16 +35,7 @@ class EventFederationStore(SQLBaseStore):
"""
def get_auth_chain(self, event_ids):
- return self.runInteraction(
- "get_auth_chain",
- self._get_auth_chain_txn,
- event_ids
- )
-
- def _get_auth_chain_txn(self, txn, event_ids):
- results = self._get_auth_chain_ids_txn(txn, event_ids)
-
- return self._get_events_txn(txn, results)
+ return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
def get_auth_chain_ids(self, event_ids):
return self.runInteraction(
@@ -79,6 +72,28 @@ class EventFederationStore(SQLBaseStore):
room_id,
)
+ def get_oldest_events_with_depth_in_room(self, room_id):
+ return self.runInteraction(
+ "get_oldest_events_with_depth_in_room",
+ self.get_oldest_events_with_depth_in_room_txn,
+ room_id,
+ )
+
+ def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
+ sql = (
+ "SELECT b.event_id, MAX(e.depth) FROM events as e"
+ " INNER JOIN event_edges as g"
+ " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+ " INNER JOIN event_backward_extremities as b"
+ " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+ " WHERE b.room_id = ? AND g.is_state is ?"
+ " GROUP BY b.event_id"
+ )
+
+ txn.execute(sql, (room_id, False,))
+
+ return dict(txn.fetchall())
+
def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn(
txn,
@@ -247,11 +262,13 @@ class EventFederationStore(SQLBaseStore):
do_insert = depth < min_depth if min_depth else True
if do_insert:
- self._simple_insert_txn(
+ self._simple_upsert_txn(
txn,
table="room_depth",
- values={
+ keyvalues={
"room_id": room_id,
+ },
+ values={
"min_depth": depth,
},
)
@@ -306,31 +323,27 @@ class EventFederationStore(SQLBaseStore):
txn.execute(query, (event_id, room_id))
- # Insert all the prev_events as a backwards thing, they'll get
- # deleted in a second if they're incorrect anyway.
- self._simple_insert_many_txn(
- txn,
- table="event_backward_extremities",
- values=[
- {
- "event_id": e_id,
- "room_id": room_id,
- }
- for e_id, _ in prev_events
- ],
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?"
+ " )"
)
- # Also delete from the backwards extremities table all ones that
- # reference events that we have already seen
+ txn.executemany(query, [
+ (e_id, room_id, e_id, room_id, e_id, room_id, )
+ for e_id, _ in prev_events
+ ])
+
query = (
- "DELETE FROM event_backward_extremities WHERE EXISTS ("
- "SELECT 1 FROM events "
- "WHERE "
- "event_backward_extremities.event_id = events.event_id "
- "AND not events.outlier "
- ")"
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
)
- txn.execute(query)
+ txn.execute(query, (event_id, room_id))
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, room_id
@@ -349,7 +362,7 @@ class EventFederationStore(SQLBaseStore):
return self.runInteraction(
"get_backfill_events",
self._get_backfill_events, room_id, event_list, limit
- )
+ ).addCallback(self._get_events)
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug(
@@ -395,16 +408,26 @@ class EventFederationStore(SQLBaseStore):
front = new_front
event_results += new_front
- return self._get_events_txn(txn, event_results)
+ return event_results
+ @defer.inlineCallbacks
def get_missing_events(self, room_id, earliest_events, latest_events,
limit, min_depth):
- return self.runInteraction(
+ ids = yield self.runInteraction(
"get_missing_events",
self._get_missing_events,
room_id, earliest_events, latest_events, limit, min_depth
)
+ events = yield self._get_events(ids)
+
+ events = sorted(
+ [ev for ev in events if ev.depth >= min_depth],
+ key=lambda e: e.depth,
+ )
+
+ defer.returnValue(events[:limit])
+
def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
limit, min_depth):
@@ -436,14 +459,7 @@ class EventFederationStore(SQLBaseStore):
front = new_front
event_results |= new_front
- events = self._get_events_txn(txn, event_results)
-
- events = sorted(
- [ev for ev in events if ev.depth >= min_depth],
- key=lambda e: e.depth,
- )
-
- return events[:limit]
+ return event_results
def clean_room_for_join(self, room_id):
return self.runInteraction(
|