summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py5
-rw-r--r--synapse/storage/event_push_actions.py37
-rw-r--r--synapse/storage/events.py158
-rw-r--r--synapse/storage/media_repository.py44
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/registration.py61
-rw-r--r--synapse/storage/room.py44
-rw-r--r--synapse/storage/schema/delta/33/remote_media_ts.py31
-rw-r--r--synapse/storage/stream.py148
9 files changed, 389 insertions, 141 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 32c6677d47..d766a30299 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -807,6 +807,11 @@ class SQLBaseStore(object):
         if txn.rowcount > 1:
             raise StoreError(500, "more than one row matched")
 
+    def _simple_delete(self, table, keyvalues, desc):
+        return self.runInteraction(
+            desc, self._simple_delete_txn, table, keyvalues
+        )
+
     @staticmethod
     def _simple_delete_txn(txn, table, keyvalues):
         sql = "DELETE FROM %s WHERE %s" % (
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 940e11d7a2..3d93285f84 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -16,6 +16,8 @@
 from ._base import SQLBaseStore
 from twisted.internet import defer
 from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.types import RoomStreamToken
+from .stream import lower_bound
 
 import logging
 import ujson as json
@@ -73,6 +75,9 @@ class EventPushActionsStore(SQLBaseStore):
 
             stream_ordering = results[0][0]
             topological_ordering = results[0][1]
+            token = RoomStreamToken(
+                topological_ordering, stream_ordering
+            )
 
             sql = (
                 "SELECT sum(notif), sum(highlight)"
@@ -80,15 +85,10 @@ class EventPushActionsStore(SQLBaseStore):
                 " WHERE"
                 " user_id = ?"
                 " AND room_id = ?"
-                " AND ("
-                "       topological_ordering > ?"
-                "       OR (topological_ordering = ? AND stream_ordering > ?)"
-                ")"
-            )
-            txn.execute(sql, (
-                user_id, room_id,
-                topological_ordering, topological_ordering, stream_ordering
-            ))
+                " AND %s"
+            ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+            txn.execute(sql, (user_id, room_id))
             row = txn.fetchone()
             if row:
                 return {
@@ -152,7 +152,7 @@ class EventPushActionsStore(SQLBaseStore):
             if max_stream_ordering is not None:
                 sql += " AND ep.stream_ordering <= ?"
                 args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+            sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
             args.append(limit)
             txn.execute(sql, args)
             return txn.fetchall()
@@ -176,14 +176,16 @@ class EventPushActionsStore(SQLBaseStore):
             if max_stream_ordering is not None:
                 sql += " AND ep.stream_ordering <= ?"
                 args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering ASC"
+            sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
+            args.append(limit)
             txn.execute(sql, args)
             return txn.fetchall()
         no_read_receipt = yield self.runInteraction(
             "get_unread_push_actions_for_user_in_range", get_no_receipt
         )
 
-        defer.returnValue([
+        # Make a list of dicts from the two sets of results.
+        notifs = [
             {
                 "event_id": row[0],
                 "room_id": row[1],
@@ -191,7 +193,16 @@ class EventPushActionsStore(SQLBaseStore):
                 "actions": json.loads(row[3]),
                 "received_ts": row[4],
             } for row in after_read_receipt + no_read_receipt
-        ])
+        ]
+
+        # Now sort it so it's ordered correctly, since currently it will
+        # contain results from the first query, correctly ordered, followed
+        # by results from the second query, but we want them all ordered
+        # by received_ts
+        notifs.sort(key=lambda r: -(r['received_ts'] or 0))
+
+        # Now return the first `limit`
+        defer.returnValue(notifs[:limit])
 
     @defer.inlineCallbacks
     def get_time_of_last_push_action_before(self, stream_ordering):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6d978ffcd5..b582942164 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,6 +23,7 @@ from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
 
 from canonicaljson import encode_canonical_json
 from collections import deque, namedtuple
@@ -355,7 +356,6 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
-            txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
 
             # Add an entry to the current_state_resets table to record the point
             # where we clobbered the current state
@@ -666,12 +666,6 @@ class EventsStore(SQLBaseStore):
                 (event.room_id, event.type, event.state_key,)
             )
 
-            if event.type in [EventTypes.Name, EventTypes.Aliases]:
-                txn.call_after(
-                    self.get_room_name_and_aliases.invalidate,
-                    (event.room_id,)
-                )
-
             self._simple_upsert_txn(
                 txn,
                 "current_state_events",
@@ -1288,6 +1282,156 @@ class EventsStore(SQLBaseStore):
             )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
+    def delete_old_state(self, room_id, topological_ordering):
+        return self.runInteraction(
+            "delete_old_state",
+            self._delete_old_state_txn, room_id, topological_ordering
+        )
+
+    def _delete_old_state_txn(self, txn, room_id, topological_ordering):
+        """Deletes old room state
+        """
+
+        # Tables that should be pruned:
+        #     event_auth
+        #     event_backward_extremities
+        #     event_content_hashes
+        #     event_destinations
+        #     event_edge_hashes
+        #     event_edges
+        #     event_forward_extremities
+        #     event_json
+        #     event_push_actions
+        #     event_reference_hashes
+        #     event_search
+        #     event_signatures
+        #     event_to_state_groups
+        #     events
+        #     rejections
+        #     room_depth
+        #     state_groups
+        #     state_groups_state
+
+        # First ensure that we're not about to delete all the forward extremeties
+        txn.execute(
+            "SELECT e.event_id, e.depth FROM events as e "
+            "INNER JOIN event_forward_extremities as f "
+            "ON e.event_id = f.event_id "
+            "AND e.room_id = f.room_id "
+            "WHERE f.room_id = ?",
+            (room_id,)
+        )
+        rows = txn.fetchall()
+        max_depth = max(row[0] for row in rows)
+
+        if max_depth <= topological_ordering:
+            # We need to ensure we don't delete all the events from the datanase
+            # otherwise we wouldn't be able to send any events (due to not
+            # having any backwards extremeties)
+            raise SynapseError(
+                400, "topological_ordering is greater than forward extremeties"
+            )
+
+        txn.execute(
+            "SELECT event_id, state_key FROM events"
+            " LEFT JOIN state_events USING (room_id, event_id)"
+            " WHERE room_id = ? AND topological_ordering < ?",
+            (room_id, topological_ordering,)
+        )
+        event_rows = txn.fetchall()
+
+        # We calculate the new entries for the backward extremeties by finding
+        # all events that point to events that are to be purged
+        txn.execute(
+            "SELECT e.event_id FROM events as e"
+            " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
+            " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
+            " WHERE e.room_id = ? AND e.topological_ordering < ?"
+            " AND e2.topological_ordering >= ?",
+            (room_id, topological_ordering, topological_ordering)
+        )
+        new_backwards_extrems = txn.fetchall()
+
+        # Get all state groups that are only referenced by events that are
+        # to be deleted.
+        txn.execute(
+            "SELECT state_group FROM event_to_state_groups"
+            " INNER JOIN events USING (event_id)"
+            " WHERE state_group IN ("
+            "   SELECT DISTINCT state_group FROM events"
+            "   INNER JOIN event_to_state_groups USING (event_id)"
+            "   WHERE room_id = ? AND topological_ordering < ?"
+            " )"
+            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
+            (room_id, topological_ordering, topological_ordering)
+        )
+        state_rows = txn.fetchall()
+        txn.executemany(
+            "DELETE FROM state_groups_state WHERE state_group = ?",
+            state_rows
+        )
+        txn.executemany(
+            "DELETE FROM state_groups WHERE id = ?",
+            state_rows
+        )
+        # Delete all non-state
+        txn.executemany(
+            "DELETE FROM event_to_state_groups WHERE event_id = ?",
+            [(event_id,) for event_id, _ in event_rows]
+        )
+
+        txn.execute(
+            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+            (topological_ordering, room_id,)
+        )
+
+        # Delete all remote non-state events
+        to_delete = [
+            (event_id,) for event_id, state_key in event_rows
+            if state_key is None and not self.hs.is_mine_id(event_id)
+        ]
+        for table in (
+            "events",
+            "event_json",
+            "event_auth",
+            "event_content_hashes",
+            "event_destinations",
+            "event_edge_hashes",
+            "event_edges",
+            "event_forward_extremities",
+            "event_push_actions",
+            "event_reference_hashes",
+            "event_search",
+            "event_signatures",
+            "rejections",
+            "event_backward_extremities",
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE event_id = ?" % (table,),
+                to_delete
+            )
+
+        # Update backward extremeties
+        txn.executemany(
+            "INSERT INTO event_backward_extremities (room_id, event_id)"
+            " VALUES (?, ?)",
+            [(room_id, event_id) for event_id, in new_backwards_extrems]
+        )
+
+        txn.executemany(
+            "DELETE FROM events WHERE event_id = ?",
+            to_delete
+        )
+        # Mark all state and own events as outliers
+        txn.executemany(
+            "UPDATE events SET outlier = ?"
+            " WHERE event_id = ?",
+            [
+                (True, event_id,) for event_id, state_key in event_rows
+                if state_key is not None or self.hs.is_mine_id(event_id)
+            ]
+        )
+
 
 AllNewEventsResult = namedtuple("AllNewEventsResult", [
     "new_forward_events", "new_backfill_events",
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index a820fcf07f..4c0f82353d 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -157,10 +157,25 @@ class MediaRepositoryStore(SQLBaseStore):
                 "created_ts": time_now_ms,
                 "upload_name": upload_name,
                 "filesystem_id": filesystem_id,
+                "last_access_ts": time_now_ms,
             },
             desc="store_cached_remote_media",
         )
 
+    def update_cached_last_access_time(self, origin_id_tuples, time_ts):
+        def update_cache_txn(txn):
+            sql = (
+                "UPDATE remote_media_cache SET last_access_ts = ?"
+                " WHERE media_origin = ? AND media_id = ?"
+            )
+
+            txn.executemany(sql, (
+                (time_ts, media_origin, media_id)
+                for media_origin, media_id in origin_id_tuples
+            ))
+
+        return self.runInteraction("update_cached_last_access_time", update_cache_txn)
+
     def get_remote_media_thumbnails(self, origin, media_id):
         return self._simple_select_list(
             "remote_media_cache_thumbnails",
@@ -190,3 +205,32 @@ class MediaRepositoryStore(SQLBaseStore):
             },
             desc="store_remote_media_thumbnail",
         )
