summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-02-02 16:23:54 +0000
committerErik Johnston <erik@matrix.org>2016-02-02 16:23:54 +0000
commit43e13dbd4d22632a56903826df56cc8aa28be944 (patch)
tree96557786f18ba3f815553d3e4e34354135a7128c /synapse/storage/stream.py
parentMerge pull request #545 from matrix-org/erikj/sync (diff)
parents/get_room_changes_for_user/get_membership_changes_for_user/ (diff)
downloadsynapse-43e13dbd4d22632a56903826df56cc8aa28be944.tar.xz
Merge pull request #549 from matrix-org/erikj/sync
Fetch events in a separate transaction.
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py57
1 files changed, 30 insertions, 27 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a03458c2fc..338a9d40d5 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -220,28 +220,30 @@ class StreamStore(SQLBaseStore):
 
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
-            ret.reverse()
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            if rows:
-                key = "s%d" % min(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = from_key
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-            return ret, key
-        res = yield self.runInteraction("get_room_events_stream_for_room", f)
-        defer.returnValue(res)
+        ret.reverse()
 
-    def get_room_changes_for_user(self, user_id, from_key, to_key):
+        if rows:
+            key = "s%d" % min(r["stream_ordering"] for r in rows)
+        else:
+            # Assume we didn't get anything because there was nothing to
+            # get.
+            key = from_key
+
+        defer.returnValue((ret, key))
+
+    @defer.inlineCallbacks
+    def get_membership_changes_for_user(self, user_id, from_key, to_key):
         if from_key is not None:
             from_id = RoomStreamToken.parse_stream_token(from_key).stream
         else:
@@ -249,14 +251,14 @@ class StreamStore(SQLBaseStore):
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
-            return defer.succeed([])
+            defer.returnValue([])
 
         if from_id:
             has_changed = self._membership_stream_cache.has_entity_changed(
                 user_id, int(from_id)
             )
             if not has_changed:
-                return defer.succeed([])
+                defer.returnValue([])
 
         def f(txn):
             if from_id is not None:
@@ -281,17 +283,18 @@ class StreamStore(SQLBaseStore):
                 txn.execute(sql, (user_id, to_id,))
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
+
+        rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            return ret
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-        return self.runInteraction("get_room_changes_for_user", f)
+        defer.returnValue(ret)
 
     def get_room_events_stream(
         self,