diff options
-rw-r--r-- | synapse/state.py | 65 | ||||
-rw-r--r-- | synapse/storage/events.py | 25 | ||||
-rw-r--r-- | synapse/util/caches/lrucache.py | 73 |
3 files changed, 94 insertions, 69 deletions
diff --git a/synapse/state.py b/synapse/state.py index e096329721..41d32e664a 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -28,6 +28,7 @@ from collections import namedtuple import logging import hashlib +import os logger = logging.getLogger(__name__) @@ -35,8 +36,11 @@ logger = logging.getLogger(__name__) KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) -SIZE_OF_CACHE = 1000 -EVICTION_TIMEOUT_SECONDS = 20 +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) + + +SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR) +EVICTION_TIMEOUT_SECONDS = 60 * 60 class _StateCacheEntry(object): @@ -86,16 +90,8 @@ class StateHandler(object): """ event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - cache = None - if self._state_cache is not None: - cache = self._state_cache.get(frozenset(event_ids), None) - - if cache: - cache.ts = self.clock.time_msec() - state = cache.state - else: - res = yield self.resolve_state_groups(room_id, event_ids) - state = res[1] + res = yield self.resolve_state_groups(room_id, event_ids) + state = res[1] if event_type: defer.returnValue(state.get((event_type, state_key))) @@ -187,20 +183,6 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) - if self._state_cache is not None: - cache = self._state_cache.get(frozenset(event_ids), None) - if cache and cache.state_group: - cache.ts = self.clock.time_msec() - prev_state = cache.state.get((event_type, state_key), None) - if prev_state: - prev_state = prev_state.event_id - prev_states = [prev_state] - else: - prev_states = [] - defer.returnValue( - (cache.state_group, cache.state, prev_states) - ) - state_groups = yield self.store.get_state_groups( room_id, event_ids ) @@ -210,7 +192,7 @@ class StateHandler(object): state_groups.keys() ) - group_names = set(state_groups.keys()) + group_names = frozenset(state_groups.keys()) if len(group_names) == 1: name, state_list = state_groups.items().pop() state = { @@ -224,16 +206,25 @@ class StateHandler(object): else: prev_states = [] - if self._state_cache is not None: - cache = _StateCacheEntry( - state=state, - state_group=name, - ts=self.clock.time_msec() - ) + defer.returnValue((name, state, prev_states)) + + if self._state_cache is not None: + cache = self._state_cache.get(group_names, None) + if cache and cache.state_group: + cache.ts = self.clock.time_msec() - self._state_cache[frozenset(event_ids)] = cache + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} - defer.returnValue((name, state, prev_states)) + prev_state = state.get((event_type, state_key), None) + if prev_state: + prev_state = prev_state.event_id + prev_states = [prev_state] + else: + prev_states = [] + defer.returnValue( + (cache.state_group, state, prev_states) + ) new_state, prev_states = self._resolve_events( state_groups.values(), event_type, state_key @@ -241,12 +232,12 @@ class StateHandler(object): if self._state_cache is not None: cache = _StateCacheEntry( - state=new_state, + state={key: event.event_id for key, event in new_state.items()}, state_group=None, ts=self.clock.time_msec() ) - self._state_cache[frozenset(event_ids)] = cache + self._state_cache[group_names] = cache defer.returnValue((None, new_state, prev_states)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e444b64cee..584e659d4a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -151,6 +151,31 @@ class EventsStore(SQLBaseStore): defer.returnValue(events[0] if events else None) + @defer.inlineCallbacks + def get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + """Get events from the database + + Args: + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred : Dict from event_id to event. + """ + events = yield self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + defer.returnValue({e.event_id: e for e in events}) + @log_function def _persist_event_txn(self, txn, event, context, is_new_state=True, current_state=None): diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index f7423f2fab..f9df445a8d 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -29,6 +29,16 @@ def enumerate_leaves(node, depth): yield m +class _Node(object): + __slots__ = ["prev_node", "next_node", "key", "value"] + + def __init__(self, prev_node, next_node, key, value): + self.prev_node = prev_node + self.next_node = next_node + self.key = key + self.value = value + + class LruCache(object): """ Least-recently-used cache. @@ -38,10 +48,9 @@ class LruCache(object): def __init__(self, max_size, keylen=1, cache_type=dict): cache = cache_type() self.cache = cache # Used for introspection. - list_root = [] - list_root[:] = [list_root, list_root, None, None] - - PREV, NEXT, KEY, VALUE = 0, 1, 2, 3 + list_root = _Node(None, None, None, None) + list_root.next_node = list_root + list_root.prev_node = list_root lock = threading.Lock() @@ -55,36 +64,36 @@ class LruCache(object): def add_node(key, value): prev_node = list_root - next_node = prev_node[NEXT] - node = [prev_node, next_node, key, value] - prev_node[NEXT] = node - next_node[PREV] = node + next_node = prev_node.next_node + node = _Node(prev_node, next_node, key, value) + prev_node.next_node = node + next_node.prev_node = node cache[key] = node def move_node_to_front(node): - prev_node = node[PREV] - next_node = node[NEXT] - prev_node[NEXT] = next_node - next_node[PREV] = prev_node + prev_node = node.prev_node + next_node = node.next_node + prev_node.next_node = next_node + next_node.prev_node = prev_node prev_node = list_root - next_node = prev_node[NEXT] - node[PREV] = prev_node - node[NEXT] = next_node - prev_node[NEXT] = node - next_node[PREV] = node + next_node = prev_node.next_node + node.prev_node = prev_node + node.next_node = next_node + prev_node.next_node = node + next_node.prev_node = node def delete_node(node): - prev_node = node[PREV] - next_node = node[NEXT] - prev_node[NEXT] = next_node - next_node[PREV] = prev_node + prev_node = node.prev_node + next_node = node.next_node + prev_node.next_node = next_node + next_node.prev_node = prev_node @synchronized def cache_get(key, default=None): node = cache.get(key, None) if node is not None: move_node_to_front(node) - return node[VALUE] + return node.value else: return default @@ -93,25 +102,25 @@ class LruCache(object): node = cache.get(key, None) if node is not None: move_node_to_front(node) - node[VALUE] = value + node.value = value else: add_node(key, value) if len(cache) > max_size: - todelete = list_root[PREV] + todelete = list_root.prev_node delete_node(todelete) - cache.pop(todelete[KEY], None) + cache.pop(todelete.key, None) @synchronized def cache_set_default(key, value): node = cache.get(key, None) if node is not None: - return node[VALUE] + return node.value else: add_node(key, value) if len(cache) > max_size: - todelete = list_root[PREV] + todelete = list_root.prev_node delete_node(todelete) - cache.pop(todelete[KEY], None) + cache.pop(todelete.key, None) return value @synchronized @@ -119,8 +128,8 @@ class LruCache(object): node = cache.get(key, None) if node: delete_node(node) - cache.pop(node[KEY], None) - return node[VALUE] + cache.pop(node.key, None) + return node.value else: return default @@ -137,8 +146,8 @@ class LruCache(object): @synchronized def cache_clear(): - list_root[NEXT] = list_root - list_root[PREV] = list_root + list_root.next_node = list_root + list_root.prev_node = list_root cache.clear() @synchronized |