diff options
Diffstat (limited to 'synapse/storage/receipts.py')
-rw-r--r-- | synapse/storage/receipts.py | 67 |
1 files changed, 3 insertions, 64 deletions
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index c0593e23ee..7118368d97 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -15,11 +15,10 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches.room_change_cache import RoomStreamChangeCache from twisted.internet import defer -from blist import sorteddict import logging import ujson as json @@ -31,8 +30,8 @@ class ReceiptsStore(SQLBaseStore): def __init__(self, hs): super(ReceiptsStore, self).__init__(hs) - self._receipts_stream_cache = _RoomStreamChangeCache( - self._receipts_id_gen.get_max_token(None) + self._receipts_stream_cache = RoomStreamChangeCache( + "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None) ) @cached(num_args=2) @@ -370,63 +369,3 @@ class ReceiptsStore(SQLBaseStore): "data": json.dumps(data), } ) - - -class _RoomStreamChangeCache(object): - """Keeps track of the stream_id of the latest change in rooms. - - Given a list of rooms and stream key, it will give a subset of rooms that - may have changed since that key. If the key is too old then the cache - will simply return all rooms. - """ - def __init__(self, current_key, size_of_cache=10000): - self._size_of_cache = size_of_cache - self._room_to_key = {} - self._cache = sorteddict() - self._earliest_key = current_key - self.name = "ReceiptsRoomChangeCache" - caches_by_name[self.name] = self._cache - - @defer.inlineCallbacks - def get_rooms_changed(self, store, room_ids, key): - """Returns subset of room ids that have had new receipts since the - given key. If the key is too old it will just return the given list. - """ - if key > (yield self._get_earliest_key(store)): - keys = self._cache.keys() - i = keys.bisect_right(key) - - result = set( - self._cache[k] for k in keys[i:] - ).intersection(room_ids) - - cache_counter.inc_hits(self.name) - else: - result = room_ids - cache_counter.inc_misses(self.name) - - defer.returnValue(result) - - @defer.inlineCallbacks - def room_has_changed(self, store, room_id, key): - """Informs the cache that the room has been changed at the given key. - """ - if key > (yield self._get_earliest_key(store)): - old_key = self._room_to_key.get(room_id, None) - if old_key: - key = max(key, old_key) - self._cache.pop(old_key, None) - self._cache[key] = room_id - - while len(self._cache) > self._size_of_cache: - k, r = self._cache.popitem() - self._earliest_key = max(k, self._earliest_key) - self._room_to_key.pop(r, None) - - @defer.inlineCallbacks - def _get_earliest_key(self, store): - if self._earliest_key is None: - self._earliest_key = yield store.get_max_receipt_stream_id() - self._earliest_key = int(self._earliest_key) - - defer.returnValue(self._earliest_key) |