diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index c21da8343a..d92f8c529c 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -60,6 +60,12 @@ class UserDirectoryHandler(object):
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
+ # If we're a worker, don't sleep when doing the initial room work, as it
+ # won't monopolise the master's CPU.
+ if hs.config.worker_app:
+ self.INITIAL_ROOM_SLEEP_MS = 0
+ self.INITIAL_USER_SLEEP_MS = 0
+
# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
self.initially_handled_users = set()
@@ -231,7 +237,7 @@ class UserDirectoryHandler(object):
unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
- {user_id: users_with_profile[user_id] for user_id in unhandled_users},
+ {user_id: users_with_profile[user_id] for user_id in unhandled_users}
)
self.initially_handled_users |= unhandled_users
@@ -241,38 +247,58 @@ class UserDirectoryHandler(object):
# We also batch up inserts/updates, but try to avoid too many at once.
to_insert = set()
count = 0
- for user_id in user_ids:
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- if not self.is_mine_id(user_id):
- count += 1
- continue
- if self.store.get_if_app_services_interested_in_user(user_id):
- count += 1
- continue
+ if is_public:
+ for user_id in user_ids:
+ if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
- for other_user_id in user_ids:
- if user_id == other_user_id:
+ if self.store.get_if_app_services_interested_in_user(user_id):
+ count += 1
continue
+ to_insert.add(user_id)
+ if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
+ yield self.store.add_users_in_public_rooms(room_id, to_insert)
+ to_insert.clear()
+
+ if to_insert:
+ yield self.store.add_users_in_public_rooms(room_id, to_insert)
+ to_insert.clear()
+ else:
+
+ for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
- count += 1
- user_set = (user_id, other_user_id)
- to_insert.add(user_set)
+ if not self.is_mine_id(user_id):
+ count += 1
+ continue
- if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.add_users_who_share_room(
- room_id, not is_public, to_insert
- )
- to_insert.clear()
+ if self.store.get_if_app_services_interested_in_user(user_id):
+ count += 1
+ continue
+
+ for other_user_id in user_ids:
+ if user_id == other_user_id:
+ continue
+
+ if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
+ count += 1
+
+ user_set = (user_id, other_user_id)
+ to_insert.add(user_set)
+
+ if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
+ yield self.store.add_users_who_share_private_room(
+ room_id, not is_public, to_insert
+ )
+ to_insert.clear()
- if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
- to_insert.clear()
+ if to_insert:
+ yield self.store.add_users_who_share_private_room(room_id, to_insert)
+ to_insert.clear()
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
@@ -445,34 +471,37 @@ class UserDirectoryHandler(object):
# Now we update users who share rooms with users.
users_with_profile = yield self.state.get_current_user_in_room(room_id)
- to_insert = set()
+ if is_public:
+ yield self.store.add_users_in_public_rooms(room_id, (user_id,))
+ else:
+ to_insert = set()
- # First, if they're our user then we need to update for every user
- if self.is_mine_id(user_id):
+ # First, if they're our user then we need to update for every user
+ if self.is_mine_id(user_id):
- is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+ is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
- # We don't care about appservice users.
- if not is_appservice:
- for other_user_id in users_with_profile:
- if user_id == other_user_id:
- continue
+ # We don't care about appservice users.
+ if not is_appservice:
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
- to_insert.add((user_id, other_user_id))
+ to_insert.add((user_id, other_user_id))
- # Next we need to update for every local user in the room
- for other_user_id in users_with_profile:
- if user_id == other_user_id:
- continue
+ # Next we need to update for every local user in the room
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
- is_appservice = self.store.get_if_app_services_interested_in_user(
- other_user_id
- )
- if self.is_mine_id(other_user_id) and not is_appservice:
- to_insert.add((other_user_id, user_id))
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ other_user_id
+ )
+ if self.is_mine_id(other_user_id) and not is_appservice:
+ to_insert.add((other_user_id, user_id))
- if to_insert:
- yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
+ if to_insert:
+ yield self.store.add_users_who_share_private_room(room_id, to_insert)
@defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id):
@@ -487,10 +516,10 @@ class UserDirectoryHandler(object):
# Remove user from sharing tables
yield self.store.remove_user_who_share_room(user_id, room_id)
- # Are they still in a room with members? If not, remove them entirely.
- users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id)
+ # Are they still in any rooms? If not, remove them entirely.
+ rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
- if len(users_in_room_with) == 0:
+ if len(rooms_user_is_in) == 0:
yield self.store.remove_from_user_dir(user_id)
@defer.inlineCallbacks
|