summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/roommember.py48
1 files changed, 30 insertions, 18 deletions
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 257bcdb2f8..b3c002b9eb 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore):
     @defer.inlineCallbacks
     def _background_current_state_membership(self, progress, batch_size):
         """Update the new membership column on current_state_events.
+
+        This works by iterating over all rooms in alphebetical order.
         """
 
-        if "rooms" not in progress:
-            rooms = yield self._simple_select_onecol(
-                table="current_state_events",
-                keyvalues={},
-                retcol="DISTINCT room_id",
-                desc="_background_current_state_membership_get_rooms",
-            )
-            progress["rooms"] = rooms
+        def _background_current_state_membership_txn(txn, last_processed_room):
+            processed = 0
+            while processed < batch_size:
+                txn.execute(
+                    """
+                        SELECT MIN(room_id) FROM rooms WHERE room_id > ?
+                    """,
+                    (last_processed_room,),
+                )
+                row = txn.fetchone()
+                if not row or not row[0]:
+                    return processed, True
 
-        rooms = progress["rooms"]
+                next_room, = row
 
-        def _background_current_state_membership_txn(txn):
-            processed = 0
-            while rooms and processed < batch_size:
                 sql = """
                     UPDATE current_state_events AS c
                     SET membership = (
@@ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore):
                     )
                     WHERE room_id = ?
                 """
-                txn.execute(sql, (rooms.pop(),))
+                txn.execute(sql, (next_room,))
                 processed += txn.rowcount
 
+                last_processed_room = next_room
+
             self._background_update_progress_txn(
-                txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
+                txn,
+                _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
+                {"last_processed_room": last_processed_room},
             )
 
-            return processed
+            return processed, False
 
-        result = yield self.runInteraction(
+        # If we haven't got a last processed room then just use the empty
+        # string, which will compare before all room IDs correctly.
+        last_processed_room = progress.get("last_processed_room", "")
+
+        row_count, finished = yield self.runInteraction(
             "_background_current_state_membership_update",
             _background_current_state_membership_txn,
+            last_processed_room,
         )
 
-        if not rooms:
+        if finished:
             yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
 
-        defer.returnValue(result)
+        defer.returnValue(row_count)
 
 
 class _JoinedHostsCache(object):