diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 43 |
1 files changed, 27 insertions, 16 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dd28c2efe3..af56f1ee57 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -342,8 +342,20 @@ class EventsStore(SQLBaseStore): # NB: Assumes that we are only persisting events for one room # at a time. + + # map room_id->list[event_ids] giving the new forward + # extremities in each room new_forward_extremeties = {} + + # map room_id->(type,state_key)->event_id tracking the full + # state in each room after adding these events current_state_for_room = {} + + # map room_id->(to_delete, to_insert) where each entry is + # a map (type,key)->event_id giving the state delta in each + # room + state_delta_for_room = {} + if not backfilled: with Measure(self._clock, "_calculate_state_and_extrem"): # Work out the new "current state" for each room. @@ -393,11 +405,12 @@ class EventsStore(SQLBaseStore): ev_ctx_rm, new_latest_event_ids, ) if current_state is not None: + current_state_for_room[room_id] = current_state delta = yield self._calculate_state_delta( room_id, current_state, ) if delta is not None: - current_state_for_room[room_id] = delta + state_delta_for_room[room_id] = delta yield self.runInteraction( "persist_events", @@ -405,7 +418,7 @@ class EventsStore(SQLBaseStore): events_and_contexts=chunk, backfilled=backfilled, delete_existing=delete_existing, - current_state_for_room=current_state_for_room, + state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) @@ -422,7 +435,7 @@ class EventsStore(SQLBaseStore): event_counter.inc(event.type, origin_type, origin_entity) - for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + for room_id, new_state in current_state_for_room.iteritems(): self.get_current_state_ids.prefill( (room_id, ), new_state ) @@ -586,10 +599,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + 2-tuple (to_delete, to_insert) where both are state dicts, i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. `new_state` is the full set of state. + to insert. """ existing_state = yield self.get_current_state_ids(room_id) @@ -610,7 +623,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert, current_state)) + defer.returnValue((to_delete, to_insert)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -670,7 +683,7 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, - delete_existing=False, current_state_for_room={}, + delete_existing=False, state_delta_for_room={}, new_forward_extremeties={}): """Insert some number of room events into the necessary database tables. @@ -686,7 +699,7 @@ class EventsStore(SQLBaseStore): delete_existing (bool): True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. - current_state_for_room (dict[str, (list[str], list[str])]): + state_delta_for_room (dict[str, (list[str], list[str])]): The current-state delta for each room. For each room, a tuple (to_delete, to_insert), being a list of event ids to be removed from the current state, and a list of event ids to be added to @@ -698,7 +711,7 @@ class EventsStore(SQLBaseStore): """ max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - self._update_current_state_txn(txn, current_state_for_room, max_stream_order) + self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) self._update_forward_extremities_txn( txn, @@ -742,9 +755,8 @@ class EventsStore(SQLBaseStore): events_and_contexts=events_and_contexts, ) - # Insert into the state_groups, state_groups_state, and - # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, events_and_contexts) + # Insert into event_to_state_groups. + self._store_event_state_mappings_txn(txn, events_and_contexts) # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. @@ -764,7 +776,7 @@ class EventsStore(SQLBaseStore): def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert, _ = current_state_tuple + to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], @@ -979,10 +991,9 @@ class EventsStore(SQLBaseStore): # an outlier in the database. We now have some state at that # so we need to update the state_groups table with that state. - # insert into the state_group, state_groups_state and - # event_to_state_groups tables. + # insert into event_to_state_groups. try: - self._store_mult_state_groups_txn(txn, ((event, context),)) + self._store_event_state_mappings_txn(txn, ((event, context),)) except Exception: logger.exception("") raise |