diff options
author | Erik Johnston <erikj@jki.re> | 2016-09-01 15:58:19 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-01 15:58:19 +0100 |
commit | c1c38da5867bf0a0eae18bab9940e92b14943fc3 (patch) | |
tree | 7987f31945df6ac73c1714d26bb96db78234fb89 /synapse/state.py | |
parent | Fix typo in log line (diff) | |
parent | Linearize state resolution to help caches (diff) | |
download | synapse-c1c38da5867bf0a0eae18bab9940e92b14943fc3.tar.xz |
Merge pull request #1061 from matrix-org/erikj/linearize_resolution
Linearize state resolution to help caches
Diffstat (limited to 'synapse/state.py')
-rw-r--r-- | synapse/state.py | 115 |
1 files changed, 59 insertions, 56 deletions
diff --git a/synapse/state.py b/synapse/state.py index b31bbcdbd2..cd792afed1 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.auth import AuthEventTypes from synapse.events.snapshot import EventContext +from synapse.util.async import Linearizer from collections import namedtuple @@ -84,6 +85,7 @@ class StateHandler(object): # dict of set of event_ids -> _StateCacheEntry. self._state_cache = None + self.resolve_linearizer = Linearizer() def start_caching(self): logger.debug("start_caching") @@ -283,69 +285,70 @@ class StateHandler(object): state_group=name, )) - if self._state_cache is not None: - cache = self._state_cache.get(group_names, None) - if cache: - defer.returnValue(cache) + with (yield self.resolve_linearizer.queue(group_names)): + if self._state_cache is not None: + cache = self._state_cache.get(group_names, None) + if cache: + defer.returnValue(cache) - logger.info( - "Resolving state for %s with %d groups", room_id, len(state_groups_ids) - ) - - state = {} - for st in state_groups_ids.values(): - for key, e_id in st.items(): - state.setdefault(key, set()).add(e_id) + logger.info( + "Resolving state for %s with %d groups", room_id, len(state_groups_ids) + ) - conflicted_state = { - k: list(v) - for k, v in state.items() - if len(v) > 1 - } + state = {} + for st in state_groups_ids.values(): + for key, e_id in st.items(): + state.setdefault(key, set()).add(e_id) - if conflicted_state: - logger.info("Resolving conflicted state for %r", room_id) - state_map = yield self.store.get_events( - [e_id for st in state_groups_ids.values() for e_id in st.values()], - get_prev_content=False - ) - state_sets = [ - [state_map[e_id] for key, e_id in st.items() if e_id in state_map] - for st in state_groups_ids.values() - ] - new_state, _ = self._resolve_events( - state_sets, event_type, state_key - ) - new_state = { - key: e.event_id for key, e in new_state.items() - } - else: - new_state = { - key: e_ids.pop() for key, e_ids in state.items() + conflicted_state = { + k: list(v) + for k, v in state.items() + if len(v) > 1 } - state_group = None - new_state_event_ids = frozenset(new_state.values()) - for sg, events in state_groups_ids.items(): - if new_state_event_ids == frozenset(e_id for e_id in events): - state_group = sg - break - if state_group is None: - # Worker instances don't have access to this method, but we want - # to set the state_group on the main instance to increase cache - # hits. - if hasattr(self.store, "get_next_state_group"): - state_group = self.store.get_next_state_group() - - cache = _StateCacheEntry( - state=new_state, - state_group=state_group, - ) + if conflicted_state: + logger.info("Resolving conflicted state for %r", room_id) + state_map = yield self.store.get_events( + [e_id for st in state_groups_ids.values() for e_id in st.values()], + get_prev_content=False + ) + state_sets = [ + [state_map[e_id] for key, e_id in st.items() if e_id in state_map] + for st in state_groups_ids.values() + ] + new_state, _ = self._resolve_events( + state_sets, event_type, state_key + ) + new_state = { + key: e.event_id for key, e in new_state.items() + } + else: + new_state = { + key: e_ids.pop() for key, e_ids in state.items() + } + + state_group = None + new_state_event_ids = frozenset(new_state.values()) + for sg, events in state_groups_ids.items(): + if new_state_event_ids == frozenset(e_id for e_id in events): + state_group = sg + break + if state_group is None: + # Worker instances don't have access to this method, but we want + # to set the state_group on the main instance to increase cache + # hits. + if hasattr(self.store, "get_next_state_group"): + state_group = self.store.get_next_state_group() + + cache = _StateCacheEntry( + state=new_state, + state_group=state_group, + ) - if self._state_cache is not None: - self._state_cache[group_names] = cache + if self._state_cache is not None: + self._state_cache[group_names] = cache - defer.returnValue(cache) + defer.returnValue(cache) def resolve_events(self, state_sets, event): logger.info( |