summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/4864.feature1
-rw-r--r--synapse/handlers/user_directory.py173
-rw-r--r--synapse/storage/background_updates.py8
-rw-r--r--synapse/storage/schema/delta/53/user_dir_populate.sql30
-rw-r--r--synapse/storage/user_directory.py370
-rw-r--r--tests/handlers/test_user_directory.py109
-rw-r--r--tests/storage/test_user_directory.py11
-rw-r--r--tests/unittest.py4
8 files changed, 405 insertions, 301 deletions
diff --git a/changelog.d/4864.feature b/changelog.d/4864.feature
new file mode 100644
index 0000000000..57927f2620
--- /dev/null
+++ b/changelog.d/4864.feature
@@ -0,0 +1 @@
+The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.
\ No newline at end of file
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d92f8c529c..7dc0e236e7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
     world_readable or publically joinable room. We keep a database table up to date
     by streaming changes of the current state and recalculating whether users should
     be in the directory or not when necessary.
-
-    For each user in the directory we also store a room_id which is public and that the
-    user is joined to. This allows us to ignore history_visibility and join_rules changes
-    for that user in all other public rooms, as we know they'll still be in at least
-    one public room.
     """
 
-    INITIAL_ROOM_SLEEP_MS = 50
-    INITIAL_ROOM_SLEEP_COUNT = 100
-    INITIAL_ROOM_BATCH_SIZE = 100
-    INITIAL_USER_SLEEP_MS = 10
-
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
@@ -59,17 +49,6 @@ class UserDirectoryHandler(object):
         self.is_mine_id = hs.is_mine_id
         self.update_user_directory = hs.config.update_user_directory
         self.search_all_users = hs.config.user_directory_search_all_users
-
-        # If we're a worker, don't sleep when doing the initial room work, as it
-        # won't monopolise the master's CPU.
-        if hs.config.worker_app:
-            self.INITIAL_ROOM_SLEEP_MS = 0
-            self.INITIAL_USER_SLEEP_MS = 0
-
-        # When start up for the first time we need to populate the user_directory.
-        # This is a set of user_id's we've inserted already
-        self.initially_handled_users = set()
-
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -132,7 +111,7 @@ class UserDirectoryHandler(object):
         # Support users are for diagnostics and should not appear in the user directory.
         if not is_support:
             yield self.store.update_profile_in_user_dir(
-                user_id, profile.display_name, profile.avatar_url, None
+                user_id, profile.display_name, profile.avatar_url
             )
 
     @defer.inlineCallbacks
@@ -149,10 +128,9 @@ class UserDirectoryHandler(object):
         if self.pos is None:
             self.pos = yield self.store.get_user_directory_stream_pos()
 
-        # If still None then we need to do the initial fill of directory
+        # If still None then the initial background update hasn't happened yet
         if self.pos is None:
-            yield self._do_initial_spam()
-            self.pos = yield self.store.get_user_directory_stream_pos()
+            defer.returnValue(None)
 
         # Loop round handling deltas until we're up to date
         while True:
@@ -174,133 +152,6 @@ class UserDirectoryHandler(object):
                 yield self.store.update_user_directory_stream_pos(self.pos)
 
     @defer.inlineCallbacks
-    def _do_initial_spam(self):
-        """Populates the user_directory from the current state of the DB, used
-        when synapse first starts with user_directory support
-        """
-        new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
-        # Delete any existing entries just in case there are any
-        yield self.store.delete_all_from_user_dir()
-
-        # We process by going through each existing room at a time.
-        room_ids = yield self.store.get_all_rooms()
-
-        logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
-        num_processed_rooms = 0
-
-        for room_id in room_ids:
-            logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
-            yield self._handle_initial_room(room_id)
-            num_processed_rooms += 1
-            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-        logger.info("Processed all rooms.")
-
-        if self.search_all_users:
-            num_processed_users = 0
-            user_ids = yield self.store.get_all_local_users()
-            logger.info(
-                "Doing initial update of user directory. %d users", len(user_ids)
-            )
-            for user_id in user_ids:
-                # We add profiles for all users even if they don't match the
-                # include pattern, just in case we want to change it in future
-                logger.info(
-                    "Handling user %d/%d", num_processed_users + 1, len(user_ids)
-                )
-                yield self._handle_local_user(user_id)
-                num_processed_users += 1
-                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
-            logger.info("Processed all users")
-
-        self.initially_handled_users = None
-
-        yield self.store.update_user_directory_stream_pos(new_pos)
-
-    @defer.inlineCallbacks
-    def _handle_initial_room(self, room_id):
-        """
-        Called when we initially fill out user_directory one room at a time
-        """
-        is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
-        if not is_in_room:
-            return
-
-        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-            room_id
-        )
-
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
-        user_ids = set(users_with_profile)
-        unhandled_users = user_ids - self.initially_handled_users
-
-        yield self.store.add_profiles_to_user_dir(
-            {user_id: users_with_profile[user_id] for user_id in unhandled_users}
-        )
-
-        self.initially_handled_users |= unhandled_users
-
-        # We now go and figure out the new users who share rooms with user entries
-        # We sleep aggressively here as otherwise it can starve resources.
-        # We also batch up inserts/updates, but try to avoid too many at once.
-        to_insert = set()
-        count = 0
-
-        if is_public:
-            for user_id in user_ids:
-                if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-                if self.store.get_if_app_services_interested_in_user(user_id):
-                    count += 1
-                    continue
-
-                to_insert.add(user_id)
-                if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
-                    yield self.store.add_users_in_public_rooms(room_id, to_insert)
-                    to_insert.clear()
-
-            if to_insert:
-                yield self.store.add_users_in_public_rooms(room_id, to_insert)
-                to_insert.clear()
-        else:
-
-            for user_id in user_ids:
-                if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-                if not self.is_mine_id(user_id):
-                    count += 1
-                    continue
-
-                if self.store.get_if_app_services_interested_in_user(user_id):
-                    count += 1
-                    continue
-
-                for other_user_id in user_ids:
-                    if user_id == other_user_id:
-                        continue
-
-                    if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                        yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-                    count += 1
-
-                    user_set = (user_id, other_user_id)
-                    to_insert.add(user_set)
-
-                    if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
-                        yield self.store.add_users_who_share_private_room(
-                            room_id, not is_public, to_insert
-                        )
-                        to_insert.clear()
-
-            if to_insert:
-                yield self.store.add_users_who_share_private_room(room_id, to_insert)
-                to_insert.clear()
-
-    @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
         """Called with the state deltas to process
         """
@@ -449,7 +300,9 @@ class UserDirectoryHandler(object):
 
         row = yield self.store.get_user_in_directory(user_id)
         if not row:
-            yield self.store.add_profiles_to_user_dir({user_id: profile})
+            yield self.store.update_profile_in_user_dir(
+                user_id, profile.display_name, profile.avatar_url
+            )
 
     @defer.inlineCallbacks
     def _handle_new_user(self, room_id, user_id, profile):
@@ -461,9 +314,9 @@ class UserDirectoryHandler(object):
         """
         logger.debug("Adding new user to dir, %r", user_id)
 
