diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 257bcdb2f8..b3c002b9eb 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore):
@defer.inlineCallbacks
def _background_current_state_membership(self, progress, batch_size):
"""Update the new membership column on current_state_events.
+
+ This works by iterating over all rooms in alphebetical order.
"""
- if "rooms" not in progress:
- rooms = yield self._simple_select_onecol(
- table="current_state_events",
- keyvalues={},
- retcol="DISTINCT room_id",
- desc="_background_current_state_membership_get_rooms",
- )
- progress["rooms"] = rooms
+ def _background_current_state_membership_txn(txn, last_processed_room):
+ processed = 0
+ while processed < batch_size:
+ txn.execute(
+ """
+ SELECT MIN(room_id) FROM rooms WHERE room_id > ?
+ """,
+ (last_processed_room,),
+ )
+ row = txn.fetchone()
+ if not row or not row[0]:
+ return processed, True
- rooms = progress["rooms"]
+ next_room, = row
- def _background_current_state_membership_txn(txn):
- processed = 0
- while rooms and processed < batch_size:
sql = """
UPDATE current_state_events AS c
SET membership = (
@@ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
WHERE room_id = ?
"""
- txn.execute(sql, (rooms.pop(),))
+ txn.execute(sql, (next_room,))
processed += txn.rowcount
+ last_processed_room = next_room
+
self._background_update_progress_txn(
- txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
+ txn,
+ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
+ {"last_processed_room": last_processed_room},
)
- return processed
+ return processed, False
- result = yield self.runInteraction(
+ # If we haven't got a last processed room then just use the empty
+ # string, which will compare before all room IDs correctly.
+ last_processed_room = progress.get("last_processed_room", "")
+
+ row_count, finished = yield self.runInteraction(
"_background_current_state_membership_update",
_background_current_state_membership_txn,
+ last_processed_room,
)
- if not rooms:
+ if finished:
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
- defer.returnValue(result)
+ defer.returnValue(row_count)
class _JoinedHostsCache(object):
|