From b97f6626b6f9b91498d06a7ae113b9d20f1fc2ef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Jan 2016 09:54:30 +0000 Subject: Add cache to room stream --- synapse/util/caches/room_change_cache.py | 86 ++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 synapse/util/caches/room_change_cache.py (limited to 'synapse/util') diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py new file mode 100644 index 0000000000..3a873c9c30 --- /dev/null +++ b/synapse/util/caches/room_change_cache.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.caches import cache_counter, caches_by_name + + +from blist import sorteddict +import logging + + +logger = logging.getLogger(__name__) + + +class RoomStreamChangeCache(object): + """Keeps track of the stream_id of the latest change in rooms. + + Given a list of rooms and stream key, it will give a subset of rooms that + may have changed since that key. If the key is too old then the cache + will simply return all rooms. + """ + def __init__(self, name, current_key, size_of_cache=10000): + self._size_of_cache = size_of_cache + self._room_to_key = {} + self._cache = sorteddict() + self._earliest_known_key = current_key + self.name = name + caches_by_name[self.name] = self._cache + + def get_room_has_changed(self, room_id, key): + if key <= self._earliest_known_key: + return True + + room_key = self._room_to_key.get(room_id, None) + if room_key is None: + return True + + if key < room_key: + return True + + return False + + def get_rooms_changed(self, store, room_ids, key): + """Returns subset of room ids that have had new things since the + given key. If the key is too old it will just return the given list. + """ + if key > self._earliest_known_key: + keys = self._cache.keys() + i = keys.bisect_right(key) + + result = set( + self._cache[k] for k in keys[i:] + ).intersection(room_ids) + + cache_counter.inc_hits(self.name) + else: + result = room_ids + cache_counter.inc_misses(self.name) + + return result + + def room_has_changed(self, store, room_id, key): + """Informs the cache that the room has been changed at the given key. + """ + if key > self._earliest_known_key: + old_key = self._room_to_key.get(room_id, None) + if old_key: + key = max(key, old_key) + self._cache.pop(old_key, None) + self._cache[key] = room_id + + while len(self._cache) > self._size_of_cache: + k, r = self._cache.popitem() + self._earliest_key = max(k, self._earliest_key) + self._room_to_key.pop(r, None) -- cgit 1.5.1 From e1941442d442fe62570551071edfd936304697e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 15:02:37 +0000 Subject: Invalidate caches properly. Remove unused arg --- synapse/storage/events.py | 9 ++++++--- synapse/storage/receipts.py | 10 ++++++---- synapse/storage/stream.py | 2 +- synapse/util/caches/room_change_cache.py | 4 ++-- 4 files changed, 15 insertions(+), 10 deletions(-) (limited to 'synapse/util') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 80187722ea..2d2270b297 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -128,9 +128,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - self._events_stream_cache.room_has_changed( - None, event.room_id, stream_ordering - ) except _RollbackButIsFineException: pass @@ -213,6 +210,12 @@ class EventsStore(SQLBaseStore): for event, _ in events_and_contexts: txn.call_after(self._invalidate_get_event_cache, event.event_id) + if not backfilled: + txn.call_after( + self._events_stream_cache.room_has_changed, + event.room_id, event.internal_metadata.stream_ordering, + ) + depth_updates = {} for event, _ in events_and_contexts: if event.internal_metadata.is_outlier(): diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 7118368d97..5ffbfdec51 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -78,7 +78,7 @@ class ReceiptsStore(SQLBaseStore): if from_key: room_ids = yield self._receipts_stream_cache.get_rooms_changed( - self, room_ids, from_key + room_ids, from_key ) results = yield self._get_linearized_receipts_for_rooms( @@ -221,6 +221,11 @@ class ReceiptsStore(SQLBaseStore): # FIXME: This shouldn't invalidate the whole cache txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after( + self._receipts_stream_cache.room_has_changed, + room_id, stream_id + ) + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = ( @@ -308,9 +313,6 @@ class ReceiptsStore(SQLBaseStore): stream_id_manager = yield self._receipts_id_gen.get_next(self) with stream_id_manager as stream_id: - yield self._receipts_stream_cache.room_has_changed( - self, room_id, stream_id - ) have_persisted = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5096b46864..67e7e6a76f 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -169,7 +169,7 @@ class StreamStore(SQLBaseStore): from_id = RoomStreamToken.parse_stream_token(from_key).stream room_ids = yield self._events_stream_cache.get_rooms_changed( - self, room_ids, from_id + room_ids, from_id ) if not room_ids: diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py index 3a873c9c30..eb2ab5f1e4 100644 --- a/synapse/util/caches/room_change_cache.py +++ b/synapse/util/caches/room_change_cache.py @@ -51,7 +51,7 @@ class RoomStreamChangeCache(object): return False - def get_rooms_changed(self, store, room_ids, key): + def get_rooms_changed(self, room_ids, key): """Returns subset of room ids that have had new things since the given key. If the key is too old it will just return the given list. """ @@ -70,7 +70,7 @@ class RoomStreamChangeCache(object): return result - def room_has_changed(self, store, room_id, key): + def room_has_changed(self, room_id, key): """Informs the cache that the room has been changed at the given key. """ if key > self._earliest_known_key: -- cgit 1.5.1 From c23a8c783382a0789c757e16e104cf08654e6cf8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 15:55:26 +0000 Subject: Ensure keys to RoomStreamChangeCache are ints --- synapse/storage/stream.py | 11 ++++++----- synapse/util/caches/room_change_cache.py | 6 ++++++ 2 files changed, 12 insertions(+), 5 deletions(-) (limited to 'synapse/util') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 67e7e6a76f..6a724193e1 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -199,12 +199,13 @@ class StreamStore(SQLBaseStore): if from_key == to_key: defer.returnValue(([], from_key)) - has_changed = yield self._events_stream_cache.get_room_has_changed( - room_id, from_id - ) + if from_id: + has_changed = yield self._events_stream_cache.get_room_has_changed( + room_id, from_id + ) - if not has_changed: - defer.returnValue(([], from_key)) + if not has_changed: + defer.returnValue(([], from_key)) def f(txn): if from_id is not None: diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py index eb2ab5f1e4..e8bfedd72f 100644 --- a/synapse/util/caches/room_change_cache.py +++ b/synapse/util/caches/room_change_cache.py @@ -39,6 +39,8 @@ class RoomStreamChangeCache(object): caches_by_name[self.name] = self._cache def get_room_has_changed(self, room_id, key): + assert type(key) is int + if key <= self._earliest_known_key: return True @@ -55,6 +57,8 @@ class RoomStreamChangeCache(object): """Returns subset of room ids that have had new things since the given key. If the key is too old it will just return the given list. """ + assert type(key) is int + if key > self._earliest_known_key: keys = self._cache.keys() i = keys.bisect_right(key) @@ -73,6 +77,8 @@ class RoomStreamChangeCache(object): def room_has_changed(self, room_id, key): """Informs the cache that the room has been changed at the given key. """ + assert type(key) is int + if key > self._earliest_known_key: old_key = self._room_to_key.get(room_id, None) if old_key: -- cgit 1.5.1 From 00cb3eb24b277bb37bd1b7d8449c08a37cb4b014 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 16:37:41 +0000 Subject: Cache tags and account data --- synapse/storage/account_data.py | 20 ++++++- synapse/storage/events.py | 2 +- synapse/storage/receipts.py | 8 +-- synapse/storage/stream.py | 8 +-- synapse/storage/tags.py | 14 +++++ synapse/util/caches/room_change_cache.py | 92 ----------------------------- synapse/util/caches/stream_change_cache.py | 95 ++++++++++++++++++++++++++++++ 7 files changed, 137 insertions(+), 102 deletions(-) delete mode 100644 synapse/util/caches/room_change_cache.py create mode 100644 synapse/util/caches/stream_change_cache.py (limited to 'synapse/util') diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 9c6597e012..95294c3f6c 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer import ujson as json @@ -23,6 +24,13 @@ logger = logging.getLogger(__name__) class AccountDataStore(SQLBaseStore): + def __init__(self, hs): + super(AccountDataStore, self).__init__(hs) + + self._account_data_stream_cache = StreamChangeCache( + "AccountDataChangeCache", self._account_data_id_gen.get_max_token(None), + max_size=1000, + ) def get_account_data_for_user(self, user_id): """Get all the client account_data for a user. @@ -83,7 +91,7 @@ class AccountDataStore(SQLBaseStore): "get_account_data_for_room", get_account_data_for_room_txn ) - def get_updated_account_data_for_user(self, user_id, stream_id): + def get_updated_account_data_for_user(self, user_id, stream_id, room_ids=None): """Get all the client account_data for a that's changed. Args: @@ -120,6 +128,12 @@ class AccountDataStore(SQLBaseStore): return (global_account_data, account_data_by_room) + changed = self._account_data_stream_cache.get_entity_has_changed( + user_id, int(stream_id) + ) + if not changed: + defer.returnValue(({}, {})) + return self.runInteraction( "get_updated_account_data_for_user", get_updated_account_data_for_user_txn ) @@ -186,6 +200,10 @@ class AccountDataStore(SQLBaseStore): "content": content_json, } ) + txn.call_after( + self._account_data_stream_cache.entity_has_changed, + user_id, next_id, + ) self._update_max_stream_id(txn, next_id) with (yield self._account_data_id_gen.get_next(self)) as next_id: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2d2270b297..5e85552029 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -212,7 +212,7 @@ class EventsStore(SQLBaseStore): if not backfilled: txn.call_after( - self._events_stream_cache.room_has_changed, + self._events_stream_cache.entity_has_changed, event.room_id, event.internal_metadata.stream_ordering, ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 5ffbfdec51..8068c73740 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -15,7 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached -from synapse.util.caches.room_change_cache import RoomStreamChangeCache +from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer @@ -30,7 +30,7 @@ class ReceiptsStore(SQLBaseStore): def __init__(self, hs): super(ReceiptsStore, self).__init__(hs) - self._receipts_stream_cache = RoomStreamChangeCache( + self._receipts_stream_cache = StreamChangeCache( "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None) ) @@ -77,7 +77,7 @@ class ReceiptsStore(SQLBaseStore): room_ids = set(room_ids) if from_key: - room_ids = yield self._receipts_stream_cache.get_rooms_changed( + room_ids = yield self._receipts_stream_cache.get_entities_changed( room_ids, from_key ) @@ -222,7 +222,7 @@ class ReceiptsStore(SQLBaseStore): txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) txn.call_after( - self._receipts_stream_cache.room_has_changed, + self._receipts_stream_cache.entity_has_changed, room_id, stream_id ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6a724193e1..c7d7893328 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,7 +37,7 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks -from synapse.util.caches.room_change_cache import RoomStreamChangeCache +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -81,7 +81,7 @@ class StreamStore(SQLBaseStore): def __init__(self, hs): super(StreamStore, self).__init__(hs) - self._events_stream_cache = RoomStreamChangeCache( + self._events_stream_cache = StreamChangeCache( "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) ) @@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore): def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0): from_id = RoomStreamToken.parse_stream_token(from_key).stream - room_ids = yield self._events_stream_cache.get_rooms_changed( + room_ids = yield self._events_stream_cache.get_entities_changed( room_ids, from_id ) @@ -200,7 +200,7 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], from_key)) if from_id: - has_changed = yield self._events_stream_cache.get_room_has_changed( + has_changed = yield self._events_stream_cache.get_entity_has_changed( room_id, from_id ) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 4c39e07cbd..50af899192 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer import ujson as json @@ -24,6 +25,13 @@ logger = logging.getLogger(__name__) class TagsStore(SQLBaseStore): + def __init__(self, hs): + super(TagsStore, self).__init__(hs) + + self._tags_stream_cache = StreamChangeCache( + "TagsChangeCache", self._account_data_id_gen.get_max_token(None), + max_size=1000, + ) def get_max_account_data_stream_id(self): """Get the current max stream id for the private user data stream @@ -80,6 +88,10 @@ class TagsStore(SQLBaseStore): room_ids = [row[0] for row in txn.fetchall()] return room_ids + changed = self._tags_stream_cache.get_entity_has_changed(user_id, int(stream_id)) + if not changed: + defer.returnValue({}) + room_ids = yield self.runInteraction( "get_updated_tags", get_updated_tags_txn ) @@ -177,6 +189,8 @@ class TagsStore(SQLBaseStore): next_id(int): The the revision to advance to. """ + txn.call_after(self._tags_stream_cache.entity_has_changed, user_id, next_id) + update_max_id_sql = ( "UPDATE account_data_max_stream_id" " SET stream_id = ?" diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py deleted file mode 100644 index e8bfedd72f..0000000000 --- a/synapse/util/caches/room_change_cache.py +++ /dev/null @@ -1,92 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.util.caches import cache_counter, caches_by_name - - -from blist import sorteddict -import logging - - -logger = logging.getLogger(__name__) - - -class RoomStreamChangeCache(object): - """Keeps track of the stream_id of the latest change in rooms. - - Given a list of rooms and stream key, it will give a subset of rooms that - may have changed since that key. If the key is too old then the cache - will simply return all rooms. - """ - def __init__(self, name, current_key, size_of_cache=10000): - self._size_of_cache = size_of_cache - self._room_to_key = {} - self._cache = sorteddict() - self._earliest_known_key = current_key - self.name = name - caches_by_name[self.name] = self._cache - - def get_room_has_changed(self, room_id, key): - assert type(key) is int - - if key <= self._earliest_known_key: - return True - - room_key = self._room_to_key.get(room_id, None) - if room_key is None: - return True - - if key < room_key: - return True - - return False - - def get_rooms_changed(self, room_ids, key): - """Returns subset of room ids that have had new things since the - given key. If the key is too old it will just return the given list. - """ - assert type(key) is int - - if key > self._earliest_known_key: - keys = self._cache.keys() - i = keys.bisect_right(key) - - result = set( - self._cache[k] for k in keys[i:] - ).intersection(room_ids) - - cache_counter.inc_hits(self.name) - else: - result = room_ids - cache_counter.inc_misses(self.name) - - return result - - def room_has_changed(self, room_id, key): - """Informs the cache that the room has been changed at the given key. - """ - assert type(key) is int - - if key > self._earliest_known_key: - old_key = self._room_to_key.get(room_id, None) - if old_key: - key = max(key, old_key) - self._cache.pop(old_key, None) - self._cache[key] = room_id - - while len(self._cache) > self._size_of_cache: - k, r = self._cache.popitem() - self._earliest_key = max(k, self._earliest_key) - self._room_to_key.pop(r, None) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py new file mode 100644 index 0000000000..33b37f7f29 --- /dev/null +++ b/synapse/util/caches/stream_change_cache.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.caches import cache_counter, caches_by_name + + +from blist import sorteddict +import logging + + +logger = logging.getLogger(__name__) + + +class StreamChangeCache(object): + """Keeps track of the stream positions of the latest change in a set of entities. + + Typically the entity will be a room or user id. + + Given a list of entities and a stream position, it will give a subset of + entities that may have changed since that position. If position key is too + old then the cache will simply return all given entities. + """ + def __init__(self, name, current_stream_pos, max_size=10000): + self._max_size = max_size + self._entity_to_key = {} + self._cache = sorteddict() + self._earliest_known_stream_pos = current_stream_pos + self.name = name + caches_by_name[self.name] = self._cache + + def get_entity_has_changed(self, entity, stream_pos): + assert type(stream_pos) is int + + if stream_pos <= self._earliest_known_stream_pos: + return True + + latest_entity_change_pos = self._entity_to_key.get(entity, None) + if latest_entity_change_pos is None: + return True + + if stream_pos < latest_entity_change_pos: + return True + + return False + + def get_entities_changed(self, entities, stream_pos): + """Returns subset of entities that have had new things since the + given position. If the position is too old it will just return the given list. + """ + assert type(stream_pos) is int + + if stream_pos > self._earliest_known_stream_pos: + keys = self._cache.keys() + i = keys.bisect_right(stream_pos) + + result = set( + self._cache[k] for k in keys[i:] + ).intersection(entities) + + cache_counter.inc_hits(self.name) + else: + result = entities + cache_counter.inc_misses(self.name) + + return result + + def entity_has_changed(self, entitiy, stream_pos): + """Informs the cache that the entitiy has been changed at the given + position. + """ + assert type(stream_pos) is int + + if stream_pos > self._earliest_known_stream_pos: + old_pos = self._entity_to_key.get(entitiy, None) + if old_pos: + stream_pos = max(stream_pos, old_pos) + self._cache.pop(old_pos, None) + self._cache[stream_pos] = entitiy + + while len(self._cache) > self._max_size: + k, r = self._cache.popitem() + self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + self._entity_to_key.pop(r, None) -- cgit 1.5.1 From 45cf827c8fe7163a51f1d0d7c9e2531da9b58c8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 16:39:18 +0000 Subject: Change name and doc has_entity_changed --- synapse/storage/account_data.py | 2 +- synapse/storage/stream.py | 2 +- synapse/storage/tags.py | 2 +- synapse/util/caches/stream_change_cache.py | 4 +++- 4 files changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/util') diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 95294c3f6c..62e49e1c0e 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -128,7 +128,7 @@ class AccountDataStore(SQLBaseStore): return (global_account_data, account_data_by_room) - changed = self._account_data_stream_cache.get_entity_has_changed( + changed = self._account_data_stream_cache.has_entity_changed( user_id, int(stream_id) ) if not changed: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c7d7893328..6e81d46c60 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -200,7 +200,7 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], from_key)) if from_id: - has_changed = yield self._events_stream_cache.get_entity_has_changed( + has_changed = yield self._events_stream_cache.has_entity_changed( room_id, from_id ) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 50af899192..75ce04092d 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -88,7 +88,7 @@ class TagsStore(SQLBaseStore): room_ids = [row[0] for row in txn.fetchall()] return room_ids - changed = self._tags_stream_cache.get_entity_has_changed(user_id, int(stream_id)) + changed = self._tags_stream_cache.has_entity_changed(user_id, int(stream_id)) if not changed: defer.returnValue({}) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 33b37f7f29..3ca0e57780 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -40,7 +40,9 @@ class StreamChangeCache(object): self.name = name caches_by_name[self.name] = self._cache - def get_entity_has_changed(self, entity, stream_pos): + def has_entity_changed(self, entity, stream_pos): + """Returns True if the entity may have been updated since stream_pos + """ assert type(stream_pos) is int if stream_pos <= self._earliest_known_stream_pos: -- cgit 1.5.1 From 0663c5bd52d52e095258e312fb62c2a9cb3f200a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 17:27:28 +0000 Subject: Include cache hits with has_entity_changed --- synapse/util/caches/stream_change_cache.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 3ca0e57780..b48f5cb273 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -46,15 +46,19 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos <= self._earliest_known_stream_pos: + cache_counter.inc_misses(self.name) return True latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: + cache_counter.inc_misses(self.name) return True if stream_pos < latest_entity_change_pos: + cache_counter.inc_misses(self.name) return True + cache_counter.inc_hits(self.name) return False def get_entities_changed(self, entities, stream_pos): -- cgit 1.5.1 From 82cf3a8043a6ffe1f57d78bf88eba06ad0c53fbe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 17:44:04 +0000 Subject: Fix inequalities --- synapse/util/caches/stream_change_cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b48f5cb273..483e0cdf96 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -45,7 +45,7 @@ class StreamChangeCache(object): """ assert type(stream_pos) is int - if stream_pos <= self._earliest_known_stream_pos: + if stream_pos < self._earliest_known_stream_pos: cache_counter.inc_misses(self.name) return True @@ -67,7 +67,7 @@ class StreamChangeCache(object): """ assert type(stream_pos) is int - if stream_pos > self._earliest_known_stream_pos: + if stream_pos >= self._earliest_known_stream_pos: keys = self._cache.keys() i = keys.bisect_right(stream_pos) -- cgit 1.5.1 From 40431251cba7ef1b623559db972600ece40818a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 18:05:43 +0000 Subject: Correctly update _entity_to_key --- synapse/util/caches/stream_change_cache.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 483e0cdf96..22a9f8f467 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -82,18 +82,19 @@ class StreamChangeCache(object): return result - def entity_has_changed(self, entitiy, stream_pos): - """Informs the cache that the entitiy has been changed at the given + def entity_has_changed(self, entity, stream_pos): + """Informs the cache that the entity has been changed at the given position. """ assert type(stream_pos) is int if stream_pos > self._earliest_known_stream_pos: - old_pos = self._entity_to_key.get(entitiy, None) + old_pos = self._entity_to_key.get(entity, None) if old_pos: stream_pos = max(stream_pos, old_pos) self._cache.pop(old_pos, None) - self._cache[stream_pos] = entitiy + self._cache[stream_pos] = entity + self._entity_to_key[entity] = stream_pos while len(self._cache) > self._max_size: k, r = self._cache.popitem() -- cgit 1.5.1 From 3f5dd18bd44ae426d3b1ff062dd64acbad72f8ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 18:11:41 +0000 Subject: If the same as the earliest key, assume nothing has changed. --- synapse/util/caches/stream_change_cache.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 22a9f8f467..c673b1bdfc 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -49,6 +49,11 @@ class StreamChangeCache(object): cache_counter.inc_misses(self.name) return True + if stream_pos == self._earliest_known_stream_pos: + # If the same as the earliest key, assume nothing has changed. + cache_counter.inc_hits(self.name) + return False + latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: cache_counter.inc_misses(self.name) -- cgit 1.5.1 From 50e18938a96583b2dc8cbc69aa7182a35e3435af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 10:00:45 +0000 Subject: Reset size on clear --- synapse/util/caches/lrucache.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index e6a66dc041..5182cadb9e 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -37,6 +37,7 @@ class LruCache(object): """ def __init__(self, max_size, keylen=1, cache_type=dict): cache = cache_type() + self.cache = cache # Used for introspection. self.size = 0 list_root = [] list_root[:] = [list_root, list_root, None, None] @@ -142,6 +143,7 @@ class LruCache(object): list_root[NEXT] = list_root list_root[PREV] = list_root cache.clear() + self.size = 0 @synchronized def cache_len(): -- cgit 1.5.1 From 766526e1142e7ad0ffb43bd075b0ff2d6265e4cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 10:11:21 +0000 Subject: Make TreeCache keep track of its own size. --- synapse/util/caches/lrucache.py | 9 +++------ synapse/util/caches/treecache.py | 7 +++++++ 2 files changed, 10 insertions(+), 6 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 5182cadb9e..ca9ffdf1b4 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -38,7 +38,6 @@ class LruCache(object): def __init__(self, max_size, keylen=1, cache_type=dict): cache = cache_type() self.cache = cache # Used for introspection. - self.size = 0 list_root = [] list_root[:] = [list_root, list_root, None, None] @@ -61,7 +60,6 @@ class LruCache(object): prev_node[NEXT] = node next_node[PREV] = node cache[key] = node - self.size += 1 def move_node_to_front(node): prev_node = node[PREV] @@ -80,7 +78,6 @@ class LruCache(object): next_node = node[NEXT] prev_node[NEXT] = next_node next_node[PREV] = prev_node - self.size -= 1 @synchronized def cache_get(key, default=None): @@ -99,7 +96,7 @@ class LruCache(object): node[VALUE] = value else: add_node(key, value) - if self.size > max_size: + if len(cache) > max_size: todelete = list_root[PREV] delete_node(todelete) cache.pop(todelete[KEY], None) @@ -111,7 +108,7 @@ class LruCache(object): return node[VALUE] else: add_node(key, value) - if self.size > max_size: + if len(cache) > max_size: todelete = list_root[PREV] delete_node(todelete) cache.pop(todelete[KEY], None) @@ -147,7 +144,7 @@ class LruCache(object): @synchronized def cache_len(): - return self.size + return len(cache) @synchronized def cache_contains(key): diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 3b58860910..a29ea8144e 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -8,6 +8,7 @@ class TreeCache(object): Keys must be tuples. """ def __init__(self): + self.size = 0 self.root = {} def __setitem__(self, key, value): @@ -21,6 +22,7 @@ class TreeCache(object): for k in key[:-1]: node = node.setdefault(k, {}) node[key[-1]] = value + self.size += 1 def get(self, key, default=None): node = self.root @@ -31,6 +33,7 @@ class TreeCache(object): return node.get(key[-1], default) def clear(self): + self.size = 0 self.root = {} def pop(self, key, default=None): @@ -57,4 +60,8 @@ class TreeCache(object): break node_and_keys[i+1][0].pop(k) + self.size -= 1 return popped + + def __len__(self): + return self.size -- cgit 1.5.1 From a30364c1f99bdd7d5cb0fe82ebdfe52d996defef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 10:44:46 +0000 Subject: Correctly bookkeep the size of TreeCache --- synapse/util/caches/treecache.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index a29ea8144e..3331ea9eba 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -21,7 +21,7 @@ class TreeCache(object): node = self.root for k in key[:-1]: node = node.setdefault(k, {}) - node[key[-1]] = value + node[key[-1]] = _Entry(value) self.size += 1 def get(self, key, default=None): @@ -30,7 +30,7 @@ class TreeCache(object): node = node.get(k, None) if node is None: return default - return node.get(key[-1], default) + return node.get(key[-1], _Entry(default)).value def clear(self): self.size = 0 @@ -60,8 +60,33 @@ class TreeCache(object): break node_and_keys[i+1][0].pop(k) - self.size -= 1 + popped, cnt = _strip_and_count_entires(popped) + self.size -= cnt return popped def __len__(self): return self.size + + +class _Entry(object): + __slots__ = ["value"] + + def __init__(self, value): + object.__setattr__(self, "value", value) + + +def _strip_and_count_entires(d): + """Takes an _Entry or dict with leaves of _Entry's, and either returns the + value or a dictionary with _Entry's replaced by their values. + + Also returns the count of _Entry's + """ + if isinstance(d, dict): + cnt = 0 + for key, value in d.items(): + v, n = _strip_and_count_entires(value) + d[key] = v + cnt += n + return d, cnt + else: + return d.value, 1 -- cgit 1.5.1 From c046630c330b5abc4403f15afa2ffb13b7b8aa08 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 11:17:54 +0000 Subject: Remove spurious self.size --- synapse/util/caches/lrucache.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index ca9ffdf1b4..f7423f2fab 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -140,7 +140,6 @@ class LruCache(object): list_root[NEXT] = list_root list_root[PREV] = list_root cache.clear() - self.size = 0 @synchronized def cache_len(): -- cgit 1.5.1 From fb7299800feb0b8bc0f8429fc5b840abea0609d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 11:29:14 +0000 Subject: Directly set self.value --- synapse/util/caches/treecache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 3331ea9eba..29d02f7e95 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -72,7 +72,7 @@ class _Entry(object): __slots__ = ["value"] def __init__(self, value): - object.__setattr__(self, "value", value) + self.value = value def _strip_and_count_entires(d): -- cgit 1.5.1