-        row = yield self.store.get_user_in_directory(user_id)
-        if not row:
-            yield self.store.add_profiles_to_user_dir({user_id: profile})
+        yield self.store.update_profile_in_user_dir(
+            user_id, profile.display_name, profile.avatar_url
+        )
 
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
             room_id
@@ -479,7 +332,9 @@ class UserDirectoryHandler(object):
             # First, if they're our user then we need to update for every user
             if self.is_mine_id(user_id):
 
-                is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+                is_appservice = self.store.get_if_app_services_interested_in_user(
+                    user_id
+                )
 
                 # We don't care about appservice users.
                 if not is_appservice:
@@ -546,9 +401,7 @@ class UserDirectoryHandler(object):
         new_avatar = event.content.get("avatar_url")
 
         if prev_name != new_name or prev_avatar != new_avatar:
-            yield self.store.update_profile_in_user_dir(
-                user_id, new_name, new_avatar, room_id
-            )
+            yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
 
     @defer.inlineCallbacks
     def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 60cdc884e6..a2f8c23a65 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -52,7 +52,9 @@ class BackgroundUpdatePerformance(object):
         Returns:
             A duration in ms as a float
         """
-        if self.total_item_count == 0:
+        if self.avg_duration_ms == 0:
+            return 0
+        elif self.total_item_count == 0:
             return None
         else:
             # Use the exponential moving average so that we can adapt to
@@ -64,7 +66,9 @@ class BackgroundUpdatePerformance(object):
         Returns:
             A duration in ms as a float
         """
