diff options
author | Erik Johnston <erik@matrix.org> | 2018-02-16 11:16:29 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-03-14 16:41:20 +0000 |
commit | 2ad7c767eacd9067593cc6e7a4816cf298203401 (patch) | |
tree | f47c138b66ba7832fae6a5b88b6ba9e32a64f797 | |
parent | Merge pull request #2995 from matrix-org/erikj/enable_membership_worker (diff) | |
download | synapse-2ad7c767eacd9067593cc6e7a4816cf298203401.tar.xz |
Refactor event storage to not require state
This is in preparation for using contexts that may or may not have the current_state_ids set. This will allow us to avoid unnecessarily pulling out state for an event on the master process when using workers.
-rw-r--r-- | synapse/storage/events.py | 73 |
1 files changed, 40 insertions, 33 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3890878170..3259a58c12 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,6 +19,7 @@ from synapse.storage.events_worker import EventsWorkerStore from twisted.internet import defer from synapse.events import USE_FROZEN_DICTS +from synapse.events.snapshot import EventContext from synapse.util.async import ObservableDeferred from synapse.util.logcontext import ( @@ -42,7 +43,6 @@ import ujson as json # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 -from synapse.events.snapshot import EventContext # noqa: F401 logger = logging.getLogger(__name__) @@ -504,61 +504,68 @@ class EventsStore(EventsWorkerStore): defer.returnValue({}) # map from state_group to ((type, key) -> event_id) state map - state_groups = {} - missing_event_ids = [] - was_updated = False + state_groups_map = {} + for ev, ctx in events_context: + if ctx.state_group is None: + # I don't think this can happen, but let's double-check + raise Exception( + "Context for new extremity event %s has no state " + "group" % (ev.event_id, ), + ) + + if ctx.state_group in state_groups_map: + continue + + if isinstance(ctx, EventContext) and ctx.current_state_ids: + state_groups_map[ctx.state_group] = ctx.current_state_ids + + # We need to map new_latest_event_ids to their state groups. First, lets + # check if the event is one we're persisting and then we can pull the + # state group from its context. + # Otherwise we need to pull the state group from the database. + missing_event_ids = [] # List of events we need to fetch groups for + state_groups_to_resolve = set() # State groups of new_latest_event_ids for event_id in new_latest_event_ids: - # First search in the list of new events we're adding, - # and then use the current state from that + # First search in the list of new events we're adding. for ev, ctx in events_context: if event_id == ev.event_id: - if ctx.current_state_ids is None: - raise Exception("Unknown current state") - - if ctx.state_group is None: - # I don't think this can happen, but let's double-check - raise Exception( - "Context for new extremity event %s has no state " - "group" % (event_id, ), - ) - - # If we've already seen the state group don't bother adding - # it to the state sets again - if ctx.state_group not in state_groups: - state_groups[ctx.state_group] = ctx.current_state_ids - if ctx.delta_ids or hasattr(ev, "state_key"): - was_updated = True + state_groups_to_resolve.add(ctx.state_group) break else: # If we couldn't find it, then we'll need to pull # the state from the database - was_updated = True missing_event_ids.append(event_id) - if not was_updated: - return - if missing_event_ids: # Now pull out the state for any missing events from DB event_to_groups = yield self._get_state_group_for_events( missing_event_ids, ) + state_groups_to_resolve.update(event_to_groups.itervalues()) - groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys()) - - if groups: - group_to_state = yield self._get_state_for_groups(groups) - state_groups.update(group_to_state) + # Now that we have calculated state_groups_to_resolve we need to get + # their state so we can resolve to a single state set. + missing_state = state_groups_to_resolve - set(state_groups_map) + if missing_state: + group_to_state = yield self._get_state_for_groups(missing_state) + state_groups_map.update(group_to_state) - if len(state_groups) == 1: + if len(state_groups_to_resolve) == 1: # If there is only one state group, then we know what the current # state is. - defer.returnValue(state_groups.values()[0]) + defer.returnValue(state_groups_map[state_groups_to_resolve.pop()]) + + # Ok, we need to defer to the state handler to resolve our state sets. def get_events(ev_ids): return self.get_events( ev_ids, get_prev_content=False, check_redacted=False, ) + + state_groups = { + sg: state_groups_map[sg] for sg in state_groups_to_resolve + } + events_map = {ev.event_id: ev for ev, _ in events_context} logger.debug("calling resolve_state_groups from preserve_events") res = yield self._state_resolution_handler.resolve_state_groups( |