summary refs log tree commit diff
path: root/synapse/storage/receipts.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-01-29 14:15:12 +0000
committerMark Haines <mark.haines@matrix.org>2016-01-29 14:15:12 +0000
commit47374a33fc195bf44363006070268fd60ffff047 (patch)
treeea2931512014f5f7361bee5c7f768506adbf783a /synapse/storage/receipts.py
parentAdd config option for setting the trusted id servers, disabling checking the ... (diff)
parentBump AccountDataAndTagsChangeCache size (diff)
downloadsynapse-47374a33fc195bf44363006070268fd60ffff047.tar.xz
Merge remote-tracking branch 'origin/develop' into markjh/3pid
Diffstat (limited to 'synapse/storage/receipts.py')
-rw-r--r--synapse/storage/receipts.py79
1 files changed, 10 insertions, 69 deletions
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c0593e23ee..8068c73740 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -15,11 +15,10 @@
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from twisted.internet import defer
 
-from blist import sorteddict
 import logging
 import ujson as json
 
@@ -31,8 +30,8 @@ class ReceiptsStore(SQLBaseStore):
     def __init__(self, hs):
         super(ReceiptsStore, self).__init__(hs)
 
-        self._receipts_stream_cache = _RoomStreamChangeCache(
-            self._receipts_id_gen.get_max_token(None)
+        self._receipts_stream_cache = StreamChangeCache(
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
         )
 
     @cached(num_args=2)
@@ -78,8 +77,8 @@ class ReceiptsStore(SQLBaseStore):
         room_ids = set(room_ids)
 
         if from_key:
-            room_ids = yield self._receipts_stream_cache.get_rooms_changed(
-                self, room_ids, from_key
+            room_ids = yield self._receipts_stream_cache.get_entities_changed(
+                room_ids, from_key
             )
 
         results = yield self._get_linearized_receipts_for_rooms(
@@ -222,6 +221,11 @@ class ReceiptsStore(SQLBaseStore):
         # FIXME: This shouldn't invalidate the whole cache
         txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
 
+        txn.call_after(
+            self._receipts_stream_cache.entity_has_changed,
+            room_id, stream_id
+        )
+
         # We don't want to clobber receipts for more recent events, so we
         # have to compare orderings of existing receipts
         sql = (
@@ -309,9 +313,6 @@ class ReceiptsStore(SQLBaseStore):
 
         stream_id_manager = yield self._receipts_id_gen.get_next(self)
         with stream_id_manager as stream_id:
-            yield self._receipts_stream_cache.room_has_changed(
-                self, room_id, stream_id
-            )
             have_persisted = yield self.runInteraction(
                 "insert_linearized_receipt",
                 self.insert_linearized_receipt_txn,
@@ -370,63 +371,3 @@ class ReceiptsStore(SQLBaseStore):
                 "data": json.dumps(data),
             }
         )
-
-
-class _RoomStreamChangeCache(object):
-    """Keeps track of the stream_id of the latest change in rooms.
-
-    Given a list of rooms and stream key, it will give a subset of rooms that
-    may have changed since that key. If the key is too old then the cache
-    will simply return all rooms.
-    """
-    def __init__(self, current_key, size_of_cache=10000):
-        self._size_of_cache = size_of_cache
-        self._room_to_key = {}
-        self._cache = sorteddict()
-        self._earliest_key = current_key
-        self.name = "ReceiptsRoomChangeCache"
-        caches_by_name[self.name] = self._cache
-
-    @defer.inlineCallbacks
-    def get_rooms_changed(self, store, room_ids, key):
-        """Returns subset of room ids that have had new receipts since the
-        given key. If the key is too old it will just return the given list.
-        """
-        if key > (yield self._get_earliest_key(store)):
-            keys = self._cache.keys()
-            i = keys.bisect_right(key)
-
-            result = set(
-                self._cache[k] for k in keys[i:]
-            ).intersection(room_ids)
-
-            cache_counter.inc_hits(self.name)
-        else:
-            result = room_ids
-            cache_counter.inc_misses(self.name)
-
-        defer.returnValue(result)
-
-    @defer.inlineCallbacks
-    def room_has_changed(self, store, room_id, key):
-        """Informs the cache that the room has been changed at the given key.
-        """
-        if key > (yield self._get_earliest_key(store)):
-            old_key = self._room_to_key.get(room_id, None)
-            if old_key:
-                key = max(key, old_key)
-                self._cache.pop(old_key, None)
-            self._cache[key] = room_id
-
-            while len(self._cache) > self._size_of_cache:
-                k, r = self._cache.popitem()
-                self._earliest_key = max(k, self._earliest_key)
-                self._room_to_key.pop(r, None)
-
-    @defer.inlineCallbacks
-    def _get_earliest_key(self, store):
-        if self._earliest_key is None:
-            self._earliest_key = yield store.get_max_receipt_stream_id()
-            self._earliest_key = int(self._earliest_key)
-
-        defer.returnValue(self._earliest_key)