diff options
author | Michael Telatynski <7t3chguy@gmail.com> | 2018-07-24 17:17:46 +0100 |
---|---|---|
committer | Michael Telatynski <7t3chguy@gmail.com> | 2018-07-24 17:17:46 +0100 |
commit | 87951d3891efb5bccedf72c12b3da0d6ab482253 (patch) | |
tree | de7d997567c66c5a4d8743c1f3b9d6b474f5cfd9 /synapse/state.py | |
parent | if inviter_display_name == ""||None then default to inviter MXID (diff) | |
parent | Merge pull request #3595 from matrix-org/erikj/use_deltas (diff) | |
download | synapse-87951d3891efb5bccedf72c12b3da0d6ab482253.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into t3chguy/default_inviter_display_name_3pid
Diffstat (limited to 'synapse/state.py')
-rw-r--r-- | synapse/state.py | 591 |
1 files changed, 403 insertions, 188 deletions
diff --git a/synapse/state.py b/synapse/state.py index 390799fbd5..033f55d967 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -14,23 +14,25 @@ # limitations under the License. +import hashlib +import logging +from collections import namedtuple + +from six import iteritems, iterkeys, itervalues + +from frozendict import frozendict + from twisted.internet import defer from synapse import event_auth -from synapse.util.logutils import log_function -from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.events.snapshot import EventContext from synapse.util.async import Linearizer from synapse.util.caches import CACHE_SIZE_FACTOR - -from collections import namedtuple -from frozendict import frozendict - -import logging -import hashlib +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.logutils import log_function +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -58,7 +60,11 @@ class _StateCacheEntry(object): __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"] def __init__(self, state, state_group, prev_group=None, delta_ids=None): + # dict[(str, str), str] map from (type, state_key) to event_id self.state = frozendict(state) + + # the ID of a state group if one and only one is involved. + # otherwise, None otherwise? self.state_group = state_group self.prev_group = prev_group @@ -81,31 +87,19 @@ class _StateCacheEntry(object): class StateHandler(object): - """ Responsible for doing state conflict resolution. + """Fetches bits of state from the stores, and does state resolution + where necessary """ def __init__(self, hs): self.clock = hs.get_clock() self.store = hs.get_datastore() self.hs = hs - - # dict of set of event_ids -> _StateCacheEntry. - self._state_cache = None - self.resolve_linearizer = Linearizer(name="state_resolve_lock") + self._state_resolution_handler = hs.get_state_resolution_handler() def start_caching(self): - logger.debug("start_caching") - - self._state_cache = ExpiringCache( - cache_name="state_cache", - clock=self.clock, - max_len=SIZE_OF_CACHE, - expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000, - iterable=True, - reset_expiry_on_get=True, - ) - - self._state_cache.start() + # TODO: remove this shim + self._state_resolution_handler.start_caching() @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key="", @@ -127,7 +121,7 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_state") - ret = yield self.resolve_state_groups(room_id, latest_event_ids) + ret = yield self.resolve_state_groups_for_events(room_id, latest_event_ids) state = ret.state if event_type: @@ -138,27 +132,36 @@ class StateHandler(object): defer.returnValue(event) return - state_map = yield self.store.get_events(state.values(), get_prev_content=False) + state_map = yield self.store.get_events(list(state.values()), + get_prev_content=False) state = { - key: state_map[e_id] for key, e_id in state.items() if e_id in state_map + key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map } defer.returnValue(state) @defer.inlineCallbacks - def get_current_state_ids(self, room_id, event_type=None, state_key="", - latest_event_ids=None): + def get_current_state_ids(self, room_id, latest_event_ids=None): + """Get the current state, or the state at a set of events, for a room + + Args: + room_id (str): + + latest_event_ids (iterable[str]|None): if given, the forward + extremities to resolve. If None, we look them up from the + database (via a cache) + + Returns: + Deferred[dict[(str, str), str)]]: the state dict, mapping from + (event_type, state_key) -> event_id + """ if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_state_ids") - ret = yield self.resolve_state_groups(room_id, latest_event_ids) + ret = yield self.resolve_state_groups_for_events(room_id, latest_event_ids) state = ret.state - if event_type: - defer.returnValue(state.get((event_type, state_key))) - return - defer.returnValue(state) @defer.inlineCallbacks @@ -166,7 +169,7 @@ class StateHandler(object): if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_user_in_room") - entry = yield self.resolve_state_groups(room_id, latest_event_ids) + entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids) joined_users = yield self.store.get_joined_users_from_state(room_id, entry) defer.returnValue(joined_users) @@ -175,7 +178,7 @@ class StateHandler(object): if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_hosts_in_room") - entry = yield self.resolve_state_groups(room_id, latest_event_ids) + entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids) joined_hosts = yield self.store.get_joined_hosts(room_id, entry) defer.returnValue(joined_hosts) @@ -183,8 +186,15 @@ class StateHandler(object): def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. + This works out what the current state should be for the event, and + generates a new state group if necessary. + Args: event (synapse.events.EventBase): + old_state (dict|None): The state at the event if it can't be + calculated from existing events. This is normally only specified + when receiving an event from federation where we don't have the + prev events for, e.g. when backfilling. Returns: synapse.events.snapshot.EventContext: """ @@ -193,113 +203,158 @@ class StateHandler(object): # If this is an outlier, then we know it shouldn't have any current # state. Certainly store.get_current_state won't return any, and # persisting the event won't store the state group. - context = EventContext() if old_state: - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): - context.current_state_ids = dict(context.prev_state_ids) + current_state_ids = dict(prev_state_ids) key = (event.type, event.state_key) - context.current_state_ids[key] = event.event_id + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids else: - context.current_state_ids = {} - context.prev_state_ids = {} - context.prev_state_events = [] - context.state_group = self.store.get_next_state_group() + current_state_ids = {} + prev_state_ids = {} + + # We don't store state for outliers, so we don't generate a state + # group for it. + context = EventContext.with_state( + state_group=None, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + ) + defer.returnValue(context) if old_state: - context = EventContext() - context.prev_state_ids = { + # We already have the state, so we don't need to calculate it. + # Let's just correctly fill out the context and create a + # new state group for it. + + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } - context.state_group = self.store.get_next_state_group() if event.is_state(): key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] if replaces != event.event_id: # Paranoia check event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids + + state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=None, + delta_ids=None, + current_state_ids=current_state_ids, + ) + + context = EventContext.with_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + ) - context.prev_state_events = [] defer.returnValue(context) logger.debug("calling resolve_state_groups from compute_event_context") - entry = yield self.resolve_state_groups( + entry = yield self.resolve_state_groups_for_events( event.room_id, [e for e, _ in event.prev_events], ) - curr_state = entry.state + prev_state_ids = entry.state + prev_group = None + delta_ids = None - context = EventContext() - context.prev_state_ids = curr_state if event.is_state(): - context.state_group = self.store.get_next_state_group() + # If this is a state event then we need to create a new state + # group for the state after this event. key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id if entry.state_group: - context.prev_group = entry.state_group - context.delta_ids = { + # If the state at the event has a state group assigned then + # we can use that as the prev group + prev_group = entry.state_group + delta_ids = { key: event.event_id } elif entry.prev_group: - context.prev_group = entry.prev_group - context.delta_ids = dict(entry.delta_ids) - context.delta_ids[key] = event.event_id + # If the state at the event only has a prev group, then we can + # use that as a prev group too. + prev_group = entry.prev_group + delta_ids = dict(entry.delta_ids) + delta_ids[key] = event.event_id + + state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=prev_group, + delta_ids=delta_ids, + current_state_ids=current_state_ids, + ) else: + current_state_ids = prev_state_ids + prev_group = entry.prev_group + delta_ids = entry.delta_ids + if entry.state_group is None: - entry.state_group = self.store.get_next_state_group() + entry.state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=entry.prev_group, + delta_ids=entry.delta_ids, + current_state_ids=current_state_ids, + ) entry.state_id = entry.state_group - context.state_group = entry.state_group - context.current_state_ids = context.prev_state_ids - context.prev_group = entry.prev_group - context.delta_ids = entry.delta_ids + state_group = entry.state_group + + context = EventContext.with_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + ) - context.prev_state_events = [] defer.returnValue(context) @defer.inlineCallbacks - @log_function - def resolve_state_groups(self, room_id, event_ids): + def resolve_state_groups_for_events(self, room_id, event_ids): """ Given a list of event_ids this method fetches the state at each event, resolves conflicts between them and returns them. + Args: + room_id (str): + event_ids (list[str]): + Returns: - a Deferred tuple of (`state_group`, `state`, `prev_state`). - `state_group` is the name of a state group if one and only one is - involved. `state` is a map from (type, state_key) to event, and - `prev_state` is a list of event ids. + Deferred[_StateCacheEntry]: resolved state """ logger.debug("resolve_state_groups event_ids %s", event_ids) + # map from state group id to the state in that state group (where + # 'state' is a map from state key to event id) + # dict[int, dict[(str, str), str]] state_groups_ids = yield self.store.get_state_groups_ids( room_id, event_ids ) - logger.debug( - "resolve_state_groups state_groups %s", - state_groups_ids.keys() - ) - - group_names = frozenset(state_groups_ids.keys()) - if len(group_names) == 1: - name, state_list = state_groups_ids.items().pop() + if len(state_groups_ids) == 1: + name, state_list = list(state_groups_ids.items()).pop() prev_group, delta_ids = yield self.store.get_state_group_delta(name) @@ -310,6 +365,102 @@ class StateHandler(object): delta_ids=delta_ids, )) + result = yield self._state_resolution_handler.resolve_state_groups( + room_id, state_groups_ids, None, self._state_map_factory, + ) + defer.returnValue(result) + + def _state_map_factory(self, ev_ids): + return self.store.get_events( + ev_ids, get_prev_content=False, check_redacted=False, + ) + + def resolve_events(self, state_sets, event): + logger.info( + "Resolving state for %s with %d groups", event.room_id, len(state_sets) + ) + state_set_ids = [{ + (ev.type, ev.state_key): ev.event_id + for ev in st + } for st in state_sets] + + state_map = { + ev.event_id: ev + for st in state_sets + for ev in st + } + + with Measure(self.clock, "state._resolve_events"): + new_state = resolve_events_with_state_map(state_set_ids, state_map) + + new_state = { + key: state_map[ev_id] for key, ev_id in iteritems(new_state) + } + + return new_state + + +class StateResolutionHandler(object): + """Responsible for doing state conflict resolution. + + Note that the storage layer depends on this handler, so all functions must + be storage-independent. + """ + def __init__(self, hs): + self.clock = hs.get_clock() + + # dict of set of event_ids -> _StateCacheEntry. + self._state_cache = None + self.resolve_linearizer = Linearizer(name="state_resolve_lock") + + def start_caching(self): + logger.debug("start_caching") + + self._state_cache = ExpiringCache( + cache_name="state_cache", + clock=self.clock, + max_len=SIZE_OF_CACHE, + expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000, + iterable=True, + reset_expiry_on_get=True, + ) + + self._state_cache.start() + + @defer.inlineCallbacks + @log_function + def resolve_state_groups( + self, room_id, state_groups_ids, event_map, state_map_factory, + ): + """Resolves conflicts between a set of state groups + + Always generates a new state group (unless we hit the cache), so should + not be called for a single state group + + Args: + room_id (str): room we are resolving for (used for logging) + state_groups_ids (dict[int, dict[(str, str), str]]): + map from state group id to the state in that state group + (where 'state' is a map from state key to event id) + + event_map(dict[str,FrozenEvent]|None): + a dict from event_id to event, for any events that we happen to + have in flight (eg, those currently being persisted). This will be + used as a starting point fof finding the state we need; any missing + events will be requested via state_map_factory. + + If None, all events will be fetched via state_map_factory. + + Returns: + Deferred[_StateCacheEntry]: resolved state + """ + logger.debug( + "resolve_state_groups state_groups %s", + state_groups_ids.keys() + ) + + group_names = frozenset(state_groups_ids.keys()) + with (yield self.resolve_linearizer.queue(group_names)): if self._state_cache is not None: cache = self._state_cache.get(group_names, None) @@ -320,112 +471,128 @@ class StateHandler(object): "Resolving state for %s with %d groups", room_id, len(state_groups_ids) ) - state = {} - for st in state_groups_ids.values(): - for key, e_id in st.items(): - state.setdefault(key, set()).add(e_id) - - conflicted_state = { - k: list(v) - for k, v in state.items() - if len(v) > 1 - } + # start by assuming we won't have any conflicted state, and build up the new + # state map by iterating through the state groups. If we discover a conflict, + # we give up and instead use `resolve_events_with_factory`. + # + # XXX: is this actually worthwhile, or should we just let + # resolve_events_with_factory do it? + new_state = {} + conflicted_state = False + for st in itervalues(state_groups_ids): + for key, e_id in iteritems(st): + if key in new_state: + conflicted_state = True + break + new_state[key] = e_id + if conflicted_state: + break if conflicted_state: logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): - new_state = yield resolve_events( - state_groups_ids.values(), - state_map_factory=lambda ev_ids: self.store.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ), + new_state = yield resolve_events_with_factory( + list(itervalues(state_groups_ids)), + event_map=event_map, + state_map_factory=state_map_factory, ) - else: - new_state = { - key: e_ids.pop() for key, e_ids in state.items() - } - state_group = None - new_state_event_ids = frozenset(new_state.values()) - for sg, events in state_groups_ids.items(): - if new_state_event_ids == frozenset(e_id for e_id in events): - state_group = sg - break + # if the new state matches any of the input state groups, we can + # use that state group again. Otherwise we will generate a state_id + # which will be used as a cache key for future resolutions, but + # not get persisted. - # TODO: We want to create a state group for this set of events, to - # increase cache hits, but we need to make sure that it doesn't - # end up as a prev_group without being added to the database - - prev_group = None - delta_ids = None - for old_group, old_ids in state_groups_ids.iteritems(): - if not set(new_state) - set(old_ids): - n_delta_ids = { - k: v - for k, v in new_state.iteritems() - if old_ids.get(k) != v - } - if not delta_ids or len(n_delta_ids) < len(delta_ids): - prev_group = old_group - delta_ids = n_delta_ids - - cache = _StateCacheEntry( - state=new_state, - state_group=state_group, - prev_group=prev_group, - delta_ids=delta_ids, - ) + with Measure(self.clock, "state.create_group_ids"): + cache = _make_state_cache_entry(new_state, state_groups_ids) if self._state_cache is not None: self._state_cache[group_names] = cache defer.returnValue(cache) - def resolve_events(self, state_sets, event): - logger.info( - "Resolving state for %s with %d groups", event.room_id, len(state_sets) - ) - state_set_ids = [{ - (ev.type, ev.state_key): ev.event_id - for ev in st - } for st in state_sets] - state_map = { - ev.event_id: ev - for st in state_sets - for ev in st - } +def _make_state_cache_entry( + new_state, + state_groups_ids, +): + """Given a resolved state, and a set of input state groups, pick one to base + a new state group on (if any), and return an appropriately-constructed + _StateCacheEntry. - with Measure(self.clock, "state._resolve_events"): - new_state = resolve_events(state_set_ids, state_map) + Args: + new_state (dict[(str, str), str]): resolved state map (mapping from + (type, state_key) to event_id) - new_state = { - key: state_map[ev_id] for key, ev_id in new_state.items() - } + state_groups_ids (dict[int, dict[(str, str), str]]): + map from state group id to the state in that state group + (where 'state' is a map from state key to event id) - return new_state + Returns: + _StateCacheEntry + """ + # if the new state matches any of the input state groups, we can + # use that state group again. Otherwise we will generate a state_id + # which will be used as a cache key for future resolutions, but + # not get persisted. + + # first look for exact matches + new_state_event_ids = set(itervalues(new_state)) + for sg, state in iteritems(state_groups_ids): + if len(new_state_event_ids) != len(state): + continue + + old_state_event_ids = set(itervalues(state)) + if new_state_event_ids == old_state_event_ids: + # got an exact match. + return _StateCacheEntry( + state=new_state, + state_group=sg, + ) + + # TODO: We want to create a state group for this set of events, to + # increase cache hits, but we need to make sure that it doesn't + # end up as a prev_group without being added to the database + + # failing that, look for the closest match. + prev_group = None + delta_ids = None + + for old_group, old_state in iteritems(state_groups_ids): + n_delta_ids = { + k: v + for k, v in iteritems(new_state) + if old_state.get(k) != v + } + if not delta_ids or len(n_delta_ids) < len(delta_ids): + prev_group = old_group + delta_ids = n_delta_ids + + return _StateCacheEntry( + state=new_state, + state_group=None, + prev_group=prev_group, + delta_ids=delta_ids, + ) def _ordered_events(events): def key_func(e): - return -int(e.depth), hashlib.sha1(e.event_id).hexdigest() + return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest() return sorted(events, key=key_func) -def resolve_events(state_sets, state_map_factory): +def resolve_events_with_state_map(state_sets, state_map): """ Args: state_sets(list): List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. - state_map_factory(dict|callable): If callable, then will be called - with a list of event_ids that are needed, and should return with - a Deferred of dict of event_id to event. Otherwise, should be - a dict from event_id to event of all events in state_sets. + state_map(dict): a dict from event_id to event, for all events in + state_sets. Returns - dict[(str, str), synapse.events.FrozenEvent] is a map from - (type, state_key) to event. + dict[(str, str), str]: + a map from (type, state_key) to event_id. """ if len(state_sets) == 1: return state_sets[0] @@ -434,13 +601,6 @@ def resolve_events(state_sets, state_map_factory): state_sets, ) - if callable(state_map_factory): - return _resolve_with_state_fac( - unconflicted_state, conflicted_state, state_map_factory - ) - - state_map = state_map_factory - auth_events = _create_auth_events_from_maps( unconflicted_state, conflicted_state, state_map ) @@ -454,12 +614,28 @@ def _seperate(state_sets): """Takes the state_sets and figures out which keys are conflicted and which aren't. i.e., which have multiple different event_ids associated with them in different state sets. + + Args: + state_sets(iterable[dict[(str, str), str]]): + List of dicts of (type, state_key) -> event_id, which are the + different state groups to resolve. + + Returns: + (dict[(str, str), str], dict[(str, str), set[str]]): + A tuple of (unconflicted_state, conflicted_state), where: + + unconflicted_state is a dict mapping (type, state_key)->event_id + for unconflicted state keys. + + conflicted_state is a dict mapping (type, state_key) to a set of + event ids for conflicted state keys. """ - unconflicted_state = dict(state_sets[0]) + state_set_iterator = iter(state_sets) + unconflicted_state = dict(next(state_set_iterator)) conflicted_state = {} - for state_set in state_sets[1:]: - for key, value in state_set.iteritems(): + for state_set in state_set_iterator: + for key, value in iteritems(state_set): # Check if there is an unconflicted entry for the state key. unconflicted_value = unconflicted_state.get(key) if unconflicted_value is None: @@ -484,24 +660,63 @@ def _seperate(state_sets): @defer.inlineCallbacks -def _resolve_with_state_fac(unconflicted_state, conflicted_state, - state_map_factory): +def resolve_events_with_factory(state_sets, event_map, state_map_factory): + """ + Args: + state_sets(list): List of dicts of (type, state_key) -> event_id, + which are the different state groups to resolve. + + event_map(dict[str,FrozenEvent]|None): + a dict from event_id to event, for any events that we happen to + have in flight (eg, those currently being persisted). This will be + used as a starting point fof finding the state we need; any missing + events will be requested via state_map_factory. + + If None, all events will be fetched via state_map_factory. + + state_map_factory(func): will be called + with a list of event_ids that are needed, and should return with + a Deferred of dict of event_id to event. + + Returns + Deferred[dict[(str, str), str]]: + a map from (type, state_key) to event_id. + """ + if len(state_sets) == 1: + defer.returnValue(state_sets[0]) + + unconflicted_state, conflicted_state = _seperate( + state_sets, + ) + needed_events = set( event_id - for event_ids in conflicted_state.itervalues() + for event_ids in itervalues(conflicted_state) for event_id in event_ids ) + if event_map is not None: + needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d conflicted events", len(needed_events)) + # dict[str, FrozenEvent]: a map from state event id to event. Only includes + # the state events which are in conflict (and those in event_map) state_map = yield state_map_factory(needed_events) + if event_map is not None: + state_map.update(event_map) + # get the ids of the auth events which allow us to authenticate the + # conflicted state, picking only from the unconflicting state. + # + # dict[(str, str), str]: a map from state key to event id auth_events = _create_auth_events_from_maps( unconflicted_state, conflicted_state, state_map ) - new_needed_events = set(auth_events.itervalues()) + new_needed_events = set(itervalues(auth_events)) new_needed_events -= needed_events + if event_map is not None: + new_needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d auth events", len(new_needed_events)) @@ -515,7 +730,7 @@ def _resolve_with_state_fac(unconflicted_state, conflicted_state, def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map): auth_events = {} - for event_ids in conflicted_state.itervalues(): + for event_ids in itervalues(conflicted_state): for event_id in event_ids: if event_id in state_map: keys = event_auth.auth_types_for_event(state_map[event_id]) @@ -527,10 +742,10 @@ def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_ma return auth_events -def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ids, +def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids, state_map): conflicted_state = {} - for key, event_ids in conflicted_state_ds.iteritems(): + for key, event_ids in iteritems(conflicted_state_ids): events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map] if len(events) > 1: conflicted_state[key] = events @@ -539,7 +754,7 @@ def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ auth_events = { key: state_map[ev_id] - for key, ev_id in auth_event_ids.items() + for key, ev_id in iteritems(auth_event_ids) if ev_id in state_map } @@ -547,12 +762,12 @@ def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ resolved_state = _resolve_state_events( conflicted_state, auth_events ) - except: + except Exception: logger.exception("Failed to resolve state") raise new_state = unconflicted_state_ids - for key, event in resolved_state.iteritems(): + for key, event in iteritems(resolved_state): new_state[key] = event.event_id return new_state @@ -577,7 +792,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.items(): + for key, events in iteritems(conflicted_state): if key[0] == EventTypes.JoinRules: logger.debug("Resolving conflicted join rules %r", events) resolved_state[key] = _resolve_auth_events( @@ -587,7 +802,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.items(): + for key, events in iteritems(conflicted_state): if key[0] == EventTypes.Member: logger.debug("Resolving conflicted member lists %r", events) resolved_state[key] = _resolve_auth_events( @@ -597,7 +812,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.items(): + for key, events in iteritems(conflicted_state): if key not in resolved_state: logger.debug("Resolving conflicted state %r:%r", key, events) resolved_state[key] = _resolve_normal_events( |