summary refs log tree commit diff
path: root/synapse/handlers/user_directory.py
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2019-03-19 04:50:24 +1100
committerGitHub <noreply@github.com>2019-03-19 04:50:24 +1100
commit282c97327f150a37d53f90ab6207bc1f98e70da3 (patch)
tree927982cf2c866ec4de99af633f507f892d28bfd9 /synapse/handlers/user_directory.py
parentAdd ratelimiting on failed login attempts (#4865) (diff)
downloadsynapse-282c97327f150a37d53f90ab6207bc1f98e70da3.tar.xz
Migrate the user directory initial population to a background task (#4864)
Diffstat (limited to 'synapse/handlers/user_directory.py')
-rw-r--r--synapse/handlers/user_directory.py173
1 files changed, 13 insertions, 160 deletions
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d92f8c529c..7dc0e236e7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
     world_readable or publically joinable room. We keep a database table up to date
     by streaming changes of the current state and recalculating whether users should
     be in the directory or not when necessary.
-
-    For each user in the directory we also store a room_id which is public and that the
-    user is joined to. This allows us to ignore history_visibility and join_rules changes
-    for that user in all other public rooms, as we know they'll still be in at least
-    one public room.
     """
 
-    INITIAL_ROOM_SLEEP_MS = 50
-    INITIAL_ROOM_SLEEP_COUNT = 100
-    INITIAL_ROOM_BATCH_SIZE = 100
-    INITIAL_USER_SLEEP_MS = 10
-
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
@@ -59,17 +49,6 @@ class UserDirectoryHandler(object):
         self.is_mine_id = hs.is_mine_id
         self.update_user_directory = hs.config.update_user_directory
         self.search_all_users = hs.config.user_directory_search_all_users
-
-        # If we're a worker, don't sleep when doing the initial room work, as it
-        # won't monopolise the master's CPU.
-        if hs.config.worker_app:
-            self.INITIAL_ROOM_SLEEP_MS = 0
-            self.INITIAL_USER_SLEEP_MS = 0
-
-        # When start up for the first time we need to populate the user_directory.
-        # This is a set of user_id's we've inserted already
-        self.initially_handled_users = set()
-
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -132,7 +111,7 @@ class UserDirectoryHandler(object):
         # Support users are for diagnostics and should not appear in the user directory.
         if not is_support:
             yield self.store.update_profile_in_user_dir(
-                user_id, profile.display_name, profile.avatar_url, None
+                user_id, profile.display_name, profile.avatar_url
             )
 
     @defer.inlineCallbacks
@@ -149,10 +128,9 @@ class UserDirectoryHandler(object):
         if self.pos is None:
             self.pos = yield self.store.get_user_directory_stream_pos()
 
-        # If still None then we need to do the initial fill of directory
+        # If still None then the initial background update hasn't happened yet
         if self.pos is None:
-            yield self._do_initial_spam()
-            self.pos = yield self.store.get_user_directory_stream_pos()
+            defer.returnValue(None)
 
         # Loop round handling deltas until we're up to date
         while True:
@@ -174,133 +152,6 @@ class UserDirectoryHandler(object):
                 yield self.store.update_user_directory_stream_pos(self.pos)
 
     @defer.inlineCallbacks
-    def _do_initial_spam(self):
-        """Populates the user_directory from the current state of the DB, used
-        when synapse first starts with user_directory support
-        """
-        new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
-        # Delete any existing entries just in case there are any
-        yield self.store.delete_all_from_user_dir()
-
-        # We process by going through each existing room at a time.
-        room_ids = yield self.store.get_all_rooms()
-
-        logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
-        num_processed_rooms = 0
-
-        for room_id in room_ids:
-            logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
-            yield self._handle_initial_room(room_id)
-            num_processed_rooms += 1
-            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-        logger.info("Processed all rooms.")
-
-        if self.search_all_users:
-            num_processed_users = 0
-            user_ids = yield self.store.get_all_local_users()
-            logger.info(
-                "Doing initial update of user directory. %d users", len(user_ids)
-            )
-            for user_id in user_ids:
-                # We add profiles for all users even if they don't match the
-                # include pattern, just in case we want to change it in future
-                logger.info(
-                    "Handling user %d/%d", num_processed_users + 1, len(user_ids)
-                )
-                yield self._handle_local_user(user_id)
-                num_processed_users += 1
-                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
-            logger.info("Processed all users")
-
-        self.initially_handled_users = None
-
-        yield self.store.update_user_directory_stream_pos(new_pos)
-
-    @defer.inlineCallbacks
-    def _handle_initial_room(self, room_id):
-        """
-        Called when we initially fill out user_directory one room at a time
-        """
-        is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
-        if not is_in_room:
-            return
-
-        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-            room_id
-        )
-
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
-        user_ids = set(users_with_profile)
-        unhandled_users = user_ids - self.initially_handled_users
-
-        yield self.store.add_profiles_to_user_dir(
-            {user_id: users_with_profile[user_id] for user_id in unhandled_users}
-        )
-
-        self.initially_handled_users |= unhandled_users
-
-        # We now go and figure out the new users who share rooms with user entries
-        # We sleep aggressively here as otherwise it can starve resources.
-        # We also batch up inserts/updates, but try to avoid too many at once.
-        to_insert = set()
-        count = 0
-
-        if is_public:
-            for user_id in user_ids:
-                if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-                if self.store.get_if_app_services_interested_in_user(user_id):
-                    count += 1
-                    continue
-
-                to_insert.add(user_id)
-                if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
-                    yield self.store.add_users_in_public_rooms(room_id, to_insert)
-                    to_insert.clear()
-
-            if to_insert:
-                yield self.store.add_users_in_public_rooms(room_id, to_insert)
-                to_insert.clear()
-        else:
-
-            for user_id in user_ids:
-                if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-                if not self.is_mine_id(user_id):
-                    count += 1
-                    continue
-
-                if self.store.get_if_app_services_interested_in_user(user_id):
-                    count += 1
-                    continue
-
-                for other_user_id in user_ids:
-                    if user_id == other_user_id:
-                        continue
-
-                    if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                        yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-                    count += 1
-
-                    user_set = (user_id, other_user_id)
-                    to_insert.add(user_set)
-
-                    if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
-                        yield self.store.add_users_who_share_private_room(
-                            room_id, not is_public, to_insert
-                        )
-                        to_insert.clear()
-
-            if to_insert:
-                yield self.store.add_users_who_share_private_room(room_id, to_insert)
-                to_insert.clear()
-
-    @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
         """Called with the state deltas to process
         """
@@ -449,7 +300,9 @@ class UserDirectoryHandler(object):
 
         row = yield self.store.get_user_in_directory(user_id)
         if not row:
-            yield self.store.add_profiles_to_user_dir({user_id: profile})
+            yield self.store.update_profile_in_user_dir(
+                user_id, profile.display_name, profile.avatar_url
+            )
 
     @defer.inlineCallbacks
     def _handle_new_user(self, room_id, user_id, profile):
@@ -461,9 +314,9 @@ class UserDirectoryHandler(object):
         """
         logger.debug("Adding new user to dir, %r", user_id)
 
-        row = yield self.store.get_user_in_directory(user_id)
-        if not row:
-            yield self.store.add_profiles_to_user_dir({user_id: profile})
+        yield self.store.update_profile_in_user_dir(
+            user_id, profile.display_name, profile.avatar_url
+        )
 
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
             room_id
@@ -479,7 +332,9 @@ class UserDirectoryHandler(object):
             # First, if they're our user then we need to update for every user
             if self.is_mine_id(user_id):
 
-                is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+                is_appservice = self.store.get_if_app_services_interested_in_user(
+                    user_id
+                )
 
                 # We don't care about appservice users.
                 if not is_appservice:
@@ -546,9 +401,7 @@ class UserDirectoryHandler(object):
         new_avatar = event.content.get("avatar_url")
 
         if prev_name != new_name or prev_avatar != new_avatar:
-            yield self.store.update_profile_in_user_dir(
-                user_id, new_name, new_avatar, room_id
-            )
+            yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
 
     @defer.inlineCallbacks
     def _get_key_change(self, prev_event_id, event_id, key_name, public_value):