summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-02-02 15:58:14 +0000
committerErik Johnston <erik@matrix.org>2016-02-02 15:58:14 +0000
commit477b1ed6cfd130e5a004cda0c0b84509da2aa006 (patch)
tree634df4231daca6072969587818c01efa8c4f6398 /synapse/storage/stream.py
parentMerge pull request #545 from matrix-org/erikj/sync (diff)
downloadsynapse-477b1ed6cfd130e5a004cda0c0b84509da2aa006.tar.xz
Fetch events in a separate transaction.
This has a couple of benefits:

- It reduces the time of transactions, allowing other database requests
  to run.
- Fetching events is given a dedicated database thread, and so can't
  starve other database requests.
Diffstat (limited to '')
-rw-r--r--synapse/storage/stream.py55
1 files changed, 29 insertions, 26 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a03458c2fc..bcae3d718e 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -220,27 +220,29 @@ class StreamStore(SQLBaseStore):
 
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
-            ret.reverse()
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            if rows:
-                key = "s%d" % min(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = from_key
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-            return ret, key
-        res = yield self.runInteraction("get_room_events_stream_for_room", f)
-        defer.returnValue(res)
+        ret.reverse()
 
+        if rows:
+            key = "s%d" % min(r["stream_ordering"] for r in rows)
+        else:
+            # Assume we didn't get anything because there was nothing to
+            # get.
+            key = from_key
+
+        defer.returnValue((ret, key))
+
+    @defer.inlineCallbacks
     def get_room_changes_for_user(self, user_id, from_key, to_key):
         if from_key is not None:
             from_id = RoomStreamToken.parse_stream_token(from_key).stream
@@ -249,14 +251,14 @@ class StreamStore(SQLBaseStore):
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
-            return defer.succeed([])
+            defer.returnValue([])
 
         if from_id:
             has_changed = self._membership_stream_cache.has_entity_changed(
                 user_id, int(from_id)
             )
             if not has_changed:
-                return defer.succeed([])
+                defer.returnValue([])
 
         def f(txn):
             if from_id is not None:
@@ -281,17 +283,18 @@ class StreamStore(SQLBaseStore):
                 txn.execute(sql, (user_id, to_id,))
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
+
+        rows = yield self.runInteraction("get_room_changes_for_user", f)
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            return ret
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-        return self.runInteraction("get_room_changes_for_user", f)
+        defer.returnValue(ret)
 
     def get_room_events_stream(
         self,