summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-06-05 10:49:43 +0100
committerErik Johnston <erik@matrix.org>2018-06-05 14:53:01 +0100
commit2d97fb674004a40d78fca2b22a7deb41fe2c9ec8 (patch)
treeab7e27d64c28e06e82a08ec517e837b954a2f95c
parentAssign chunks to forward extremities (diff)
downloadsynapse-2d97fb674004a40d78fca2b22a7deb41fe2c9ec8.tar.xz
Implement backgroud update for chunks github/erikj/chunks_bg_update erikj/chunks_bg_update
-rw-r--r--synapse/storage/events.py72
-rw-r--r--synapse/storage/schema/delta/49/event_chunks.py4
2 files changed, 76 insertions, 0 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d38f65b4e6..873bc717bb 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -202,6 +202,7 @@ def _retry_on_integrity_error(func):
 class EventsStore(EventsWorkerStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+    EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
 
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
@@ -242,6 +243,11 @@ class EventsStore(EventsWorkerStore):
             psql_only=True,
         )
 
+        self.register_background_update_handler(
+            self.EVENT_FIELDS_CHUNK,
+            self._background_compute_chunks,
+        )
+
         self._event_persist_queue = _EventPeristenceQueue()
 
         self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -1823,6 +1829,72 @@ class EventsStore(EventsWorkerStore):
 
         defer.returnValue(result)
 
+    @defer.inlineCallbacks
+    def _background_compute_chunks(self, progress, batch_size):
+        """Iterates over events and assigns them chunk IDs
+        """
+
+        up_to_stream_id = progress.get("up_to_stream_id")
+        if up_to_stream_id is None:
+            up_to_stream_id = self.get_current_events_token() + 1
+
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        def reindex_chunks_txn(txn):
+            txn.execute("""
+                SELECT stream_ordering, room_id, event_id FROM events
+                WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
+                ORDER BY stream_ordering DESC
+                LIMIT ?
+            """, (up_to_stream_id, False, batch_size))
+
+            rows = txn.fetchall()
+
+            stream_ordering = up_to_stream_id
+            for stream_ordering, room_id, event_id in rows:
+                prev_events = self._simple_select_onecol_txn(
+                    txn,
+                    table="event_edges",
+                    keyvalues={
+                        "event_id": event_id,
+                    },
+                    retcol="prev_event_id",
+                )
+
+                chunk_id, topo = self._insert_into_chunk_txn(
+                    txn, room_id, event_id, prev_events,
+                )
+
+                self._simple_update_txn(
+                    txn,
+                    table="events",
+                    keyvalues={"event_id": event_id},
+                    updatevalues={
+                        "chunk_id": chunk_id,
+                        "topological_ordering": topo,
+                    },
+                )
+
+            progress = {
+                "up_to_stream_id": stream_ordering,
+                "rows_inserted": rows_inserted + len(rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_FIELDS_CHUNK, progress
+            )
+
+            return len(rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
+
+        defer.returnValue(result)
+
     def get_current_backfill_token(self):
         """The current minimum token that backfilled events have reached"""
         return -self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/schema/delta/49/event_chunks.py b/synapse/storage/schema/delta/49/event_chunks.py
index ee6a91e78b..7d8d711600 100644
--- a/synapse/storage/schema/delta/49/event_chunks.py
+++ b/synapse/storage/schema/delta/49/event_chunks.py
@@ -58,6 +58,10 @@ CREATE TABLE chunk_linearized (
 
 CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
 CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering);
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('event_fields_chunk_id', '{}');
+
 """