diff options
Diffstat (limited to 'synapse/handlers/user_directory.py')
-rw-r--r-- | synapse/handlers/user_directory.py | 230 |
1 files changed, 203 insertions, 27 deletions
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index f4451e5dfb..8928786fd6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -14,12 +14,12 @@ # limitations under the License. import logging - from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure +from synapse.util.async import sleep logger = logging.getLogger(__name__) @@ -41,12 +41,15 @@ class UserDirectoyHandler(object): one public room. """ + INITIAL_SLEEP_MS = 50 + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.notifier.add_replication_callback(self.notify_new_event) @@ -55,6 +58,9 @@ class UserDirectoyHandler(object): self.initially_handled_users = set() self.initially_handled_users_in_public = set() + self.initially_handled_users_share = set() + self.initially_handled_users_share_private_room = set() + # The current position in the current_state_delta stream self.pos = None @@ -65,7 +71,7 @@ class UserDirectoyHandler(object): # we start populating the user directory self.clock.call_later(0, self.notify_new_event) - def search_users(self, search_term, limit): + def search_users(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -82,7 +88,7 @@ class UserDirectoyHandler(object): ] } """ - return self.store.search_user_dir(search_term, limit) + return self.store.search_user_dir(user_id, search_term, limit) @defer.inlineCallbacks def notify_new_event(self): @@ -140,10 +146,14 @@ class UserDirectoyHandler(object): logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) yield self._handle_intial_room(room_id) num_processed_rooms += 1 + yield sleep(self.INITIAL_SLEEP_MS / 1000.) logger.info("Processed all rooms.") self.initially_handled_users = None + self.initially_handled_users_in_public = None + self.initially_handled_users_share = None + self.initially_handled_users_share_private_room = None yield self.store.update_user_directory_stream_pos(new_pos) @@ -158,7 +168,8 @@ class UserDirectoyHandler(object): is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users + user_ids = set(users_with_profile) + unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( room_id, { @@ -175,6 +186,69 @@ class UserDirectoyHandler(object): ) self.initially_handled_users_in_public != unhandled_users + # We now go and figure out the new users who share rooms with user entries + # We sleep aggressively here as otherwise it can starve resources. + # We also batch up inserts/updates, but try to avoid too many at once. + to_insert = set() + to_update = set() + count = 0 + for user_id in user_ids: + if count % 100 == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + if not self.is_mine_id(user_id): + count += 1 + continue + + for other_user_id in user_ids: + if user_id == other_user_id: + continue + + if count % 100 == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + count += 1 + + user_set = (user_id, other_user_id) + + if user_set in self.initially_handled_users_share_private_room: + continue + + if user_set in self.initially_handled_users_share: + if is_public: + continue + to_update.add(user_set) + else: + to_insert.add(user_set) + + if is_public: + self.initially_handled_users_share.add(user_set) + else: + self.initially_handled_users_share_private_room.add(user_set) + + if len(to_insert) > 100: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if len(to_update) > 100: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process @@ -316,12 +390,77 @@ class UserDirectoyHandler(object): room_id ) - if not is_public: - return + if is_public: + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) - row = yield self.store.get_user_in_public_room(user_id) - if not row: - yield self.store.add_users_to_public_room(room_id, [user_id]) + # Now we update users who share rooms with users. We do this by getting + # all the current users in the room and seeing which aren't already + # marked in the database as sharing with `user_id` + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + to_insert = set() + to_update = set() + + # First, if they're our user then we need to update for every user + if self.is_mine_id(user_id): + # Returns a map of other_user_id -> shared_private. We only need + # to update mappings if for users that either don't share a room + # already (aren't in the map) or, if the room is private, those that + # only share a public room. + user_ids_shared = yield self.store.get_users_who_share_room_from_dir( + user_id + ) + + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + shared_is_private = user_ids_shared.get(other_user_id) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((user_id, other_user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((user_id, other_user_id)) + + # Next we need to update for every local user in the room + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + if self.is_mine_id(other_user_id): + shared_is_private = yield self.store.get_if_users_share_a_room( + other_user_id, user_id, + ) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((other_user_id, user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((other_user_id, user_id)) + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -339,32 +478,29 @@ class UserDirectoyHandler(object): row = yield self.store.get_user_in_public_room(user_id) update_user_in_public = row and row["room_id"] == room_id - if not update_user_in_public and not update_user_dir: - return - - # XXX: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: - break + if (update_user_in_public or update_user_dir): + # XXX: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + if (not update_user_in_public and not update_user_dir): + break - is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name, - ) + is_in_room = yield self.store.is_host_joined( + j_room_id, self.server_name, + ) - if not is_in_room: - continue + if not is_in_room: + continue - if update_user_dir: - update_user_dir = False - yield self.store.update_user_in_user_dir(user_id, j_room_id) + if update_user_dir: + update_user_dir = False + yield self.store.update_user_in_user_dir(user_id, j_room_id) - if update_user_in_public: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( j_room_id ) - if is_public: + if update_user_in_public and is_public: yield self.store.update_user_in_public_user_list(user_id, j_room_id) update_user_in_public = False @@ -373,6 +509,46 @@ class UserDirectoyHandler(object): elif update_user_in_public: yield self.store.remove_from_user_in_public_room(user_id) + # Now handle users_who_share_rooms. + + # Get a list of user tuples that were in the DB due to this room and + # users (this includes tuples where the other user matches `user_id`) + user_tuples = yield self.store.get_users_in_share_dir_with_room_id( + user_id, room_id, + ) + + for user_id, other_user_id in user_tuples: + # For each user tuple get a list of rooms that they still share, + # trying to find a private room, and update the entry in the DB + rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id) + + # If they dont share a room anymore, remove the mapping + if not rooms: + yield self.store.remove_user_who_share_room( + user_id, other_user_id, + ) + continue + + found_public_share = None + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + found_public_share = j_room_id + else: + found_public_share = None + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + break + + if found_public_share: + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + @defer.inlineCallbacks def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): """Check member event changes for any profile changes and update the |