summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-07-04 19:44:55 +0100
committerMark Haines <mark.haines@matrix.org>2016-07-04 19:44:55 +0100
commit0fb76c71ac4bdd00e7524cf11668c13754d29a08 (patch)
tree0f24365a8a18af295318d93d00a75e2a57011d1c /synapse/storage/stream.py
parentUse a query that postgresql optimises better for get_events_around (diff)
downloadsynapse-0fb76c71ac4bdd00e7524cf11668c13754d29a08.tar.xz
Use different SQL for postgres and sqlite3 for when using multicolumn indexes
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py100
1 files changed, 50 insertions, 50 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 4dd11284e5..23b3a40aaf 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -40,6 +40,7 @@ from synapse.util.caches.descriptors import cached
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.logcontext import preserve_fn
+from synapse.storage.engines import PostgresEngine
 
 import logging
 
@@ -54,25 +55,41 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
-def lower_bound(token):
+def lower_bound(token, engine, inclusive=""):
     if token.topological is None:
-        return "(%d < %s)" % (token.stream, "stream_ordering")
+        return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
     else:
-        return "(%d < %s OR (%d = %s AND %d < %s))" % (
+        if isinstance(engine, PostgresEngine):
+            # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
+            # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
+            # use the later form when running against postgres.
+            return "((%d,%d) <%s (%s,%s))" % (
+                token.topological, token.stream, inclusive,
+                "topological_ordering", "stream_ordering",
+            )
+        return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
             token.topological, "topological_ordering",
             token.topological, "topological_ordering",
-            token.stream, "stream_ordering",
+            token.stream, inclusive, "stream_ordering",
         )
 
 
-def upper_bound(token):
+def upper_bound(token, engine, inclusive="="):
     if token.topological is None:
-        return "(%d >= %s)" % (token.stream, "stream_ordering")
+        return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
     else:
-        return "(%d > %s OR (%d = %s AND %d >= %s))" % (
+        if isinstance(engine, PostgresEngine):
+            # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
+            # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
+            # use the later form when running against postgres.
+            return "((%d,%d) >%s (%s,%s))" % (
+                token.topological, token.stream, inclusive,
+                "topological_ordering", "stream_ordering",
+            )
+        return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
             token.topological, "topological_ordering",
             token.topological, "topological_ordering",
-            token.stream, "stream_ordering",
+            token.stream, inclusive, "stream_ordering",
         )
 
 
@@ -308,18 +325,22 @@ class StreamStore(SQLBaseStore):
         args = [False, room_id]
         if direction == 'b':
             order = "DESC"
-            bounds = upper_bound(RoomStreamToken.parse(from_key))
+            bounds = upper_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
             if to_key:
-                bounds = "%s AND %s" % (
-                    bounds, lower_bound(RoomStreamToken.parse(to_key))
-                )
+                bounds = "%s AND %s" % (bounds, lower_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
         else:
             order = "ASC"
-            bounds = lower_bound(RoomStreamToken.parse(from_key))
+            bounds = lower_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
             if to_key:
-                bounds = "%s AND %s" % (
-                    bounds, upper_bound(RoomStreamToken.parse(to_key))
-                )
+                bounds = "%s AND %s" % (bounds, upper_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
 
         if int(limit) > 0:
             args.append(int(limit))
@@ -586,35 +607,24 @@ class StreamStore(SQLBaseStore):
             retcols=["stream_ordering", "topological_ordering"],
         )
 
-        stream_ordering = results["stream_ordering"]
-        topological_ordering = results["topological_ordering"]
+        token = RoomStreamToken(
+            results["topological_ordering"],
+            results["stream_ordering"],
+        )
 
         query_before = (
             "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " WHERE room_id = ? AND topological_ordering < ?"
-            " UNION ALL "
-            " SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
+            " WHERE room_id = ? AND %s"
             " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
-        )
+        ) % (upper_bound(token, self.database_engine, inclusive=""),)
 
         query_after = (
             "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " WHERE room_id = ? AND topological_ordering > ?"
-            " UNION ALL"
-            " SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
+            " WHERE room_id = ? AND %s"
             " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
-        )
+        ) % (lower_bound(token, self.database_engine, inclusive=""),)
 
-        txn.execute(
-            query_before,
-            (
-                room_id, topological_ordering,
-                room_id, topological_ordering, stream_ordering,
-                before_limit,
-            )
-        )
+        txn.execute(query_before, (room_id, before_limit))
 
         rows = self.cursor_to_dict(txn)
         events_before = [r["event_id"] for r in rows]
@@ -626,18 +636,11 @@ class StreamStore(SQLBaseStore):
             ))
         else:
             start_token = str(RoomStreamToken(
-                topological_ordering,
-                stream_ordering - 1,
+                token.topological,
+                token.stream - 1,
             ))
 
-        txn.execute(
-            query_after,
-            (
-                room_id, topological_ordering,
-                room_id, topological_ordering, stream_ordering,
-                after_limit,
-            )
-        )
+        txn.execute(query_after, (room_id, after_limit))
 
         rows = self.cursor_to_dict(txn)
         events_after = [r["event_id"] for r in rows]
@@ -648,10 +651,7 @@ class StreamStore(SQLBaseStore):
                 rows[-1]["stream_ordering"],
             ))
         else:
-            end_token = str(RoomStreamToken(
-                topological_ordering,
-                stream_ordering,
-            ))
+            end_token = str(token)
 
         return {
             "before": {