diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index ace7459538..09705c60be 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -282,8 +282,7 @@ class EventFederationStore(SQLBaseStore):
},
)
- def _handle_prev_events(self, txn, outlier, event_id, prev_events,
- room_id):
+ def _handle_mult_prev_events(self, txn, events):
"""
For the given event, update the event edges table and forward and
backward extremities tables.
@@ -293,68 +292,75 @@ class EventFederationStore(SQLBaseStore):
table="event_edges",
values=[
{
- "event_id": event_id,
+ "event_id": ev.event_id,
"prev_event_id": e_id,
- "room_id": room_id,
+ "room_id": ev.room_id,
"is_state": False,
}
- for e_id, _ in prev_events
+ for ev in events
+ for e_id, _ in ev.prev_events
],
)
- # Update the extremities table if this is not an outlier.
- if not outlier:
- for e_id, _ in prev_events:
- # TODO (erikj): This could be done as a bulk insert
- self._simple_delete_txn(
- txn,
- table="event_forward_extremities",
- keyvalues={
- "event_id": e_id,
- "room_id": room_id,
- }
- )
+ events_by_room = {}
+ for ev in events:
+ events_by_room.setdefault(ev.room_id, []).append(ev)
- # We only insert as a forward extremity the new event if there are
- # no other events that reference it as a prev event
- query = (
- "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
- )
+ for room_id, room_events in events_by_room.items():
+ prevs = [
+ e_id for ev in room_events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ if prevs:
+ txn.execute(
+ "DELETE FROM event_forward_extremities"
+ " WHERE room_id = ?"
+ " AND event_id in (%s)" % (
+ ",".join(["?"] * len(prevs)),
+ ),
+ [room_id] + prevs,
+ )
- txn.execute(query, (event_id,))
+ query = (
+ "INSERT INTO event_forward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+ " )"
+ )
- if not txn.fetchone():
- query = (
- "INSERT INTO event_forward_extremities"
- " (event_id, room_id)"
- " VALUES (?, ?)"
- )
+ txn.executemany(
+ query,
+ [(ev.event_id, ev.room_id, ev.event_id) for ev in events]
+ )
- txn.execute(query, (event_id, room_id))
-
- query = (
- "INSERT INTO event_backward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- " )"
- " AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
- " AND outlier = ?"
- " )"
- )
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
+ " )"
+ )
- txn.executemany(query, [
- (e_id, room_id, e_id, room_id, e_id, room_id, False)
- for e_id, _ in prev_events
- ])
+ txn.executemany(query, [
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ for ev in events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ])
- query = (
- "DELETE FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- )
- txn.execute(query, (event_id, room_id))
+ query = (
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ )
+ txn.executemany(
+ query,
+ [(ev.event_id, ev.room_id) for ev in events]
+ )
+ for room_id in events_by_room:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, room_id
)
|