diff options
author | Matthew Hodgson <matthew@matrix.org> | 2018-07-25 17:27:49 +0100 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2018-07-25 17:27:49 +0100 |
commit | 2565804030b14eab5f1455e8a860c1fbd71d45fc (patch) | |
tree | 7cdac37c0d7a0611b54f32312fdf5028c0c91b44 /synapse/storage/events.py | |
parent | flake8 (diff) | |
parent | Merge pull request #3603 from matrix-org/erikj/handle_outliers (diff) | |
download | synapse-2565804030b14eab5f1455e8a860c1fbd71d45fc.tar.xz |
Merge branch 'develop' into matthew/filter_members
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 196 |
1 files changed, 131 insertions, 65 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 19c05dc8d6..200f5ec95f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ import logging from collections import OrderedDict, deque, namedtuple from functools import wraps -from six import iteritems, itervalues +from six import iteritems from six.moves import range from canonicaljson import json @@ -142,15 +142,14 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: - # handle_queue_loop runs in the sentinel logcontext, so - # there is no need to preserve_fn when running the - # callbacks on the deferred. try: ret = yield per_item_callback(item) + except Exception: + with PreserveLoggingContext(): + item.deferred.errback() + else: with PreserveLoggingContext(): item.deferred.callback(ret) - except Exception: - item.deferred.errback() finally: queue = self._event_persist_queues.pop(room_id, None) if queue: @@ -344,11 +343,14 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties = {} # map room_id->(type,state_key)->event_id tracking the full - # state in each room after adding these events + # state in each room after adding these events. + # This is simply used to prefill the get_current_state_ids + # cache current_state_for_room = {} - # map room_id->(to_delete, to_insert) where each entry is - # a map (type,key)->event_id giving the state delta in each + # map room_id->(to_delete, to_insert) where to_delete is a list + # of type/state keys to remove from current state, and to_insert + # is a map (type,key)->event_id giving the state delta in each # room state_delta_for_room = {} @@ -418,28 +420,40 @@ class EventsStore(EventsWorkerStore): logger.info( "Calculating state delta for room %s", room_id, ) - with Measure( - self._clock, - "persist_events.get_new_state_after_events", + self._clock, + "persist_events.get_new_state_after_events", ): - current_state = yield self._get_new_state_after_events( + res = yield self._get_new_state_after_events( room_id, ev_ctx_rm, latest_event_ids, new_latest_event_ids, ) - - if current_state is not None: - current_state_for_room[room_id] = current_state + current_state, delta_ids = res + + # If either are not None then there has been a change, + # and we need to work out the delta (or use that + # given) + if delta_ids is not None: + # If there is a delta we know that we've + # only added or replaced state, never + # removed keys entirely. + state_delta_for_room[room_id] = ([], delta_ids) + elif current_state is not None: with Measure( - self._clock, - "persist_events.calculate_state_delta", + self._clock, + "persist_events.calculate_state_delta", ): delta = yield self._calculate_state_delta( room_id, current_state, ) - state_delta_for_room[room_id] = delta + state_delta_for_room[room_id] = delta + + # If we have the current_state then lets prefill + # the cache with it. + if current_state is not None: + current_state_for_room[room_id] = current_state yield self.runInteraction( "persist_events", @@ -538,9 +552,15 @@ class EventsStore(EventsWorkerStore): the new forward extremities for the room. Returns: - Deferred[dict[(str,str), str]|None]: - None if there are no changes to the room state, or - a dict of (type, state_key) -> event_id]. + Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]: + Returns a tuple of two state maps, the first being the full new current + state and the second being the delta to the existing current state. + If both are None then there has been no change. + + If there has been a change then we only return the delta if its + already been calculated. Conversely if we do know the delta then + the new current state is only returned if we've already calculated + it. """ if not new_latest_event_ids: @@ -548,13 +568,19 @@ class EventsStore(EventsWorkerStore): # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} + + # Map from (prev state group, new state group) -> delta state dict + state_group_deltas = {} + for ev, ctx in events_context: if ctx.state_group is None: - # I don't think this can happen, but let's double-check - raise Exception( - "Context for new extremity event %s has no state " - "group" % (ev.event_id, ), - ) + # This should only happen for outlier events. + if not ev.internal_metadata.is_outlier(): + raise Exception( + "Context for new event %s has no state " + "group" % (ev.event_id, ), + ) + continue if ctx.state_group in state_groups_map: continue @@ -566,6 +592,9 @@ class EventsStore(EventsWorkerStore): if current_state_ids is not None: state_groups_map[ctx.state_group] = current_state_ids + if ctx.prev_group: + state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids + # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can # pull the state group from its context. @@ -579,7 +608,7 @@ class EventsStore(EventsWorkerStore): for event_id in new_latest_event_ids: # First search in the list of new events we're adding. for ev, ctx in events_context: - if event_id == ev.event_id: + if event_id == ev.event_id and ctx.state_group is not None: event_id_to_state_group[event_id] = ctx.state_group break else: @@ -607,7 +636,26 @@ class EventsStore(EventsWorkerStore): # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return + defer.returnValue((None, None)) + + if len(new_state_groups) == 1 and len(old_state_groups) == 1: + # If we're going from one state group to another, lets check if + # we have a delta for that transition. If we do then we can just + # return that. + + new_state_group = next(iter(new_state_groups)) + old_state_group = next(iter(old_state_groups)) + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + if delta_ids is not None: + # We have a delta from the existing to new current state, + # so lets just return that. If we happen to already have + # the current state in memory then lets also return that, + # but it doesn't matter if we don't. + new_state = state_groups_map.get(new_state_group) + defer.returnValue((new_state, delta_ids)) # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -619,7 +667,7 @@ class EventsStore(EventsWorkerStore): if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - defer.returnValue(state_groups_map[new_state_groups.pop()]) + defer.returnValue((state_groups_map[new_state_groups.pop()], None)) # Ok, we need to defer to the state handler to resolve our state sets. @@ -638,7 +686,7 @@ class EventsStore(EventsWorkerStore): room_id, state_groups, events_map, get_events ) - defer.returnValue(res.state) + defer.returnValue((res.state, None)) @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): @@ -647,17 +695,16 @@ class EventsStore(EventsWorkerStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, - i.e. (type, state_key) -> event_id. `to_delete` are the entries to - first be deleted from current_state_events, `to_insert` are entries - to insert. + tuple[list, dict] (to_delete, to_insert): where to_delete are the + type/state_keys to remove from current_state_events and `to_insert` + are the updates to current_state_events. """ existing_state = yield self.get_current_state_ids(room_id) - to_delete = { - key: ev_id for key, ev_id in iteritems(existing_state) - if ev_id != current_state.get(key) - } + to_delete = [ + key for key in existing_state + if key not in current_state + ] to_insert = { key: ev_id for key, ev_id in iteritems(current_state) @@ -684,10 +731,10 @@ class EventsStore(EventsWorkerStore): delete_existing (bool): True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. - state_delta_for_room (dict[str, (list[str], list[str])]): + state_delta_for_room (dict[str, (list, dict)]): The current-state delta for each room. For each room, a tuple - (to_delete, to_insert), being a list of event ids to be removed - from the current state, and a list of event ids to be added to + (to_delete, to_insert), being a list of type/state keys to be + removed from the current state, and a state set to be added to the current state. new_forward_extremeties (dict[str, list[str]]): The new forward extremities for each room. For each room, a @@ -765,9 +812,46 @@ class EventsStore(EventsWorkerStore): def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple + + # First we add entries to the current_state_delta_stream. We + # do this before updating the current_state_events table so + # that we can use it to calculate the `prev_event_id`. (This + # allows us to not have to pull out the existing state + # unnecessarily). + sql = """ + INSERT INTO current_state_delta_stream + (stream_id, room_id, type, state_key, event_id, prev_event_id) + SELECT ?, ?, ?, ?, ?, ( + SELECT event_id FROM current_state_events + WHERE room_id = ? AND type = ? AND state_key = ? + ) + """ + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, None, + room_id, etype, state_key, + ) + for etype, state_key in to_delete + # We sanity check that we're deleting rather than updating + if (etype, state_key) not in to_insert + )) + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, ev_id, + room_id, etype, state_key, + ) + for (etype, state_key), ev_id in iteritems(to_insert) + )) + + # Now we actually update the current_state_events table + txn.executemany( - "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in itervalues(to_delete)], + "DELETE FROM current_state_events" + " WHERE room_id = ? AND type = ? AND state_key = ?", + ( + (room_id, etype, state_key) + for etype, state_key in itertools.chain(to_delete, to_insert) + ), ) self._simple_insert_many_txn( @@ -784,25 +868,6 @@ class EventsStore(EventsWorkerStore): ], ) - state_deltas = {key: None for key in to_delete} - state_deltas.update(to_insert) - - self._simple_insert_many_txn( - txn, - table="current_state_delta_stream", - values=[ - { - "stream_id": max_stream_order, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": ev_id, - "prev_event_id": to_delete.get(key, None), - } - for key, ev_id in iteritems(state_deltas) - ] - ) - txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, room_id, max_stream_order, @@ -816,7 +881,8 @@ class EventsStore(EventsWorkerStore): # and which we have added, then we invlidate the caches for all # those users. members_changed = set( - state_key for ev_type, state_key in state_deltas + state_key + for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member ) @@ -1072,7 +1138,7 @@ class EventsStore(EventsWorkerStore): ): txn.executemany( "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] + [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts] ) def _store_event_txn(self, txn, events_and_contexts): |