diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index f4451e5dfb..8928786fd6 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,12 +14,12 @@
# limitations under the License.
import logging
-
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
+from synapse.util.async import sleep
logger = logging.getLogger(__name__)
@@ -41,12 +41,15 @@ class UserDirectoyHandler(object):
one public room.
"""
+ INITIAL_SLEEP_MS = 50
+
def __init__(self, hs):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
self.notifier.add_replication_callback(self.notify_new_event)
@@ -55,6 +58,9 @@ class UserDirectoyHandler(object):
self.initially_handled_users = set()
self.initially_handled_users_in_public = set()
+ self.initially_handled_users_share = set()
+ self.initially_handled_users_share_private_room = set()
+
# The current position in the current_state_delta stream
self.pos = None
@@ -65,7 +71,7 @@ class UserDirectoyHandler(object):
# we start populating the user directory
self.clock.call_later(0, self.notify_new_event)
- def search_users(self, search_term, limit):
+ def search_users(self, user_id, search_term, limit):
"""Searches for users in directory
Returns:
@@ -82,7 +88,7 @@ class UserDirectoyHandler(object):
]
}
"""
- return self.store.search_user_dir(search_term, limit)
+ return self.store.search_user_dir(user_id, search_term, limit)
@defer.inlineCallbacks
def notify_new_event(self):
@@ -140,10 +146,14 @@ class UserDirectoyHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
yield self._handle_intial_room(room_id)
num_processed_rooms += 1
+ yield sleep(self.INITIAL_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
self.initially_handled_users = None
+ self.initially_handled_users_in_public = None
+ self.initially_handled_users_share = None
+ self.initially_handled_users_share_private_room = None
yield self.store.update_user_directory_stream_pos(new_pos)
@@ -158,7 +168,8 @@ class UserDirectoyHandler(object):
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
- unhandled_users = set(users_with_profile) - self.initially_handled_users
+ user_ids = set(users_with_profile)
+ unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
room_id, {
@@ -175,6 +186,69 @@ class UserDirectoyHandler(object):
)
self.initially_handled_users_in_public != unhandled_users
+ # We now go and figure out the new users who share rooms with user entries
+ # We sleep aggressively here as otherwise it can starve resources.
+ # We also batch up inserts/updates, but try to avoid too many at once.
+ to_insert = set()
+ to_update = set()
+ count = 0
+ for user_id in user_ids:
+ if count % 100 == 0:
+ yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+
+ if not self.is_mine_id(user_id):
+ count += 1
+ continue
+
+ for other_user_id in user_ids:
+ if user_id == other_user_id:
+ continue
+
+ if count % 100 == 0:
+ yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+ count += 1
+
+ user_set = (user_id, other_user_id)
+
+ if user_set in self.initially_handled_users_share_private_room:
+ continue
+
+ if user_set in self.initially_handled_users_share:
+ if is_public:
+ continue
+ to_update.add(user_set)
+ else:
+ to_insert.add(user_set)
+
+ if is_public:
+ self.initially_handled_users_share.add(user_set)
+ else:
+ self.initially_handled_users_share_private_room.add(user_set)
+
+ if len(to_insert) > 100:
+ yield self.store.add_users_who_share_room(
+ room_id, not is_public, to_insert,
+ )
+ to_insert.clear()
+
+ if len(to_update) > 100:
+ yield self.store.update_users_who_share_room(
+ room_id, not is_public, to_update,
+ )
+ to_update.clear()
+
+ if to_insert:
+ yield self.store.add_users_who_share_room(
+ room_id, not is_public, to_insert,
+ )
+ to_insert.clear()
+
+ if to_update:
+ yield self.store.update_users_who_share_room(
+ room_id, not is_public, to_update,
+ )
+ to_update.clear()
+
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
@@ -316,12 +390,77 @@ class UserDirectoyHandler(object):
room_id
)
- if not is_public:
- return
+ if is_public:
+ row = yield self.store.get_user_in_public_room(user_id)
+ if not row:
+ yield self.store.add_users_to_public_room(room_id, [user_id])
- row = yield self.store.get_user_in_public_room(user_id)
- if not row:
- yield self.store.add_users_to_public_room(room_id, [user_id])
+ # Now we update users who share rooms with users. We do this by getting
+ # all the current users in the room and seeing which aren't already
+ # marked in the database as sharing with `user_id`
+
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+ to_insert = set()
+ to_update = set()
+
+ # First, if they're our user then we need to update for every user
+ if self.is_mine_id(user_id):
+ # Returns a map of other_user_id -> shared_private. We only need
+ # to update mappings if for users that either don't share a room
+ # already (aren't in the map) or, if the room is private, those that
+ # only share a public room.
+ user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
+ user_id
+ )
+
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
+
+ shared_is_private = user_ids_shared.get(other_user_id)
+ if shared_is_private is True:
+ # We've already marked in the database they share a private room
+ continue
+ elif shared_is_private is False:
+ # They already share a public room, so only update if this is
+ # a private room
+ if not is_public:
+ to_update.add((user_id, other_user_id))
+ elif shared_is_private is None:
+ # This is the first time they both share a room
+ to_insert.add((user_id, other_user_id))
+
+ # Next we need to update for every local user in the room
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
+
+ if self.is_mine_id(other_user_id):
+ shared_is_private = yield self.store.get_if_users_share_a_room(
+ other_user_id, user_id,
+ )
+ if shared_is_private is True:
+ # We've already marked in the database they share a private room
+ continue
+ elif shared_is_private is False:
+ # They already share a public room, so only update if this is
+ # a private room
+ if not is_public:
+ to_update.add((other_user_id, user_id))
+ elif shared_is_private is None:
+ # This is the first time they both share a room
+ to_insert.add((other_user_id, user_id))
+
+ if to_insert:
+ yield self.store.add_users_who_share_room(
+ room_id, not is_public, to_insert,
+ )
+
+ if to_update:
+ yield self.store.update_users_who_share_room(
+ room_id, not is_public, to_update,
+ )
@defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id):
@@ -339,32 +478,29 @@ class UserDirectoyHandler(object):
row = yield self.store.get_user_in_public_room(user_id)
update_user_in_public = row and row["room_id"] == room_id
- if not update_user_in_public and not update_user_dir:
- return
-
- # XXX: Make this faster?
- rooms = yield self.store.get_rooms_for_user(user_id)
- for j_room_id in rooms:
- if not update_user_in_public and not update_user_dir:
- break
+ if (update_user_in_public or update_user_dir):
+ # XXX: Make this faster?
+ rooms = yield self.store.get_rooms_for_user(user_id)
+ for j_room_id in rooms:
+ if (not update_user_in_public and not update_user_dir):
+ break
- is_in_room = yield self.store.is_host_joined(
- j_room_id, self.server_name,
- )
+ is_in_room = yield self.store.is_host_joined(
+ j_room_id, self.server_name,
+ )
- if not is_in_room:
- continue
+ if not is_in_room:
+ continue
- if update_user_dir:
- update_user_dir = False
- yield self.store.update_user_in_user_dir(user_id, j_room_id)
+ if update_user_dir:
+ update_user_dir = False
+ yield self.store.update_user_in_user_dir(user_id, j_room_id)
- if update_user_in_public:
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
j_room_id
)
- if is_public:
+ if update_user_in_public and is_public:
yield self.store.update_user_in_public_user_list(user_id, j_room_id)
update_user_in_public = False
@@ -373,6 +509,46 @@ class UserDirectoyHandler(object):
elif update_user_in_public:
yield self.store.remove_from_user_in_public_room(user_id)
+ # Now handle users_who_share_rooms.
+
+ # Get a list of user tuples that were in the DB due to this room and
+ # users (this includes tuples where the other user matches `user_id`)
+ user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
+ user_id, room_id,
+ )
+
+ for user_id, other_user_id in user_tuples:
+ # For each user tuple get a list of rooms that they still share,
+ # trying to find a private room, and update the entry in the DB
+ rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id)
+
+ # If they dont share a room anymore, remove the mapping
+ if not rooms:
+ yield self.store.remove_user_who_share_room(
+ user_id, other_user_id,
+ )
+ continue
+
+ found_public_share = None
+ for j_room_id in rooms:
+ is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+ j_room_id
+ )
+
+ if is_public:
+ found_public_share = j_room_id
+ else:
+ found_public_share = None
+ yield self.store.update_users_who_share_room(
+ room_id, not is_public, [(user_id, other_user_id)],
+ )
+ break
+
+ if found_public_share:
+ yield self.store.update_users_who_share_room(
+ room_id, not is_public, [(user_id, other_user_id)],
+ )
+
@defer.inlineCallbacks
def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
"""Check member event changes for any profile changes and update the
|