diff options
author | Erik Johnston <erik@matrix.org> | 2019-07-23 09:49:26 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-07-23 10:01:30 +0100 |
commit | b2a629ef498df2b0585c9474613dda778bec7be0 (patch) | |
tree | 8797d92147bb28fe1df242b9c4d2d84df1789777 /synapse | |
parent | Fix logging in workers (#5729) (diff) | |
download | synapse-b2a629ef498df2b0585c9474613dda778bec7be0.tar.xz |
Speed up current state background update.
Turns out that storing huge JSON arrays in the progress JSON isn't something that postgres particularly likes.
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/roommember.py | 48 |
1 files changed, 30 insertions, 18 deletions
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 257bcdb2f8..b3c002b9eb 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore): @defer.inlineCallbacks def _background_current_state_membership(self, progress, batch_size): """Update the new membership column on current_state_events. + + This works by iterating over all rooms in alphebetical order. """ - if "rooms" not in progress: - rooms = yield self._simple_select_onecol( - table="current_state_events", - keyvalues={}, - retcol="DISTINCT room_id", - desc="_background_current_state_membership_get_rooms", - ) - progress["rooms"] = rooms + def _background_current_state_membership_txn(txn, last_processed_room): + processed = 0 + while processed < batch_size: + txn.execute( + """ + SELECT MIN(room_id) FROM rooms WHERE room_id > ? + """, + (last_processed_room,), + ) + row = txn.fetchone() + if not row or not row[0]: + return processed, True - rooms = progress["rooms"] + next_room, = row - def _background_current_state_membership_txn(txn): - processed = 0 - while rooms and processed < batch_size: sql = """ UPDATE current_state_events AS c SET membership = ( @@ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore): ) WHERE room_id = ? """ - txn.execute(sql, (rooms.pop(),)) + txn.execute(sql, (next_room,)) processed += txn.rowcount + last_processed_room = next_room + self._background_update_progress_txn( - txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress + txn, + _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, + {"last_processed_room": last_processed_room}, ) - return processed + return processed, False - result = yield self.runInteraction( + # If we haven't got a last processed room then just use the empty + # string, which will compare before all room IDs correctly. + last_processed_room = progress.get("last_processed_room", "") + + row_count, finished = yield self.runInteraction( "_background_current_state_membership_update", _background_current_state_membership_txn, + last_processed_room, ) - if not rooms: + if finished: yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME) - defer.returnValue(result) + defer.returnValue(row_count) class _JoinedHostsCache(object): |