-        if self.total_item_count == 0:
+        if self.total_duration_ms == 0:
+            return 0
+        elif self.total_item_count == 0:
             return None
         else:
             return float(self.total_item_count) / float(self.total_duration_ms)
diff --git a/synapse/storage/schema/delta/53/user_dir_populate.sql b/synapse/storage/schema/delta/53/user_dir_populate.sql
new file mode 100644
index 0000000000..955b8fdbd6
--- /dev/null
+++ b/synapse/storage/schema/delta/53/user_dir_populate.sql
@@ -0,0 +1,30 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Set up staging tables
+INSERT INTO background_updates (update_name, progress_json) VALUES
+    ('populate_user_directory_createtables', '{}');
+
+-- Run through each room and update the user directory according to who is in it
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables');
+
+-- Insert all users, if search_all_users is on
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_user_directory_process_users', '{}', 'populate_user_directory_rooms');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users');
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 1c00b956e5..4ee653210f 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -16,12 +16,10 @@
 import logging
 import re
 
-from six import iteritems
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
-from synapse.storage._base import SQLBaseStore
+from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.storage.state import StateFilter
 from synapse.types import get_domain_from_id, get_localpart_from_id
@@ -30,7 +28,276 @@ from synapse.util.caches.descriptors import cached
 logger = logging.getLogger(__name__)
 
 
