diff options
Diffstat (limited to 'synapse/state.py')
-rw-r--r-- | synapse/state.py | 92 |
1 files changed, 43 insertions, 49 deletions
diff --git a/synapse/state.py b/synapse/state.py index 5dcff27367..e69282860a 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -134,7 +134,9 @@ class StateHandler(object): @defer.inlineCallbacks @log_function def _handle_new_state(self, new_pdu): - tree = yield self.store.get_unresolved_state_tree(new_pdu) + tree, missing_branch = yield self.store.get_unresolved_state_tree( + new_pdu + ) new_branch, current_branch = tree logger.debug( @@ -142,6 +144,28 @@ class StateHandler(object): new_branch, current_branch ) + if missing_branch is not None: + # We're missing some PDUs. Fetch them. + # TODO (erikj): Limit this. + missing_prev = tree[missing_branch][-1] + + pdu_id = missing_prev.prev_state_id + origin = missing_prev.prev_state_origin + + is_missing = yield self.store.get_pdu(pdu_id, origin) is None + if not is_missing: + raise Exception("Conflict resolution failed") + + yield self._replication.get_pdu( + destination=missing_prev.origin, + pdu_origin=origin, + pdu_id=pdu_id, + outlier=True + ) + + updated_current = yield self._handle_new_state(new_pdu) + defer.returnValue(updated_current) + if not current_branch: # There is no current state defer.returnValue(True) @@ -151,65 +175,35 @@ class StateHandler(object): c = current_branch[-1] if n.pdu_id == c.pdu_id and n.origin == c.origin: - # We have all the PDUs we need, so we can just do the conflict - # resolution. + # We found a common ancestor! if len(current_branch) == 1: # This is a direct clobber so we can just... defer.returnValue(True) - conflict_res = [ - self._do_power_level_conflict_res, - self._do_chain_length_conflict_res, - self._do_hash_conflict_res, - ] - - for algo in conflict_res: - new_res, curr_res = algo(new_branch, current_branch) - - if new_res < curr_res: - defer.returnValue(False) - elif new_res > curr_res: - defer.returnValue(True) - - raise Exception("Conflict resolution failed.") - else: - # We need to ask for PDUs. - missing_prev = max( - new_branch[-1], current_branch[-1], - key=lambda x: x.depth - ) - - if not hasattr(missing_prev, "prev_state_id"): - # FIXME Hmm - # temporary fallback - for algo in conflict_res: - new_res, curr_res = algo(new_branch, current_branch) - - if new_res < curr_res: - defer.returnValue(False) - elif new_res > curr_res: - defer.returnValue(True) - return + # We didn't find a common ancestor. This is probably fine. + pass - pdu_id = missing_prev.prev_state_id - origin = missing_prev.prev_state_origin + result = self._do_conflict_res(new_branch, current_branch) + defer.returnValue(result) - is_missing = yield self.store.get_pdu(pdu_id, origin) is None + def _do_conflict_res(self, new_branch, current_branch): + conflict_res = [ + self._do_power_level_conflict_res, + self._do_chain_length_conflict_res, + self._do_hash_conflict_res, + ] - if not is_missing: - raise Exception("Conflict resolution failed.") + for algo in conflict_res: + new_res, curr_res = algo(new_branch, current_branch) - yield self._replication.get_pdu( - destination=missing_prev.origin, - pdu_origin=origin, - pdu_id=pdu_id, - outlier=True - ) + if new_res < curr_res: + defer.returnValue(False) + elif new_res > curr_res: + defer.returnValue(True) - updated_current = yield self._handle_new_state(new_pdu) - defer.returnValue(updated_current) + raise Exception("Conflict resolution failed.") def _do_power_level_conflict_res(self, new_branch, current_branch): max_power_new = max( |