diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2021-06-29 12:43:36 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-29 12:43:36 +0100 |
commit | 7647b0337fb5d936c88c5949fa92c07bf2137ad0 (patch) | |
tree | f53a00b85e25e1c60d3ef9327b4e7dbd1ffbd130 /synapse/storage/databases | |
parent | Migrate stream_ordering to a bigint (#10264) (diff) | |
download | synapse-7647b0337fb5d936c88c5949fa92c07bf2137ad0.tar.xz |
Fix `populate_stream_ordering2` background job (#10267)
It was possible for us not to find any rows in a batch, and hence conclude that we had finished. Let's not do that.
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/events_bg_updates.py | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 39aaee743c..da3a7df27b 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1055,32 +1055,28 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): batch_size = max(batch_size, 1) def process(txn: Cursor) -> int: - # if this is the first pass, find the minimum stream ordering - last_stream = progress.get("last_stream") - if last_stream is None: - txn.execute( - """ - SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1 - """ - ) - rows = txn.fetchall() - if not rows: - return 0 - last_stream = rows[0][0] - 1 - + last_stream = progress.get("last_stream", -(1 << 31)) txn.execute( """ UPDATE events SET stream_ordering2=stream_ordering - WHERE stream_ordering > ? AND stream_ordering <= ? + WHERE stream_ordering IN ( + SELECT stream_ordering FROM events WHERE stream_ordering > ? + ORDER BY stream_ordering LIMIT ? + ) + RETURNING stream_ordering; """, - (last_stream, last_stream + batch_size), + (last_stream, batch_size), ) row_count = txn.rowcount + if row_count == 0: + return 0 + last_stream = max(row[0] for row in txn) + logger.info("populated stream_ordering2 up to %i", last_stream) self.db_pool.updates._background_update_progress_txn( txn, _BackgroundUpdates.POPULATE_STREAM_ORDERING2, - {"last_stream": last_stream + batch_size}, + {"last_stream": last_stream}, ) return row_count |