summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2021-06-29 12:43:36 +0100
committerGitHub <noreply@github.com>2021-06-29 12:43:36 +0100
commit7647b0337fb5d936c88c5949fa92c07bf2137ad0 (patch)
treef53a00b85e25e1c60d3ef9327b4e7dbd1ffbd130 /synapse/storage/databases
parentMigrate stream_ordering to a bigint (#10264) (diff)
downloadsynapse-7647b0337fb5d936c88c5949fa92c07bf2137ad0.tar.xz
Fix `populate_stream_ordering2` background job (#10267)
It was possible for us not to find any rows in a batch, and hence conclude that
we had finished. Let's not do that.
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py28
1 files changed, 12 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 39aaee743c..da3a7df27b 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1055,32 +1055,28 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         batch_size = max(batch_size, 1)
 
         def process(txn: Cursor) -> int:
-            # if this is the first pass, find the minimum stream ordering
-            last_stream = progress.get("last_stream")
-            if last_stream is None:
-                txn.execute(
-                    """
-                    SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
-                    """
-                )
-                rows = txn.fetchall()
-                if not rows:
-                    return 0
-                last_stream = rows[0][0] - 1
-
+            last_stream = progress.get("last_stream", -(1 << 31))
             txn.execute(
                 """
                 UPDATE events SET stream_ordering2=stream_ordering
-                WHERE stream_ordering > ? AND stream_ordering <= ?
+                WHERE stream_ordering IN (
+                   SELECT stream_ordering FROM events WHERE stream_ordering > ?
+                   ORDER BY stream_ordering LIMIT ?
+                )
+                RETURNING stream_ordering;
                 """,
-                (last_stream, last_stream + batch_size),
+                (last_stream, batch_size),
             )
             row_count = txn.rowcount
+            if row_count == 0:
+                return 0
+            last_stream = max(row[0] for row in txn)
+            logger.info("populated stream_ordering2 up to %i", last_stream)
 
             self.db_pool.updates._background_update_progress_txn(
                 txn,
                 _BackgroundUpdates.POPULATE_STREAM_ORDERING2,
-                {"last_stream": last_stream + batch_size},
+                {"last_stream": last_stream},
             )
             return row_count