diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a03458c2fc..bcae3d718e 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -220,27 +220,29 @@ class StreamStore(SQLBaseStore):
rows = self.cursor_to_dict(txn)
- ret = self._get_events_txn(
- txn,
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
+ return rows
- self._set_before_and_after(ret, rows, topo_order=False)
+ rows = yield self.runInteraction("get_room_events_stream_for_room", f)
- ret.reverse()
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
- if rows:
- key = "s%d" % min(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = from_key
+ self._set_before_and_after(ret, rows, topo_order=False)
- return ret, key
- res = yield self.runInteraction("get_room_events_stream_for_room", f)
- defer.returnValue(res)
+ ret.reverse()
+ if rows:
+ key = "s%d" % min(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = from_key
+
+ defer.returnValue((ret, key))
+
+ @defer.inlineCallbacks
def get_room_changes_for_user(self, user_id, from_key, to_key):
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
@@ -249,14 +251,14 @@ class StreamStore(SQLBaseStore):
to_id = RoomStreamToken.parse_stream_token(to_key).stream
if from_key == to_key:
- return defer.succeed([])
+ defer.returnValue([])
if from_id:
has_changed = self._membership_stream_cache.has_entity_changed(
user_id, int(from_id)
)
if not has_changed:
- return defer.succeed([])
+ defer.returnValue([])
def f(txn):
if from_id is not None:
@@ -281,17 +283,18 @@ class StreamStore(SQLBaseStore):
txn.execute(sql, (user_id, to_id,))
rows = self.cursor_to_dict(txn)
- ret = self._get_events_txn(
- txn,
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
+ return rows
+
+ rows = yield self.runInteraction("get_room_changes_for_user", f)
- self._set_before_and_after(ret, rows, topo_order=False)
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
- return ret
+ self._set_before_and_after(ret, rows, topo_order=False)
- return self.runInteraction("get_room_changes_for_user", f)
+ defer.returnValue(ret)
def get_room_events_stream(
self,
|