summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py136
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres40
3 files changed, 162 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index cbe4be1437..39aaee743c 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -29,6 +29,25 @@ from synapse.types import JsonDict
 logger = logging.getLogger(__name__)
 
 
+_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
+    # there should be no leftover rows without a stream_ordering2, but just in case...
+    "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
+    # finally, we can drop the rule and switch the columns
+    "DROP RULE populate_stream_ordering2 ON events",
+    "ALTER TABLE events DROP COLUMN stream_ordering",
+    "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
+)
+
+
+class _BackgroundUpdates:
+    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+    DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
+    POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
+    INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
+    REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
+
+
 @attr.s(slots=True, frozen=True)
 class _CalculateChainCover:
     """Return value for _calculate_chain_cover_txn."""
@@ -48,19 +67,15 @@ class _CalculateChainCover:
 
 
 class EventsBackgroundUpdatesStore(SQLBaseStore):
-
-    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
-    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
-    DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
-
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
 
         self.db_pool.updates.register_background_update_handler(
-            self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
+            _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
+            self._background_reindex_origin_server_ts,
         )
         self.db_pool.updates.register_background_update_handler(
-            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
+            _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
             self._background_reindex_fields_sender,
         )
 
@@ -85,7 +100,8 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         )
 
         self.db_pool.updates.register_background_update_handler(
-            self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
+            _BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES,
+            self._cleanup_extremities_bg_update,
         )
 
         self.db_pool.updates.register_background_update_handler(
@@ -139,6 +155,24 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             self._purged_chain_cover_index,
         )
 
+        # bg updates for replacing stream_ordering with a BIGINT
+        # (these only run on postgres.)
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.POPULATE_STREAM_ORDERING2,
+            self._background_populate_stream_ordering2,
+        )
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2,
+            index_name="events_stream_ordering",
+            table="events",
+            columns=["stream_ordering2"],
+            unique=True,
+        )
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
+            self._background_replace_stream_ordering_column,
+        )
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -190,18 +224,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             }
 
             self.db_pool.updates._background_update_progress_txn(
-                txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
+                txn, _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
             )
 
             return len(rows)
 
         result = await self.db_pool.runInteraction(
-            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
+            _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
         )
 
         if not result:
             await self.db_pool.updates._end_background_update(
-                self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
+                _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
             )
 
         return result
@@ -264,18 +298,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             }
 
             self.db_pool.updates._background_update_progress_txn(
-                txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+                txn, _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, progress
             )
 
             return len(rows_to_update)
 
         result = await self.db_pool.runInteraction(
-            self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
+            _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
         )
 
         if not result:
             await self.db_pool.updates._end_background_update(
-                self.EVENT_ORIGIN_SERVER_TS_NAME
+                _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME
             )
 
         return result
@@ -454,7 +488,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
         if not num_handled:
             await self.db_pool.updates._end_background_update(
-                self.DELETE_SOFT_FAILED_EXTREMITIES
+                _BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES
             )
 
             def _drop_table_txn(txn):
@@ -1009,3 +1043,75 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             await self.db_pool.updates._end_background_update("purged_chain_cover")
 
         return result
+
+    async def _background_populate_stream_ordering2(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Populate events.stream_ordering2, then replace stream_ordering
+
+        This is to deal with the fact that stream_ordering was initially created as a
+        32-bit integer field.
+        """
+        batch_size = max(batch_size, 1)
+
+        def process(txn: Cursor) -> int:
+            # if this is the first pass, find the minimum stream ordering
+            last_stream = progress.get("last_stream")
+            if last_stream is None:
+                txn.execute(
+                    """
+                    SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
+                    """
+                )
+                rows = txn.fetchall()
+                if not rows:
+                    return 0
+                last_stream = rows[0][0] - 1
+
+            txn.execute(
+                """
+                UPDATE events SET stream_ordering2=stream_ordering
+                WHERE stream_ordering > ? AND stream_ordering <= ?
+                """,
+                (last_stream, last_stream + batch_size),
+            )
+            row_count = txn.rowcount
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                _BackgroundUpdates.POPULATE_STREAM_ORDERING2,
+                {"last_stream": last_stream + batch_size},
+            )
+            return row_count
+
+        result = await self.db_pool.runInteraction(
+            "_background_populate_stream_ordering2", process
+        )
+
+        if result != 0:
+            return result
+
+        await self.db_pool.updates._end_background_update(
+            _BackgroundUpdates.POPULATE_STREAM_ORDERING2
+        )
+        return 0
+
+    async def _background_replace_stream_ordering_column(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
+
+        def process(txn: Cursor) -> None:
+            for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
+                logger.info("completing stream_ordering migration: %s", sql)
+                txn.execute(sql)
+
+        await self.db_pool.runInteraction(
+            "_background_replace_stream_ordering_column", process
+        )
+
+        await self.db_pool.updates._end_background_update(
+            _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN
+        )
+
+        return 0
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index d36ba1d773..0a53b73ccc 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 59
+SCHEMA_VERSION = 60
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
new file mode 100644
index 0000000000..88c9f8bd0d
--- /dev/null
+++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
@@ -0,0 +1,40 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This migration handles the process of changing the type of `stream_ordering` to
+-- a BIGINT.
+--
+-- Note that this is only a problem on postgres as sqlite only has one "integer" type
+-- which can cope with values up to 2^63.
+
+-- First add a new column to contain the bigger stream_ordering
+ALTER TABLE events ADD COLUMN stream_ordering2 BIGINT;
+
+-- Create a rule which will populate it for new rows.
+CREATE OR REPLACE RULE "populate_stream_ordering2" AS
+    ON INSERT TO events
+    DO UPDATE events SET stream_ordering2=NEW.stream_ordering WHERE stream_ordering=NEW.stream_ordering;
+
+-- Start a bg process to populate it for old events
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (6001, 'populate_stream_ordering2', '{}');
+
+-- ... and another to build an index on it
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+  (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');
+
+-- ... and another to do the switcheroo
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+  (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');