From d787e41b2046572d42c13b6aa9b4636f98f7f9e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 14:44:48 +0000 Subject: Measure StateHandler._resolve_events --- synapse/state.py | 78 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 40 insertions(+), 38 deletions(-) (limited to 'synapse/state.py') diff --git a/synapse/state.py b/synapse/state.py index b9a1387520..e096329721 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.auth import AuthEventTypes @@ -263,48 +264,49 @@ class StateHandler(object): from (type, state_key) to event. prev_states is a list of event_ids. :rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str]) """ - state = {} - for st in state_sets: - for e in st: - state.setdefault( - (e.type, e.state_key), - {} - )[e.event_id] = e - - unconflicted_state = { - k: v.values()[0] for k, v in state.items() - if len(v.values()) == 1 - } - - conflicted_state = { - k: v.values() - for k, v in state.items() - if len(v.values()) > 1 - } + with Measure(self.clock, "state._resolve_events"): + state = {} + for st in state_sets: + for e in st: + state.setdefault( + (e.type, e.state_key), + {} + )[e.event_id] = e + + unconflicted_state = { + k: v.values()[0] for k, v in state.items() + if len(v.values()) == 1 + } - if event_type: - prev_states_events = conflicted_state.get( - (event_type, state_key), [] - ) - prev_states = [s.event_id for s in prev_states_events] - else: - prev_states = [] + conflicted_state = { + k: v.values() + for k, v in state.items() + if len(v.values()) > 1 + } - auth_events = { - k: e for k, e in unconflicted_state.items() - if k[0] in AuthEventTypes - } + if event_type: + prev_states_events = conflicted_state.get( + (event_type, state_key), [] + ) + prev_states = [s.event_id for s in prev_states_events] + else: + prev_states = [] - try: - resolved_state = self._resolve_state_events( - conflicted_state, auth_events - ) - except: - logger.exception("Failed to resolve state") - raise + auth_events = { + k: e for k, e in unconflicted_state.items() + if k[0] in AuthEventTypes + } + + try: + resolved_state = self._resolve_state_events( + conflicted_state, auth_events + ) + except: + logger.exception("Failed to resolve state") + raise - new_state = unconflicted_state - new_state.update(resolved_state) + new_state = unconflicted_state + new_state.update(resolved_state) return new_state, prev_states -- cgit 1.4.1 From 99f929f36b396b7152b3840c11e8debc5505f673 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 15:31:13 +0000 Subject: Make StateHandler._state_cache only store event_ids. --- synapse/state.py | 24 +++++++++++++++++------- synapse/storage/events.py | 25 +++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) (limited to 'synapse/state.py') diff --git a/synapse/state.py b/synapse/state.py index e096329721..9d90a437d3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -28,6 +28,7 @@ from collections import namedtuple import logging import hashlib +import os logger = logging.getLogger(__name__) @@ -35,8 +36,11 @@ logger = logging.getLogger(__name__) KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) -SIZE_OF_CACHE = 1000 -EVICTION_TIMEOUT_SECONDS = 20 +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) + + +SIZE_OF_CACHE = int(5000 * CACHE_SIZE_FACTOR) +EVICTION_TIMEOUT_SECONDS = 60 * 60 class _StateCacheEntry(object): @@ -92,7 +96,9 @@ class StateHandler(object): if cache: cache.ts = self.clock.time_msec() - state = cache.state + + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} else: res = yield self.resolve_state_groups(room_id, event_ids) state = res[1] @@ -191,14 +197,18 @@ class StateHandler(object): cache = self._state_cache.get(frozenset(event_ids), None) if cache and cache.state_group: cache.ts = self.clock.time_msec() - prev_state = cache.state.get((event_type, state_key), None) + + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} + + prev_state = state.get((event_type, state_key), None) if prev_state: prev_state = prev_state.event_id prev_states = [prev_state] else: prev_states = [] defer.returnValue( - (cache.state_group, cache.state, prev_states) + (cache.state_group, state, prev_states) ) state_groups = yield self.store.get_state_groups( @@ -226,7 +236,7 @@ class StateHandler(object): if self._state_cache is not None: cache = _StateCacheEntry( - state=state, + state={key: event.event_id for key, event in state.items()}, state_group=name, ts=self.clock.time_msec() ) @@ -241,7 +251,7 @@ class StateHandler(object): if self._state_cache is not None: cache = _StateCacheEntry( - state=new_state, + state={key: event.event_id for key, event in new_state.items()}, state_group=None, ts=self.clock.time_msec() ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e444b64cee..584e659d4a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -151,6 +151,31 @@ class EventsStore(SQLBaseStore): defer.returnValue(events[0] if events else None) + @defer.inlineCallbacks + def get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + """Get events from the database + + Args: + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred : Dict from event_id to event. + """ + events = yield self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + defer.returnValue({e.event_id: e for e in events}) + @log_function def _persist_event_txn(self, txn, event, context, is_new_state=True, current_state=None): -- cgit 1.4.1 From d531ebcb57de61bad0ac2e4231280d41d8db4404 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 18:02:36 +0000 Subject: Key StateHandler._state_cache off of state groups --- synapse/state.py | 61 +++++++++++++++++++------------------------------------- 1 file changed, 21 insertions(+), 40 deletions(-) (limited to 'synapse/state.py') diff --git a/synapse/state.py b/synapse/state.py index 9d90a437d3..14c0430017 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -90,18 +90,8 @@ class StateHandler(object): """ event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - cache = None - if self._state_cache is not None: - cache = self._state_cache.get(frozenset(event_ids), None) - - if cache: - cache.ts = self.clock.time_msec() - - event_dict = yield self.store.get_events(cache.state.values()) - state = {(e.type, e.state_key): e for e in event_dict.values()} - else: - res = yield self.resolve_state_groups(room_id, event_ids) - state = res[1] + res = yield self.resolve_state_groups(room_id, event_ids) + state = res[1] if event_type: defer.returnValue(state.get((event_type, state_key))) @@ -193,24 +183,6 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) - if self._state_cache is not None: - cache = self._state_cache.get(frozenset(event_ids), None) - if cache and cache.state_group: - cache.ts = self.clock.time_msec() - - event_dict = yield self.store.get_events(cache.state.values()) - state = {(e.type, e.state_key): e for e in event_dict.values()} - - prev_state = state.get((event_type, state_key), None) - if prev_state: - prev_state = prev_state.event_id - prev_states = [prev_state] - else: - prev_states = [] - defer.returnValue( - (cache.state_group, state, prev_states) - ) - state_groups = yield self.store.get_state_groups( room_id, event_ids ) @@ -220,7 +192,7 @@ class StateHandler(object): state_groups.keys() ) - group_names = set(state_groups.keys()) + group_names = frozenset(state_groups.keys()) if len(group_names) == 1: name, state_list = state_groups.items().pop() state = { @@ -234,16 +206,25 @@ class StateHandler(object): else: prev_states = [] - if self._state_cache is not None: - cache = _StateCacheEntry( - state={key: event.event_id for key, event in state.items()}, - state_group=name, - ts=self.clock.time_msec() - ) + defer.returnValue((name, state, prev_states)) + + if self._state_cache is not None: + cache = self._state_cache.get(group_names, None) + if cache and cache.state_group: + cache.ts = self.clock.time_msec() - self._state_cache[frozenset(event_ids)] = cache + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} - defer.returnValue((name, state, prev_states)) + prev_state = state.get((event_type, state_key), None) + if prev_state: + prev_state = prev_state.event_id + prev_states = [prev_state] + else: + prev_states = [] + defer.returnValue( + (cache.state_group, state, prev_states) + ) new_state, prev_states = self._resolve_events( state_groups.values(), event_type, state_key @@ -256,7 +237,7 @@ class StateHandler(object): ts=self.clock.time_msec() ) - self._state_cache[frozenset(event_ids)] = cache + self._state_cache[group_names] = cache defer.returnValue((None, new_state, prev_states)) -- cgit 1.4.1 From 9e2e994395327956f846113566fd18c01f12441a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 09:28:07 +0000 Subject: Reduce cache size --- synapse/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/state.py') diff --git a/synapse/state.py b/synapse/state.py index 14c0430017..41d32e664a 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -39,7 +39,7 @@ KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) -SIZE_OF_CACHE = int(5000 * CACHE_SIZE_FACTOR) +SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR) EVICTION_TIMEOUT_SECONDS = 60 * 60 -- cgit 1.4.1