diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 53feaa1960..f0aa2193fb 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -235,80 +235,21 @@ class EventFederationStore(SQLBaseStore):
],
)
- self._update_extremeties(txn, events)
+ self._update_backward_extremeties(txn, events)
- def _update_extremeties(self, txn, events):
- """Updates the event_*_extremities tables based on the new/updated
+ def _update_backward_extremeties(self, txn, events):
+ """Updates the event_backward_extremities tables based on the new/updated
events being persisted.
This is called for new events *and* for events that were outliers, but
- are are now being persisted as non-outliers.
+ are now being persisted as non-outliers.
+
+ Forward extremities are handled when we first start persisting the events.
"""
events_by_room = {}
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)
- for room_id, room_events in events_by_room.items():
- prevs = [
- e_id for ev in room_events for e_id, _ in ev.prev_events
- if not ev.internal_metadata.is_outlier()
- ]
- if prevs:
- txn.execute(
- "DELETE FROM event_forward_extremities"
- " WHERE room_id = ?"
- " AND event_id in (%s)" % (
- ",".join(["?"] * len(prevs)),
- ),
- [room_id] + prevs,
- )
-
- query = (
- "INSERT INTO event_forward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
- " )"
- )
-
- txn.executemany(
- query,
- [
- (ev.event_id, ev.room_id, ev.event_id) for ev in events
- if not ev.internal_metadata.is_outlier()
- ]
- )
-
- # We now insert into stream_ordering_to_exterm a mapping from room_id,
- # new stream_ordering to new forward extremeties in the room.
- # This allows us to later efficiently look up the forward extremeties
- # for a room before a given stream_ordering
- max_stream_ord = max(
- ev.internal_metadata.stream_ordering for ev in events
- )
- new_extrem = {}
- for room_id in events_by_room:
- event_ids = self._simple_select_onecol_txn(
- txn,
- table="event_forward_extremities",
- keyvalues={"room_id": room_id},
- retcol="event_id",
- )
- new_extrem[room_id] = event_ids
-
- self._simple_insert_many_txn(
- txn,
- table="stream_ordering_to_exterm",
- values=[
- {
- "room_id": room_id,
- "event_id": event_id,
- "stream_ordering": max_stream_ord,
- }
- for room_id, extrem_evs in new_extrem.items()
- for event_id in extrem_evs
- ]
- )
-
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
@@ -339,11 +280,6 @@ class EventFederationStore(SQLBaseStore):
]
)
- for room_id in events_by_room:
- txn.call_after(
- self.get_latest_event_ids_in_room.invalidate, (room_id,)
- )
-
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
|