-class UserDirectoryStore(SQLBaseStore):
+TEMP_TABLE = "_temp_populate_user_directory"
+
+
+class UserDirectoryStore(BackgroundUpdateStore):
+    def __init__(self, db_conn, hs):
+        super(UserDirectoryStore, self).__init__(db_conn, hs)
+
+        self.server_name = hs.hostname
+
+        self.register_background_update_handler(
+            "populate_user_directory_createtables",
+            self._populate_user_directory_createtables,
+        )
+        self.register_background_update_handler(
+            "populate_user_directory_process_rooms",
+            self._populate_user_directory_process_rooms,
+        )
+        self.register_background_update_handler(
+            "populate_user_directory_process_users",
+            self._populate_user_directory_process_users,
+        )
+        self.register_background_update_handler(
+            "populate_user_directory_cleanup", self._populate_user_directory_cleanup
+        )
+
+    @defer.inlineCallbacks
+    def _populate_user_directory_createtables(self, progress, batch_size):
+
+        # Get all the rooms that we want to process.
+        def _make_staging_area(txn):
+            sql = (
+                "CREATE TABLE IF NOT EXISTS "
+                + TEMP_TABLE
+                + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
+            )
+            txn.execute(sql)
+
+            sql = (
+                "CREATE TABLE IF NOT EXISTS "
+                + TEMP_TABLE
+                + "_position(position TEXT NOT NULL)"
+            )
+            txn.execute(sql)
+
+            # Get rooms we want to process from the database
+            sql = """
+                SELECT room_id, count(*) FROM current_state_events
+                GROUP BY room_id
+            """
+            txn.execute(sql)
+            rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
+            self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
+            del rooms
+
+            # If search all users is on, get all the users we want to add.
+            if self.hs.config.user_directory_search_all_users:
+                sql = (
+                    "CREATE TABLE IF NOT EXISTS "
+                    + TEMP_TABLE
+                    + "_users(user_id TEXT NOT NULL)"
+                )
+                txn.execute(sql)
+
+                txn.execute("SELECT name FROM users")
+                users = [{"user_id": x[0]} for x in txn.fetchall()]
+
+                self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
+
+        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
+        yield self.runInteraction(
+            "populate_user_directory_temp_build", _make_staging_area
+        )
+        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+
+        yield self._end_background_update("populate_user_directory_createtables")
+        defer.returnValue(1)
+
+    @defer.inlineCallbacks
+    def _populate_user_directory_cleanup(self, progress, batch_size):
+        """
+        Update the user directory stream position, then clean up the old tables.
+        """
+        position = yield self._simple_select_one_onecol(
+            TEMP_TABLE + "_position", None, "position"
+        )
+        yield self.update_user_directory_stream_pos(position)
+
+        def _delete_staging_area(txn):
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+
+        yield self.runInteraction(
+            "populate_user_directory_cleanup", _delete_staging_area
+        )
+
+        yield self._end_background_update("populate_user_directory_cleanup")
+        defer.returnValue(1)
+
+    @defer.inlineCallbacks
+    def _populate_user_directory_process_rooms(self, progress, batch_size):
+
+        state = self.hs.get_state_handler()
+
+        # If we don't have progress filed, delete everything.
+        if not progress:
+            yield self.delete_all_from_user_dir()
+
+        def _get_next_batch(txn):
+            sql = """
+                SELECT room_id FROM %s
+                ORDER BY events DESC
+                LIMIT %s
+            """ % (
+                TEMP_TABLE + "_rooms",
+                str(batch_size),
+            )
+            txn.execute(sql)
+            rooms_to_work_on = txn.fetchall()
+
+            if not rooms_to_work_on:
+                return None
+
+            rooms_to_work_on = [x[0] for x in rooms_to_work_on]
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+            progress["remaining"] = txn.fetchone()[0]
+
+            return rooms_to_work_on
+
+        rooms_to_work_on = yield self.runInteraction(
+            "populate_user_directory_temp_read", _get_next_batch
+        )
+
+        # No more rooms -- complete the transaction.
+        if not rooms_to_work_on:
+            yield self._end_background_update("populate_user_directory_process_rooms")
+            defer.returnValue(1)
+
+        logger.info(
+            "Processing the next %d rooms of %d remaining"
+            % (len(rooms_to_work_on), progress["remaining"])
+        )
+
+        for room_id in rooms_to_work_on:
+            is_in_room = yield self.is_host_joined(room_id, self.server_name)
+
+            if is_in_room:
+                is_public = yield self.is_room_world_readable_or_publicly_joinable(
+                    room_id
+                )
+
+                users_with_profile = yield state.get_current_user_in_room(room_id)
+                user_ids = set(users_with_profile)
+
+                # Update each user in the user directory.
+                for user_id, profile in users_with_profile.items():
+                    yield self.update_profile_in_user_dir(
+                        user_id, profile.display_name, profile.avatar_url
+                    )
+
+                to_insert = set()
+
+                if is_public:
+                    for user_id in user_ids:
+                        if self.get_if_app_services_interested_in_user(user_id):
+                            continue
+
+                        to_insert.add(user_id)
+
+                    if to_insert:
+                        yield self.add_users_in_public_rooms(room_id, to_insert)
+                        to_insert.clear()
+                else:
+                    for user_id in user_ids:
+                        if not self.hs.is_mine_id(user_id):
+                            continue
+
+                        if self.get_if_app_services_interested_in_user(user_id):
+                            continue
+
+                        for other_user_id in user_ids:
+                            if user_id == other_user_id:
+                                continue
+
+                            user_set = (user_id, other_user_id)
+                            to_insert.add(user_set)
+
+                    if to_insert:
+                        yield self.add_users_who_share_private_room(room_id, to_insert)
+                        to_insert.clear()
+
+            # We've finished a room. Delete it from the table.
+            yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+            yield self.runInteraction(
+                "populate_user_directory",
+                self._background_update_progress_txn,
+                "populate_user_directory_process_rooms",
+                progress,
+            )
+
+        defer.returnValue(len(rooms_to_work_on))
+
+    @defer.inlineCallbacks
+    def _populate_user_directory_process_users(self, progress, batch_size):
+        """
+        If search_all_users is enabled, add all of the users to the user directory.
+        """
+        if not self.hs.config.user_directory_search_all_users:
+            yield self._end_background_update("populate_user_directory_process_users")
+            defer.returnValue(1)
+
+        def _get_next_batch(txn):
+            sql = "SELECT user_id FROM %s LIMIT %s" % (
+                TEMP_TABLE + "_users",
+                str(batch_size),
+            )
+            txn.execute(sql)
+            users_to_work_on = txn.fetchall()
+
+            if not users_to_work_on:
+                return None
+
+            users_to_work_on = [x[0] for x in users_to_work_on]
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
+            txn.execute(sql)
+            progress["remaining"] = txn.fetchone()[0]
+
+            return users_to_work_on
+
+        users_to_work_on = yield self.runInteraction(
+            "populate_user_directory_temp_read", _get_next_batch
+        )
+
+        # No more users -- complete the transaction.
+        if not users_to_work_on:
+            yield self._end_background_update("populate_user_directory_process_users")
+            defer.returnValue(1)
+
+        logger.info(
+            "Processing the next %d users of %d remaining"
+            % (len(users_to_work_on), progress["remaining"])
+        )
+
+        for user_id in users_to_work_on:
+            profile = yield self.get_profileinfo(get_localpart_from_id(user_id))
+            yield self.update_profile_in_user_dir(
+                user_id, profile.display_name, profile.avatar_url
+            )
+
+            # We've finished processing a user. Delete it from the table.
+            yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id})
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+            yield self.runInteraction(
+                "populate_user_directory",
+                self._background_update_progress_txn,
+                "populate_user_directory_process_users",
+                progress,
+            )
+
+        defer.returnValue(len(users_to_work_on))
+
     @defer.inlineCallbacks
     def is_room_world_readable_or_publicly_joinable(self, room_id):
         """Check if the room is either world_readable or publically joinable
@@ -62,89 +329,16 @@ class UserDirectoryStore(SQLBaseStore):
 
         defer.returnValue(False)
 
-    def add_profiles_to_user_dir(self, users_with_profile):
-        """Add profiles to the user directory
-
-        Args:
-            users_with_profile (dict): Users to add to directory in the form of
-                mapping of user_id -> ProfileInfo
+    def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
+        """
+        Update or add a user's profile in the user directory.
         """
 