+
+    def get_remote_media_before(self, before_ts):
+        sql = (
+            "SELECT media_origin, media_id, filesystem_id"
+            " FROM remote_media_cache"
+            " WHERE last_access_ts < ?"
+        )
+
+        return self._execute(
+            "get_remote_media_before", self.cursor_to_dict, sql, before_ts
+        )
+
+    def delete_remote_media(self, media_origin, media_id):
+        def delete_remote_media_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                "remote_media_cache",
+                keyvalues={
+                    "media_origin": media_origin, "media_id": media_id
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                "remote_media_cache_thumbnails",
+                keyvalues={
+                    "media_origin": media_origin, "media_id": media_id
+                },
+            )
+        return self.runInteraction("delete_remote_media", delete_remote_media_txn)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c8487c8838..8801669a6b 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 32
+SCHEMA_VERSION = 33
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 3de9e0f709..0a68341494 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -77,7 +77,7 @@ class RegistrationStore(SQLBaseStore):
     @defer.inlineCallbacks
     def register(self, user_id, token, password_hash,
                  was_guest=False, make_guest=False, appservice_id=None,
-                 create_profile_with_localpart=None):
+                 create_profile_with_localpart=None, admin=False):
         """Attempts to register an account.
 
         Args:
@@ -104,6 +104,7 @@ class RegistrationStore(SQLBaseStore):
             make_guest,
             appservice_id,
             create_profile_with_localpart,
+            admin
         )
         self.get_user_by_id.invalidate((user_id,))
         self.is_guest.invalidate((user_id,))
@@ -118,6 +119,7 @@ class RegistrationStore(SQLBaseStore):
         make_guest,
         appservice_id,
         create_profile_with_localpart,
+        admin,
     ):
         now = int(self.clock.time())
 
@@ -125,29 +127,33 @@ class RegistrationStore(SQLBaseStore):
 
         try:
             if was_guest:
-                txn.execute("UPDATE users SET"
-                            " password_hash = ?,"
-                            " upgrade_ts = ?,"
-                            " is_guest = ?"
-                            " WHERE name = ?",
-                            [password_hash, now, 1 if make_guest else 0, user_id])
+                self._simple_update_one_txn(
+                    txn,
+                    "users",
+                    keyvalues={
+                        "name": user_id,
+                    },
+                    updatevalues={
+                        "password_hash": password_hash,
+                        "upgrade_ts": now,
+                        "is_guest": 1 if make_guest else 0,
+                        "appservice_id": appservice_id,
+                        "admin": 1 if admin else 0,
+                    }
+                )
             else:
-                txn.execute("INSERT INTO users "
-                            "("
-                            "   name,"
-                            "   password_hash,"
-                            "   creation_ts,"
-                            "   is_guest,"
-                            "   appservice_id"
-                            ") "
-                            "VALUES (?,?,?,?,?)",
-                            [
-                                user_id,
-                                password_hash,
-                                now,
-                                1 if make_guest else 0,
-                                appservice_id,
-                            ])
+                self._simple_insert_txn(
+                    txn,
+                    "users",
+                    values={
+                        "name": user_id,
+                        "password_hash": password_hash,
+                        "creation_ts": now,
+                        "is_guest": 1 if make_guest else 0,
+                        "appservice_id": appservice_id,
+                        "admin": 1 if admin else 0,
+                    }
+                )
         except self.database_engine.module.IntegrityError:
             raise StoreError(
                 400, "User ID already taken.", errcode=Codes.USER_IN_USE
@@ -384,6 +390,15 @@ class RegistrationStore(SQLBaseStore):
             defer.returnValue(ret['user_id'])
         defer.returnValue(None)
 
+    def user_delete_threepids(self, user_id):
+        return self._simple_delete(
+            "user_threepids",
+            keyvalues={
+                "user_id": user_id,
+            },
+            desc="user_delete_threepids",
+        )
+
     @defer.inlineCallbacks
     def count_all_users(self):
         """Counts all users registered on the homeserver."""
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 97f9f1929c..8251f58670 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
 from .engines import PostgresEngine, Sqlite3Engine
 
 import collections
@@ -192,49 +191,6 @@ class RoomStore(SQLBaseStore):
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
 
-    @cachedInlineCallbacks()
-    def get_room_name_and_aliases(self, room_id):
-        def get_room_name(txn):
-            sql = (
-                "SELECT name FROM room_names"
-                " INNER JOIN current_state_events USING (room_id, event_id)"
-                " WHERE room_id = ?"
-                " LIMIT 1"
-            )
-
-            txn.execute(sql, (room_id,))
-            rows = txn.fetchall()
-            if rows:
-                return rows[0][0]
-            else:
-                return None
-
-            return [row[0] for row in txn.fetchall()]
-
-        def get_room_aliases(txn):
-            sql = (
-                "SELECT content FROM current_state_events"
-                " INNER JOIN events USING (room_id, event_id)"
-                " WHERE room_id = ?"
-            )
-            txn.execute(sql, (room_id,))
-            return [row[0] for row in txn.fetchall()]
-
-        name = yield self.runInteraction("get_room_name", get_room_name)
-        alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
-
-        aliases = []
-
-        for c in alias_contents:
-            try:
-                content = json.loads(c)
-            except:
-                continue
-
-            aliases.extend(content.get('aliases', []))
-
-        defer.returnValue((name, aliases))
-
     def add_event_report(self, room_id, event_id, user_id, reason, content,
                          received_ts):
         next_id = self._event_reports_id_gen.get_next()
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
new file mode 100644
index 0000000000..55ae43f395
--- /dev/null
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -0,0 +1,31 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+
+ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    cur.execute(ALTER_TABLE)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    cur.execute(
+        database_engine.convert_param_style(
+            "UPDATE remote_media_cache SET last_access_ts = ?"
+        ),
+        (int(time.time() * 1000),)
+    )
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b9ad965fd6..c33ac5a8d7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -40,6 +40,7 @@ from synapse.util.caches.descriptors import cached
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.logcontext import preserve_fn
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 import logging
 
@@ -54,25 +55,43 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
-def lower_bound(token):
+def lower_bound(token, engine, inclusive=False):
+    inclusive = "=" if inclusive else ""
     if token.topological is None:
-        return "(%d < %s)" % (token.stream, "stream_ordering")
+        return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
     else:
-        return "(%d < %s OR (%d = %s AND %d < %s))" % (
+        if isinstance(engine, PostgresEngine):
+            # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
+            # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
+            # use the later form when running against postgres.
+            return "((%d,%d) <%s (%s,%s))" % (
+                token.topological, token.stream, inclusive,
+                "topological_ordering", "stream_ordering",
+            )
+        return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
             token.topological, "topological_ordering",
             token.topological, "topological_ordering",
-            token.stream, "stream_ordering",
+            token.stream, inclusive, "stream_ordering",
         )
 
 
-def upper_bound(token):
+def upper_bound(token, engine, inclusive=True):
+    inclusive = "=" if inclusive else ""
     if token.topological is None:
-        return "(%d >= %s)" % (token.stream, "stream_ordering")
+        return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
     else:
-        return "(%d > %s OR (%d = %s AND %d >= %s))" % (
+        if isinstance(engine, PostgresEngine):
+            # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
+            # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
+            # use the later form when running against postgres.
+            return "((%d,%d) >%s (%s,%s))" % (
+                token.topological, token.stream, inclusive,
+                "topological_ordering", "stream_ordering",
+            )
+        return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
             token.topological, "topological_ordering",
             token.topological, "topological_ordering",
-            token.stream, "stream_ordering",
+            token.stream, inclusive, "stream_ordering",
         )
 
 
@@ -308,18 +327,22 @@ class StreamStore(SQLBaseStore):
         args = [False, room_id]
         if direction == 'b':
             order = "DESC"
-            bounds = upper_bound(RoomStreamToken.parse(from_key))
+            bounds = upper_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
             if to_key:
-                bounds = "%s AND %s" % (
-                    bounds, lower_bound(RoomStreamToken.parse(to_key))
-                )
+                bounds = "%s AND %s" % (bounds, lower_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
         else:
             order = "ASC"
-            bounds = lower_bound(RoomStreamToken.parse(from_key))
+            bounds = lower_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
             if to_key:
-                bounds = "%s AND %s" % (
-                    bounds, upper_bound(RoomStreamToken.parse(to_key))
-                )
+                bounds = "%s AND %s" % (bounds, upper_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
 
         if int(limit) > 0:
             args.append(int(limit))
@@ -487,13 +510,13 @@ class StreamStore(SQLBaseStore):
             row["topological_ordering"], row["stream_ordering"],)
         )
 
-    def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
+    def get_max_topological_token(self, room_id, stream_key):
         sql = (
             "SELECT max(topological_ordering) FROM events"
             " WHERE room_id = ? AND stream_ordering < ?"
         )
         return self._execute(
-            "get_max_topological_token_for_stream_and_room", None,
+            "get_max_topological_token", None,
             sql, room_id, stream_key,
         ).addCallback(
             lambda r: r[0][0] if r else 0
@@ -586,32 +609,60 @@ class StreamStore(SQLBaseStore):
             retcols=["stream_ordering", "topological_ordering"],
         )
 
-        stream_ordering = results["stream_ordering"]
-        topological_ordering = results["topological_ordering"]
-
-        query_before = (
-            "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " WHERE room_id = ? AND (topological_ordering < ?"
-            " OR (topological_ordering = ? AND stream_ordering < ?))"
-            " ORDER BY topological_ordering DESC, stream_ordering DESC"
-            " LIMIT ?"
+        token = RoomStreamToken(
+            results["topological_ordering"],
+            results["stream_ordering"],
         )
 
-        query_after = (
-            "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " WHERE room_id = ? AND (topological_ordering > ?"
-            " OR (topological_ordering = ? AND stream_ordering > ?))"
-            " ORDER BY topological_ordering ASC, stream_ordering ASC"
-            " LIMIT ?"
-        )
+        if isinstance(self.database_engine, Sqlite3Engine):
+            # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
+            # So we give pass it to SQLite3 as the UNION ALL of the two queries.
+
+            query_before = (
+                "SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND topological_ordering < ?"
+                " UNION ALL"
+                " SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+            )
+            before_args = (
+                room_id, token.topological,
+                room_id, token.topological, token.stream,
+                before_limit,
+            )
 
-        txn.execute(
-            query_before,
-            (
-                room_id, topological_ordering, topological_ordering,
-                stream_ordering, before_limit,
+            query_after = (
+                "SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND topological_ordering > ?"
+                " UNION ALL"
+                " SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
+                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
             )
-        )
+            after_args = (
+                room_id, token.topological,
+                room_id, token.topological, token.stream,
+                after_limit,
+            )
+        else:
+            query_before = (
+                "SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND %s"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+            ) % (upper_bound(token, self.database_engine, inclusive=False),)
+
+            before_args = (room_id, before_limit)
+
+            query_after = (
+                "SELECT topological_ordering, stream_ordering, event_id FROM events"
+                " WHERE room_id = ? AND %s"
+                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
+            ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+            after_args = (room_id, after_limit)
+
+        txn.execute(query_before, before_args)
 
         rows = self.cursor_to_dict(txn)
         events_before = [r["event_id"] for r in rows]
@@ -623,17 +674,11 @@ class StreamStore(SQLBaseStore):
             ))
         else:
             start_token = str(RoomStreamToken(
-                topological_ordering,
-                stream_ordering - 1,
+                token.topological,
+                token.stream - 1,
             ))
 
-        txn.execute(
-            query_after,
-            (
-                room_id, topological_ordering, topological_ordering,
-                stream_ordering, after_limit,
-            )
-        )
+        txn.execute(query_after, after_args)
 
         rows = self.cursor_to_dict(txn)
         events_after = [r["event_id"] for r in rows]
@@ -644,10 +689,7 @@ class StreamStore(SQLBaseStore):
                 rows[-1]["stream_ordering"],
             ))
         else:
-            end_token = str(RoomStreamToken(
-                topological_ordering,
-                stream_ordering,
-            ))
+            end_token = str(token)
 
         return {
             "before": {