diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index fc49112063..f92d824876 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -17,11 +17,15 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
import attr
-from synapse.api.constants import EventContentFields
+from synapse.api.constants import EventContentFields, RelationTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
-from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
+from synapse.storage.database import (
+ DatabasePool,
+ LoggingTransaction,
+ make_tuple_comparison_clause,
+)
from synapse.storage.databases.main.events import PersistEventsStore
from synapse.storage.types import Cursor
from synapse.types import JsonDict
@@ -167,6 +171,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
self._purged_chain_cover_index,
)
+ self.db_pool.updates.register_background_update_handler(
+ "event_thread_relation", self._event_thread_relation
+ )
+
################################################################################
# bg updates for replacing stream_ordering with a BIGINT
@@ -1091,6 +1099,79 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
return result
+ async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int:
+ """Background update handler which will store thread relations for existing events."""
+ last_event_id = progress.get("last_event_id", "")
+
+ def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
+ txn.execute(
+ """
+ SELECT event_id, json FROM event_json
+ LEFT JOIN event_relations USING (event_id)
+ WHERE event_id > ? AND relates_to_id IS NULL
+ ORDER BY event_id LIMIT ?
+ """,
+ (last_event_id, batch_size),
+ )
+
+ results = list(txn)
+ missing_thread_relations = []
+ for (event_id, event_json_raw) in results:
+ try:
+ event_json = db_to_json(event_json_raw)
+ except Exception as e:
+ logger.warning(
+ "Unable to load event %s (no relations will be updated): %s",
+ event_id,
+ e,
+ )
+ continue
+
+ # If there's no relation (or it is not a thread), skip!
+ relates_to = event_json["content"].get("m.relates_to")
+ if not relates_to or not isinstance(relates_to, dict):
+ continue
+ if relates_to.get("rel_type") != RelationTypes.THREAD:
+ continue
+
+ # Get the parent ID.
+ parent_id = relates_to.get("event_id")
+ if not isinstance(parent_id, str):
+ continue
+
+ missing_thread_relations.append((event_id, parent_id))
+
+ # Insert the missing data.
+ self.db_pool.simple_insert_many_txn(
+ txn=txn,
+ table="event_relations",
+ values=[
+ {
+ "event_id": event_id,
+ "relates_to_Id": parent_id,
+ "relation_type": RelationTypes.THREAD,
+ }
+ for event_id, parent_id in missing_thread_relations
+ ],
+ )
+
+ if results:
+ latest_event_id = results[-1][0]
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "event_thread_relation", {"last_event_id": latest_event_id}
+ )
+
+ return len(results)
+
+ num_rows = await self.db_pool.runInteraction(
+ desc="event_thread_relation", func=_event_thread_relation_txn
+ )
+
+ if not num_rows:
+ await self.db_pool.updates._end_background_update("event_thread_relation")
+
+ return num_rows
+
async def _background_populate_stream_ordering2(
self, progress: JsonDict, batch_size: int
) -> int:
diff --git a/synapse/storage/schema/main/delta/65/02_thread_relations.sql b/synapse/storage/schema/main/delta/65/02_thread_relations.sql
new file mode 100644
index 0000000000..d60517f7b4
--- /dev/null
+++ b/synapse/storage/schema/main/delta/65/02_thread_relations.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Check old events for thread relations.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (6502, 'event_thread_relation', '{}');
|