diff options
author | Erik Johnston <erik@matrix.org> | 2016-01-27 09:54:30 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-01-27 17:33:26 +0000 |
commit | b97f6626b6f9b91498d06a7ae113b9d20f1fc2ef (patch) | |
tree | d086f8f64c7b116e10ba5f819e5046484a73061e /synapse/storage | |
parent | Don't turn on profiling (diff) | |
download | synapse-b97f6626b6f9b91498d06a7ae113b9d20f1fc2ef.tar.xz |
Add cache to room stream
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/events.py | 2 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 65 | ||||
-rw-r--r-- | synapse/storage/stream.py | 133 |
3 files changed, 138 insertions, 62 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 298cb9bada..d96ea3a30e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -128,6 +128,8 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) + logger.info("Invalidating %r at %r", event.room_id, stream_ordering) + self._events_stream_cache.room_has_changed(None, event.room_id, stream_ordering) except _RollbackButIsFineException: pass diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index c0593e23ee..b7a4e77748 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches.room_change_cache import RoomStreamChangeCache from twisted.internet import defer @@ -31,8 +32,8 @@ class ReceiptsStore(SQLBaseStore): def __init__(self, hs): super(ReceiptsStore, self).__init__(hs) - self._receipts_stream_cache = _RoomStreamChangeCache( - self._receipts_id_gen.get_max_token(None) + self._receipts_stream_cache = RoomStreamChangeCache( + "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None) ) @cached(num_args=2) @@ -370,63 +371,3 @@ class ReceiptsStore(SQLBaseStore): "data": json.dumps(data), } ) - - -class _RoomStreamChangeCache(object): - """Keeps track of the stream_id of the latest change in rooms. - - Given a list of rooms and stream key, it will give a subset of rooms that - may have changed since that key. If the key is too old then the cache - will simply return all rooms. - """ - def __init__(self, current_key, size_of_cache=10000): - self._size_of_cache = size_of_cache - self._room_to_key = {} - self._cache = sorteddict() - self._earliest_key = current_key - self.name = "ReceiptsRoomChangeCache" - caches_by_name[self.name] = self._cache - - @defer.inlineCallbacks - def get_rooms_changed(self, store, room_ids, key): - """Returns subset of room ids that have had new receipts since the - given key. If the key is too old it will just return the given list. - """ - if key > (yield self._get_earliest_key(store)): - keys = self._cache.keys() - i = keys.bisect_right(key) - - result = set( - self._cache[k] for k in keys[i:] - ).intersection(room_ids) - - cache_counter.inc_hits(self.name) - else: - result = room_ids - cache_counter.inc_misses(self.name) - - defer.returnValue(result) - - @defer.inlineCallbacks - def room_has_changed(self, store, room_id, key): - """Informs the cache that the room has been changed at the given key. - """ - if key > (yield self._get_earliest_key(store)): - old_key = self._room_to_key.get(room_id, None) - if old_key: - key = max(key, old_key) - self._cache.pop(old_key, None) - self._cache[key] = room_id - - while len(self._cache) > self._size_of_cache: - k, r = self._cache.popitem() - self._earliest_key = max(k, self._earliest_key) - self._room_to_key.pop(r, None) - - @defer.inlineCallbacks - def _get_earliest_key(self, store): - if self._earliest_key is None: - self._earliest_key = yield store.get_max_receipt_stream_id() - self._earliest_key = int(self._earliest_key) - - defer.returnValue(self._earliest_key) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index e31bad258a..3a32a0019a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,6 +37,7 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.room_change_cache import RoomStreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -77,6 +78,12 @@ def upper_bound(token): class StreamStore(SQLBaseStore): + def __init__(self, hs): + super(StreamStore, self).__init__(hs) + + self._events_stream_cache = RoomStreamChangeCache( + "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) + ) @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): @@ -157,6 +164,132 @@ class StreamStore(SQLBaseStore): results = yield self.runInteraction("get_appservice_room_stream", f) defer.returnValue(results) + @defer.inlineCallbacks + def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0): + from_id = RoomStreamToken.parse_stream_token(from_key).stream + + room_ids = yield self._events_stream_cache.get_rooms_changed( + self, room_ids, from_id + ) + + if not room_ids: + defer.returnValue({}) + + results = {} + room_ids = list(room_ids) + for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): + res = yield defer.gatherResults([ + self.get_recent_room_events_stream_for_room( + room_id, from_key, to_key, limit + ).addCallback(lambda r, rm: (rm, r), room_id) + for room_id in room_ids + ]) + results.update(dict(res)) + + defer.returnValue(results) + + @defer.inlineCallbacks + def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + defer.returnValue(([], from_key)) + + has_changed = yield self._events_stream_cache.get_room_has_changed( + room_id, from_id + ) + + if not has_changed: + defer.returnValue(([], from_key)) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering > ? AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, from_id, to_id, limit)) + else: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, to_id, limit)) + + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + ret.reverse() + + self._set_before_and_after(ret, rows) + + if rows: + key = "s%d" % min(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = from_key + + return ret, key + res = yield self.runInteraction("get_recent_room_events_stream_for_room", f) + defer.returnValue(res) + + def get_room_changes_for_user(self, user_id, from_key, to_key): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + return defer.succeed([]) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND e.stream_ordering > ? AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + ) + txn.execute(sql, (user_id, from_id, to_id,)) + else: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + ) + txn.execute(sql, (user_id, to_id,)) + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + return ret + + return self.runInteraction("get_room_changes_for_user", f) + @log_function def get_room_events_stream( self, |