diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/events.py | 3 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 67 | ||||
-rw-r--r-- | synapse/storage/stream.py | 157 |
3 files changed, 160 insertions, 67 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 298cb9bada..80187722ea 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -128,6 +128,9 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) + self._events_stream_cache.room_has_changed( + None, event.room_id, stream_ordering + ) except _RollbackButIsFineException: pass diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index c0593e23ee..7118368d97 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -15,11 +15,10 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches.room_change_cache import RoomStreamChangeCache from twisted.internet import defer -from blist import sorteddict import logging import ujson as json @@ -31,8 +30,8 @@ class ReceiptsStore(SQLBaseStore): def __init__(self, hs): super(ReceiptsStore, self).__init__(hs) - self._receipts_stream_cache = _RoomStreamChangeCache( - self._receipts_id_gen.get_max_token(None) + self._receipts_stream_cache = RoomStreamChangeCache( + "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None) ) @cached(num_args=2) @@ -370,63 +369,3 @@ class ReceiptsStore(SQLBaseStore): "data": json.dumps(data), } ) - - -class _RoomStreamChangeCache(object): - """Keeps track of the stream_id of the latest change in rooms. - - Given a list of rooms and stream key, it will give a subset of rooms that - may have changed since that key. If the key is too old then the cache - will simply return all rooms. - """ - def __init__(self, current_key, size_of_cache=10000): - self._size_of_cache = size_of_cache - self._room_to_key = {} - self._cache = sorteddict() - self._earliest_key = current_key - self.name = "ReceiptsRoomChangeCache" - caches_by_name[self.name] = self._cache - - @defer.inlineCallbacks - def get_rooms_changed(self, store, room_ids, key): - """Returns subset of room ids that have had new receipts since the - given key. If the key is too old it will just return the given list. - """ - if key > (yield self._get_earliest_key(store)): - keys = self._cache.keys() - i = keys.bisect_right(key) - - result = set( - self._cache[k] for k in keys[i:] - ).intersection(room_ids) - - cache_counter.inc_hits(self.name) - else: - result = room_ids - cache_counter.inc_misses(self.name) - - defer.returnValue(result) - - @defer.inlineCallbacks - def room_has_changed(self, store, room_id, key): - """Informs the cache that the room has been changed at the given key. - """ - if key > (yield self._get_earliest_key(store)): - old_key = self._room_to_key.get(room_id, None) - if old_key: - key = max(key, old_key) - self._cache.pop(old_key, None) - self._cache[key] = room_id - - while len(self._cache) > self._size_of_cache: - k, r = self._cache.popitem() - self._earliest_key = max(k, self._earliest_key) - self._room_to_key.pop(r, None) - - @defer.inlineCallbacks - def _get_earliest_key(self, store): - if self._earliest_key is None: - self._earliest_key = yield store.get_max_receipt_stream_id() - self._earliest_key = int(self._earliest_key) - - defer.returnValue(self._earliest_key) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index e31bad258a..5096b46864 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,6 +37,7 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.room_change_cache import RoomStreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -77,6 +78,12 @@ def upper_bound(token): class StreamStore(SQLBaseStore): + def __init__(self, hs): + super(StreamStore, self).__init__(hs) + + self._events_stream_cache = RoomStreamChangeCache( + "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) + ) @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): @@ -157,6 +164,134 @@ class StreamStore(SQLBaseStore): results = yield self.runInteraction("get_appservice_room_stream", f) defer.returnValue(results) + @defer.inlineCallbacks + def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0): + from_id = RoomStreamToken.parse_stream_token(from_key).stream + + room_ids = yield self._events_stream_cache.get_rooms_changed( + self, room_ids, from_id + ) + + if not room_ids: + defer.returnValue({}) + + results = {} + room_ids = list(room_ids) + for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): + res = yield defer.gatherResults([ + self.get_room_events_stream_for_room( + room_id, from_key, to_key, limit + ).addCallback(lambda r, rm: (rm, r), room_id) + for room_id in room_ids + ]) + results.update(dict(res)) + + defer.returnValue(results) + + @defer.inlineCallbacks + def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + defer.returnValue(([], from_key)) + + has_changed = yield self._events_stream_cache.get_room_has_changed( + room_id, from_id + ) + + if not has_changed: + defer.returnValue(([], from_key)) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering > ? AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, from_id, to_id, limit)) + else: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, to_id, limit)) + + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows, topo_order=False) + + ret.reverse() + + if rows: + key = "s%d" % min(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = from_key + + return ret, key + res = yield self.runInteraction("get_room_events_stream_for_room", f) + defer.returnValue(res) + + def get_room_changes_for_user(self, user_id, from_key, to_key): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + return defer.succeed([]) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e," + " room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND e.stream_ordering > ? AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + ) + txn.execute(sql, (user_id, from_id, to_id,)) + else: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e," + " room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + ) + txn.execute(sql, (user_id, to_id,)) + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + return ret + + return self.runInteraction("get_room_changes_for_user", f) + @log_function def get_room_events_stream( self, @@ -174,7 +309,8 @@ class StreamStore(SQLBaseStore): "SELECT c.room_id FROM history_visibility AS h" " INNER JOIN current_state_events AS c" " ON h.event_id = c.event_id" - " WHERE c.room_id IN (%s) AND h.history_visibility = 'world_readable'" % ( + " WHERE c.room_id IN (%s)" + " AND h.history_visibility = 'world_readable'" % ( ",".join(map(lambda _: "?", room_ids)) ) ) @@ -434,6 +570,18 @@ class StreamStore(SQLBaseStore): row["topological_ordering"], row["stream_ordering"],) ) + def get_max_topological_token_for_stream_and_room(self, room_id, stream_key): + sql = ( + "SELECT max(topological_ordering) FROM events" + " WHERE room_id = ? AND stream_ordering < ?" + ) + return self._execute( + "get_max_topological_token_for_stream_and_room", None, + sql, room_id, stream_key, + ).addCallback( + lambda r: r[0][0] if r else 0 + ) + def _get_max_topological_txn(self, txn): txn.execute( "SELECT MAX(topological_ordering) FROM events" @@ -445,10 +593,13 @@ class StreamStore(SQLBaseStore): return rows[0][0] if rows else 0 @staticmethod - def _set_before_and_after(events, rows): + def _set_before_and_after(events, rows, topo_order=True): for event, row in zip(events, rows): stream = row["stream_ordering"] - topo = event.depth + if topo_order: + topo = event.depth + else: + topo = None internal = event.internal_metadata internal.before = str(RoomStreamToken(topo, stream - 1)) internal.after = str(RoomStreamToken(topo, stream)) |