diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/events.py | 42 |
1 files changed, 31 insertions, 11 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 8659f605a5..b5e4b6f4b0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -301,15 +301,24 @@ class EventsStore(SQLBaseStore): latest_event_ids = yield self.get_latest_event_ids_in_room( room_id ) + latest_event_ids = frozenset(latest_event_ids) new_latest_event_ids = yield self._calculate_new_extremeties( - room_id, [ev for ev, _ in ev_ctx_rm] + room_id, [ev for ev, _ in ev_ctx_rm], latest_event_ids ) - if new_latest_event_ids == set(latest_event_ids): + if new_latest_event_ids == latest_event_ids: # No change in extremities, so no change in state continue - new_forward_extremeties[room_id] = new_latest_event_ids + to_add = new_latest_event_ids - latest_event_ids + to_remove = latest_event_ids - new_latest_event_ids + + new_forward_extremeties[room_id] = { + "full_list": new_latest_event_ids, + "to_add": to_add, + "to_remove": to_remove, + "prev_latest": latest_event_ids, + } state = yield self._calculate_state_delta( room_id, ev_ctx_rm, new_latest_event_ids @@ -329,15 +338,12 @@ class EventsStore(SQLBaseStore): persist_event_counter.inc_by(len(chunk)) @defer.inlineCallbacks - def _calculate_new_extremeties(self, room_id, events): + def _calculate_new_extremeties(self, room_id, events, latest_event_ids): """Calculates the new forward extremeties for a room given events to persist. Assumes that we are only persisting events for one room at a time. """ - latest_event_ids = yield self.get_latest_event_ids_in_room( - room_id - ) new_latest_event_ids = set(latest_event_ids) # First, add all the new events to the list new_latest_event_ids.update( @@ -573,12 +579,26 @@ class EventsStore(SQLBaseStore): txn, self.get_users_in_room, (room_id,) ) - for room_id, new_extrem in new_forward_extremeties.items(): - self._simple_delete_txn( + for room_id, new_extrem_dict in new_forward_extremeties.items(): + current_latest = self._simple_select_onecol_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id}, + retcol="event_id" ) + + if set(current_latest) != new_extrem_dict["prev_latest"]: + raise RuntimeError( + "event_forward_extremities don't match that when we" + " calculated new extrems" + ) + + txn.executemany( + "DELETE FROM event_forward_extremities" + " WHERE room_id = ? AND event_id = ?", + ((room_id, event_id) for event_id in new_extrem_dict["to_remove"]) + ) + txn.call_after( self.get_latest_event_ids_in_room.invalidate, (room_id,) ) @@ -592,7 +612,7 @@ class EventsStore(SQLBaseStore): "room_id": room_id, } for room_id, new_extrem in new_forward_extremeties.items() - for ev_id in new_extrem + for ev_id in new_extrem["to_add"] ], ) # We now insert into stream_ordering_to_exterm a mapping from room_id, @@ -609,7 +629,7 @@ class EventsStore(SQLBaseStore): "stream_ordering": max_stream_order, } for room_id, new_extrem in new_forward_extremeties.items() - for event_id in new_extrem + for event_id in new_extrem["full_list"] ] ) |