diff options
author | Erik Johnston <erikj@jki.re> | 2016-09-07 09:39:58 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-07 09:39:58 +0100 |
commit | 94a83b534f0c482a4faa6f33433126615f14401a (patch) | |
tree | fd9f61ff3907fba599d387b5b70c5b6ec16ce9c6 /synapse/state.py | |
parent | Merge pull request #1073 from matrix-org/erikj/presence_fiddle (diff) | |
parent | Scale the batch size so that we're not bitten by the minimum (diff) | |
download | synapse-94a83b534f0c482a4faa6f33433126615f14401a.tar.xz |
Merge pull request #1065 from matrix-org/erikj/state_storage
Move to storing state_groups_state as deltas
Diffstat (limited to 'synapse/state.py')
-rw-r--r-- | synapse/state.py | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/synapse/state.py b/synapse/state.py index cd792afed1..4520fa0415 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -55,12 +55,15 @@ def _gen_state_id(): class _StateCacheEntry(object): - __slots__ = ["state", "state_group", "state_id"] + __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"] - def __init__(self, state, state_group): + def __init__(self, state, state_group, prev_group=None, delta_ids=None): self.state = state self.state_group = state_group + self.prev_group = prev_group + self.delta_ids = delta_ids + # The `state_id` is a unique ID we generate that can be used as ID for # this collection of state. Usually this would be the same as the # state group, but on worker instances we can't generate a new state @@ -245,11 +248,20 @@ class StateHandler(object): if key in context.prev_state_ids: replaces = context.prev_state_ids[key] event.unsigned["replaces_state"] = replaces + context.current_state_ids = dict(context.prev_state_ids) context.current_state_ids[key] = event.event_id + + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + if context.delta_ids is not None: + context.delta_ids[key] = event.event_id else: context.current_state_ids = context.prev_state_ids + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + context.prev_state_events = [] defer.returnValue(context) @@ -283,6 +295,8 @@ class StateHandler(object): defer.returnValue(_StateCacheEntry( state=state_list, state_group=name, + prev_group=name, + delta_ids={}, )) with (yield self.resolve_linearizer.queue(group_names)): @@ -340,9 +354,24 @@ class StateHandler(object): if hasattr(self.store, "get_next_state_group"): state_group = self.store.get_next_state_group() + prev_group = None + delta_ids = None + for old_group, old_ids in state_groups_ids.items(): + if not set(new_state.iterkeys()) - set(old_ids.iterkeys()): + n_delta_ids = { + k: v + for k, v in new_state.items() + if old_ids.get(k) != v + } + if not delta_ids or len(n_delta_ids) < len(delta_ids): + prev_group = old_group + delta_ids = n_delta_ids + cache = _StateCacheEntry( state=new_state, state_group=state_group, + prev_group=prev_group, + delta_ids=delta_ids, ) if self._state_cache is not None: |