From eeb2f9e546060ca9f2ef7260220b51d85d9b0d92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 11:51:01 +0100 Subject: Add user_directory to database --- synapse/storage/schema/delta/42/user_dir.py | 69 +++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 synapse/storage/schema/delta/42/user_dir.py (limited to 'synapse/storage/schema/delta/42/user_dir.py') diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py new file mode 100644 index 0000000000..38538960a4 --- /dev/null +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -0,0 +1,69 @@ +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + +logger = logging.getLogger(__name__) + + +BOTH_TABLES = """ +CREATE TABLE user_directory_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT, + CHECK (Lock='X') +); + +INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); +""" + + +POSTGRES_TABLE = """ +CREATE TABLE user_directory ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + display_name TEXT, + avatar_url TEXT, + vector tsvector +); + +CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory + USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + for statement in get_statements(BOTH_TABLES.splitlines()): + cur.execute(statement) + + if isinstance(database_engine, PostgresEngine): + for statement in get_statements(POSTGRES_TABLE.splitlines()): + cur.execute(statement) + elif isinstance(database_engine, Sqlite3Engine): + for statement in get_statements(SQLITE_TABLE.splitlines()): + cur.execute(statement) + else: + raise Exception("Unrecognized database engine") + + +def run_upgrade(*args, **kwargs): + pass -- cgit 1.5.1 From 63fda37e20015f0fe56aed86f907035d42fdc2ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:00:29 +0100 Subject: Add comments --- synapse/handlers/user_directory.py | 161 ++++++++++++++++++------- synapse/rest/client/v2_alpha/user_directory.py | 16 +++ synapse/storage/schema/delta/42/user_dir.py | 2 +- synapse/storage/user_directory.py | 39 +++++- 4 files changed, 173 insertions(+), 45 deletions(-) (limited to 'synapse/storage/schema/delta/42/user_dir.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4a9565df93..88b79e3325 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -26,25 +26,54 @@ logger = logging.getLogger(__name__) class UserDirectoyHandler(object): + """Handles querying of and keeping updated the user_directory. + + N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY + """ + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() + # When start up for the first time we need to populate the user_directory. + # This is a set of user_id's we've inserted already self.initially_handled_users = set() + # The current position in the current_state_delta stream self.pos = None + # Guard to ensure we only process deltas one at a time self._is_processing = False + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory self.clock.call_later(0, self.notify_new_event) def search_users(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ return self.store.search_user_dir(search_term, limit) @defer.inlineCallbacks def notify_new_event(self): + """Called when there may be more deltas to process + """ if self._is_processing: return @@ -56,13 +85,16 @@ class UserDirectoyHandler(object): @defer.inlineCallbacks def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB if self.pos is None: self.pos = yield self.store.get_user_directory_stream_pos() + # If still None then we need to do the initial fill of directory if self.pos is None: yield self._do_initial_spam() self.pos = yield self.store.get_user_directory_stream_pos() + # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "user_dir_delta"): deltas = yield self.store.get_current_state_deltas(self.pos) @@ -74,69 +106,53 @@ class UserDirectoyHandler(object): self.pos = deltas[-1]["stream_id"] yield self.store.update_user_directory_stream_pos(self.pos) - @defer.inlineCallbacks - def _handle_room(self, room_id): - # TODO: Check we're still joined to room - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users - - yield self.store.add_profiles_to_user_dir( - room_id, { - user_id: users_with_profile[user_id] for user_id in unhandled_users - } - ) - - self.initially_handled_users |= unhandled_users - @defer.inlineCallbacks def _do_initial_spam(self): + """Populates the user_directory from the current state of the DB, used + when synapse first starts with user_directory support + """ + # TODO: pull from current delta stream_id new_pos = self.store.get_room_max_stream_ordering() + # Delete any existing entries just in case there are any yield self.store.delete_all_from_user_dir() + # We process by going through each existing room at a time. room_ids = yield self.store.get_all_rooms() for room_id in room_ids: - yield self._handle_room(room_id) + yield self._handle_intial_room(room_id) self.initially_handled_users = None yield self.store.update_user_directory_stream_pos(new_pos) @defer.inlineCallbacks - def _handle_new_user(self, room_id, user_id, profile): - row = yield self.store.get_user_in_directory(user_id) - if row: - return - - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + def _handle_intial_room(self, room_id): + """Called when we initially fill out user_directory one room at a time + """ + # TODO: Check we're still joined to room - def _handle_remove_user(self, room_id, user_id): - row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + if not is_public: return - # TODO: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users - if is_public: - yield self.store.update_user_in_user_dir(user_id, j_room_id) - return + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) - yield self.store.remove_from_user_dir(user_id) + self.initially_handled_users |= unhandled_users @defer.inlineCallbacks def _handle_deltas(self, deltas): + """Called with the state deltas to process + """ for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -144,22 +160,33 @@ class UserDirectoyHandler(object): event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] + # For join rule and visibility changes we need to check if the room + # may have become public or not and add/remove the users in said room if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( prev_event_id, event_id, key_name="history_visibility", public_value="world_readable", ) + + # If change is None, no change. True => become world readable, + # False => was world readable if change is None: continue + # There's been a change to or from being world readable. + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id ) - if change and is_public: + if change and not is_public: + # If we became world readable but room isn't currently public then + # we ignore the change continue - elif not change and not is_public: + elif not change and is_public: + # If we stopped being world readable but are still public, + # ignore the change continue users_with_profile = yield self.state.get_current_user_in_room(room_id) @@ -213,8 +240,60 @@ class UserDirectoyHandler(object): else: yield self._handle_remove_user(room_id, state_key) + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + """Called when we might need to add user to directory + + Args: + room_id (str): room_id that user joined or started being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if row: + return + + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + def _handle_remove_user(self, room_id, user_id): + """Called when we might need to remove user to directory + + Args: + room_id (str): room_id that user left or stopped being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if not row or row["room_id"] != room_id: + # Either the user wasn't in directory or we're still in a room that + # is public (i.e. the room_id in the database) + return + + # TODO: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_user_dir(user_id, j_room_id) + return + + yield self.store.remove_from_user_dir(user_id) + @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` o + neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + Falsse if it did match `public_value` but now doesn't + """ prev_event = None event = None if prev_event_id: diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index fe91207195..17d3dffc8f 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -39,6 +39,22 @@ class UserDirectorySearchRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ yield self.auth.get_user_by_req(request, allow_guest=False) body = parse_json_object_from_request(request) diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 38538960a4..57b89ba552 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -34,7 +34,7 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, + room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, avatar_url TEXT, vector tsvector diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ebcc8b9633..83812bf092 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -26,6 +26,8 @@ class UserDirectoryStore(SQLBaseStore): @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + """Check if the room is either world_readable or publically joinable + """ current_state_ids = yield self.get_current_state_ids( room_id, on_invalidate=cache_context.invalidate ) @@ -47,14 +49,24 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are in that is world_readable + or publically joinable + users_with_profile (dict): Users to add to directory in the form of + mapping of user_id -> ProfileInfo + """ if isinstance(self.database_engine, PostgresEngine): + # We weight the loclpart most highly, then display name and finally + # server name sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, vector) VALUES (?,?,?,?, setweight(to_tsvector('english', ?), 'A') - || to_tsvector('english', ?) - || to_tsvector('english', COALESCE(?, '')) + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') ) """ args = ( @@ -113,6 +125,8 @@ class UserDirectoryStore(SQLBaseStore): self.get_user_in_directory.invalidate((user_id,)) def get_all_rooms(self): + """Get all room_ids we've ever known about + """ return self._simple_select_onecol( table="current_state_events", keyvalues={}, @@ -121,6 +135,8 @@ class UserDirectoryStore(SQLBaseStore): ) def delete_all_from_user_dir(self): + """Delete the entire user directory + """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.call_after(self.get_user_in_directory.invalidate_all) @@ -170,12 +186,29 @@ class UserDirectoryStore(SQLBaseStore): @defer.inlineCallbacks def search_user_dir(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ + if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory WHERE vector @@ plainto_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC + ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? """ args = (search_term, search_term, limit + 1,) -- cgit 1.5.1 From 350622a107c356da630eba09b63ed4b6de94b198 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:11:36 +0100 Subject: Handle the server leaving a public room --- synapse/handlers/user_directory.py | 23 ++++++++++++++++++++--- synapse/state.py | 11 +++++++++++ synapse/storage/schema/delta/42/user_dir.py | 4 ++++ synapse/storage/user_directory.py | 11 +++++++++++ 4 files changed, 46 insertions(+), 3 deletions(-) (limited to 'synapse/storage/schema/delta/42/user_dir.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 88b79e3325..4e491a43e6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -132,7 +132,9 @@ class UserDirectoyHandler(object): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - # TODO: Check we're still joined to room + is_in_room = yield self.store.get_is_host_in_room(room_id, self.server_name) + if not is_in_room: + return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) if not is_public: @@ -229,7 +231,22 @@ class UserDirectoyHandler(object): if change is None: continue - if change: + if not change: + # Need to check if the server left the room entirely, if so + # we might need to remove all the users in that room + is_in_room = yield self.store.get_is_host_in_room( + room_id, self.server_name, + ) + if not is_in_room: + # Fetch all the users that we marked as being in user + # directory due to being in the room and then check if + # need to remove those users or not + user_ids = yield self.store.get_users_in_dir_due_to_room(room_id) + for user_id in user_ids: + yield self._handle_remove_user(room_id, user_id) + return + + if change: # The user joined event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -237,7 +254,7 @@ class UserDirectoyHandler(object): ) yield self._handle_new_user(room_id, state_key, profile) - else: + else: # The user left yield self._handle_remove_user(room_id, state_key) @defer.inlineCallbacks diff --git a/synapse/state.py b/synapse/state.py index 02fee47f39..dffa79e4c9 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -186,6 +186,17 @@ class StateHandler(object): ) defer.returnValue(joined_hosts) + @defer.inlineCallbacks + def get_is_host_in_room(self, room_id, host, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.debug("calling resolve_state_groups from get_is_host_in_room") + entry = yield self.resolve_state_groups(room_id, latest_event_ids) + is_host_joined = yield self.store.is_host_joined( + room_id, host, entry.state_id, entry.state + ) + defer.returnValue(is_host_joined) + @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 57b89ba552..95a7a79fd3 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -41,6 +41,7 @@ CREATE TABLE user_directory ( ); CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,6 +49,9 @@ CREATE INDEX user_directory_user_idx ON user_directory(user_id); SQLITE_TABLE = """ CREATE VIRTUAL TABLE user_directory USING fts4 ( user_id, room_id, display_name, avatar_url, value ); + +CREATE INDEX user_directory_room_idx ON user_directory(room_id); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 83812bf092..0df979cb01 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -124,6 +124,17 @@ class UserDirectoryStore(SQLBaseStore): ) self.get_user_in_directory.invalidate((user_id,)) + def get_users_in_dir_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="user_directory", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + def get_all_rooms(self): """Get all room_ids we've ever known about """ -- cgit 1.5.1 From 5d79d728f5f38463171f0d063713905f8cb9faec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:23:49 +0100 Subject: Split out directory and search tables --- synapse/storage/schema/delta/42/user_dir.py | 25 ++++++------ synapse/storage/user_directory.py | 60 ++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 29 deletions(-) (limited to 'synapse/storage/schema/delta/42/user_dir.py') diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 95a7a79fd3..7e32662928 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -28,30 +28,33 @@ CREATE TABLE user_directory_stream_pos ( ); INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); -""" - -POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, - avatar_url TEXT, - vector tsvector + avatar_url TEXT ); -CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ -SQLITE_TABLE = """ -CREATE VIRTUAL TABLE user_directory - USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +POSTGRES_TABLE = """ +CREATE TABLE user_directory_search ( + user_id TEXT NOT NULL, + vector tsvector +); -CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); +CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory_search + USING fts4 ( user_id, value ); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 011c711ec1..b1957cb873 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -61,9 +61,8 @@ class UserDirectoryStore(SQLBaseStore): # We weight the loclpart most highly, then display name and finally # server name sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, vector) - VALUES (?,?,?,?, + INSERT INTO user_directory_search(user_id, vector) + VALUES (?, setweight(to_tsvector('english', ?), 'A') || setweight(to_tsvector('english', ?), 'D') || setweight(to_tsvector('english', COALESCE(?, '')), 'B') @@ -71,21 +70,19 @@ class UserDirectoryStore(SQLBaseStore): """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, - get_localpart_from_id(user_id), get_domain_from_id(user_id), - p.display_name, + user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), + profile.display_name, ) - for user_id, p in users_with_profile.iteritems() + for user_id, profile in users_with_profile.iteritems() ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, value) - VALUES (?,?,?,?,?) + INSERT INTO user_directory_search(user_id, value) + VALUES (?,?) """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, + user_id, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) for user_id, p in users_with_profile.iteritems() @@ -96,6 +93,19 @@ class UserDirectoryStore(SQLBaseStore): def _add_profiles_to_user_dir_txn(txn): txn.executemany(sql, args) + self._simple_insert_many_txn( + txn, + table="user_directory", + values=[ + { + "user_id": user_id, + "room_id": room_id, + "display_name": profile.display_name, + "avatar_url": profile.avatar_url, + } + for user_id, profile in users_with_profile.iteritems() + ] + ) for user_id in users_with_profile: txn.call_after( self.get_user_in_directory.invalidate, (user_id,) @@ -117,12 +127,23 @@ class UserDirectoryStore(SQLBaseStore): @defer.inlineCallbacks def remove_from_user_dir(self, user_id): - yield self._simple_delete( - table="user_directory", - keyvalues={"user_id": user_id}, - desc="remove_from_user_dir", + def _remove_from_user_dir_txn(txn): + self._simple_delete_txn( + txn, + table="user_directory", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="user_directory_search", + keyvalues={"user_id": user_id}, + ) + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + return self.runInteraction( + "remove_from_user_dir", _remove_from_user_dir_txn, ) - self.get_user_in_directory.invalidate((user_id,)) def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're @@ -150,6 +171,7 @@ class UserDirectoryStore(SQLBaseStore): """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") + txn.execute("DELETE FROM user_directory_search") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -225,7 +247,8 @@ class UserDirectoryStore(SQLBaseStore): if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE vector @@ plainto_tsquery('english', ?) ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? @@ -234,7 +257,8 @@ class UserDirectoryStore(SQLBaseStore): elif isinstance(self.database_engine, Sqlite3Engine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC LIMIT ? -- cgit 1.5.1 From 5dd1b2c525cb786614d1503757bf52a7086f3bf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:29:12 +0100 Subject: Use unique indices --- synapse/storage/schema/delta/42/user_dir.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/schema/delta/42/user_dir.py') diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 7e32662928..c34aa5e7d2 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -37,7 +37,7 @@ CREATE TABLE user_directory ( ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,7 +48,7 @@ CREATE TABLE user_directory_search ( ); CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); -CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search(user_id); """ -- cgit 1.5.1 From 21e255a8f1948c2fd298ce2e037d20bdd25f2f69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 14:50:46 +0100 Subject: Split the table in two --- synapse/handlers/user_directory.py | 77 +++++++++++++++++++---------- synapse/storage/_base.py | 5 ++ synapse/storage/schema/delta/42/user_dir.py | 10 +++- synapse/storage/user_directory.py | 77 +++++++++++++++++++++++++++-- 4 files changed, 138 insertions(+), 31 deletions(-) (limited to 'synapse/storage/schema/delta/42/user_dir.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a8525fc86a..d795a9f8d5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -50,6 +50,7 @@ class UserDirectoyHandler(object): # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() + self.initially_handled_users_in_public = set() # The current position in the current_state_delta stream self.pos = None @@ -145,8 +146,6 @@ class UserDirectoyHandler(object): return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return users_with_profile = yield self.state.get_current_user_in_room(room_id) unhandled_users = set(users_with_profile) - self.initially_handled_users @@ -159,6 +158,13 @@ class UserDirectoyHandler(object): self.initially_handled_users |= unhandled_users + if is_public: + yield self.store.add_users_to_public_room( + room_id, + user_ids=unhandled_users - self.initially_handled_users_in_public + ) + self.initially_handled_users_in_public != unhandled_users + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process @@ -206,14 +212,7 @@ class UserDirectoyHandler(object): else: logger.debug("Server is still in room: %r", room_id) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - if change: # The user joined - if not is_public: - return - event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -276,11 +275,13 @@ class UserDirectoyHandler(object): # ignore the change return - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): - if change: + if change: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): yield self._handle_new_user(room_id, user_id, profile) - else: + else: + users = yield self.store.get_users_in_public_due_to_room(room_id) + for user_id in users: yield self._handle_remove_user(room_id, user_id) @defer.inlineCallbacks @@ -292,11 +293,21 @@ class UserDirectoyHandler(object): user_id (str) """ logger.debug("Adding user to dir, %r", user_id) + row = yield self.store.get_user_in_directory(user_id) - if row: + if not row: + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if not is_public: return - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -309,15 +320,20 @@ class UserDirectoyHandler(object): logger.debug("Maybe removing user %r", user_id) row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: - # Either the user wasn't in directory or we're still in a room that - # is public (i.e. the room_id in the database) - logger.debug("Not removing as row: %r", row) + update_user_dir = row and row["room_id"] == room_id + + row = yield self.store.get_user_in_public_room(user_id) + update_user_in_public = row and row["room_id"] == room_id + + if not update_user_in_public and not update_user_dir: return # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: + if not update_user_in_public and not update_user_dir: + break + is_in_room = yield self.state.get_is_host_in_room( j_room_id, self.server_name, ) @@ -325,16 +341,23 @@ class UserDirectoyHandler(object): if not is_in_room: continue - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) - - if is_public: + if update_user_dir: + update_user_dir = False yield self.store.update_user_in_user_dir(user_id, j_room_id) - logger.debug("Not removing as found other public room: %r", j_room_id) - return - yield self.store.remove_from_user_dir(user_id) + if update_user_in_public: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_public_user_list(user_id, j_room_id) + update_user_in_public = False + + if update_user_dir: + yield self.store.remove_from_user_dir(user_id) + elif update_user_in_public: + yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 58b73af7d2..db816346f5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -425,6 +425,11 @@ class SQLBaseStore(object): txn.execute(sql, vals) + def _simple_insert_many(self, table, values, desc): + return self.runInteraction( + desc, self._simple_insert_many_txn, table, values + ) + @staticmethod def _simple_insert_many_txn(txn, table, values): if not values: diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index c34aa5e7d2..ea6a18196d 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -31,13 +31,21 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, -- A room_id that we know is public + room_id TEXT NOT NULL, -- A room_id that we know the user is joined to display_name TEXT, avatar_url TEXT ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); + +CREATE TABLE users_in_pubic_room ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL -- A room_id that we know is public +); + +CREATE INDEX users_in_pubic_room_room_idx ON users_in_pubic_room(room_id); +CREATE UNIQUE INDEX users_in_pubic_room_user_idx ON users_in_pubic_room(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 4fe30ce72e..cab0afc5c3 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -50,12 +50,34 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) - def add_profiles_to_user_dir(self, room_id, users_with_profile): - """Add profiles to the user directory + @defer.inlineCallbacks + def add_users_to_public_room(self, room_id, user_ids): + """Add user to the list of users in public rooms Args: room_id (str): A room_id that all users are in that is world_readable or publically joinable + user_ids (list(str)): Users to add + """ + yield self._simple_insert_many( + table="users_in_pubic_room", + values=[ + { + "user_id": user_id, + "room_id": room_id, + } + for user_id in user_ids + ], + desc="add_users_to_public_room" + ) + for user_id in user_ids: + self.get_user_in_public_room.invalidate((user_id,)) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are joined to users_with_profile (dict): Users to add to directory in the form of mapping of user_id -> ProfileInfo """ @@ -125,7 +147,15 @@ class UserDirectoryStore(SQLBaseStore): updatevalues={"room_id": room_id}, desc="update_user_in_user_dir", ) - self.get_user_in_directory.invalidate((user_id,)) + + @defer.inlineCallbacks + def update_user_in_public_user_list(self, user_id, room_id): + yield self._simple_update_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + updatevalues={"room_id": room_id}, + desc="update_user_in_public_user_list", + ) def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): @@ -139,13 +169,41 @@ class UserDirectoryStore(SQLBaseStore): table="user_directory_search", keyvalues={"user_id": user_id}, ) + self._simple_delete_txn( + txn, + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + ) txn.call_after( self.get_user_in_directory.invalidate, (user_id,) ) + txn.call_after( + self.get_user_in_public_room.invalidate, (user_id,) + ) return self.runInteraction( "remove_from_user_dir", _remove_from_user_dir_txn, ) + @defer.inlineCallbacks + def remove_from_user_in_public_room(self, user_id): + yield self._simple_delete( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + desc="remove_from_user_in_public_room", + ) + self.get_user_in_public_room.invalidate((user_id,)) + + def get_users_in_public_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="users_in_pubic_room", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_public_due_to_room", + ) + def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're in the given room_id @@ -173,6 +231,7 @@ class UserDirectoryStore(SQLBaseStore): def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") + txn.execute("DELETE FROM users_in_pubic_room") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -188,6 +247,16 @@ class UserDirectoryStore(SQLBaseStore): desc="get_user_in_directory", ) + @cached() + def get_user_in_public_room(self, user_id): + return self._simple_select_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + retcols=("room_id",), + allow_none=True, + desc="get_user_in_public_room", + ) + def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -282,6 +351,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, @@ -295,6 +365,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC, -- cgit 1.5.1