diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-09-08 13:43:43 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-09-08 13:43:43 +0100 |
commit | fa9d36e050d942e76b2bd6a6d2c190215a1dac39 (patch) | |
tree | 806152a98d66013b94882c85cb1cb6129be62e78 /synapse/state.py | |
parent | Add a new method to enqueue the device messages rather than sending a dummy EDU (diff) | |
parent | Log delta files we're applying (diff) | |
download | synapse-fa9d36e050d942e76b2bd6a6d2c190215a1dac39.tar.xz |
Merge branch 'develop' into markjh/direct_to_device_federation
Diffstat (limited to 'synapse/state.py')
-rw-r--r-- | synapse/state.py | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/synapse/state.py b/synapse/state.py index cd792afed1..4520fa0415 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -55,12 +55,15 @@ def _gen_state_id(): class _StateCacheEntry(object): - __slots__ = ["state", "state_group", "state_id"] + __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"] - def __init__(self, state, state_group): + def __init__(self, state, state_group, prev_group=None, delta_ids=None): self.state = state self.state_group = state_group + self.prev_group = prev_group + self.delta_ids = delta_ids + # The `state_id` is a unique ID we generate that can be used as ID for # this collection of state. Usually this would be the same as the # state group, but on worker instances we can't generate a new state @@ -245,11 +248,20 @@ class StateHandler(object): if key in context.prev_state_ids: replaces = context.prev_state_ids[key] event.unsigned["replaces_state"] = replaces + context.current_state_ids = dict(context.prev_state_ids) context.current_state_ids[key] = event.event_id + + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + if context.delta_ids is not None: + context.delta_ids[key] = event.event_id else: context.current_state_ids = context.prev_state_ids + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + context.prev_state_events = [] defer.returnValue(context) @@ -283,6 +295,8 @@ class StateHandler(object): defer.returnValue(_StateCacheEntry( state=state_list, state_group=name, + prev_group=name, + delta_ids={}, )) with (yield self.resolve_linearizer.queue(group_names)): @@ -340,9 +354,24 @@ class StateHandler(object): if hasattr(self.store, "get_next_state_group"): state_group = self.store.get_next_state_group() + prev_group = None + delta_ids = None + for old_group, old_ids in state_groups_ids.items(): + if not set(new_state.iterkeys()) - set(old_ids.iterkeys()): + n_delta_ids = { + k: v + for k, v in new_state.items() + if old_ids.get(k) != v + } + if not delta_ids or len(n_delta_ids) < len(delta_ids): + prev_group = old_group + delta_ids = n_delta_ids + cache = _StateCacheEntry( state=new_state, state_group=state_group, + prev_group=prev_group, + delta_ids=delta_ids, ) if self._state_cache is not None: |