diff options
author | Amber Brown <hawkowl@atleastfornow.net> | 2019-03-19 04:50:24 +1100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-19 04:50:24 +1100 |
commit | 282c97327f150a37d53f90ab6207bc1f98e70da3 (patch) | |
tree | 927982cf2c866ec4de99af633f507f892d28bfd9 /synapse/handlers/user_directory.py | |
parent | Add ratelimiting on failed login attempts (#4865) (diff) | |
download | synapse-282c97327f150a37d53f90ab6207bc1f98e70da3.tar.xz |
Migrate the user directory initial population to a background task (#4864)
Diffstat (limited to 'synapse/handlers/user_directory.py')
-rw-r--r-- | synapse/handlers/user_directory.py | 173 |
1 files changed, 13 insertions, 160 deletions
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index d92f8c529c..7dc0e236e7 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -38,18 +38,8 @@ class UserDirectoryHandler(object): world_readable or publically joinable room. We keep a database table up to date by streaming changes of the current state and recalculating whether users should be in the directory or not when necessary. - - For each user in the directory we also store a room_id which is public and that the - user is joined to. This allows us to ignore history_visibility and join_rules changes - for that user in all other public rooms, as we know they'll still be in at least - one public room. """ - INITIAL_ROOM_SLEEP_MS = 50 - INITIAL_ROOM_SLEEP_COUNT = 100 - INITIAL_ROOM_BATCH_SIZE = 100 - INITIAL_USER_SLEEP_MS = 10 - def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() @@ -59,17 +49,6 @@ class UserDirectoryHandler(object): self.is_mine_id = hs.is_mine_id self.update_user_directory = hs.config.update_user_directory self.search_all_users = hs.config.user_directory_search_all_users - - # If we're a worker, don't sleep when doing the initial room work, as it - # won't monopolise the master's CPU. - if hs.config.worker_app: - self.INITIAL_ROOM_SLEEP_MS = 0 - self.INITIAL_USER_SLEEP_MS = 0 - - # When start up for the first time we need to populate the user_directory. - # This is a set of user_id's we've inserted already - self.initially_handled_users = set() - # The current position in the current_state_delta stream self.pos = None @@ -132,7 +111,7 @@ class UserDirectoryHandler(object): # Support users are for diagnostics and should not appear in the user directory. if not is_support: yield self.store.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url, None + user_id, profile.display_name, profile.avatar_url ) @defer.inlineCallbacks @@ -149,10 +128,9 @@ class UserDirectoryHandler(object): if self.pos is None: self.pos = yield self.store.get_user_directory_stream_pos() - # If still None then we need to do the initial fill of directory + # If still None then the initial background update hasn't happened yet if self.pos is None: - yield self._do_initial_spam() - self.pos = yield self.store.get_user_directory_stream_pos() + defer.returnValue(None) # Loop round handling deltas until we're up to date while True: @@ -174,133 +152,6 @@ class UserDirectoryHandler(object): yield self.store.update_user_directory_stream_pos(self.pos) @defer.inlineCallbacks - def _do_initial_spam(self): - """Populates the user_directory from the current state of the DB, used - when synapse first starts with user_directory support - """ - new_pos = yield self.store.get_max_stream_id_in_current_state_deltas() - - # Delete any existing entries just in case there are any - yield self.store.delete_all_from_user_dir() - - # We process by going through each existing room at a time. - room_ids = yield self.store.get_all_rooms() - - logger.info("Doing initial update of user directory. %d rooms", len(room_ids)) - num_processed_rooms = 0 - - for room_id in room_ids: - logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) - yield self._handle_initial_room(room_id) - num_processed_rooms += 1 - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - - logger.info("Processed all rooms.") - - if self.search_all_users: - num_processed_users = 0 - user_ids = yield self.store.get_all_local_users() - logger.info( - "Doing initial update of user directory. %d users", len(user_ids) - ) - for user_id in user_ids: - # We add profiles for all users even if they don't match the - # include pattern, just in case we want to change it in future - logger.info( - "Handling user %d/%d", num_processed_users + 1, len(user_ids) - ) - yield self._handle_local_user(user_id) - num_processed_users += 1 - yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0) - - logger.info("Processed all users") - - self.initially_handled_users = None - - yield self.store.update_user_directory_stream_pos(new_pos) - - @defer.inlineCallbacks - def _handle_initial_room(self, room_id): - """ - Called when we initially fill out user_directory one room at a time - """ - is_in_room = yield self.store.is_host_joined(room_id, self.server_name) - if not is_in_room: - return - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - user_ids = set(users_with_profile) - unhandled_users = user_ids - self.initially_handled_users - - yield self.store.add_profiles_to_user_dir( - {user_id: users_with_profile[user_id] for user_id in unhandled_users} - ) - - self.initially_handled_users |= unhandled_users - - # We now go and figure out the new users who share rooms with user entries - # We sleep aggressively here as otherwise it can starve resources. - # We also batch up inserts/updates, but try to avoid too many at once. - to_insert = set() - count = 0 - - if is_public: - for user_id in user_ids: - if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - - if self.store.get_if_app_services_interested_in_user(user_id): - count += 1 - continue - - to_insert.add(user_id) - if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: - yield self.store.add_users_in_public_rooms(room_id, to_insert) - to_insert.clear() - - if to_insert: - yield self.store.add_users_in_public_rooms(room_id, to_insert) - to_insert.clear() - else: - - for user_id in user_ids: - if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - - if not self.is_mine_id(user_id): - count += 1 - continue - - if self.store.get_if_app_services_interested_in_user(user_id): - count += 1 - continue - - for other_user_id in user_ids: - if user_id == other_user_id: - continue - - if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - count += 1 - - user_set = (user_id, other_user_id) - to_insert.add(user_set) - - if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: - yield self.store.add_users_who_share_private_room( - room_id, not is_public, to_insert - ) - to_insert.clear() - - if to_insert: - yield self.store.add_users_who_share_private_room(room_id, to_insert) - to_insert.clear() - - @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process """ @@ -449,7 +300,9 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_directory(user_id) if not row: - yield self.store.add_profiles_to_user_dir({user_id: profile}) + yield self.store.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url + ) @defer.inlineCallbacks def _handle_new_user(self, room_id, user_id, profile): @@ -461,9 +314,9 @@ class UserDirectoryHandler(object): """ logger.debug("Adding new user to dir, %r", user_id) - row = yield self.store.get_user_in_directory(user_id) - if not row: - yield self.store.add_profiles_to_user_dir({user_id: profile}) + yield self.store.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url + ) is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id @@ -479,7 +332,9 @@ class UserDirectoryHandler(object): # First, if they're our user then we need to update for every user if self.is_mine_id(user_id): - is_appservice = self.store.get_if_app_services_interested_in_user(user_id) + is_appservice = self.store.get_if_app_services_interested_in_user( + user_id + ) # We don't care about appservice users. if not is_appservice: @@ -546,9 +401,7 @@ class UserDirectoryHandler(object): new_avatar = event.content.get("avatar_url") if prev_name != new_name or prev_avatar != new_avatar: - yield self.store.update_profile_in_user_dir( - user_id, new_name, new_avatar, room_id - ) + yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): |