diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index c21da8343a..7dc0e236e7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
world_readable or publically joinable room. We keep a database table up to date
by streaming changes of the current state and recalculating whether users should
be in the directory or not when necessary.
-
- For each user in the directory we also store a room_id which is public and that the
- user is joined to. This allows us to ignore history_visibility and join_rules changes
- for that user in all other public rooms, as we know they'll still be in at least
- one public room.
"""
- INITIAL_ROOM_SLEEP_MS = 50
- INITIAL_ROOM_SLEEP_COUNT = 100
- INITIAL_ROOM_BATCH_SIZE = 100
- INITIAL_USER_SLEEP_MS = 10
-
def __init__(self, hs):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
@@ -59,11 +49,6 @@ class UserDirectoryHandler(object):
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
-
- # When start up for the first time we need to populate the user_directory.
- # This is a set of user_id's we've inserted already
- self.initially_handled_users = set()
-
# The current position in the current_state_delta stream
self.pos = None
@@ -126,7 +111,7 @@ class UserDirectoryHandler(object):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
- user_id, profile.display_name, profile.avatar_url, None
+ user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
@@ -143,10 +128,9 @@ class UserDirectoryHandler(object):
if self.pos is None:
self.pos = yield self.store.get_user_directory_stream_pos()
- # If still None then we need to do the initial fill of directory
+ # If still None then the initial background update hasn't happened yet
if self.pos is None:
- yield self._do_initial_spam()
- self.pos = yield self.store.get_user_directory_stream_pos()
+ defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
@@ -168,113 +152,6 @@ class UserDirectoryHandler(object):
yield self.store.update_user_directory_stream_pos(self.pos)
@defer.inlineCallbacks
- def _do_initial_spam(self):
- """Populates the user_directory from the current state of the DB, used
- when synapse first starts with user_directory support
- """
- new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
- # Delete any existing entries just in case there are any
- yield self.store.delete_all_from_user_dir()
-
- # We process by going through each existing room at a time.
- room_ids = yield self.store.get_all_rooms()
-
- logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
- num_processed_rooms = 0
-
- for room_id in room_ids:
- logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
- yield self._handle_initial_room(room_id)
- num_processed_rooms += 1
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- logger.info("Processed all rooms.")
-
- if self.search_all_users:
- num_processed_users = 0
- user_ids = yield self.store.get_all_local_users()
- logger.info(
- "Doing initial update of user directory. %d users", len(user_ids)
- )
- for user_id in user_ids:
- # We add profiles for all users even if they don't match the
- # include pattern, just in case we want to change it in future
- logger.info(
- "Handling user %d/%d", num_processed_users + 1, len(user_ids)
- )
- yield self._handle_local_user(user_id)
- num_processed_users += 1
- yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
- logger.info("Processed all users")
-
- self.initially_handled_users = None
-
- yield self.store.update_user_directory_stream_pos(new_pos)
-
- @defer.inlineCallbacks
- def _handle_initial_room(self, room_id):
- """
- Called when we initially fill out user_directory one room at a time
- """
- is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
- if not is_in_room:
- return
-
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- room_id
- )
-
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
- user_ids = set(users_with_profile)
- unhandled_users = user_ids - self.initially_handled_users
-
- yield self.store.add_profiles_to_user_dir(
- {user_id: users_with_profile[user_id] for user_id in unhandled_users},
- )
-
- self.initially_handled_users |= unhandled_users
-
- # We now go and figure out the new users who share rooms with user entries
- # We sleep aggressively here as otherwise it can starve resources.
- # We also batch up inserts/updates, but try to avoid too many at once.
- to_insert = set()
- count = 0
- for user_id in user_ids:
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- if not self.is_mine_id(user_id):
- count += 1
- continue
-
- if self.store.get_if_app_services_interested_in_user(user_id):
- count += 1
- continue
-
- for other_user_id in user_ids:
- if user_id == other_user_id:
- continue
-
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
- count += 1
-
- user_set = (user_id, other_user_id)
- to_insert.add(user_set)
-
- if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.add_users_who_share_room(
- room_id, not is_public, to_insert
- )
- to_insert.clear()
-
- if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
- to_insert.clear()
-
- @defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
"""
@@ -423,7 +300,9 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id)
if not row:
- yield self.store.add_profiles_to_user_dir({user_id: profile})
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
@@ -435,9 +314,9 @@ class UserDirectoryHandler(object):
"""
logger.debug("Adding new user to dir, %r", user_id)
- row = yield self.store.get_user_in_directory(user_id)
- if not row:
- yield self.store.add_profiles_to_user_dir({user_id: profile})
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
@@ -445,34 +324,39 @@ class UserDirectoryHandler(object):
# Now we update users who share rooms with users.
users_with_profile = yield self.state.get_current_user_in_room(room_id)
- to_insert = set()
+ if is_public:
+ yield self.store.add_users_in_public_rooms(room_id, (user_id,))
+ else:
+ to_insert = set()
- # First, if they're our user then we need to update for every user
- if self.is_mine_id(user_id):
+ # First, if they're our user then we need to update for every user
+ if self.is_mine_id(user_id):
- is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ user_id
+ )
- # We don't care about appservice users.
- if not is_appservice:
- for other_user_id in users_with_profile:
- if user_id == other_user_id:
- continue
+ # We don't care about appservice users.
+ if not is_appservice:
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
- to_insert.add((user_id, other_user_id))
+ to_insert.add((user_id, other_user_id))
- # Next we need to update for every local user in the room
- for other_user_id in users_with_profile:
- if user_id == other_user_id:
- continue
+ # Next we need to update for every local user in the room
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
- is_appservice = self.store.get_if_app_services_interested_in_user(
- other_user_id
- )
- if self.is_mine_id(other_user_id) and not is_appservice:
- to_insert.add((other_user_id, user_id))
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ other_user_id
+ )
+ if self.is_mine_id(other_user_id) and not is_appservice:
+ to_insert.add((other_user_id, user_id))
- if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
+ if to_insert:
+ yield self.store.add_users_who_share_private_room(room_id, to_insert)
@defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id):
@@ -487,10 +371,10 @@ class UserDirectoryHandler(object):
# Remove user from sharing tables
yield self.store.remove_user_who_share_room(user_id, room_id)
- # Are they still in a room with members? If not, remove them entirely.
- users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id)
+ # Are they still in any rooms? If not, remove them entirely.
+ rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
- if len(users_in_room_with) == 0:
+ if len(rooms_user_is_in) == 0:
yield self.store.remove_from_user_dir(user_id)
@defer.inlineCallbacks
@@ -517,9 +401,7 @@ class UserDirectoryHandler(object):
new_avatar = event.content.get("avatar_url")
if prev_name != new_name or prev_avatar != new_avatar:
- yield self.store.update_profile_in_user_dir(
- user_id, new_name, new_avatar, room_id
- )
+ yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
|