From 8c652a2b5f4ed07abd682543ad6a365c6d50b8d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Feb 2015 14:20:05 +0000 Subject: When we see a difference in current state, actually use state conflict resolution algorithm --- synapse/handlers/federation.py | 34 +++++++++++++++++++++++++++++++++ synapse/state.py | 43 ++++++++++++++++++++++++++++-------------- 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 77c81fe2da..5b225ffb57 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -858,6 +858,40 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.debug("Different auth: %s", different_auth) + different_events = yield defer.gatherResults( + [ + self.store.get_event( + d, + allow_none=True, + allow_rejected=False, + ) + for d in different_auth + if d in have_events and not have_events[d] + ], + consumeErrors=True + ) + + if different_events: + local_view = dict(auth_events) + remote_view = dict(auth_events) + remote_view.update({ + (d.type, d.state_key) for d in different_events + }) + + new_state, _ = self.state.resolve_events( + [local_view, remote_view], + event + ) + + auth_events.update(new_state) + + current_state = set(e.event_id for e in auth_events.values()) + different_auth = event_auth_events - current_state + + context.current_state.update(auth_events) + context.state_group = None + + if different_auth and not event.internal_metadata.is_outlier(): # Only do auth resolution if we have something new to say. # We can't rove an auth failure. do_resolution = False diff --git a/synapse/state.py b/synapse/state.py index 98aaa2be53..fe5f3dc84b 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -259,13 +259,37 @@ class StateHandler(object): defer.returnValue((name, state, prev_states)) + new_state, prev_states = self._resolve_events( + state_groups.values(), event_type, state_key + ) + + if self._state_cache is not None: + cache = _StateCacheEntry( + state=new_state, + state_group=None, + ts=self.clock.time_msec() + ) + + self._state_cache[frozenset(event_ids)] = cache + + defer.returnValue((None, new_state, prev_states)) + + def resolve_events(self, state_sets, event): + if event.is_state(): + return self._resolve_events( + state_sets, event.type, event.state_key + ) + else: + return self._resolve_events(state_sets) + + def _resolve_events(self, state_sets, event_type=None, state_key=""): state = {} - for group, g_state in state_groups.items(): - for s in g_state: + for st in state_sets: + for e in st: state.setdefault( - (s.type, s.state_key), + (e.type, e.state_key), {} - )[s.event_id] = s + )[e.event_id] = e unconflicted_state = { k: v.values()[0] for k, v in state.items() @@ -302,16 +326,7 @@ class StateHandler(object): new_state = unconflicted_state new_state.update(resolved_state) - if self._state_cache is not None: - cache = _StateCacheEntry( - state=new_state, - state_group=None, - ts=self.clock.time_msec() - ) - - self._state_cache[frozenset(event_ids)] = cache - - defer.returnValue((None, new_state, prev_states)) + return new_state, prev_states @log_function def _resolve_state_events(self, conflicted_state, auth_events): -- cgit 1.4.1