summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13215.misc1
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py99
-rw-r--r--synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py47
3 files changed, 147 insertions, 0 deletions
diff --git a/changelog.d/13215.misc b/changelog.d/13215.misc
new file mode 100644
index 0000000000..3da35addb3
--- /dev/null
+++ b/changelog.d/13215.misc
@@ -0,0 +1 @@
+Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index eeca85fc94..7d951d85d6 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -67,6 +67,8 @@ class _BackgroundUpdates:
     EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
     EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
 
+    EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
+
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _CalculateChainCover:
@@ -253,6 +255,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             replaces_index="ev_edges_id",
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
+            self._background_events_populate_state_key_rejections,
+        )
+
     async def _background_reindex_fields_sender(
         self, progress: JsonDict, batch_size: int
     ) -> int:
@@ -1399,3 +1406,95 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             )
 
         return batch_size
+
+    async def _background_events_populate_state_key_rejections(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Back-populate `events.state_key` and `events.rejection_reason"""
+
+        min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
+        max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]
+
+        def _populate_txn(txn: LoggingTransaction) -> bool:
+            """Returns True if we're done."""
+
+            # first we need to find an endpoint.
+            # we need to find the final row in the batch of batch_size, which means
+            # we need to skip over (batch_size-1) rows and get the next row.
+            txn.execute(
+                """
+                SELECT stream_ordering FROM events
+                WHERE stream_ordering > ? AND stream_ordering <= ?
+                ORDER BY stream_ordering
+                LIMIT 1 OFFSET ?
+                """,
+                (
+                    min_stream_ordering_exclusive,
+                    max_stream_ordering_inclusive,
+                    batch_size - 1,
+                ),
+            )
+
+            endpoint = None
+            row = txn.fetchone()
+            if row:
+                endpoint = row[0]
+
+            where_clause = "e.stream_ordering > ?"
+            args = [min_stream_ordering_exclusive]
+            if endpoint:
+                where_clause += " AND e.stream_ordering <= ?"
+                args.append(endpoint)
+
+            # now do the updates. We consider rows within our range of stream orderings,
+            # but only those with a non-null rejection reason or state_key (since there
+            # is nothing to update for rows where rejection reason and state_key are
+            # both null.
+            txn.execute(
+                f"""
+                WITH t AS (
+                   SELECT e.event_id, r.reason, se.state_key
+                   FROM events e
+                   LEFT JOIN rejections r USING (event_id)
+                   LEFT JOIN state_events se USING (event_id)
+                   WHERE ({where_clause}) AND (
+                       r.reason IS NOT NULL OR se.state_key IS NOT NULL
+                   )
+                )
+                UPDATE events
+                SET rejection_reason=t.reason, state_key=t.state_key
+                FROM t WHERE events.event_id = t.event_id
+                """,
+                args,
+            )
+
+            logger.info(
+                "populated new `events` columns up to %s/%i: updated %i/%i rows",
+                endpoint,
+                max_stream_ordering_inclusive,
+                txn.rowcount,
+                batch_size,
+            )
+
+            if endpoint is None:
+                # we're done
+                return True
+
+            progress["min_stream_ordering_exclusive"] = endpoint
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                _BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
+                progress,
+            )
+            return False
+
+        done = await self.db_pool.runInteraction(
+            desc="events_populate_state_key_rejections", func=_populate_txn
+        )
+
+        if done:
+            await self.db_pool.updates._end_background_update(
+                _BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
+            )
+
+        return batch_size
diff --git a/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py b/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py
new file mode 100644
index 0000000000..55a5d092cc
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py
@@ -0,0 +1,47 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from synapse.storage.types import Cursor
+
+
+def run_create(cur: Cursor, database_engine, *args, **kwargs):
+    """Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
+
+    # we know that any new events will have the columns populated (and that has been
+    # the case since schema_version 68, so there is no chance of rolling back now).
+    #
+    # So, we only need to make sure that existing rows are updated. We read the
+    # current min and max stream orderings, since that is guaranteed to include all
+    # the events that were stored before the new columns were added.
+    cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
+    (min_stream_ordering, max_stream_ordering) = cur.fetchone()
+
+    if min_stream_ordering is None:
+        # no rows, nothing to do.
+        return
+
+    cur.execute(
+        "INSERT into background_updates (ordering, update_name, progress_json)"
+        " VALUES (7203, 'events_populate_state_key_rejections', ?)",
+        (
+            json.dumps(
+                {
+                    "min_stream_ordering_exclusive": min_stream_ordering - 1,
+                    "max_stream_ordering_inclusive": max_stream_ordering,
+                }
+            ),
+        ),
+    )