-        if isinstance(self.database_engine, PostgresEngine):
-            # We weight the loclpart most highly, then display name and finally
-            # server name
-            sql = """
-                INSERT INTO user_directory_search(user_id, vector)
-                VALUES (?,
-                    setweight(to_tsvector('english', ?), 'A')
-                    || setweight(to_tsvector('english', ?), 'D')
-                    || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
-                )
-            """
-            args = (
-                (
-                    user_id,
-                    get_localpart_from_id(user_id),
-                    get_domain_from_id(user_id),
-                    profile.display_name,
-                )
-                for user_id, profile in iteritems(users_with_profile)
-            )
-        elif isinstance(self.database_engine, Sqlite3Engine):
-            sql = """
-                INSERT INTO user_directory_search(user_id, value)
-                VALUES (?,?)
-            """
-            args = tuple(
-                (
-                    user_id,
-                    "%s %s" % (user_id, p.display_name) if p.display_name else user_id,
-                )
-                for user_id, p in iteritems(users_with_profile)
-            )
-        else:
-            # This should be unreachable.
-            raise Exception("Unrecognized database engine")
-
-        def _add_profiles_to_user_dir_txn(txn):
-            txn.executemany(sql, args)
-            self._simple_insert_many_txn(
-                txn,
-                table="user_directory",
-                values=[
-                    {
-                        "user_id": user_id,
-                        "room_id": None,
-                        "display_name": profile.display_name,
-                        "avatar_url": profile.avatar_url,
-                    }
-                    for user_id, profile in iteritems(users_with_profile)
-                ],
-            )
-            for user_id in users_with_profile:
-                txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
-
-        return self.runInteraction(
-            "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
-        )
-
-    @defer.inlineCallbacks
-    def update_user_in_user_dir(self, user_id, room_id):
-        yield self._simple_update_one(
-            table="user_directory",
-            keyvalues={"user_id": user_id},
-            updatevalues={"room_id": room_id},
-            desc="update_user_in_user_dir",
-        )
-        self.get_user_in_directory.invalidate((user_id,))
-
-    def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id):
         def _update_profile_in_user_dir_txn(txn):
             new_entry = self._simple_upsert_txn(
                 txn,
                 table="user_directory",
                 keyvalues={"user_id": user_id},
-                insertion_values={"room_id": room_id},
                 values={"display_name": display_name, "avatar_url": avatar_url},
                 lock=False,  # We're only inserter
             )
