diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/user_directory.py | 117 | ||||
-rw-r--r-- | synapse/storage/schema/delta/53/users_in_public_rooms.sql | 17 | ||||
-rw-r--r-- | synapse/storage/user_directory.py | 165 |
3 files changed, 139 insertions, 160 deletions
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 20a026e776..f9f7b8abd0 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -247,38 +247,58 @@ class UserDirectoryHandler(object): # We also batch up inserts/updates, but try to avoid too many at once. to_insert = set() count = 0 - for user_id in user_ids: - if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - if not self.is_mine_id(user_id): - count += 1 - continue - - if self.store.get_if_app_services_interested_in_user(user_id): - count += 1 - continue + if is_public: + for user_id in user_ids: + if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - for other_user_id in user_ids: - if user_id == other_user_id: + if self.store.get_if_app_services_interested_in_user(user_id): + count += 1 continue + to_insert.add(user_id) + if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: + yield self.store.add_users_in_public_rooms(room_id, to_insert) + to_insert.clear() + + if to_insert: + yield self.store.add_users_in_public_rooms(room_id, to_insert) + to_insert.clear() + else: + + for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - count += 1 - user_set = (user_id, other_user_id) - to_insert.add(user_set) + if not self.is_mine_id(user_id): + count += 1 + continue - if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: - yield self.store.add_users_who_share_room( - room_id, not is_public, to_insert - ) - to_insert.clear() + if self.store.get_if_app_services_interested_in_user(user_id): + count += 1 + continue - if to_insert: - yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) - to_insert.clear() + for other_user_id in user_ids: + if user_id == other_user_id: + continue + + if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) + count += 1 + + user_set = (user_id, other_user_id) + to_insert.add(user_set) + + if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: + yield self.store.add_users_who_share_private_room( + room_id, not is_public, to_insert + ) + to_insert.clear() + + if to_insert: + yield self.store.add_users_who_share_private_room(room_id, to_insert) + to_insert.clear() @defer.inlineCallbacks def _handle_deltas(self, deltas): @@ -451,34 +471,37 @@ class UserDirectoryHandler(object): # Now we update users who share rooms with users. users_with_profile = yield self.state.get_current_user_in_room(room_id) - to_insert = set() + if is_public: + yield self.store.add_users_in_public_rooms(room_id, (user_id,)) + else: + to_insert = set() - # First, if they're our user then we need to update for every user - if self.is_mine_id(user_id): + # First, if they're our user then we need to update for every user + if self.is_mine_id(user_id): - is_appservice = self.store.get_if_app_services_interested_in_user(user_id) + is_appservice = self.store.get_if_app_services_interested_in_user(user_id) - # We don't care about appservice users. - if not is_appservice: - for other_user_id in users_with_profile: - if user_id == other_user_id: - continue + # We don't care about appservice users. + if not is_appservice: + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue - to_insert.add((user_id, other_user_id)) + to_insert.add((user_id, other_user_id)) - # Next we need to update for every local user in the room - for other_user_id in users_with_profile: - if user_id == other_user_id: - continue + # Next we need to update for every local user in the room + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue - is_appservice = self.store.get_if_app_services_interested_in_user( - other_user_id - ) - if self.is_mine_id(other_user_id) and not is_appservice: - to_insert.add((other_user_id, user_id)) + is_appservice = self.store.get_if_app_services_interested_in_user( + other_user_id + ) + if self.is_mine_id(other_user_id) and not is_appservice: + to_insert.add((other_user_id, user_id)) - if to_insert: - yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) + if to_insert: + yield self.store.add_users_who_share_private_room(room_id, to_insert) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -493,10 +516,10 @@ class UserDirectoryHandler(object): # Remove user from sharing tables yield self.store.remove_user_who_share_room(user_id, room_id) - # Are they still in a room with members? If not, remove them entirely. - users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id) + # Are they still in any rooms? If not, remove them entirely. + rooms_user_is_in = yield self.store.get_rooms_user_is_in(user_id) - if len(users_in_room_with) == 0: + if len(rooms_user_is_in) == 0: yield self.store.remove_from_user_dir(user_id) @defer.inlineCallbacks diff --git a/synapse/storage/schema/delta/53/users_in_public_rooms.sql b/synapse/storage/schema/delta/53/users_in_public_rooms.sql index bd57fd778b..40adc98387 100644 --- a/synapse/storage/schema/delta/53/users_in_public_rooms.sql +++ b/synapse/storage/schema/delta/53/users_in_public_rooms.sql @@ -16,13 +16,20 @@ -- We don't need the old version of this table. DROP TABLE IF EXISTS users_in_public_rooms; +-- Old version of users_in_public_rooms +DROP TABLE IF EXISTS users_who_share_public_rooms; + -- Track what users are in public rooms. CREATE TABLE IF NOT EXISTS users_in_public_rooms ( - user_id TEXT NOT NULL + user_id TEXT NOT NULL, + room_id TEXT NOT NULL ); -CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms(user_id); +CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms(user_id, room_id); + +-- Track what users are publicly visible +CREATE TABLE IF NOT EXISTS publicly_visible_users ( + user_id TEXT NOT NULL +); --- Fill the table. -INSERT INTO background_updates (update_name, progress_json) VALUES - ('users_in_public_rooms_initial', '{}'); +CREATE UNIQUE INDEX publicly_visible_users_u_idx ON publicly_visible_users(user_id); diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 4de552c1bb..af4260bc61 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -21,57 +21,15 @@ from six import iteritems from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules -from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.state import StateFilter from synapse.types import get_domain_from_id, get_localpart_from_id -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) -class UserDirectoryStore(BackgroundUpdateStore): - def __init__(self, dbconn, hs): - super(UserDirectoryStore, self).__init__(dbconn, hs) - - self.register_background_update_handler( - "users_in_public_rooms_initial", self._populate_users_in_public_rooms - ) - - @defer.inlineCallbacks - def _populate_users_in_public_rooms(self, progress, batch_size): - """ - Populate the users_in_public_rooms table with the contents of the - users_who_share_public_rooms table. - """ - - def _fetch(txn): - sql = "SELECT DISTINCT other_user_id FROM users_who_share_public_rooms" - txn.execute(sql) - return txn.fetchall() - - users = yield self.runInteraction( - "populate_users_in_public_rooms_fetch", _fetch - ) - - if users: - def _fill(txn): - self._simple_upsert_many_txn( - txn, - table="users_in_public_rooms", - key_names=["user_id"], - key_values=users, - value_names=(), - value_values=None, - ) - - users = yield self.runInteraction( - "populate_users_in_public_rooms_fill", _fill - ) - - yield self._end_background_update("users_in_public_rooms_initial") - defer.returnValue(1) - +class UserDirectoryStore(object): @defer.inlineCallbacks def is_room_world_readable_or_publicly_joinable(self, room_id): """Check if the room is either world_readable or publically joinable @@ -282,17 +240,10 @@ class UserDirectoryStore(BackgroundUpdateStore): txn, table="user_directory_search", keyvalues={"user_id": user_id} ) self._simple_delete_txn( - txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} + txn, table="publicly_visible_users", keyvalues={"user_id": user_id} ) self._simple_delete_txn( - txn, - table="users_who_share_public_rooms", - keyvalues={"user_id": user_id}, - ) - self._simple_delete_txn( - txn, - table="users_who_share_public_rooms", - keyvalues={"other_user_id": user_id}, + txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} ) self._simple_delete_txn( txn, @@ -314,9 +265,9 @@ class UserDirectoryStore(BackgroundUpdateStore): in the given room_id """ user_ids_share_pub = yield self._simple_select_onecol( - table="users_who_share_public_rooms", + table="publicly_visible_users", keyvalues={"room_id": room_id}, - retcol="other_user_id", + retcol="user_id", desc="get_users_in_dir_due_to_room", ) @@ -354,26 +305,19 @@ class UserDirectoryStore(BackgroundUpdateStore): rows = yield self._execute("get_all_local_users", None, sql) defer.returnValue([name for name, in rows]) - def add_users_who_share_room(self, room_id, share_private, user_id_tuples): - """Insert entries into the users_who_share_*_rooms table. The first + def add_users_who_share_private_room(self, room_id, user_id_tuples): + """Insert entries into the users_who_share_private_rooms table. The first user should be a local user. Args: room_id (str) - share_private (bool): Is the room private user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. """ def _add_users_who_share_room_txn(txn): - - if share_private: - tbl = "users_who_share_private_rooms" - else: - tbl = "users_who_share_public_rooms" - self._simple_upsert_many_txn( txn, - table=tbl, + table="users_who_share_private_rooms", key_names=["user_id", "other_user_id", "room_id"], key_values=[ (user_id, other_user_id, room_id) @@ -383,26 +327,44 @@ class UserDirectoryStore(BackgroundUpdateStore): value_values=None, ) - # If it's a public room, also update them in users_in_public_rooms. + return self.runInteraction( + "add_users_who_share_room", _add_users_who_share_room_txn + ) + + def add_users_in_public_rooms(self, room_id, user_ids): + """Insert entries into the users_who_share_private_rooms table. The first + user should be a local user. + + Args: + room_id (str) + user_ids (list[str]) + """ + + def _add_users_in_public_rooms_txn(txn): + + self._simple_upsert_many_txn( + txn, + table="users_in_public_rooms", + key_names=["user_id", "room_id"], + key_values=[(user_id, room_id) for user_id in user_ids], + value_names=(), + value_values=None, + ) + + # If it's a public room, also update them in publicly_visible_users. # We don't look before they're in the table before we do it, as it's # more efficient to simply have Postgres do that (one UPSERT vs one # SELECT and maybe one INSERT). - if not share_private: - for user_id in set([x[1] for x in user_id_tuples]): - self._simple_upsert_txn( - txn, - "users_in_public_rooms", - keyvalues={"user_id": user_id}, - values={}, - ) - - for user_id, other_user_id in user_id_tuples: - txn.call_after( - self.get_users_who_share_room_from_dir.invalidate, (user_id,) + for user_id in user_ids: + self._simple_upsert_txn( + txn, + "publicly_visible_users", + keyvalues={"user_id": user_id}, + values={}, ) return self.runInteraction( - "add_users_who_share_room", _add_users_who_share_room_txn + "add_users_in_public_rooms", _add_users_in_public_rooms_txn ) def remove_user_who_share_room(self, user_id, room_id): @@ -428,40 +390,32 @@ class UserDirectoryStore(BackgroundUpdateStore): ) self._simple_delete_txn( txn, - table="users_who_share_public_rooms", + table="users_in_public_rooms", keyvalues={"user_id": user_id, "room_id": room_id}, ) - self._simple_delete_txn( - txn, - table="users_who_share_public_rooms", - keyvalues={"other_user_id": user_id, "room_id": room_id}, - ) # Are the users still in a public room after we deleted them from this one? still_in_public = self._simple_select_one_onecol_txn( txn, - "users_who_share_public_rooms", - keyvalues={"other_user_id": user_id}, - retcol="other_user_id", + "users_in_public_rooms", + keyvalues={"user_id": user_id}, + retcol="user_id", allow_none=True, ) if still_in_public is None: self._simple_delete_txn( - txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} + txn, table="publicly_visible_users", keyvalues={"user_id": user_id} ) - txn.call_after( - self.get_users_who_share_room_from_dir.invalidate, (user_id,) - ) - return self.runInteraction( "remove_user_who_share_room", _remove_user_who_share_room_txn ) - @cachedInlineCallbacks(max_entries=500000, iterable=True) - def get_users_who_share_room_from_dir(self, user_id): - """Returns the set of users who share a room with `user_id` + @defer.inlineCallbacks + def get_rooms_user_is_in(self, user_id): + """ + Returns the rooms that a user is in. Args: user_id(str): Must be a local user @@ -472,23 +426,19 @@ class UserDirectoryStore(BackgroundUpdateStore): rows = yield self._simple_select_onecol( table="users_who_share_private_rooms", keyvalues={"user_id": user_id}, - retcol="other_user_id", - desc="get_users_who_share_room_with_user", + retcol="room_id", + desc="get_rooms_user_is_in", ) pub_rows = yield self._simple_select_onecol( - table="users_who_share_public_rooms", + table="users_in_public_rooms", keyvalues={"user_id": user_id}, - retcol="other_user_id", - desc="get_users_who_share_room_with_user", + retcol="room_id", + desc="get_rooms_user_is_in", ) users = set(pub_rows) users.update(rows) - - # Remove the user themselves from this list. - users.discard(user_id) - defer.returnValue(list(users)) @defer.inlineCallbacks @@ -525,10 +475,9 @@ class UserDirectoryStore(BackgroundUpdateStore): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") txn.execute("DELETE FROM users_in_public_rooms") - txn.execute("DELETE FROM users_who_share_public_rooms") + txn.execute("DELETE FROM publicly_visible_users") txn.execute("DELETE FROM users_who_share_private_rooms") txn.call_after(self.get_user_in_directory.invalidate_all) - txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -641,7 +590,7 @@ class UserDirectoryStore(BackgroundUpdateStore): where_clause = "1=1" else: join_clause = """ - LEFT JOIN users_in_public_rooms AS p USING (user_id) + LEFT JOIN publicly_visible_users AS p USING (user_id) LEFT JOIN ( SELECT other_user_id AS user_id FROM users_who_share_private_rooms WHERE user_id = ? |