From eeb2f9e546060ca9f2ef7260220b51d85d9b0d92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 11:51:01 +0100 Subject: Add user_directory to database --- synapse/storage/user_directory.py | 145 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 synapse/storage/user_directory.py (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py new file mode 100644 index 0000000000..6c7c8c4bee --- /dev/null +++ b/synapse/storage/user_directory.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.api.constants import EventTypes, JoinRules +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + + +class UserDirectoryStore(SQLBaseStore): + + @cachedInlineCallbacks(cache_context=True) + def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + current_state_ids = yield self.get_current_state_ids( + room_id, on_invalidate=cache_context.invalidate + ) + + join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) + if join_rules_id: + join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) + if join_rule_ev: + if join_rule_ev.content.get("join_rules") == JoinRules.PUBLIC: + defer.returnValue(True) + + hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) + if hist_vis_id: + hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True) + if hist_vis_ev: + if hist_vis_ev.content.get("history_visibility") == "world_readable": + defer.returnValue(True) + + defer.returnValue(False) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + if isinstance(self.database_engine, PostgresEngine): + sql = """ + INSERT INTO user_directory + (user_id, room_id, display_name, avatar_url, vector) + VALUES (?,?,?,?,to_tsvector('english', ?)) + """ + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + INSERT INTO user_directory + (user_id, room_id, display_name, avatar_url, value) + VALUES (?,?,?,?,?) + """ + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + def _add_profiles_to_user_dir_txn(txn): + txn.executemany(sql, ( + ( + user_id, room_id, p.display_name, p.avatar_url, + "%s %s" % (user_id, p.display_name,) if p.display_name else user_id + ) + for user_id, p in users_with_profile.iteritems() + )) + for user_id in users_with_profile: + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + + return self.runInteraction( + "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn + ) + + @defer.inlineCallbacks + def remove_from_user_dir(self, user_id): + yield self._simple_delete( + table="user_directory", + keyvalues={"user_id": user_id}, + desc="remove_from_user_dir", + ) + self.get_user_in_directory.invalidate((user_id,)) + + def get_all_rooms(self): + return self._simple_select_onecol( + table="current_state_events", + keyvalues={}, + retcol="DISTINCT room_id", + desc="get_all_rooms", + ) + + def delete_all_from_user_dir(self): + def _delete_all_from_user_dir_txn(txn): + txn.execute("DELETE FROM user_directory") + txn.call_after(self.get_user_in_directory.invalidate_all) + return self.runInteraction( + "delete_all_from_user_dir", _delete_all_from_user_dir_txn + ) + + @cached() + def get_user_in_directory(self, user_id): + return self._simple_select_one( + table="user_directory", + keyvalues={"user_id": user_id}, + retcols=("room_id", "display_name", "avatar_url",), + allow_none=True, + desc="get_user_in_directory", + ) + + def get_user_directory_stream_pos(self): + return self._simple_select_one_onecol( + table="user_directory_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="get_user_directory_stream_pos", + ) + + def update_user_directory_stream_pos(self, stream_id): + return self._simple_update_one( + table="user_directory_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_user_directory_stream_pos", + ) + + def get_current_state_deltas(self, prev_stream_id): + # TODO: Add stream change cache + # TODO: Add limit + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE stream_id > ? + ORDER BY stream_id ASC + """ + + return self._execute( + "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id + ) -- cgit 1.5.1 From b5db4ed5f68ee81557393e94436d768b955b1aa0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 13:30:12 +0100 Subject: Update room column when room becomes unpublic --- synapse/handlers/user_directory.py | 23 +++++++++++++++++++++-- synapse/storage/user_directory.py | 10 ++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index e9488ce554..0cf403f599 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -121,12 +121,13 @@ class UserDirectoyHandler(object): # TODO: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) - for room_id in rooms: + for j_room_id in rooms: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id + j_room_id ) if is_public: + yield self.store.update_user_in_user_dir(user_id, j_room_id) return yield self.store.remove_from_user_dir(user_id) @@ -149,6 +150,15 @@ class UserDirectoyHandler(object): if change is None: continue + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if change and is_public: + continue + elif not change and not is_public: + continue + users_with_profile = yield self.state.get_current_user_in_room(room_id) for user_id, profile in users_with_profile.iteritems(): if change: @@ -164,6 +174,15 @@ class UserDirectoyHandler(object): if change is None: continue + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if change and is_public: + continue + elif not change and not is_public: + continue + users_with_profile = yield self.state.get_current_user_in_room(room_id) for user_id, profile in users_with_profile.iteritems(): if change: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 6c7c8c4bee..d72b93b585 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -79,6 +79,16 @@ class UserDirectoryStore(SQLBaseStore): "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn ) + @defer.inlineCallbacks + def update_user_in_user_dir(self, user_id, room_id): + yield self._simple_update_one( + table="user_directory", + keyvalues={"user_id": user_id}, + updatevalues={"room_id": room_id}, + desc="update_user_in_user_dir", + ) + self.get_user_in_directory.invalidate((user_id,)) + @defer.inlineCallbacks def remove_from_user_dir(self, user_id): yield self._simple_delete( -- cgit 1.5.1 From 3b5f22ca40303392e45c8407952ecf3ee15785f6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 14:00:01 +0100 Subject: Add search --- synapse/handlers/user_directory.py | 3 +++ synapse/storage/user_directory.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 0cf403f599..4a9565df93 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -40,6 +40,9 @@ class UserDirectoyHandler(object): self.clock.call_later(0, self.notify_new_event) + def search_users(self, search_term, limit): + return self.store.search_user_dir(search_term, limit) + @defer.inlineCallbacks def notify_new_event(self): if self._is_processing: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index d72b93b585..650c49982d 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -153,3 +153,38 @@ class UserDirectoryStore(SQLBaseStore): return self._execute( "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id ) + + @defer.inlineCallbacks + def search_user_dir(self, search_term, limit): + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT user_id, display_name, avatar_url + FROM user_directory + WHERE vector @@ to_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + LIMIT ? + """ + args = (search_term, search_term, limit + 1,) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + SELECT user_id, display_name, avatar_url + FROM user_directory + WHERE value MATCH ? + ORDER BY rank(matchinfo(user_directory)) DESC + LIMIT ? + """ + args = (search_term, limit + 1) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + results = yield self._execute( + "search_user_dir", self.cursor_to_dict, sql, *args + ) + + limited = len(results) > limit + + defer.returnValue({ + "limited": limited, + "results": results, + }) -- cgit 1.5.1 From 293ef296559fa5bb721592bfa9605f7282df0f6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 14:29:32 +0100 Subject: Weight differently --- synapse/storage/user_directory.py | 34 ++++++++++++++++++++++++---------- synapse/types.py | 7 +++++++ 2 files changed, 31 insertions(+), 10 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 650c49982d..ebcc8b9633 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -19,6 +19,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.types import get_domain_from_id, get_localpart_from_id class UserDirectoryStore(SQLBaseStore): @@ -50,26 +51,39 @@ class UserDirectoryStore(SQLBaseStore): sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, vector) - VALUES (?,?,?,?,to_tsvector('english', ?)) + VALUES (?,?,?,?, + setweight(to_tsvector('english', ?), 'A') + || to_tsvector('english', ?) + || to_tsvector('english', COALESCE(?, '')) + ) """ + args = ( + ( + user_id, room_id, p.display_name, p.avatar_url, + get_localpart_from_id(user_id), get_domain_from_id(user_id), + p.display_name, + ) + for user_id, p in users_with_profile.iteritems() + ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, value) VALUES (?,?,?,?,?) """ - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - - def _add_profiles_to_user_dir_txn(txn): - txn.executemany(sql, ( + args = ( ( user_id, room_id, p.display_name, p.avatar_url, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) for user_id, p in users_with_profile.iteritems() - )) + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + def _add_profiles_to_user_dir_txn(txn): + txn.executemany(sql, args) for user_id in users_with_profile: txn.call_after( self.get_user_in_directory.invalidate, (user_id,) @@ -160,8 +174,8 @@ class UserDirectoryStore(SQLBaseStore): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory - WHERE vector @@ to_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + WHERE vector @@ plainto_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? """ args = (search_term, search_term, limit + 1,) diff --git a/synapse/types.py b/synapse/types.py index 445bdcb4d7..111948540d 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -62,6 +62,13 @@ def get_domain_from_id(string): return string[idx + 1:] +def get_localpart_from_id(string): + idx = string.find(":") + if idx == -1: + raise SynapseError(400, "Invalid ID: %r" % (string,)) + return string[1:idx] + + class DomainSpecificString( namedtuple("DomainSpecificString", ("localpart", "domain")) ): -- cgit 1.5.1 From 63fda37e20015f0fe56aed86f907035d42fdc2ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:00:29 +0100 Subject: Add comments --- synapse/handlers/user_directory.py | 161 ++++++++++++++++++------- synapse/rest/client/v2_alpha/user_directory.py | 16 +++ synapse/storage/schema/delta/42/user_dir.py | 2 +- synapse/storage/user_directory.py | 39 +++++- 4 files changed, 173 insertions(+), 45 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4a9565df93..88b79e3325 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -26,25 +26,54 @@ logger = logging.getLogger(__name__) class UserDirectoyHandler(object): + """Handles querying of and keeping updated the user_directory. + + N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY + """ + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() + # When start up for the first time we need to populate the user_directory. + # This is a set of user_id's we've inserted already self.initially_handled_users = set() + # The current position in the current_state_delta stream self.pos = None + # Guard to ensure we only process deltas one at a time self._is_processing = False + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory self.clock.call_later(0, self.notify_new_event) def search_users(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ return self.store.search_user_dir(search_term, limit) @defer.inlineCallbacks def notify_new_event(self): + """Called when there may be more deltas to process + """ if self._is_processing: return @@ -56,13 +85,16 @@ class UserDirectoyHandler(object): @defer.inlineCallbacks def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB if self.pos is None: self.pos = yield self.store.get_user_directory_stream_pos() + # If still None then we need to do the initial fill of directory if self.pos is None: yield self._do_initial_spam() self.pos = yield self.store.get_user_directory_stream_pos() + # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "user_dir_delta"): deltas = yield self.store.get_current_state_deltas(self.pos) @@ -74,69 +106,53 @@ class UserDirectoyHandler(object): self.pos = deltas[-1]["stream_id"] yield self.store.update_user_directory_stream_pos(self.pos) - @defer.inlineCallbacks - def _handle_room(self, room_id): - # TODO: Check we're still joined to room - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users - - yield self.store.add_profiles_to_user_dir( - room_id, { - user_id: users_with_profile[user_id] for user_id in unhandled_users - } - ) - - self.initially_handled_users |= unhandled_users - @defer.inlineCallbacks def _do_initial_spam(self): + """Populates the user_directory from the current state of the DB, used + when synapse first starts with user_directory support + """ + # TODO: pull from current delta stream_id new_pos = self.store.get_room_max_stream_ordering() + # Delete any existing entries just in case there are any yield self.store.delete_all_from_user_dir() + # We process by going through each existing room at a time. room_ids = yield self.store.get_all_rooms() for room_id in room_ids: - yield self._handle_room(room_id) + yield self._handle_intial_room(room_id) self.initially_handled_users = None yield self.store.update_user_directory_stream_pos(new_pos) @defer.inlineCallbacks - def _handle_new_user(self, room_id, user_id, profile): - row = yield self.store.get_user_in_directory(user_id) - if row: - return - - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + def _handle_intial_room(self, room_id): + """Called when we initially fill out user_directory one room at a time + """ + # TODO: Check we're still joined to room - def _handle_remove_user(self, room_id, user_id): - row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + if not is_public: return - # TODO: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users - if is_public: - yield self.store.update_user_in_user_dir(user_id, j_room_id) - return + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) - yield self.store.remove_from_user_dir(user_id) + self.initially_handled_users |= unhandled_users @defer.inlineCallbacks def _handle_deltas(self, deltas): + """Called with the state deltas to process + """ for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -144,22 +160,33 @@ class UserDirectoyHandler(object): event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] + # For join rule and visibility changes we need to check if the room + # may have become public or not and add/remove the users in said room if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( prev_event_id, event_id, key_name="history_visibility", public_value="world_readable", ) + + # If change is None, no change. True => become world readable, + # False => was world readable if change is None: continue + # There's been a change to or from being world readable. + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id ) - if change and is_public: + if change and not is_public: + # If we became world readable but room isn't currently public then + # we ignore the change continue - elif not change and not is_public: + elif not change and is_public: + # If we stopped being world readable but are still public, + # ignore the change continue users_with_profile = yield self.state.get_current_user_in_room(room_id) @@ -213,8 +240,60 @@ class UserDirectoyHandler(object): else: yield self._handle_remove_user(room_id, state_key) + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + """Called when we might need to add user to directory + + Args: + room_id (str): room_id that user joined or started being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if row: + return + + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + def _handle_remove_user(self, room_id, user_id): + """Called when we might need to remove user to directory + + Args: + room_id (str): room_id that user left or stopped being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if not row or row["room_id"] != room_id: + # Either the user wasn't in directory or we're still in a room that + # is public (i.e. the room_id in the database) + return + + # TODO: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_user_dir(user_id, j_room_id) + return + + yield self.store.remove_from_user_dir(user_id) + @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` o + neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + Falsse if it did match `public_value` but now doesn't + """ prev_event = None event = None if prev_event_id: diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index fe91207195..17d3dffc8f 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -39,6 +39,22 @@ class UserDirectorySearchRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ yield self.auth.get_user_by_req(request, allow_guest=False) body = parse_json_object_from_request(request) diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 38538960a4..57b89ba552 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -34,7 +34,7 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, + room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, avatar_url TEXT, vector tsvector diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ebcc8b9633..83812bf092 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -26,6 +26,8 @@ class UserDirectoryStore(SQLBaseStore): @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + """Check if the room is either world_readable or publically joinable + """ current_state_ids = yield self.get_current_state_ids( room_id, on_invalidate=cache_context.invalidate ) @@ -47,14 +49,24 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are in that is world_readable + or publically joinable + users_with_profile (dict): Users to add to directory in the form of + mapping of user_id -> ProfileInfo + """ if isinstance(self.database_engine, PostgresEngine): + # We weight the loclpart most highly, then display name and finally + # server name sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, vector) VALUES (?,?,?,?, setweight(to_tsvector('english', ?), 'A') - || to_tsvector('english', ?) - || to_tsvector('english', COALESCE(?, '')) + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') ) """ args = ( @@ -113,6 +125,8 @@ class UserDirectoryStore(SQLBaseStore): self.get_user_in_directory.invalidate((user_id,)) def get_all_rooms(self): + """Get all room_ids we've ever known about + """ return self._simple_select_onecol( table="current_state_events", keyvalues={}, @@ -121,6 +135,8 @@ class UserDirectoryStore(SQLBaseStore): ) def delete_all_from_user_dir(self): + """Delete the entire user directory + """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.call_after(self.get_user_in_directory.invalidate_all) @@ -170,12 +186,29 @@ class UserDirectoryStore(SQLBaseStore): @defer.inlineCallbacks def search_user_dir(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ + if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory WHERE vector @@ plainto_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC + ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? """ args = (search_term, search_term, limit + 1,) -- cgit 1.5.1 From 350622a107c356da630eba09b63ed4b6de94b198 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:11:36 +0100 Subject: Handle the server leaving a public room --- synapse/handlers/user_directory.py | 23 ++++++++++++++++++++--- synapse/state.py | 11 +++++++++++ synapse/storage/schema/delta/42/user_dir.py | 4 ++++ synapse/storage/user_directory.py | 11 +++++++++++ 4 files changed, 46 insertions(+), 3 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 88b79e3325..4e491a43e6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -132,7 +132,9 @@ class UserDirectoyHandler(object): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - # TODO: Check we're still joined to room + is_in_room = yield self.store.get_is_host_in_room(room_id, self.server_name) + if not is_in_room: + return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) if not is_public: @@ -229,7 +231,22 @@ class UserDirectoyHandler(object): if change is None: continue - if change: + if not change: + # Need to check if the server left the room entirely, if so + # we might need to remove all the users in that room + is_in_room = yield self.store.get_is_host_in_room( + room_id, self.server_name, + ) + if not is_in_room: + # Fetch all the users that we marked as being in user + # directory due to being in the room and then check if + # need to remove those users or not + user_ids = yield self.store.get_users_in_dir_due_to_room(room_id) + for user_id in user_ids: + yield self._handle_remove_user(room_id, user_id) + return + + if change: # The user joined event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -237,7 +254,7 @@ class UserDirectoyHandler(object): ) yield self._handle_new_user(room_id, state_key, profile) - else: + else: # The user left yield self._handle_remove_user(room_id, state_key) @defer.inlineCallbacks diff --git a/synapse/state.py b/synapse/state.py index 02fee47f39..dffa79e4c9 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -186,6 +186,17 @@ class StateHandler(object): ) defer.returnValue(joined_hosts) + @defer.inlineCallbacks + def get_is_host_in_room(self, room_id, host, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.debug("calling resolve_state_groups from get_is_host_in_room") + entry = yield self.resolve_state_groups(room_id, latest_event_ids) + is_host_joined = yield self.store.is_host_joined( + room_id, host, entry.state_id, entry.state + ) + defer.returnValue(is_host_joined) + @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 57b89ba552..95a7a79fd3 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -41,6 +41,7 @@ CREATE TABLE user_directory ( ); CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,6 +49,9 @@ CREATE INDEX user_directory_user_idx ON user_directory(user_id); SQLITE_TABLE = """ CREATE VIRTUAL TABLE user_directory USING fts4 ( user_id, room_id, display_name, avatar_url, value ); + +CREATE INDEX user_directory_room_idx ON user_directory(room_id); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 83812bf092..0df979cb01 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -124,6 +124,17 @@ class UserDirectoryStore(SQLBaseStore): ) self.get_user_in_directory.invalidate((user_id,)) + def get_users_in_dir_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="user_directory", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + def get_all_rooms(self): """Get all room_ids we've ever known about """ -- cgit 1.5.1 From dc51af3d031030fdf553f8478f7930596f2694f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:13:49 +0100 Subject: Pull max id from correct table --- synapse/handlers/user_directory.py | 6 ++---- synapse/storage/user_directory.py | 8 ++++++++ 2 files changed, 10 insertions(+), 4 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4e491a43e6..8331f6422e 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -111,9 +111,7 @@ class UserDirectoyHandler(object): """Populates the user_directory from the current state of the DB, used when synapse first starts with user_directory support """ - - # TODO: pull from current delta stream_id - new_pos = self.store.get_room_max_stream_ordering() + new_pos = yield self.store.get_max_stream_id_in_current_state_deltas() # Delete any existing entries just in case there are any yield self.store.delete_all_from_user_dir() @@ -284,7 +282,7 @@ class UserDirectoyHandler(object): # is public (i.e. the room_id in the database) return - # TODO: Make this faster? + # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 0df979cb01..011c711ec1 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -195,6 +195,14 @@ class UserDirectoryStore(SQLBaseStore): "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id ) + def get_max_stream_id_in_current_state_deltas(self): + return self._simple_select_one_onecol( + table="current_state_delta_stream", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), -1)", + desc="get_max_stream_id_in_current_state_deltas", + ) + @defer.inlineCallbacks def search_user_dir(self, search_term, limit): """Searches for users in directory -- cgit 1.5.1 From 5d79d728f5f38463171f0d063713905f8cb9faec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:23:49 +0100 Subject: Split out directory and search tables --- synapse/storage/schema/delta/42/user_dir.py | 25 ++++++------ synapse/storage/user_directory.py | 60 ++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 29 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 95a7a79fd3..7e32662928 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -28,30 +28,33 @@ CREATE TABLE user_directory_stream_pos ( ); INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); -""" - -POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, - avatar_url TEXT, - vector tsvector + avatar_url TEXT ); -CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ -SQLITE_TABLE = """ -CREATE VIRTUAL TABLE user_directory - USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +POSTGRES_TABLE = """ +CREATE TABLE user_directory_search ( + user_id TEXT NOT NULL, + vector tsvector +); -CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); +CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory_search + USING fts4 ( user_id, value ); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 011c711ec1..b1957cb873 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -61,9 +61,8 @@ class UserDirectoryStore(SQLBaseStore): # We weight the loclpart most highly, then display name and finally # server name sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, vector) - VALUES (?,?,?,?, + INSERT INTO user_directory_search(user_id, vector) + VALUES (?, setweight(to_tsvector('english', ?), 'A') || setweight(to_tsvector('english', ?), 'D') || setweight(to_tsvector('english', COALESCE(?, '')), 'B') @@ -71,21 +70,19 @@ class UserDirectoryStore(SQLBaseStore): """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, - get_localpart_from_id(user_id), get_domain_from_id(user_id), - p.display_name, + user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), + profile.display_name, ) - for user_id, p in users_with_profile.iteritems() + for user_id, profile in users_with_profile.iteritems() ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, value) - VALUES (?,?,?,?,?) + INSERT INTO user_directory_search(user_id, value) + VALUES (?,?) """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, + user_id, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) for user_id, p in users_with_profile.iteritems() @@ -96,6 +93,19 @@ class UserDirectoryStore(SQLBaseStore): def _add_profiles_to_user_dir_txn(txn): txn.executemany(sql, args) + self._simple_insert_many_txn( + txn, + table="user_directory", + values=[ + { + "user_id": user_id, + "room_id": room_id, + "display_name": profile.display_name, + "avatar_url": profile.avatar_url, + } + for user_id, profile in users_with_profile.iteritems() + ] + ) for user_id in users_with_profile: txn.call_after( self.get_user_in_directory.invalidate, (user_id,) @@ -117,12 +127,23 @@ class UserDirectoryStore(SQLBaseStore): @defer.inlineCallbacks def remove_from_user_dir(self, user_id): - yield self._simple_delete( - table="user_directory", - keyvalues={"user_id": user_id}, - desc="remove_from_user_dir", + def _remove_from_user_dir_txn(txn): + self._simple_delete_txn( + txn, + table="user_directory", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="user_directory_search", + keyvalues={"user_id": user_id}, + ) + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + return self.runInteraction( + "remove_from_user_dir", _remove_from_user_dir_txn, ) - self.get_user_in_directory.invalidate((user_id,)) def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're @@ -150,6 +171,7 @@ class UserDirectoryStore(SQLBaseStore): """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") + txn.execute("DELETE FROM user_directory_search") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -225,7 +247,8 @@ class UserDirectoryStore(SQLBaseStore): if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE vector @@ plainto_tsquery('english', ?) ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? @@ -234,7 +257,8 @@ class UserDirectoryStore(SQLBaseStore): elif isinstance(self.database_engine, Sqlite3Engine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC LIMIT ? -- cgit 1.5.1 From 304880d18545b59a51c5d4b928e563c6d1514fdc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:46:36 +0100 Subject: Add stream change cache --- synapse/storage/__init__.py | 12 ++++++++++++ synapse/storage/events.py | 4 ++++ synapse/storage/user_directory.py | 4 +++- synapse/util/caches/stream_change_cache.py | 15 +++++++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 11655bf60f..3c88ba9860 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -223,6 +223,18 @@ class DataStore(RoomMemberStore, RoomStore, "DeviceListFederationStreamChangeCache", device_list_max, ) + curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + db_conn, "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dfb57f9d12..77861488d2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -755,6 +755,10 @@ class EventsStore(SQLBaseStore): ] ) + self._curr_state_delta_stream_cache.enttity_has_changed( + room_id, max_stream_order, + ) + # Invalidate the various caches # Figure out the changes of membership to invalidate the diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index b1957cb873..15b8ea0460 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -204,7 +204,9 @@ class UserDirectoryStore(SQLBaseStore): ) def get_current_state_deltas(self, prev_stream_id): - # TODO: Add stream change cache + if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): + return [] + # TODO: Add limit sql = """ SELECT stream_id, room_id, type, state_key, event_id, prev_event_id diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 70fe00ce0b..c498aee46c 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -89,6 +89,21 @@ class StreamChangeCache(object): return result + def has_any_entity_changed(self, stream_pos): + """Returns if any entity has changed + """ + assert type(stream_pos) is int + + if stream_pos >= self._earliest_known_stream_pos: + self.metrics.inc_hits() + if stream_pos >= max(self._cache): + return False + else: + return True + else: + self.metrics.inc_misses() + return True + def get_all_entities_changed(self, stream_pos): """Returns all entites that have had new things since the given position. If the position is too old it will return None. -- cgit 1.5.1 From 63c58c2a3fced42c254da1c1ae5e55a977b7141c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:17:58 +0100 Subject: Limit number of things we fetch out of the db --- synapse/storage/user_directory.py | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 15b8ea0460..9137fc24ea 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -207,16 +207,37 @@ class UserDirectoryStore(SQLBaseStore): if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): return [] - # TODO: Add limit - sql = """ - SELECT stream_id, room_id, type, state_key, event_id, prev_event_id - FROM current_state_delta_stream - WHERE stream_id > ? - ORDER BY stream_id ASC - """ + def get_current_state_deltas_txn(txn): + # First we calculate the max stream id that will give us less than + # N results + sql = """ + SELECT stream_id, count(*) + FROM current_state_delta_stream + WHERE stream_id > ? + GROUP BY stream_id + ORDER BY stream_id ASC + LIMIT 100 + """ + txn.execute(sql, (prev_stream_id,)) + + total = 0 + for max_stream_id, count in txn: + total += count + if total > 50: + break - return self._execute( - "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id + # Now actually get the deltas + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute(sql, (prev_stream_id, max_stream_id,)) + return self.cursor_to_dict(txn) + + return self.runInteraction( + "get_current_state_deltas", get_current_state_deltas_txn ) def get_max_stream_id_in_current_state_deltas(self): -- cgit 1.5.1 From f0910617111fe81b79777a2c597b1e4240d61a9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:34:40 +0100 Subject: Fix tests --- synapse/handlers/user_directory.py | 4 ++-- synapse/storage/user_directory.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 8331f6422e..75f259ee4e 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -130,7 +130,7 @@ class UserDirectoyHandler(object): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - is_in_room = yield self.store.get_is_host_in_room(room_id, self.server_name) + is_in_room = yield self.state.get_is_host_in_room(room_id, self.server_name) if not is_in_room: return @@ -232,7 +232,7 @@ class UserDirectoyHandler(object): if not change: # Need to check if the server left the room entirely, if so # we might need to remove all the users in that room - is_in_room = yield self.store.get_is_host_in_room( + is_in_room = yield self.state.get_is_host_in_room( room_id, self.server_name, ) if not is_in_room: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 9137fc24ea..348064436f 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -221,6 +221,7 @@ class UserDirectoryStore(SQLBaseStore): txn.execute(sql, (prev_stream_id,)) total = 0 + max_stream_id = prev_stream_id for max_stream_id, count in txn: total += count if total > 50: -- cgit 1.5.1 From f1378aef9199390ae0130cc6bda5c7f4fa7a2e33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:03:08 +0100 Subject: Convert to int --- synapse/storage/user_directory.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 348064436f..71b0502646 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -204,6 +204,7 @@ class UserDirectoryStore(SQLBaseStore): ) def get_current_state_deltas(self, prev_stream_id): + prev_stream_id = int(prev_stream_id) if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): return [] -- cgit 1.5.1 From f5cc22bdc63e58857f435227b70d145d07aabb77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:30:26 +0100 Subject: Comment on why arbitrary comments --- synapse/storage/user_directory.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 71b0502646..2e9175f50a 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -210,7 +210,9 @@ class UserDirectoryStore(SQLBaseStore): def get_current_state_deltas_txn(txn): # First we calculate the max stream id that will give us less than - # N results + # N results. + # We arbitarily limit to 100 stream_id entries to ensure we don't + # select toooo many. sql = """ SELECT stream_id, count(*) FROM current_state_delta_stream @@ -225,7 +227,9 @@ class UserDirectoryStore(SQLBaseStore): max_stream_id = prev_stream_id for max_stream_id, count in txn: total += count - if total > 50: + if total > 100: + # We arbitarily limit to 100 entries to ensure we don't + # select toooo many. break # Now actually get the deltas -- cgit 1.5.1 From a757dd4863d0a467becf5b73ca15eafeb3c2823c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 18:07:12 +0100 Subject: Use prefix matching --- synapse/storage/user_directory.py | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 2e9175f50a..ca2be9daf2 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -21,6 +21,8 @@ from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id +import re + class UserDirectoryStore(SQLBaseStore): @@ -272,17 +274,17 @@ class UserDirectoryStore(SQLBaseStore): ] } """ - + search_query = _parse_query(self.database_engine, search_term) if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) - WHERE vector @@ plainto_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC + WHERE vector @@ to_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC LIMIT ? """ - args = (search_term, search_term, limit + 1,) + args = (search_query, search_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ SELECT user_id, display_name, avatar_url @@ -292,7 +294,7 @@ class UserDirectoryStore(SQLBaseStore): ORDER BY rank(matchinfo(user_directory)) DESC LIMIT ? """ - args = (search_term, limit + 1) + args = (search_query, limit + 1) else: # This should be unreachable. raise Exception("Unrecognized database engine") @@ -307,3 +309,25 @@ class UserDirectoryStore(SQLBaseStore): "limited": limited, "results": results, }) + + +def _parse_query(database_engine, search_term): + """Takes a plain unicode string from the user and converts it into a form + that can be passed to database. + We use this so that we can add prefix matching, which isn't something + that is supported by default. + + We specifically add both a prefix and non prefix matching term so that + exact matches get ranked higher. + """ + + # Pull out the individual words, discarding any non-word characters. + results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + if isinstance(database_engine, PostgresEngine): + return " & ".join("%s:* & %s" % (result, result,) for result in results) + elif isinstance(database_engine, Sqlite3Engine): + return " & ".join("%s* & %s" % (result, result,) for result in results) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") -- cgit 1.5.1 From 036362ede6cadc4d6f289dbcabfc5e06d370a587 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 18:17:47 +0100 Subject: Order by if they have profile info --- synapse/storage/user_directory.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ca2be9daf2..79161f2745 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -274,14 +274,20 @@ class UserDirectoryStore(SQLBaseStore): ] } """ + search_query = _parse_query(self.database_engine, search_term) + if isinstance(self.database_engine, PostgresEngine): + # We order by rank and then if they have profile info sql = """ SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) WHERE vector @@ to_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + ORDER BY + ts_rank_cd(vector, to_tsquery('english', ?)) DESC, + display_name IS NULL, + avatar_url IS NULL LIMIT ? """ args = (search_query, search_query, limit + 1,) @@ -291,7 +297,10 @@ class UserDirectoryStore(SQLBaseStore): FROM user_directory_search INNER JOIN user_directory USING (user_id) WHERE value MATCH ? - ORDER BY rank(matchinfo(user_directory)) DESC + ORDER BY + rank(matchinfo(user_directory)) DESC, + display_name IS NULL, + avatar_url IS NULL LIMIT ? """ args = (search_query, limit + 1) -- cgit 1.5.1 From 0fe6f3c521498fc92c58c49d8edcc6984471da08 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 11:09:49 +0100 Subject: Bug fixes and logging - Check if room is public when a user joins before adding to user dir - Fix typo of field name "content.join_rules" -> "content.join_rule" --- synapse/handlers/user_directory.py | 22 +++++++++++++++++++++- synapse/storage/user_directory.py | 2 +- 2 files changed, 22 insertions(+), 2 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 7130cc6ee3..130ff45ec5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -170,6 +170,8 @@ class UserDirectoyHandler(object): event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + # For join rule and visibility changes we need to check if the room # may have become public or not and add/remove the users in said room if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): @@ -201,7 +203,14 @@ class UserDirectoyHandler(object): yield self._handle_remove_user(room_id, user_id) return + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + if change: # The user joined + if not is_public: + return + event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -211,7 +220,10 @@ class UserDirectoyHandler(object): yield self._handle_new_user(room_id, state_key, profile) else: # The user left yield self._handle_remove_user(room_id, state_key) + else: + logger.debug("Ignoring irrelevant type: %r", typ) + @defer.inlineCallbacks def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): """Handle a room having potentially changed from/to world_readable/publically joinable. @@ -222,6 +234,8 @@ class UserDirectoyHandler(object): event_id (str|None): The new event after the state change typ (str): Type of the event """ + logger.debug("Handling change for %s", typ) + if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( prev_event_id, event_id, @@ -231,7 +245,7 @@ class UserDirectoyHandler(object): elif typ == EventTypes.JoinRules: change = yield self._get_key_change( prev_event_id, event_id, - key_name="join_rules", + key_name="join_rule", public_value=JoinRules.PUBLIC, ) else: @@ -239,6 +253,7 @@ class UserDirectoyHandler(object): # If change is None, no change. True => become world_readable/public, # False => was world_readable/public if change is None: + logger.debug("No change") return # There's been a change to or from being world readable. @@ -247,6 +262,8 @@ class UserDirectoyHandler(object): room_id ) + logger.debug("Change: %r, is_public: %r", change, is_public) + if change and not is_public: # If we became world readable but room isn't currently public then # we ignore the change @@ -326,6 +343,7 @@ class UserDirectoyHandler(object): event = yield self.store.get_event(event_id, allow_none=True) if not event and not prev_event: + logger.debug("Neither event exists: %r %r", prev_event_id, event_id) defer.returnValue(None) prev_value = None @@ -337,6 +355,8 @@ class UserDirectoyHandler(object): if event: value = event.content.get(key_name, None) + logger.debug("prev_value: %r -> value: %r", prev_value, value) + if value == public_value and prev_value != public_value: defer.returnValue(True) elif value != public_value and prev_value == public_value: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 79161f2745..7323d783ac 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -38,7 +38,7 @@ class UserDirectoryStore(SQLBaseStore): if join_rules_id: join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) if join_rule_ev: - if join_rule_ev.content.get("join_rules") == JoinRules.PUBLIC: + if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC: defer.returnValue(True) hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) -- cgit 1.5.1 From 59dbb470654ff812975f888a3ec41537916091ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 11:41:29 +0100 Subject: Remove spurious inlineCallbacks --- synapse/storage/user_directory.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 7323d783ac..a251aee465 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -127,7 +127,6 @@ class UserDirectoryStore(SQLBaseStore): ) self.get_user_in_directory.invalidate((user_id,)) - @defer.inlineCallbacks def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): self._simple_delete_txn( -- cgit 1.5.1 From 02a6108235610304b981939bd2c74ae7f36dd929 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 13:16:40 +0100 Subject: Tweak search query --- synapse/storage/user_directory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index a251aee465..c2ea261289 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -333,9 +333,9 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join("%s:* & %s" % (result, result,) for result in results) + return " & ".join("(%s:* | %s)" % (result, result,) for result in results) elif isinstance(database_engine, Sqlite3Engine): - return " & ".join("%s* & %s" % (result, result,) for result in results) + return " & ".join("(%s* | %s)" % (result, result,) for result in results) else: # This should be unreachable. raise Exception("Unrecognized database engine") -- cgit 1.5.1 From d5477c7afd884f200c55a1c6a187983756f49577 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 13:27:28 +0100 Subject: Tweak search query --- synapse/storage/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index c2ea261289..4fe30ce72e 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -284,7 +284,7 @@ class UserDirectoryStore(SQLBaseStore): INNER JOIN user_directory USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY - ts_rank_cd(vector, to_tsquery('english', ?)) DESC, + ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? -- cgit 1.5.1 From 21e255a8f1948c2fd298ce2e037d20bdd25f2f69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 14:50:46 +0100 Subject: Split the table in two --- synapse/handlers/user_directory.py | 77 +++++++++++++++++++---------- synapse/storage/_base.py | 5 ++ synapse/storage/schema/delta/42/user_dir.py | 10 +++- synapse/storage/user_directory.py | 77 +++++++++++++++++++++++++++-- 4 files changed, 138 insertions(+), 31 deletions(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a8525fc86a..d795a9f8d5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -50,6 +50,7 @@ class UserDirectoyHandler(object): # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() + self.initially_handled_users_in_public = set() # The current position in the current_state_delta stream self.pos = None @@ -145,8 +146,6 @@ class UserDirectoyHandler(object): return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return users_with_profile = yield self.state.get_current_user_in_room(room_id) unhandled_users = set(users_with_profile) - self.initially_handled_users @@ -159,6 +158,13 @@ class UserDirectoyHandler(object): self.initially_handled_users |= unhandled_users + if is_public: + yield self.store.add_users_to_public_room( + room_id, + user_ids=unhandled_users - self.initially_handled_users_in_public + ) + self.initially_handled_users_in_public != unhandled_users + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process @@ -206,14 +212,7 @@ class UserDirectoyHandler(object): else: logger.debug("Server is still in room: %r", room_id) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - if change: # The user joined - if not is_public: - return - event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -276,11 +275,13 @@ class UserDirectoyHandler(object): # ignore the change return - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): - if change: + if change: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): yield self._handle_new_user(room_id, user_id, profile) - else: + else: + users = yield self.store.get_users_in_public_due_to_room(room_id) + for user_id in users: yield self._handle_remove_user(room_id, user_id) @defer.inlineCallbacks @@ -292,11 +293,21 @@ class UserDirectoyHandler(object): user_id (str) """ logger.debug("Adding user to dir, %r", user_id) + row = yield self.store.get_user_in_directory(user_id) - if row: + if not row: + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if not is_public: return - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -309,15 +320,20 @@ class UserDirectoyHandler(object): logger.debug("Maybe removing user %r", user_id) row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: - # Either the user wasn't in directory or we're still in a room that - # is public (i.e. the room_id in the database) - logger.debug("Not removing as row: %r", row) + update_user_dir = row and row["room_id"] == room_id + + row = yield self.store.get_user_in_public_room(user_id) + update_user_in_public = row and row["room_id"] == room_id + + if not update_user_in_public and not update_user_dir: return # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: + if not update_user_in_public and not update_user_dir: + break + is_in_room = yield self.state.get_is_host_in_room( j_room_id, self.server_name, ) @@ -325,16 +341,23 @@ class UserDirectoyHandler(object): if not is_in_room: continue - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) - - if is_public: + if update_user_dir: + update_user_dir = False yield self.store.update_user_in_user_dir(user_id, j_room_id) - logger.debug("Not removing as found other public room: %r", j_room_id) - return - yield self.store.remove_from_user_dir(user_id) + if update_user_in_public: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_public_user_list(user_id, j_room_id) + update_user_in_public = False + + if update_user_dir: + yield self.store.remove_from_user_dir(user_id) + elif update_user_in_public: + yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 58b73af7d2..db816346f5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -425,6 +425,11 @@ class SQLBaseStore(object): txn.execute(sql, vals) + def _simple_insert_many(self, table, values, desc): + return self.runInteraction( + desc, self._simple_insert_many_txn, table, values + ) + @staticmethod def _simple_insert_many_txn(txn, table, values): if not values: diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index c34aa5e7d2..ea6a18196d 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -31,13 +31,21 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, -- A room_id that we know is public + room_id TEXT NOT NULL, -- A room_id that we know the user is joined to display_name TEXT, avatar_url TEXT ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); + +CREATE TABLE users_in_pubic_room ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL -- A room_id that we know is public +); + +CREATE INDEX users_in_pubic_room_room_idx ON users_in_pubic_room(room_id); +CREATE UNIQUE INDEX users_in_pubic_room_user_idx ON users_in_pubic_room(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 4fe30ce72e..cab0afc5c3 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -50,12 +50,34 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) - def add_profiles_to_user_dir(self, room_id, users_with_profile): - """Add profiles to the user directory + @defer.inlineCallbacks + def add_users_to_public_room(self, room_id, user_ids): + """Add user to the list of users in public rooms Args: room_id (str): A room_id that all users are in that is world_readable or publically joinable + user_ids (list(str)): Users to add + """ + yield self._simple_insert_many( + table="users_in_pubic_room", + values=[ + { + "user_id": user_id, + "room_id": room_id, + } + for user_id in user_ids + ], + desc="add_users_to_public_room" + ) + for user_id in user_ids: + self.get_user_in_public_room.invalidate((user_id,)) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are joined to users_with_profile (dict): Users to add to directory in the form of mapping of user_id -> ProfileInfo """ @@ -125,7 +147,15 @@ class UserDirectoryStore(SQLBaseStore): updatevalues={"room_id": room_id}, desc="update_user_in_user_dir", ) - self.get_user_in_directory.invalidate((user_id,)) + + @defer.inlineCallbacks + def update_user_in_public_user_list(self, user_id, room_id): + yield self._simple_update_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + updatevalues={"room_id": room_id}, + desc="update_user_in_public_user_list", + ) def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): @@ -139,13 +169,41 @@ class UserDirectoryStore(SQLBaseStore): table="user_directory_search", keyvalues={"user_id": user_id}, ) + self._simple_delete_txn( + txn, + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + ) txn.call_after( self.get_user_in_directory.invalidate, (user_id,) ) + txn.call_after( + self.get_user_in_public_room.invalidate, (user_id,) + ) return self.runInteraction( "remove_from_user_dir", _remove_from_user_dir_txn, ) + @defer.inlineCallbacks + def remove_from_user_in_public_room(self, user_id): + yield self._simple_delete( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + desc="remove_from_user_in_public_room", + ) + self.get_user_in_public_room.invalidate((user_id,)) + + def get_users_in_public_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="users_in_pubic_room", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_public_due_to_room", + ) + def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're in the given room_id @@ -173,6 +231,7 @@ class UserDirectoryStore(SQLBaseStore): def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") + txn.execute("DELETE FROM users_in_pubic_room") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -188,6 +247,16 @@ class UserDirectoryStore(SQLBaseStore): desc="get_user_in_directory", ) + @cached() + def get_user_in_public_room(self, user_id): + return self._simple_select_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + retcols=("room_id",), + allow_none=True, + desc="get_user_in_public_room", + ) + def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -282,6 +351,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, @@ -295,6 +365,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC, -- cgit 1.5.1 From 4d039aa2ca78730ca5f8bb9043ab75328004d7a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 14:58:48 +0100 Subject: Fix sqlite --- synapse/storage/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index cab0afc5c3..bcf24fa4d0 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -368,7 +368,7 @@ class UserDirectoryStore(SQLBaseStore): INNER JOIN users_in_pubic_room USING (user_id) WHERE value MATCH ? ORDER BY - rank(matchinfo(user_directory)) DESC, + rank(matchinfo(user_directory_search)) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? -- cgit 1.5.1 From 1a01af079e97d25bbac46127b8bd069189c02c97 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 15:39:51 +0100 Subject: Handle profile updates in user directory --- synapse/handlers/user_directory.py | 25 +++++++++++++++++++ synapse/storage/user_directory.py | 49 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) (limited to 'synapse/storage/user_directory.py') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index d795a9f8d5..0182cf86d6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -192,6 +192,8 @@ class UserDirectoyHandler(object): ) if change is None: + # Handle any profile changes + yield self._handle_profile_change(state_key, prev_event_id, event_id) continue if not change: @@ -359,6 +361,29 @@ class UserDirectoyHandler(object): elif update_user_in_public: yield self.store.remove_from_user_in_public_room(user_id) + @defer.inlineCallbacks + def _handle_profile_change(self, user_id, prev_event_id, event_id): + """Check member event changes for any profile changes and update the + database if there are. + """ + if not prev_event_id or not event_id: + return + + prev_event = yield self.store.get_event(prev_event_id) + event = yield self.store.get_event(event_id) + + if event.membership != Membership.JOIN: + return + + prev_name = prev_event.content.get("displayname") + new_name = event.content.get("displayname") + + prev_avatar = prev_event.content.get("avatar_url") + new_avatar = event.content.get("avatar_url") + + if prev_name != new_name or prev_avatar != new_avatar: + yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) + @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): """Given two events check if the `key_name` field in content changed diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index bcf24fa4d0..6a4bf63f0d 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -147,6 +147,53 @@ class UserDirectoryStore(SQLBaseStore): updatevalues={"room_id": room_id}, desc="update_user_in_user_dir", ) + self.get_user_in_directory.invalidate((user_id,)) + + def update_profile_in_user_dir(self, user_id, display_name, avatar_url): + def _update_profile_in_user_dir_txn(txn): + self._simple_update_one_txn( + txn, + table="user_directory", + keyvalues={"user_id": user_id}, + updatevalues={"display_name": display_name, "avatar_url": avatar_url}, + ) + + if isinstance(self.database_engine, PostgresEngine): + # We weight the loclpart most highly, then display name and finally + # server name + sql = """ + UPDATE user_directory_search + SET vector = setweight(to_tsvector('english', ?), 'A') + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') + WHERE user_id = ? + """ + args = ( + get_localpart_from_id(user_id), get_domain_from_id(user_id), + display_name, + user_id, + ) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + UPDATE user_directory_search + set value = ? + WHERE user_id = ? + """ + args = ( + "%s %s" % (user_id, display_name,) if display_name else user_id, + user_id, + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + txn.execute(sql, args) + + txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) + + return self.runInteraction( + "update_profile_in_user_dir", _update_profile_in_user_dir_txn + ) @defer.inlineCallbacks def update_user_in_public_user_list(self, user_id, room_id): @@ -156,6 +203,7 @@ class UserDirectoryStore(SQLBaseStore): updatevalues={"room_id": room_id}, desc="update_user_in_public_user_list", ) + self.get_user_in_public_room.invalidate((user_id,)) def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): @@ -233,6 +281,7 @@ class UserDirectoryStore(SQLBaseStore): txn.execute("DELETE FROM user_directory_search") txn.execute("DELETE FROM users_in_pubic_room") txn.call_after(self.get_user_in_directory.invalidate_all) + txn.call_after(self.get_user_in_public_room.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn ) -- cgit 1.5.1