summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/stream.py62
1 files changed, 49 insertions, 13 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 56304999dc..f18fb63c5e 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -40,7 +40,7 @@ from synapse.util.caches.descriptors import cached
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.logcontext import preserve_fn
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 import logging
 
@@ -614,19 +614,55 @@ class StreamStore(SQLBaseStore):
             results["stream_ordering"],
         )
 
-        query_before = (
-            "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " WHERE room_id = ? AND %s"
-            " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
-        ) % (upper_bound(token, self.database_engine, inclusive=False),)
+        if isinstance(self.database_engine, Sqlite3Engine):
+            # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
+            # So we give pass it to SQLite3 as the UNION ALL of the two queries.
+
+            query_before = (
+                "SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND topological_ordering < ?"
+                " UNION ALL"
+                " SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+            )
+            before_args = (
+                room_id, token.topological,
+                room_id, token.topological, token.stream,
+                before_limit,
+            )
+
+            query_after = (
+                "SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND topological_ordering > ?"
+                " UNION ALL"
+                " SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
+                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
+            )
+            after_args = (
+                room_id, token.topological,
+                room_id, token.topological, token.stream,
+                after_limit,
+            )
+        else:
+            query_before = (
+                "SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND %s"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+            ) % (upper_bound(token, self.database_engine, inclusive=False),)
+
+            before_args = (room_id, before_limit),
+
+            query_after = (
+                "SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND %s"
+                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
+            ) % (lower_bound(token, self.database_engine, inclusive=False),)
 
-        query_after = (
-            "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " WHERE room_id = ? AND %s"
-            " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
-        ) % (lower_bound(token, self.database_engine, inclusive=False),)
+            after_args = (room_id, after_limit)
 
-        txn.execute(query_before, (room_id, before_limit))
+        txn.execute(query_before, before_args)
 
         rows = self.cursor_to_dict(txn)
         events_before = [r["event_id"] for r in rows]
@@ -642,7 +678,7 @@ class StreamStore(SQLBaseStore):
                 token.stream - 1,
             ))
 
-        txn.execute(query_after, (room_id, after_limit))
+        txn.execute(query_after, after_args)
 
         rows = self.cursor_to_dict(txn)
         events_after = [r["event_id"] for r in rows]