Fix `populate_stream_ordering2` background job (#10267)
It was possible for us not to find any rows in a batch, and hence conclude that
we had finished. Let's not do that.
1 files changed, 12 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 39aaee743c..da3a7df27b 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1055,32 +1055,28 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
batch_size = max(batch_size, 1)
def process(txn: Cursor) -> int:
- # if this is the first pass, find the minimum stream ordering
- last_stream = progress.get("last_stream")
- if last_stream is None:
- txn.execute(
- """
- SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
- """
- )
- rows = txn.fetchall()
- if not rows:
- return 0
- last_stream = rows[0][0] - 1
-
+ last_stream = progress.get("last_stream", -(1 << 31))
txn.execute(
"""
UPDATE events SET stream_ordering2=stream_ordering
- WHERE stream_ordering > ? AND stream_ordering <= ?
+ WHERE stream_ordering IN (
+ SELECT stream_ordering FROM events WHERE stream_ordering > ?
+ ORDER BY stream_ordering LIMIT ?
+ )
+ RETURNING stream_ordering;
""",
- (last_stream, last_stream + batch_size),
+ (last_stream, batch_size),
)
row_count = txn.rowcount
+ if row_count == 0:
+ return 0
+ last_stream = max(row[0] for row in txn)
+ logger.info("populated stream_ordering2 up to %i", last_stream)
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
- {"last_stream": last_stream + batch_size},
+ {"last_stream": last_stream},
)
return row_count
|