From 00cb3eb24b277bb37bd1b7d8449c08a37cb4b014 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 16:37:41 +0000 Subject: Cache tags and account data --- synapse/util/caches/stream_change_cache.py | 95 ++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 synapse/util/caches/stream_change_cache.py (limited to 'synapse/util/caches/stream_change_cache.py') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py new file mode 100644 index 0000000000..33b37f7f29 --- /dev/null +++ b/synapse/util/caches/stream_change_cache.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.caches import cache_counter, caches_by_name + + +from blist import sorteddict +import logging + + +logger = logging.getLogger(__name__) + + +class StreamChangeCache(object): + """Keeps track of the stream positions of the latest change in a set of entities. + + Typically the entity will be a room or user id. + + Given a list of entities and a stream position, it will give a subset of + entities that may have changed since that position. If position key is too + old then the cache will simply return all given entities. + """ + def __init__(self, name, current_stream_pos, max_size=10000): + self._max_size = max_size + self._entity_to_key = {} + self._cache = sorteddict() + self._earliest_known_stream_pos = current_stream_pos + self.name = name + caches_by_name[self.name] = self._cache + + def get_entity_has_changed(self, entity, stream_pos): + assert type(stream_pos) is int + + if stream_pos <= self._earliest_known_stream_pos: + return True + + latest_entity_change_pos = self._entity_to_key.get(entity, None) + if latest_entity_change_pos is None: + return True + + if stream_pos < latest_entity_change_pos: + return True + + return False + + def get_entities_changed(self, entities, stream_pos): + """Returns subset of entities that have had new things since the + given position. If the position is too old it will just return the given list. + """ + assert type(stream_pos) is int + + if stream_pos > self._earliest_known_stream_pos: + keys = self._cache.keys() + i = keys.bisect_right(stream_pos) + + result = set( + self._cache[k] for k in keys[i:] + ).intersection(entities) + + cache_counter.inc_hits(self.name) + else: + result = entities + cache_counter.inc_misses(self.name) + + return result + + def entity_has_changed(self, entitiy, stream_pos): + """Informs the cache that the entitiy has been changed at the given + position. + """ + assert type(stream_pos) is int + + if stream_pos > self._earliest_known_stream_pos: + old_pos = self._entity_to_key.get(entitiy, None) + if old_pos: + stream_pos = max(stream_pos, old_pos) + self._cache.pop(old_pos, None) + self._cache[stream_pos] = entitiy + + while len(self._cache) > self._max_size: + k, r = self._cache.popitem() + self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + self._entity_to_key.pop(r, None) -- cgit 1.5.1 From 45cf827c8fe7163a51f1d0d7c9e2531da9b58c8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 16:39:18 +0000 Subject: Change name and doc has_entity_changed --- synapse/storage/account_data.py | 2 +- synapse/storage/stream.py | 2 +- synapse/storage/tags.py | 2 +- synapse/util/caches/stream_change_cache.py | 4 +++- 4 files changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/util/caches/stream_change_cache.py') diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 95294c3f6c..62e49e1c0e 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -128,7 +128,7 @@ class AccountDataStore(SQLBaseStore): return (global_account_data, account_data_by_room) - changed = self._account_data_stream_cache.get_entity_has_changed( + changed = self._account_data_stream_cache.has_entity_changed( user_id, int(stream_id) ) if not changed: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c7d7893328..6e81d46c60 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -200,7 +200,7 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], from_key)) if from_id: - has_changed = yield self._events_stream_cache.get_entity_has_changed( + has_changed = yield self._events_stream_cache.has_entity_changed( room_id, from_id ) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 50af899192..75ce04092d 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -88,7 +88,7 @@ class TagsStore(SQLBaseStore): room_ids = [row[0] for row in txn.fetchall()] return room_ids - changed = self._tags_stream_cache.get_entity_has_changed(user_id, int(stream_id)) + changed = self._tags_stream_cache.has_entity_changed(user_id, int(stream_id)) if not changed: defer.returnValue({}) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 33b37f7f29..3ca0e57780 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -40,7 +40,9 @@ class StreamChangeCache(object): self.name = name caches_by_name[self.name] = self._cache - def get_entity_has_changed(self, entity, stream_pos): + def has_entity_changed(self, entity, stream_pos): + """Returns True if the entity may have been updated since stream_pos + """ assert type(stream_pos) is int if stream_pos <= self._earliest_known_stream_pos: -- cgit 1.5.1 From 0663c5bd52d52e095258e312fb62c2a9cb3f200a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 17:27:28 +0000 Subject: Include cache hits with has_entity_changed --- synapse/util/caches/stream_change_cache.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/util/caches/stream_change_cache.py') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 3ca0e57780..b48f5cb273 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -46,15 +46,19 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos <= self._earliest_known_stream_pos: + cache_counter.inc_misses(self.name) return True latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: + cache_counter.inc_misses(self.name) return True if stream_pos < latest_entity_change_pos: + cache_counter.inc_misses(self.name) return True + cache_counter.inc_hits(self.name) return False def get_entities_changed(self, entities, stream_pos): -- cgit 1.5.1 From 82cf3a8043a6ffe1f57d78bf88eba06ad0c53fbe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 17:44:04 +0000 Subject: Fix inequalities --- synapse/util/caches/stream_change_cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/util/caches/stream_change_cache.py') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b48f5cb273..483e0cdf96 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -45,7 +45,7 @@ class StreamChangeCache(object): """ assert type(stream_pos) is int - if stream_pos <= self._earliest_known_stream_pos: + if stream_pos < self._earliest_known_stream_pos: cache_counter.inc_misses(self.name) return True @@ -67,7 +67,7 @@ class StreamChangeCache(object): """ assert type(stream_pos) is int - if stream_pos > self._earliest_known_stream_pos: + if stream_pos >= self._earliest_known_stream_pos: keys = self._cache.keys() i = keys.bisect_right(stream_pos) -- cgit 1.5.1 From 40431251cba7ef1b623559db972600ece40818a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 18:05:43 +0000 Subject: Correctly update _entity_to_key --- synapse/util/caches/stream_change_cache.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/util/caches/stream_change_cache.py') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 483e0cdf96..22a9f8f467 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -82,18 +82,19 @@ class StreamChangeCache(object): return result - def entity_has_changed(self, entitiy, stream_pos): - """Informs the cache that the entitiy has been changed at the given + def entity_has_changed(self, entity, stream_pos): + """Informs the cache that the entity has been changed at the given position. """ assert type(stream_pos) is int if stream_pos > self._earliest_known_stream_pos: - old_pos = self._entity_to_key.get(entitiy, None) + old_pos = self._entity_to_key.get(entity, None) if old_pos: stream_pos = max(stream_pos, old_pos) self._cache.pop(old_pos, None) - self._cache[stream_pos] = entitiy + self._cache[stream_pos] = entity + self._entity_to_key[entity] = stream_pos while len(self._cache) > self._max_size: k, r = self._cache.popitem() -- cgit 1.5.1 From 3f5dd18bd44ae426d3b1ff062dd64acbad72f8ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 18:11:41 +0000 Subject: If the same as the earliest key, assume nothing has changed. --- synapse/util/caches/stream_change_cache.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/util/caches/stream_change_cache.py') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 22a9f8f467..c673b1bdfc 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -49,6 +49,11 @@ class StreamChangeCache(object): cache_counter.inc_misses(self.name) return True + if stream_pos == self._earliest_known_stream_pos: + # If the same as the earliest key, assume nothing has changed. + cache_counter.inc_hits(self.name) + return False + latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: cache_counter.inc_misses(self.name) -- cgit 1.5.1 From 18579534ea67f2d98c189e2ddeccc4bfecb491eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 14:37:59 +0000 Subject: Prefill stream change caches --- synapse/storage/__init__.py | 49 +++++++++++++++++++++++++++++- synapse/storage/account_data.py | 9 ------ synapse/storage/stream.py | 8 ----- synapse/util/caches/stream_change_cache.py | 5 ++- 4 files changed, 52 insertions(+), 19 deletions(-) (limited to 'synapse/util/caches/stream_change_cache.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index eb88842308..95ae97d507 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -45,9 +45,10 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore - from util.id_generators import IdGenerator, StreamIdGenerator +from synapse.util.caches.stream_change_cache import StreamChangeCache + import logging @@ -117,8 +118,54 @@ class DataStore(RoomMemberStore, RoomStore, self._push_rule_id_gen = IdGenerator("push_rules", "id", self) self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) + events_max = self._stream_id_gen.get_max_token(None) + event_cache_prefill = self._get_cache_dict( + db_conn, "events", + entity_column="room_id", + stream_column="stream_ordering", + max_value=events_max, + ) + self._events_stream_cache = StreamChangeCache( + "EventsRoomStreamChangeCache", events_max, + prefilled_cache=event_cache_prefill, + ) + + account_max = self._account_data_id_gen.get_max_token(None) + account_cache_prefill = self._get_cache_dict( + db_conn, "account_data", + entity_column="user_id", + stream_column="stream_id", + max_value=account_max, + ) + self._account_data_stream_cache = StreamChangeCache( + "AccountDataAndTagsChangeCache", account_max, + prefilled_cache=account_cache_prefill, + ) + super(DataStore, self).__init__(hs) + def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): + sql = ( + "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s" + " WHERE %(stream)s > max(? - 100000, 0)" + " GROUP BY %(entity)s" + " ORDER BY MAX(%(stream)s) DESC" + " LIMIT 10000" + ) % { + "table": table, + "entity": entity_column, + "stream": stream_column, + } + + txn = db_conn.cursor() + txn.execute(sql, (int(max_value),)) + rows = txn.fetchall() + + return { + row[0]: row[1] + for row in rows + } + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent): now = int(self._clock.time_msec()) diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index ed6587429b..625d062eb1 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -14,7 +14,6 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer import ujson as json @@ -24,14 +23,6 @@ logger = logging.getLogger(__name__) class AccountDataStore(SQLBaseStore): - def __init__(self, hs): - super(AccountDataStore, self).__init__(hs) - - self._account_data_stream_cache = StreamChangeCache( - "AccountDataAndTagsChangeCache", - self._account_data_id_gen.get_max_token(None), - max_size=10000, - ) def get_account_data_for_user(self, user_id): """Get all the client account_data for a user. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6e81d46c60..e245d2f914 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,7 +37,6 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks -from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -78,13 +77,6 @@ def upper_bound(token): class StreamStore(SQLBaseStore): - def __init__(self, hs): - super(StreamStore, self).__init__(hs) - - self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) - ) - @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index c673b1bdfc..891cb619fa 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -32,7 +32,7 @@ class StreamChangeCache(object): entities that may have changed since that position. If position key is too old then the cache will simply return all given entities. """ - def __init__(self, name, current_stream_pos, max_size=10000): + def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): self._max_size = max_size self._entity_to_key = {} self._cache = sorteddict() @@ -40,6 +40,9 @@ class StreamChangeCache(object): self.name = name caches_by_name[self.name] = self._cache + for entity, stream_pos in prefilled_cache.items(): + self.entity_has_changed(entity, stream_pos) + def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ -- cgit 1.5.1 From e70165039cd718af0d4aca46df8857e805c39e28 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 16:24:34 +0000 Subject: If stream pos is greater then earliest known key and entity hasn't changed, then entity hasn't changed --- synapse/util/caches/stream_change_cache.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) (limited to 'synapse/util/caches/stream_change_cache.py') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 891cb619fa..b37f1c0725 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -52,15 +52,10 @@ class StreamChangeCache(object): cache_counter.inc_misses(self.name) return True - if stream_pos == self._earliest_known_stream_pos: - # If the same as the earliest key, assume nothing has changed. - cache_counter.inc_hits(self.name) - return False - latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: - cache_counter.inc_misses(self.name) - return True + cache_counter.inc_hits(self.name) + return False if stream_pos < latest_entity_change_pos: cache_counter.inc_misses(self.name) @@ -98,7 +93,7 @@ class StreamChangeCache(object): if stream_pos > self._earliest_known_stream_pos: old_pos = self._entity_to_key.get(entity, None) - if old_pos: + if old_pos is not None: stream_pos = max(stream_pos, old_pos) self._cache.pop(old_pos, None) self._cache[stream_pos] = entity -- cgit 1.5.1