diff options
author | Erik Johnston <erik@matrix.org> | 2018-06-05 10:49:43 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-06-05 14:53:01 +0100 |
commit | 2d97fb674004a40d78fca2b22a7deb41fe2c9ec8 (patch) | |
tree | ab7e27d64c28e06e82a08ec517e837b954a2f95c | |
parent | Assign chunks to forward extremities (diff) | |
download | synapse-2d97fb674004a40d78fca2b22a7deb41fe2c9ec8.tar.xz |
Implement backgroud update for chunks github/erikj/chunks_bg_update erikj/chunks_bg_update
-rw-r--r-- | synapse/storage/events.py | 72 | ||||
-rw-r--r-- | synapse/storage/schema/delta/49/event_chunks.py | 4 |
2 files changed, 76 insertions, 0 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d38f65b4e6..873bc717bb 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -202,6 +202,7 @@ def _retry_on_integrity_error(func): class EventsStore(EventsWorkerStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" + EVENT_FIELDS_CHUNK = "event_fields_chunk_id" def __init__(self, db_conn, hs): super(EventsStore, self).__init__(db_conn, hs) @@ -242,6 +243,11 @@ class EventsStore(EventsWorkerStore): psql_only=True, ) + self.register_background_update_handler( + self.EVENT_FIELDS_CHUNK, + self._background_compute_chunks, + ) + self._event_persist_queue = _EventPeristenceQueue() self._state_resolution_handler = hs.get_state_resolution_handler() @@ -1823,6 +1829,72 @@ class EventsStore(EventsWorkerStore): defer.returnValue(result) + @defer.inlineCallbacks + def _background_compute_chunks(self, progress, batch_size): + """Iterates over events and assigns them chunk IDs + """ + + up_to_stream_id = progress.get("up_to_stream_id") + if up_to_stream_id is None: + up_to_stream_id = self.get_current_events_token() + 1 + + rows_inserted = progress.get("rows_inserted", 0) + + def reindex_chunks_txn(txn): + txn.execute(""" + SELECT stream_ordering, room_id, event_id FROM events + WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL + ORDER BY stream_ordering DESC + LIMIT ? + """, (up_to_stream_id, False, batch_size)) + + rows = txn.fetchall() + + stream_ordering = up_to_stream_id + for stream_ordering, room_id, event_id in rows: + prev_events = self._simple_select_onecol_txn( + txn, + table="event_edges", + keyvalues={ + "event_id": event_id, + }, + retcol="prev_event_id", + ) + + chunk_id, topo = self._insert_into_chunk_txn( + txn, room_id, event_id, prev_events, + ) + + self._simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event_id}, + updatevalues={ + "chunk_id": chunk_id, + "topological_ordering": topo, + }, + ) + + progress = { + "up_to_stream_id": stream_ordering, + "rows_inserted": rows_inserted + len(rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_FIELDS_CHUNK, progress + ) + + return len(rows) + + result = yield self.runInteraction( + self.EVENT_FIELDS_CHUNK, reindex_chunks_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_FIELDS_CHUNK) + + defer.returnValue(result) + def get_current_backfill_token(self): """The current minimum token that backfilled events have reached""" return -self._backfill_id_gen.get_current_token() diff --git a/synapse/storage/schema/delta/49/event_chunks.py b/synapse/storage/schema/delta/49/event_chunks.py index ee6a91e78b..7d8d711600 100644 --- a/synapse/storage/schema/delta/49/event_chunks.py +++ b/synapse/storage/schema/delta/49/event_chunks.py @@ -58,6 +58,10 @@ CREATE TABLE chunk_linearized ( CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id); CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering); + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_fields_chunk_id', '{}'); + """ |