diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 24141f97ba..85d7fd0910 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -202,6 +202,7 @@ def _retry_on_integrity_error(func):
class EventsStore(EventsWorkerStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+ EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
@@ -242,6 +243,11 @@ class EventsStore(EventsWorkerStore):
psql_only=True,
)
+ self.register_background_update_handler(
+ self.EVENT_FIELDS_CHUNK,
+ self._background_compute_chunks,
+ )
+
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -1834,6 +1840,69 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(result)
+ @defer.inlineCallbacks
+ def _background_compute_chunks(self, progress, batch_size):
+ up_to_stream_id = progress.get("up_to_stream_id")
+ if up_to_stream_id is None:
+ up_to_stream_id = self.get_current_events_token() + 1
+
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ def reindex_chunks_txn(txn):
+ txn.execute("""
+ SELECT stream_ordering, room_id, event_id FROM events
+ WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
+ ORDER BY stream_ordering DESC
+ LIMIT ?
+ """, (up_to_stream_id, False, batch_size))
+
+ rows = txn.fetchall()
+
+ stream_ordering = up_to_stream_id
+ for stream_ordering, room_id, event_id in rows:
+ prev_events = self._simple_select_onecol_txn(
+ txn,
+ table="event_edges",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="prev_event_id",
+ )
+
+ chunk_id, topo = self._compute_chunk_id_txn(
+ txn, room_id, event_id, prev_events,
+ )
+
+ self._simple_update_txn(
+ txn,
+ table="events",
+ keyvalues={"event_id": event_id},
+ updatevalues={
+ "chunk_id": chunk_id,
+ "topological_ordering": topo,
+ },
+ )
+
+ progress = {
+ "up_to_stream_id": stream_ordering,
+ "rows_inserted": rows_inserted + len(rows)
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_FIELDS_CHUNK, progress
+ )
+
+ return len(rows)
+
+ result = yield self.runInteraction(
+ self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
+
+ defer.returnValue(result)
+
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/schema/delta/49/event_chunks.sql b/synapse/storage/schema/delta/49/event_chunks.sql
index e995a949ce..65ec19180b 100644
--- a/synapse/storage/schema/delta/49/event_chunks.sql
+++ b/synapse/storage/schema/delta/49/event_chunks.sql
@@ -15,6 +15,8 @@
ALTER TABLE events ADD COLUMN chunk_id BIGINT;
+-- FIXME: Add index on contains_url
+
INSERT INTO background_updates (update_name, progress_json) VALUES
('events_chunk_index', '{}');
@@ -80,3 +82,7 @@ INSERT INTO chunk_linearized (chunk_id, room_id, ordering)
SELECT chunk_id, room_id, stream_ordering
FROM event_forward_extremities
INNER JOIN events USING (room_id, event_id);
+
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('event_fields_chunk_id', '{}');
|