diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index e31bad258a..3a32a0019a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,6 +37,7 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.room_change_cache import RoomStreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -77,6 +78,12 @@ def upper_bound(token): class StreamStore(SQLBaseStore): + def __init__(self, hs): + super(StreamStore, self).__init__(hs) + + self._events_stream_cache = RoomStreamChangeCache( + "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) + ) @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): @@ -157,6 +164,132 @@ class StreamStore(SQLBaseStore): results = yield self.runInteraction("get_appservice_room_stream", f) defer.returnValue(results) + @defer.inlineCallbacks + def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0): + from_id = RoomStreamToken.parse_stream_token(from_key).stream + + room_ids = yield self._events_stream_cache.get_rooms_changed( + self, room_ids, from_id + ) + + if not room_ids: + defer.returnValue({}) + + results = {} + room_ids = list(room_ids) + for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): + res = yield defer.gatherResults([ + self.get_recent_room_events_stream_for_room( + room_id, from_key, to_key, limit + ).addCallback(lambda r, rm: (rm, r), room_id) + for room_id in room_ids + ]) + results.update(dict(res)) + + defer.returnValue(results) + + @defer.inlineCallbacks + def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + defer.returnValue(([], from_key)) + + has_changed = yield self._events_stream_cache.get_room_has_changed( + room_id, from_id + ) + + if not has_changed: + defer.returnValue(([], from_key)) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering > ? AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, from_id, to_id, limit)) + else: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, to_id, limit)) + + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + ret.reverse() + + self._set_before_and_after(ret, rows) + + if rows: + key = "s%d" % min(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = from_key + + return ret, key + res = yield self.runInteraction("get_recent_room_events_stream_for_room", f) + defer.returnValue(res) + + def get_room_changes_for_user(self, user_id, from_key, to_key): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + return defer.succeed([]) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND e.stream_ordering > ? AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + ) + txn.execute(sql, (user_id, from_id, to_id,)) + else: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + ) + txn.execute(sql, (user_id, to_id,)) + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + return ret + + return self.runInteraction("get_room_changes_for_user", f) + @log_function def get_room_events_stream( self, |