@@ -282,18 +476,6 @@ class UserDirectoryStore(SQLBaseStore):
         defer.returnValue(user_ids)
 
     @defer.inlineCallbacks
-    def get_all_rooms(self):
-        """Get all room_ids we've ever known about, in ascending order of "size"
-        """
-        sql = """
-            SELECT room_id FROM current_state_events
-            GROUP BY room_id
-            ORDER BY count(*) ASC
-        """
-        rows = yield self._execute("get_all_rooms", None, sql)
-        defer.returnValue([room_id for room_id, in rows])
-
-    @defer.inlineCallbacks
     def get_all_local_users(self):
         """Get all local users
         """
@@ -553,8 +735,8 @@ class UserDirectoryStore(SQLBaseStore):
         """
 
         if self.hs.config.user_directory_search_all_users:
-            join_args = ()
-            where_clause = "1=1"
+            join_args = (user_id,)
+            where_clause = "user_id != ?"
         else:
             join_args = (user_id,)
             where_clause = """
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index 114807efc1..aefe11ac28 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -163,9 +163,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
     def get_users_in_public_rooms(self):
         r = self.get_success(
             self.store._simple_select_list(
-                "users_in_public_rooms",
-                None,
-                ("user_id", "room_id"),
+                "users_in_public_rooms", None, ("user_id", "room_id")
             )
         )
         retval = []
@@ -182,6 +180,53 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
             )
         )
 
+    def _add_background_updates(self):
+        """
+        Add the background updates we need to run.
+        """
+        # Ugh, have to reset this flag
+        self.store._all_done = False
+
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {
+                    "update_name": "populate_user_directory_createtables",
+                    "progress_json": "{}",
+                },
+            )
+        )
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {
+                    "update_name": "populate_user_directory_process_rooms",
+                    "progress_json": "{}",
+                    "depends_on": "populate_user_directory_createtables",
+                },
+            )
+        )
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {
+                    "update_name": "populate_user_directory_process_users",
+                    "progress_json": "{}",
+                    "depends_on": "populate_user_directory_process_rooms",
+                },
+            )
+        )
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {
+                    "update_name": "populate_user_directory_cleanup",
+                    "progress_json": "{}",
+                    "depends_on": "populate_user_directory_process_users",
+                },
+            )
+        )
+
     def test_initial(self):
         """
         The user directory's initial handler correctly updates the search tables.
@@ -211,26 +256,17 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
         self.assertEqual(shares_private, [])
         self.assertEqual(public_users, [])
 
-        # Reset the handled users caches
-        self.handler.initially_handled_users = set()
+        # Do the initial population of the user directory via the background update
+        self._add_background_updates()
 
-        # Do the initial population
-        d = self.handler._do_initial_spam()
-
-        # This takes a while, so pump it a bunch of times to get through the
-        # sleep delays
-        for i in range(10):
-            self.pump(1)
-
-        self.get_success(d)
+        while not self.get_success(self.store.has_completed_background_updates()):
+            self.get_success(self.store.do_next_background_update(100), by=0.1)
 
         shares_private = self.get_users_who_share_private_rooms()
         public_users = self.get_users_in_public_rooms()
 
         # User 1 and User 2 are in the same public room
-        self.assertEqual(
-            set(public_users), set([(u1, room), (u2, room)])
-        )
+        self.assertEqual(set(public_users), set([(u1, room), (u2, room)]))
 
         # User 1 and User 3 share private rooms
         self.assertEqual(
@@ -238,7 +274,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
             set([(u1, u3, private_room), (u3, u1, private_room)]),
         )
 
-    def test_search_all_users(self):
+    def test_initial_share_all_users(self):
         """
         Search all users = True means that a user does not have to share a
         private room with the searching user or be in a public room to be search
