summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py59
1 files changed, 31 insertions, 28 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a03458c2fc..2c49a5e499 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore):
 
         results = {}
         room_ids = list(room_ids)
-        for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+        for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
             res = yield defer.gatherResults([
                 self.get_room_events_stream_for_room(
                     room_id, from_key, to_key, limit
@@ -220,28 +220,30 @@ class StreamStore(SQLBaseStore):
 
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
-            ret.reverse()
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            if rows:
-                key = "s%d" % min(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = from_key
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-            return ret, key
-        res = yield self.runInteraction("get_room_events_stream_for_room", f)
-        defer.returnValue(res)
+        ret.reverse()
 
-    def get_room_changes_for_user(self, user_id, from_key, to_key):
+        if rows:
+            key = "s%d" % min(r["stream_ordering"] for r in rows)
+        else:
+            # Assume we didn't get anything because there was nothing to
+            # get.
+            key = from_key
+
+        defer.returnValue((ret, key))
+
+    @defer.inlineCallbacks
+    def get_membership_changes_for_user(self, user_id, from_key, to_key):
         if from_key is not None:
             from_id = RoomStreamToken.parse_stream_token(from_key).stream
         else:
@@ -249,14 +251,14 @@ class StreamStore(SQLBaseStore):
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
-            return defer.succeed([])
+            defer.returnValue([])
 
         if from_id:
             has_changed = self._membership_stream_cache.has_entity_changed(
                 user_id, int(from_id)
             )
             if not has_changed:
-                return defer.succeed([])
+                defer.returnValue([])
 
         def f(txn):
             if from_id is not None:
@@ -281,17 +283,18 @@ class StreamStore(SQLBaseStore):
                 txn.execute(sql, (user_id, to_id,))
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
+
+        rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            return ret
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-        return self.runInteraction("get_room_changes_for_user", f)
+        defer.returnValue(ret)
 
     def get_room_events_stream(
         self,