diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d38f65b4e6..873bc717bb 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -202,6 +202,7 @@ def _retry_on_integrity_error(func):
class EventsStore(EventsWorkerStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+ EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
@@ -242,6 +243,11 @@ class EventsStore(EventsWorkerStore):
psql_only=True,
)
+ self.register_background_update_handler(
+ self.EVENT_FIELDS_CHUNK,
+ self._background_compute_chunks,
+ )
+
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -1823,6 +1829,72 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(result)
+ @defer.inlineCallbacks
+ def _background_compute_chunks(self, progress, batch_size):
+ """Iterates over events and assigns them chunk IDs
+ """
+
+ up_to_stream_id = progress.get("up_to_stream_id")
+ if up_to_stream_id is None:
+ up_to_stream_id = self.get_current_events_token() + 1
+
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ def reindex_chunks_txn(txn):
+ txn.execute("""
+ SELECT stream_ordering, room_id, event_id FROM events
+ WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
+ ORDER BY stream_ordering DESC
+ LIMIT ?
+ """, (up_to_stream_id, False, batch_size))
+
+ rows = txn.fetchall()
+
+ stream_ordering = up_to_stream_id
+ for stream_ordering, room_id, event_id in rows:
+ prev_events = self._simple_select_onecol_txn(
+ txn,
+ table="event_edges",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="prev_event_id",
+ )
+
+ chunk_id, topo = self._insert_into_chunk_txn(
+ txn, room_id, event_id, prev_events,
+ )
+
+ self._simple_update_txn(
+ txn,
+ table="events",
+ keyvalues={"event_id": event_id},
+ updatevalues={
+ "chunk_id": chunk_id,
+ "topological_ordering": topo,
+ },
+ )
+
+ progress = {
+ "up_to_stream_id": stream_ordering,
+ "rows_inserted": rows_inserted + len(rows)
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_FIELDS_CHUNK, progress
+ )
+
+ return len(rows)
+
+ result = yield self.runInteraction(
+ self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
+
+ defer.returnValue(result)
+
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/schema/delta/49/event_chunks.py b/synapse/storage/schema/delta/49/event_chunks.py
index ee6a91e78b..7d8d711600 100644
--- a/synapse/storage/schema/delta/49/event_chunks.py
+++ b/synapse/storage/schema/delta/49/event_chunks.py
@@ -58,6 +58,10 @@ CREATE TABLE chunk_linearized (
CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering);
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('event_fields_chunk_id', '{}');
+
"""
|