summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2021-07-01 08:59:54 +0100
committerRichard van der Hoff <richard@matrix.org>2021-07-01 08:59:54 +0100
commit7eea8de9de653bc160dada0bf0f87eb6d1256acc (patch)
treee363a57eb7a28a2b32fcc1e2b349e402728c87ab /synapse/storage
parentbump background update rate (diff)
parentFix the homeserver config example in presence router docs (#10288) (diff)
downloadsynapse-7eea8de9de653bc160dada0bf0f87eb6d1256acc.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/event_federation.py66
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py50
-rw-r--r--synapse/storage/engines/_base.py6
-rw-r--r--synapse/storage/engines/postgres.py5
-rw-r--r--synapse/storage/engines/sqlite.py5
-rw-r--r--synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres11
-rw-r--r--synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres30
7 files changed, 157 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..f2d27ee893 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         self,
         origin: str,
         event_id: str,
-    ) -> None:
-        """Remove the given event from the staging area"""
-        await self.db_pool.simple_delete(
-            table="federation_inbound_events_staging",
-            keyvalues={
-                "origin": origin,
-                "event_id": event_id,
-            },
-            desc="remove_received_event_from_staging",
-        )
+    ) -> Optional[int]:
+        """Remove the given event from the staging area.
+
+        Returns:
+            The received_ts of the row that was deleted, if any.
+        """
+        if self.db_pool.engine.supports_returning:
+
+            def _remove_received_event_from_staging_txn(txn):
+                sql = """
+                    DELETE FROM federation_inbound_events_staging
+                    WHERE origin = ? AND event_id = ?
+                    RETURNING received_ts
+                """
+
+                txn.execute(sql, (origin, event_id))
+                return txn.fetchone()
+
+            row = await self.db_pool.runInteraction(
+                "remove_received_event_from_staging",
+                _remove_received_event_from_staging_txn,
+                db_autocommit=True,
+            )
+            if row is None:
+                return None
+
+            return row[0]
+
+        else:
+
+            def _remove_received_event_from_staging_txn(txn):
+                received_ts = self.db_pool.simple_select_one_onecol_txn(
+                    txn,
+                    table="federation_inbound_events_staging",
+                    keyvalues={
+                        "origin": origin,
+                        "event_id": event_id,
+                    },
+                    retcol="received_ts",
+                    allow_none=True,
+                )
+                self.db_pool.simple_delete_txn(
+                    txn,
+                    table="federation_inbound_events_staging",
+                    keyvalues={
+                        "origin": origin,
+                        "event_id": event_id,
+                    },
+                )
+
+                return received_ts
+
+            return await self.db_pool.runInteraction(
+                "remove_received_event_from_staging",
+                _remove_received_event_from_staging_txn,
+            )
 
     async def get_next_staged_event_id_for_room(
         self,
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index da3a7df27b..1c95c66648 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -29,13 +29,18 @@ from synapse.types import JsonDict
 logger = logging.getLogger(__name__)
 
 
-_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
+_REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
     # there should be no leftover rows without a stream_ordering2, but just in case...
     "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
-    # finally, we can drop the rule and switch the columns
+    # now we can drop the rule and switch the columns
     "DROP RULE populate_stream_ordering2 ON events",
     "ALTER TABLE events DROP COLUMN stream_ordering",
     "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
+    # ... and finally, rename the indexes into place for consistency with sqlite
+    "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index",
+    "ALTER INDEX events_order_room2 RENAME TO events_order_room",
+    "ALTER INDEX events_room_stream2 RENAME TO events_room_stream",
+    "ALTER INDEX events_ts2 RENAME TO events_ts",
 )
 
 
@@ -45,6 +50,10 @@ class _BackgroundUpdates:
     DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
     POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
     INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
+    INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
+    INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
+    INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
+    INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
     REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
 
 
@@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             self._purged_chain_cover_index,
         )
 
+        ################################################################################
+
         # bg updates for replacing stream_ordering with a BIGINT
         # (these only run on postgres.)
+
         self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.POPULATE_STREAM_ORDERING2,
             self._background_populate_stream_ordering2,
         )
+        # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2);
         self.db_pool.updates.register_background_index_update(
             _BackgroundUpdates.INDEX_STREAM_ORDERING2,
             index_name="events_stream_ordering",
@@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             columns=["stream_ordering2"],
             unique=True,
         )
+        # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL,
+            index_name="event_contains_url_index2",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering2"],
+            where_clause="contains_url = true AND outlier = false",
+        )
+        # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER,
+            index_name="events_order_room2",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering2"],
+        )
+        # CREATE INDEX events_room_stream ON events(room_id, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM,
+            index_name="events_room_stream2",
+            table="events",
+            columns=["room_id", "stream_ordering2"],
+        )
+        # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS,
+            index_name="events_ts2",
+            table="events",
+            columns=["origin_server_ts", "stream_ordering2"],
+        )
         self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
             self._background_replace_stream_ordering_column,
         )
 
+        ################################################################################
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -1098,7 +1142,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
 
         def process(txn: Cursor) -> None:
-            for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
+            for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS:
                 logger.info("completing stream_ordering migration: %s", sql)
                 txn.execute(sql)
 
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 1882bfd9cf..20cd63c330 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
         """
         ...
 
+    @property
+    @abc.abstractmethod
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        ...
+
     @abc.abstractmethod
     def check_database(
         self, db_conn: ConnectionType, allow_outdated_version: bool = False
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 21411c5fea..30f948a0f7 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine):
         """Do we support using `a = ANY(?)` and passing a list"""
         return True
 
+    @property
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        return True
+
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
             # https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 5fe1b205e1..70d17d4f2c 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
         """Do we support using `a = ANY(?)` and passing a list"""
         return False
 
+    @property
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        return self.module.sqlite_version_info >= (3, 35, 0)
+
     def check_database(self, db_conn, allow_outdated_version: bool = False):
         if not allow_outdated_version:
             version = self.module.sqlite_version_info
diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
index 88c9f8bd0d..b5fb763ddd 100644
--- a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
+++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
@@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS
 INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
   (6001, 'populate_stream_ordering2', '{}');
 
--- ... and another to build an index on it
+-- ... and some more to build indexes on it. These aren't really interdependent
+-- but the backround_updates manager can only handle a single dependency per update.
 INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
-  (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');
+  (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'),
+  (6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'),
+  (6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'),
+  (6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'),
+  (6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream');
 
 -- ... and another to do the switcheroo
 INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
-  (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');
+  (6003, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts');
diff --git a/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres
new file mode 100644
index 0000000000..630c24fd9e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This migration is closely related to '01recreate_stream_ordering.sql.postgres'.
+--
+-- It updates the other tables which use an INTEGER to refer to a stream ordering.
+-- These tables are all small enough that a re-create is tractable.
+ALTER TABLE pushers ALTER COLUMN last_stream_ordering SET DATA TYPE BIGINT;
+ALTER TABLE federation_stream_position ALTER COLUMN stream_id SET DATA TYPE BIGINT;
+
+-- these aren't actually event stream orderings, but they are numbers where 2 billion
+-- is a bit limiting, application_services_state is tiny, and I don't want to ever have
+-- to do this again.
+ALTER TABLE application_services_state ALTER COLUMN last_txn SET DATA TYPE BIGINT;
+ALTER TABLE application_services_state ALTER COLUMN read_receipt_stream_id SET DATA TYPE BIGINT;
+ALTER TABLE application_services_state ALTER COLUMN presence_stream_id SET DATA TYPE BIGINT;
+
+