diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index eeca85fc94..6e8aeed7b4 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -67,6 +67,8 @@ class _BackgroundUpdates:
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
+ EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
@@ -253,6 +255,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
replaces_index="ev_edges_id",
)
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
+ self._background_events_populate_state_key_rejections,
+ )
+
async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -1399,3 +1406,83 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
return batch_size
+
+ async def _background_events_populate_state_key_rejections(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Back-populate `events.state_key` and `events.rejection_reason"""
+
+ min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
+ max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]
+
+ def _populate_txn(txn: LoggingTransaction) -> bool:
+ """Returns True if we're done."""
+
+ # first we need to find an endpoint.
+ # we need to find the final row in the batch of batch_size, which means
+ # we need to skip over (batch_size-1) rows and get the next row.
+ txn.execute(
+ """
+ SELECT stream_ordering FROM events
+ WHERE stream_ordering > ? AND stream_ordering <= ?
+ ORDER BY stream_ordering
+ LIMIT 1 OFFSET ?
+ """,
+ (
+ min_stream_ordering_exclusive,
+ max_stream_ordering_inclusive,
+ batch_size - 1,
+ ),
+ )
+
+ endpoint = None
+ row = txn.fetchone()
+ if row:
+ endpoint = row[0]
+
+ where_clause = "stream_ordering > ?"
+ args = [min_stream_ordering_exclusive]
+ if endpoint:
+ where_clause += " AND stream_ordering <= ?"
+ args.append(endpoint)
+
+ # now do the updates.
+ txn.execute(
+ f"""
+ UPDATE events
+ SET state_key = (SELECT state_key FROM state_events se WHERE se.event_id = events.event_id),
+ rejection_reason = (SELECT reason FROM rejections rej WHERE rej.event_id = events.event_id)
+ WHERE ({where_clause})
+ """,
+ args,
+ )
+
+ logger.info(
+ "populated new `events` columns up to %s/%i: updated %i rows",
+ endpoint,
+ max_stream_ordering_inclusive,
+ txn.rowcount,
+ )
+
+ if endpoint is None:
+ # we're done
+ return True
+
+ progress["min_stream_ordering_exclusive"] = endpoint
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
+ progress,
+ )
+ return False
+
+ done = await self.db_pool.runInteraction(
+ desc="events_populate_state_key_rejections", func=_populate_txn
+ )
+
+ if done:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
+ )
+
+ return batch_size
diff --git a/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py b/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py
new file mode 100644
index 0000000000..55a5d092cc
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py
@@ -0,0 +1,47 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from synapse.storage.types import Cursor
+
+
+def run_create(cur: Cursor, database_engine, *args, **kwargs):
+ """Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
+
+ # we know that any new events will have the columns populated (and that has been
+ # the case since schema_version 68, so there is no chance of rolling back now).
+ #
+ # So, we only need to make sure that existing rows are updated. We read the
+ # current min and max stream orderings, since that is guaranteed to include all
+ # the events that were stored before the new columns were added.
+ cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
+ (min_stream_ordering, max_stream_ordering) = cur.fetchone()
+
+ if min_stream_ordering is None:
+ # no rows, nothing to do.
+ return
+
+ cur.execute(
+ "INSERT into background_updates (ordering, update_name, progress_json)"
+ " VALUES (7203, 'events_populate_state_key_rejections', ?)",
+ (
+ json.dumps(
+ {
+ "min_stream_ordering_exclusive": min_stream_ordering - 1,
+ "max_stream_ordering_inclusive": max_stream_ordering,
+ }
+ ),
+ ),
+ )
|