diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8659f605a5..b5e4b6f4b0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -301,15 +301,24 @@ class EventsStore(SQLBaseStore):
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
+ latest_event_ids = frozenset(latest_event_ids)
new_latest_event_ids = yield self._calculate_new_extremeties(
- room_id, [ev for ev, _ in ev_ctx_rm]
+ room_id, [ev for ev, _ in ev_ctx_rm], latest_event_ids
)
- if new_latest_event_ids == set(latest_event_ids):
+ if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state
continue
- new_forward_extremeties[room_id] = new_latest_event_ids
+ to_add = new_latest_event_ids - latest_event_ids
+ to_remove = latest_event_ids - new_latest_event_ids
+
+ new_forward_extremeties[room_id] = {
+ "full_list": new_latest_event_ids,
+ "to_add": to_add,
+ "to_remove": to_remove,
+ "prev_latest": latest_event_ids,
+ }
state = yield self._calculate_state_delta(
room_id, ev_ctx_rm, new_latest_event_ids
@@ -329,15 +338,12 @@ class EventsStore(SQLBaseStore):
persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks
- def _calculate_new_extremeties(self, room_id, events):
+ def _calculate_new_extremeties(self, room_id, events, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
persist.
Assumes that we are only persisting events for one room at a time.
"""
- latest_event_ids = yield self.get_latest_event_ids_in_room(
- room_id
- )
new_latest_event_ids = set(latest_event_ids)
# First, add all the new events to the list
new_latest_event_ids.update(
@@ -573,12 +579,26 @@ class EventsStore(SQLBaseStore):
txn, self.get_users_in_room, (room_id,)
)
- for room_id, new_extrem in new_forward_extremeties.items():
- self._simple_delete_txn(
+ for room_id, new_extrem_dict in new_forward_extremeties.items():
+ current_latest = self._simple_select_onecol_txn(
txn,
table="event_forward_extremities",
keyvalues={"room_id": room_id},
+ retcol="event_id"
)
+
+ if set(current_latest) != new_extrem_dict["prev_latest"]:
+ raise RuntimeError(
+ "event_forward_extremities don't match that when we"
+ " calculated new extrems"
+ )
+
+ txn.executemany(
+ "DELETE FROM event_forward_extremities"
+ " WHERE room_id = ? AND event_id = ?",
+ ((room_id, event_id) for event_id in new_extrem_dict["to_remove"])
+ )
+
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
@@ -592,7 +612,7 @@ class EventsStore(SQLBaseStore):
"room_id": room_id,
}
for room_id, new_extrem in new_forward_extremeties.items()
- for ev_id in new_extrem
+ for ev_id in new_extrem["to_add"]
],
)
# We now insert into stream_ordering_to_exterm a mapping from room_id,
@@ -609,7 +629,7 @@ class EventsStore(SQLBaseStore):
"stream_ordering": max_stream_order,
}
for room_id, new_extrem in new_forward_extremeties.items()
- for event_id in new_extrem
+ for event_id in new_extrem["full_list"]
]
)
|