diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index e31bad258a..3a32a0019a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,6 +37,7 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.room_change_cache import RoomStreamChangeCache
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function
@@ -77,6 +78,12 @@ def upper_bound(token):
class StreamStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(StreamStore, self).__init__(hs)
+
+ self._events_stream_cache = RoomStreamChangeCache(
+ "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
+ )
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
@@ -157,6 +164,132 @@ class StreamStore(SQLBaseStore):
results = yield self.runInteraction("get_appservice_room_stream", f)
defer.returnValue(results)
+ @defer.inlineCallbacks
+ def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+
+ room_ids = yield self._events_stream_cache.get_rooms_changed(
+ self, room_ids, from_id
+ )
+
+ if not room_ids:
+ defer.returnValue({})
+
+ results = {}
+ room_ids = list(room_ids)
+ for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+ res = yield defer.gatherResults([
+ self.get_recent_room_events_stream_for_room(
+ room_id, from_key, to_key, limit
+ ).addCallback(lambda r, rm: (rm, r), room_id)
+ for room_id in room_ids
+ ])
+ results.update(dict(res))
+
+ defer.returnValue(results)
+
+ @defer.inlineCallbacks
+ def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
+ if from_key is not None:
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+ else:
+ from_id = None
+ to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+ if from_key == to_key:
+ defer.returnValue(([], from_key))
+
+ has_changed = yield self._events_stream_cache.get_room_has_changed(
+ room_id, from_id
+ )
+
+ if not has_changed:
+ defer.returnValue(([], from_key))
+
+ def f(txn):
+ if from_id is not None:
+ sql = (
+ "SELECT event_id, stream_ordering FROM events WHERE"
+ " room_id = ?"
+ " AND not outlier"
+ " AND stream_ordering > ? AND stream_ordering <= ?"
+ " ORDER BY stream_ordering DESC LIMIT ?"
+ )
+ txn.execute(sql, (room_id, from_id, to_id, limit))
+ else:
+ sql = (
+ "SELECT event_id, stream_ordering FROM events WHERE"
+ " room_id = ?"
+ " AND not outlier"
+ " AND stream_ordering <= ?"
+ " ORDER BY stream_ordering DESC LIMIT ?"
+ )
+ txn.execute(sql, (room_id, to_id, limit))
+
+ rows = self.cursor_to_dict(txn)
+
+ ret = self._get_events_txn(
+ txn,
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ ret.reverse()
+
+ self._set_before_and_after(ret, rows)
+
+ if rows:
+ key = "s%d" % min(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = from_key
+
+ return ret, key
+ res = yield self.runInteraction("get_recent_room_events_stream_for_room", f)
+ defer.returnValue(res)
+
+ def get_room_changes_for_user(self, user_id, from_key, to_key):
+ if from_key is not None:
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+ else:
+ from_id = None
+ to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+ if from_key == to_key:
+ return defer.succeed([])
+
+ def f(txn):
+ if from_id is not None:
+ sql = (
+ "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m"
+ " WHERE e.event_id = m.event_id"
+ " AND m.user_id = ?"
+ " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ )
+ txn.execute(sql, (user_id, from_id, to_id,))
+ else:
+ sql = (
+ "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m"
+ " WHERE e.event_id = m.event_id"
+ " AND m.user_id = ?"
+ " AND stream_ordering <= ?"
+ " ORDER BY stream_ordering ASC"
+ )
+ txn.execute(sql, (user_id, to_id,))
+ rows = self.cursor_to_dict(txn)
+
+ ret = self._get_events_txn(
+ txn,
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ return ret
+
+ return self.runInteraction("get_room_changes_for_user", f)
+
@log_function
def get_room_events_stream(
self,
|