diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 7f7bcb7094..72cf91eb39 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -269,11 +269,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
event_push_actions_done = progress.get("event_push_actions_done", False)
def add_thread_id_txn(
- txn: LoggingTransaction, table_name: str, start_stream_ordering: int
+ txn: LoggingTransaction, start_stream_ordering: int
) -> int:
- sql = f"""
+ sql = """
SELECT stream_ordering
- FROM {table_name}
+ FROM event_push_actions
WHERE
thread_id IS NULL
AND stream_ordering > ?
@@ -285,7 +285,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# No more rows to process.
rows = txn.fetchall()
if not rows:
- progress[f"{table_name}_done"] = True
+ progress["event_push_actions_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
@@ -294,8 +294,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]
- sql = f"""
- UPDATE {table_name}
+ sql = """
+ UPDATE event_push_actions
SET thread_id = 'main'
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
"""
@@ -309,7 +309,50 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# Update progress.
processed_rows = txn.rowcount
- progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
+ progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "event_push_backfill_thread_id", progress
+ )
+
+ return processed_rows
+
+ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
+ min_user_id = progress.get("max_summary_user_id", "")
+ min_room_id = progress.get("max_summary_room_id", "")
+
+ # Slightly overcomplicated query for getting the Nth user ID / room
+ # ID tuple, or the last if there are less than N remaining.
+ sql = """
+ SELECT user_id, room_id FROM (
+ SELECT user_id, room_id FROM event_push_summary
+ WHERE (user_id, room_id) > (?, ?)
+ AND thread_id IS NULL
+ ORDER BY user_id, room_id
+ LIMIT ?
+ ) AS e
+ ORDER BY user_id DESC, room_id DESC
+ LIMIT 1
+ """
+
+ txn.execute(sql, (min_user_id, min_room_id, batch_size))
+ row = txn.fetchone()
+ if not row:
+ return 0
+
+ max_user_id, max_room_id = row
+
+ sql = """
+ UPDATE event_push_summary
+ SET thread_id = 'main'
+ WHERE
+ (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
+ AND thread_id IS NULL
+ """
+ txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
+ processed_rows = txn.rowcount
+
+ progress["max_summary_user_id"] = max_user_id
+ progress["max_summary_room_id"] = max_room_id
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
@@ -325,15 +368,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
- "event_push_actions",
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
- add_thread_id_txn,
- "event_push_summary",
- progress.get("max_event_push_summary_stream_ordering", 0),
+ add_thread_id_summary_txn,
)
# Only done after the event_push_summary table is done.
|