diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/events/snapshot.py | 3 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 13 | ||||
-rw-r--r-- | synapse/state.py | 143 |
3 files changed, 97 insertions, 62 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 189212b0fa..368b5f6ae4 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -249,7 +249,7 @@ class EventContext(object): @defer.inlineCallbacks def update_state(self, state_group, prev_state_ids, current_state_ids, - delta_ids): + prev_group, delta_ids): """Replace the state in the context """ @@ -260,6 +260,7 @@ class EventContext(object): self.state_group = state_group self._prev_state_ids = prev_state_ids + self.prev_group = prev_group self._current_state_ids = current_state_ids self.delta_ids = delta_ids diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14654d59f1..145c1a21d4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1980,10 +1980,6 @@ class FederationHandler(BaseHandler): current_state_ids.update(state_updates) - if context.delta_ids is not None: - delta_ids = dict(context.delta_ids) - delta_ids.update(state_updates) - prev_state_ids = yield context.get_prev_state_ids(self.store) prev_state_ids = dict(prev_state_ids) @@ -1991,11 +1987,13 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) }) + # create a new state group as a delta from the existing one. + prev_group = context.state_group state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=delta_ids, + prev_group=prev_group, + delta_ids=state_updates, current_state_ids=current_state_ids, ) @@ -2003,7 +2001,8 @@ class FederationHandler(BaseHandler): state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, - delta_ids=delta_ids, + prev_group=prev_group, + delta_ids=state_updates, ) @defer.inlineCallbacks diff --git a/synapse/state.py b/synapse/state.py index 32125c95df..033f55d967 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -471,69 +471,39 @@ class StateResolutionHandler(object): "Resolving state for %s with %d groups", room_id, len(state_groups_ids) ) - # build a map from state key to the event_ids which set that state. - # dict[(str, str), set[str]) - state = {} + # start by assuming we won't have any conflicted state, and build up the new + # state map by iterating through the state groups. If we discover a conflict, + # we give up and instead use `resolve_events_with_factory`. + # + # XXX: is this actually worthwhile, or should we just let + # resolve_events_with_factory do it? + new_state = {} + conflicted_state = False for st in itervalues(state_groups_ids): for key, e_id in iteritems(st): - state.setdefault(key, set()).add(e_id) - - # build a map from state key to the event_ids which set that state, - # including only those where there are state keys in conflict. - conflicted_state = { - k: list(v) - for k, v in iteritems(state) - if len(v) > 1 - } + if key in new_state: + conflicted_state = True + break + new_state[key] = e_id + if conflicted_state: + break if conflicted_state: logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_factory( - list(state_groups_ids.values()), + list(itervalues(state_groups_ids)), event_map=event_map, state_map_factory=state_map_factory, ) - else: - new_state = { - key: e_ids.pop() for key, e_ids in iteritems(state) - } - with Measure(self.clock, "state.create_group_ids"): - # if the new state matches any of the input state groups, we can - # use that state group again. Otherwise we will generate a state_id - # which will be used as a cache key for future resolutions, but - # not get persisted. - state_group = None - new_state_event_ids = frozenset(itervalues(new_state)) - for sg, events in iteritems(state_groups_ids): - if new_state_event_ids == frozenset(e_id for e_id in events): - state_group = sg - break + # if the new state matches any of the input state groups, we can + # use that state group again. Otherwise we will generate a state_id + # which will be used as a cache key for future resolutions, but + # not get persisted. - # TODO: We want to create a state group for this set of events, to - # increase cache hits, but we need to make sure that it doesn't - # end up as a prev_group without being added to the database - - prev_group = None - delta_ids = None - for old_group, old_ids in iteritems(state_groups_ids): - if not set(new_state) - set(old_ids): - n_delta_ids = { - k: v - for k, v in iteritems(new_state) - if old_ids.get(k) != v - } - if not delta_ids or len(n_delta_ids) < len(delta_ids): - prev_group = old_group - delta_ids = n_delta_ids - - cache = _StateCacheEntry( - state=new_state, - state_group=state_group, - prev_group=prev_group, - delta_ids=delta_ids, - ) + with Measure(self.clock, "state.create_group_ids"): + cache = _make_state_cache_entry(new_state, state_groups_ids) if self._state_cache is not None: self._state_cache[group_names] = cache @@ -541,6 +511,70 @@ class StateResolutionHandler(object): defer.returnValue(cache) +def _make_state_cache_entry( + new_state, + state_groups_ids, +): + """Given a resolved state, and a set of input state groups, pick one to base + a new state group on (if any), and return an appropriately-constructed + _StateCacheEntry. + + Args: + new_state (dict[(str, str), str]): resolved state map (mapping from + (type, state_key) to event_id) + + state_groups_ids (dict[int, dict[(str, str), str]]): + map from state group id to the state in that state group + (where 'state' is a map from state key to event id) + + Returns: + _StateCacheEntry + """ + # if the new state matches any of the input state groups, we can + # use that state group again. Otherwise we will generate a state_id + # which will be used as a cache key for future resolutions, but + # not get persisted. + + # first look for exact matches + new_state_event_ids = set(itervalues(new_state)) + for sg, state in iteritems(state_groups_ids): + if len(new_state_event_ids) != len(state): + continue + + old_state_event_ids = set(itervalues(state)) + if new_state_event_ids == old_state_event_ids: + # got an exact match. + return _StateCacheEntry( + state=new_state, + state_group=sg, + ) + + # TODO: We want to create a state group for this set of events, to + # increase cache hits, but we need to make sure that it doesn't + # end up as a prev_group without being added to the database + + # failing that, look for the closest match. + prev_group = None + delta_ids = None + + for old_group, old_state in iteritems(state_groups_ids): + n_delta_ids = { + k: v + for k, v in iteritems(new_state) + if old_state.get(k) != v + } + if not delta_ids or len(n_delta_ids) < len(delta_ids): + prev_group = old_group + delta_ids = n_delta_ids + + return _StateCacheEntry( + state=new_state, + state_group=None, + prev_group=prev_group, + delta_ids=delta_ids, + ) + + def _ordered_events(events): def key_func(e): return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest() @@ -582,7 +616,7 @@ def _seperate(state_sets): with them in different state sets. Args: - state_sets(list[dict[(str, str), str]]): + state_sets(iterable[dict[(str, str), str]]): List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. @@ -596,10 +630,11 @@ def _seperate(state_sets): conflicted_state is a dict mapping (type, state_key) to a set of event ids for conflicted state keys. """ - unconflicted_state = dict(state_sets[0]) + state_set_iterator = iter(state_sets) + unconflicted_state = dict(next(state_set_iterator)) conflicted_state = {} - for state_set in state_sets[1:]: + for state_set in state_set_iterator: for key, value in iteritems(state_set): # Check if there is an unconflicted entry for the state key. unconflicted_value = unconflicted_state.get(key) |