@@ -248,33 +284,36 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
         self.hs.config.user_directory_search_all_users = True
 
         u1 = self.register_user("user1", "pass")
-        u1_token = self.login(u1, "pass")
-        u2 = self.register_user("user2", "pass")
-        u2_token = self.login(u2, "pass")
+        self.register_user("user2", "pass")
         u3 = self.register_user("user3", "pass")
 
-        # User 1 and User 2 join a room. User 3 never does.
-        room = self.helper.create_room_as(u1, is_public=True, tok=u1_token)
-        self.helper.invite(room, src=u1, targ=u2, tok=u1_token)
-        self.helper.join(room, user=u2, tok=u2_token)
-
+        # Wipe the user dir
         self.get_success(self.store.update_user_directory_stream_pos(None))
         self.get_success(self.store.delete_all_from_user_dir())
 
-        # Reset the handled users caches
-        self.handler.initially_handled_users = set()
+        # Do the initial population of the user directory via the background update
+        self._add_background_updates()
 
-        # Do the initial population
-        d = self.handler._do_initial_spam()
+        while not self.get_success(self.store.has_completed_background_updates()):
+            self.get_success(self.store.do_next_background_update(100), by=0.1)
 
-        # This takes a while, so pump it a bunch of times to get through the
-        # sleep delays
-        for i in range(10):
-            self.pump(1)
+        shares_private = self.get_users_who_share_private_rooms()
+        public_users = self.get_users_in_public_rooms()
 
-        self.get_success(d)
+        # No users share rooms
+        self.assertEqual(public_users, [])
+        self.assertEqual(self._compress_shared(shares_private), set([]))
 
         # Despite not sharing a room, search_all_users means we get a search
         # result.
         s = self.get_success(self.handler.search_users(u1, u3, 10))
         self.assertEqual(len(s["results"]), 1)
+
+        # We can find the other two users
+        s = self.get_success(self.handler.search_users(u1, "user", 10))
+        self.assertEqual(len(s["results"]), 2)
+
+        # Registering a user and then searching for them works.
+        u4 = self.register_user("user4", "pass")
+        s = self.get_success(self.handler.search_users(u1, u4, 10))
+        self.assertEqual(len(s["results"]), 1)
diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py
index 512d76e7a3..fd3361404f 100644
--- a/tests/storage/test_user_directory.py
+++ b/tests/storage/test_user_directory.py
@@ -16,7 +16,6 @@
 from twisted.internet import defer
 
 from synapse.storage import UserDirectoryStore
-from synapse.storage.roommember import ProfileInfo
 
 from tests import unittest
 from tests.utils import setup_test_homeserver
@@ -34,13 +33,9 @@ class UserDirectoryStoreTestCase(unittest.TestCase):
 
         # alice and bob are both in !room_id. bobby is not but shares
         # a homeserver with alice.
-        yield self.store.add_profiles_to_user_dir(
-            {
-                ALICE: ProfileInfo(None, "alice"),
-                BOB: ProfileInfo(None, "bob"),
-                BOBBY: ProfileInfo(None, "bobby"),
-            },
-        )
+        yield self.store.update_profile_in_user_dir(ALICE, "alice", None)
+        yield self.store.update_profile_in_user_dir(BOB, "bob", None)
+        yield self.store.update_profile_in_user_dir(BOBBY, "bobby", None)
         yield self.store.add_users_in_public_rooms(
             "!room:id", (ALICE, BOB)
         )
diff --git a/tests/unittest.py b/tests/unittest.py
index ef31321bc8..7772a47078 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -330,10 +330,10 @@ class HomeserverTestCase(TestCase):
         """
         self.reactor.pump([by] * 100)
 
-    def get_success(self, d):
+    def get_success(self, d, by=0.0):
         if not isinstance(d, Deferred):
             return d
-        self.pump()
+        self.pump(by=by)
         return self.successResultOf(d)
 
     def register_user(self, username, password, admin=False):