diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 59 |
1 files changed, 31 insertions, 28 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a03458c2fc..2c49a5e499 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) - for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): + for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ self.get_room_events_stream_for_room( room_id, from_key, to_key, limit @@ -220,28 +220,30 @@ class StreamStore(SQLBaseStore): rows = self.cursor_to_dict(txn) - ret = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows - self._set_before_and_after(ret, rows, topo_order=False) + rows = yield self.runInteraction("get_room_events_stream_for_room", f) - ret.reverse() + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - if rows: - key = "s%d" % min(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = from_key + self._set_before_and_after(ret, rows, topo_order=False) - return ret, key - res = yield self.runInteraction("get_room_events_stream_for_room", f) - defer.returnValue(res) + ret.reverse() - def get_room_changes_for_user(self, user_id, from_key, to_key): + if rows: + key = "s%d" % min(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = from_key + + defer.returnValue((ret, key)) + + @defer.inlineCallbacks + def get_membership_changes_for_user(self, user_id, from_key, to_key): if from_key is not None: from_id = RoomStreamToken.parse_stream_token(from_key).stream else: @@ -249,14 +251,14 @@ class StreamStore(SQLBaseStore): to_id = RoomStreamToken.parse_stream_token(to_key).stream if from_key == to_key: - return defer.succeed([]) + defer.returnValue([]) if from_id: has_changed = self._membership_stream_cache.has_entity_changed( user_id, int(from_id) ) if not has_changed: - return defer.succeed([]) + defer.returnValue([]) def f(txn): if from_id is not None: @@ -281,17 +283,18 @@ class StreamStore(SQLBaseStore): txn.execute(sql, (user_id, to_id,)) rows = self.cursor_to_dict(txn) - ret = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows + + rows = yield self.runInteraction("get_membership_changes_for_user", f) - self._set_before_and_after(ret, rows, topo_order=False) + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - return ret + self._set_before_and_after(ret, rows, topo_order=False) - return self.runInteraction("get_room_changes_for_user", f) + defer.returnValue(ret) def get_room_events_stream( self, |