diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/state.py | 92 | ||||
-rw-r--r-- | synapse/storage/pdu.py | 9 |
2 files changed, 49 insertions, 52 deletions
diff --git a/synapse/state.py b/synapse/state.py index 5dcff27367..e69282860a 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -134,7 +134,9 @@ class StateHandler(object): @defer.inlineCallbacks @log_function def _handle_new_state(self, new_pdu): - tree = yield self.store.get_unresolved_state_tree(new_pdu) + tree, missing_branch = yield self.store.get_unresolved_state_tree( + new_pdu + ) new_branch, current_branch = tree logger.debug( @@ -142,6 +144,28 @@ class StateHandler(object): new_branch, current_branch ) + if missing_branch is not None: + # We're missing some PDUs. Fetch them. + # TODO (erikj): Limit this. + missing_prev = tree[missing_branch][-1] + + pdu_id = missing_prev.prev_state_id + origin = missing_prev.prev_state_origin + + is_missing = yield self.store.get_pdu(pdu_id, origin) is None + if not is_missing: + raise Exception("Conflict resolution failed") + + yield self._replication.get_pdu( + destination=missing_prev.origin, + pdu_origin=origin, + pdu_id=pdu_id, + outlier=True + ) + + updated_current = yield self._handle_new_state(new_pdu) + defer.returnValue(updated_current) + if not current_branch: # There is no current state defer.returnValue(True) @@ -151,65 +175,35 @@ class StateHandler(object): c = current_branch[-1] if n.pdu_id == c.pdu_id and n.origin == c.origin: - # We have all the PDUs we need, so we can just do the conflict - # resolution. + # We found a common ancestor! if len(current_branch) == 1: # This is a direct clobber so we can just... defer.returnValue(True) - conflict_res = [ - self._do_power_level_conflict_res, - self._do_chain_length_conflict_res, - self._do_hash_conflict_res, - ] - - for algo in conflict_res: - new_res, curr_res = algo(new_branch, current_branch) - - if new_res < curr_res: - defer.returnValue(False) - elif new_res > curr_res: - defer.returnValue(True) - - raise Exception("Conflict resolution failed.") - else: - # We need to ask for PDUs. - missing_prev = max( - new_branch[-1], current_branch[-1], - key=lambda x: x.depth - ) - - if not hasattr(missing_prev, "prev_state_id"): - # FIXME Hmm - # temporary fallback - for algo in conflict_res: - new_res, curr_res = algo(new_branch, current_branch) - - if new_res < curr_res: - defer.returnValue(False) - elif new_res > curr_res: - defer.returnValue(True) - return + # We didn't find a common ancestor. This is probably fine. + pass - pdu_id = missing_prev.prev_state_id - origin = missing_prev.prev_state_origin + result = self._do_conflict_res(new_branch, current_branch) + defer.returnValue(result) - is_missing = yield self.store.get_pdu(pdu_id, origin) is None + def _do_conflict_res(self, new_branch, current_branch): + conflict_res = [ + self._do_power_level_conflict_res, + self._do_chain_length_conflict_res, + self._do_hash_conflict_res, + ] - if not is_missing: - raise Exception("Conflict resolution failed.") + for algo in conflict_res: + new_res, curr_res = algo(new_branch, current_branch) - yield self._replication.get_pdu( - destination=missing_prev.origin, - pdu_origin=origin, - pdu_id=pdu_id, - outlier=True - ) + if new_res < curr_res: + defer.returnValue(False) + elif new_res > curr_res: + defer.returnValue(True) - updated_current = yield self._handle_new_state(new_pdu) - defer.returnValue(updated_current) + raise Exception("Conflict resolution failed.") def _do_power_level_conflict_res(self, new_branch, current_branch): max_power_new = max( diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 0bf97e37ee..3cbce2d0a1 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -308,8 +308,8 @@ class PduStore(SQLBaseStore): @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): - """Get a list of Pdus that we haven't backfilled beyond yet (and haven't - seen). This list is used when we want to backfill backwards and is the + """Get a list of Pdus that we haven't backfilled beyond yet (and havent + seen). This list is used when we want to backfill backwards and is the list we send to the remote server. Args: @@ -524,13 +524,16 @@ class StatePduStore(SQLBaseStore): txn, new_pdu, current ) + missing_branch = None for branch, prev_state, state in enum_branches: if state: return_value[branch].append(state) else: + # We don't have prev_state :( + missing_branch = branch break - return return_value + return (return_value, missing_branch) def update_current_state(self, pdu_id, origin, context, pdu_type, state_key): |