diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 60cdc884e6..a2f8c23a65 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -52,7 +52,9 @@ class BackgroundUpdatePerformance(object):
Returns:
A duration in ms as a float
"""
- if self.total_item_count == 0:
+ if self.avg_duration_ms == 0:
+ return 0
+ elif self.total_item_count == 0:
return None
else:
# Use the exponential moving average so that we can adapt to
@@ -64,7 +66,9 @@ class BackgroundUpdatePerformance(object):
Returns:
A duration in ms as a float
"""
- if self.total_item_count == 0:
+ if self.total_duration_ms == 0:
+ return 0
+ elif self.total_item_count == 0:
return None
else:
return float(self.total_item_count) / float(self.total_duration_ms)
diff --git a/synapse/storage/schema/delta/53/user_dir_populate.sql b/synapse/storage/schema/delta/53/user_dir_populate.sql
new file mode 100644
index 0000000000..955b8fdbd6
--- /dev/null
+++ b/synapse/storage/schema/delta/53/user_dir_populate.sql
@@ -0,0 +1,30 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Set up staging tables
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('populate_user_directory_createtables', '{}');
+
+-- Run through each room and update the user directory according to who is in it
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables');
+
+-- Insert all users, if search_all_users is on
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_user_directory_process_users', '{}', 'populate_user_directory_rooms');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users');
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 1c00b956e5..4ee653210f 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -16,12 +16,10 @@
import logging
import re
-from six import iteritems
-
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
-from synapse.storage._base import SQLBaseStore
+from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.state import StateFilter
from synapse.types import get_domain_from_id, get_localpart_from_id
@@ -30,7 +28,276 @@ from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
-class UserDirectoryStore(SQLBaseStore):
+TEMP_TABLE = "_temp_populate_user_directory"
+
+
+class UserDirectoryStore(BackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+ super(UserDirectoryStore, self).__init__(db_conn, hs)
+
+ self.server_name = hs.hostname
+
+ self.register_background_update_handler(
+ "populate_user_directory_createtables",
+ self._populate_user_directory_createtables,
+ )
+ self.register_background_update_handler(
+ "populate_user_directory_process_rooms",
+ self._populate_user_directory_process_rooms,
+ )
+ self.register_background_update_handler(
+ "populate_user_directory_process_users",
+ self._populate_user_directory_process_users,
+ )
+ self.register_background_update_handler(
+ "populate_user_directory_cleanup", self._populate_user_directory_cleanup
+ )
+
+ @defer.inlineCallbacks
+ def _populate_user_directory_createtables(self, progress, batch_size):
+
+ # Get all the rooms that we want to process.
+ def _make_staging_area(txn):
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_position(position TEXT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ # Get rooms we want to process from the database
+ sql = """
+ SELECT room_id, count(*) FROM current_state_events
+ GROUP BY room_id
+ """
+ txn.execute(sql)
+ rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
+ self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
+ del rooms
+
+ # If search all users is on, get all the users we want to add.
+ if self.hs.config.user_directory_search_all_users:
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_users(user_id TEXT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ txn.execute("SELECT name FROM users")
+ users = [{"user_id": x[0]} for x in txn.fetchall()]
+
+ self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
+
+ new_pos = yield self.get_max_stream_id_in_current_state_deltas()
+ yield self.runInteraction(
+ "populate_user_directory_temp_build", _make_staging_area
+ )
+ yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+
+ yield self._end_background_update("populate_user_directory_createtables")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _populate_user_directory_cleanup(self, progress, batch_size):
+ """
+ Update the user directory stream position, then clean up the old tables.
+ """
+ position = yield self._simple_select_one_onecol(
+ TEMP_TABLE + "_position", None, "position"
+ )
+ yield self.update_user_directory_stream_pos(position)
+
+ def _delete_staging_area(txn):
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+
+ yield self.runInteraction(
+ "populate_user_directory_cleanup", _delete_staging_area
+ )
+
+ yield self._end_background_update("populate_user_directory_cleanup")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _populate_user_directory_process_rooms(self, progress, batch_size):
+
+ state = self.hs.get_state_handler()
+
+ # If we don't have progress filed, delete everything.
+ if not progress:
+ yield self.delete_all_from_user_dir()
+
+ def _get_next_batch(txn):
+ sql = """
+ SELECT room_id FROM %s
+ ORDER BY events DESC
+ LIMIT %s
+ """ % (
+ TEMP_TABLE + "_rooms",
+ str(batch_size),
+ )
+ txn.execute(sql)
+ rooms_to_work_on = txn.fetchall()
+
+ if not rooms_to_work_on:
+ return None
+
+ rooms_to_work_on = [x[0] for x in rooms_to_work_on]
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+ progress["remaining"] = txn.fetchone()[0]
+
+ return rooms_to_work_on
+
+ rooms_to_work_on = yield self.runInteraction(
+ "populate_user_directory_temp_read", _get_next_batch
+ )
+
+ # No more rooms -- complete the transaction.
+ if not rooms_to_work_on:
+ yield self._end_background_update("populate_user_directory_process_rooms")
+ defer.returnValue(1)
+
+ logger.info(
+ "Processing the next %d rooms of %d remaining"
+ % (len(rooms_to_work_on), progress["remaining"])
+ )
+
+ for room_id in rooms_to_work_on:
+ is_in_room = yield self.is_host_joined(room_id, self.server_name)
+
+ if is_in_room:
+ is_public = yield self.is_room_world_readable_or_publicly_joinable(
+ room_id
+ )
+
+ users_with_profile = yield state.get_current_user_in_room(room_id)
+ user_ids = set(users_with_profile)
+
+ # Update each user in the user directory.
+ for user_id, profile in users_with_profile.items():
+ yield self.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
+
+ to_insert = set()
+
+ if is_public:
+ for user_id in user_ids:
+ if self.get_if_app_services_interested_in_user(user_id):
+ continue
+
+ to_insert.add(user_id)
+
+ if to_insert:
+ yield self.add_users_in_public_rooms(room_id, to_insert)
+ to_insert.clear()
+ else:
+ for user_id in user_ids:
+ if not self.hs.is_mine_id(user_id):
+ continue
+
+ if self.get_if_app_services_interested_in_user(user_id):
+ continue
+
+ for other_user_id in user_ids:
+ if user_id == other_user_id:
+ continue
+
+ user_set = (user_id, other_user_id)
+ to_insert.add(user_set)
+
+ if to_insert:
+ yield self.add_users_who_share_private_room(room_id, to_insert)
+ to_insert.clear()
+
+ # We've finished a room. Delete it from the table.
+ yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+ yield self.runInteraction(
+ "populate_user_directory",
+ self._background_update_progress_txn,
+ "populate_user_directory_process_rooms",
+ progress,
+ )
+
+ defer.returnValue(len(rooms_to_work_on))
+
+ @defer.inlineCallbacks
+ def _populate_user_directory_process_users(self, progress, batch_size):
+ """
+ If search_all_users is enabled, add all of the users to the user directory.
+ """
+ if not self.hs.config.user_directory_search_all_users:
+ yield self._end_background_update("populate_user_directory_process_users")
+ defer.returnValue(1)
+
+ def _get_next_batch(txn):
+ sql = "SELECT user_id FROM %s LIMIT %s" % (
+ TEMP_TABLE + "_users",
+ str(batch_size),
+ )
+ txn.execute(sql)
+ users_to_work_on = txn.fetchall()
+
+ if not users_to_work_on:
+ return None
+
+ users_to_work_on = [x[0] for x in users_to_work_on]
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
+ txn.execute(sql)
+ progress["remaining"] = txn.fetchone()[0]
+
+ return users_to_work_on
+
+ users_to_work_on = yield self.runInteraction(
+ "populate_user_directory_temp_read", _get_next_batch
+ )
+
+ # No more users -- complete the transaction.
+ if not users_to_work_on:
+ yield self._end_background_update("populate_user_directory_process_users")
+ defer.returnValue(1)
+
+ logger.info(
+ "Processing the next %d users of %d remaining"
+ % (len(users_to_work_on), progress["remaining"])
+ )
+
+ for user_id in users_to_work_on:
+ profile = yield self.get_profileinfo(get_localpart_from_id(user_id))
+ yield self.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
+
+ # We've finished processing a user. Delete it from the table.
+ yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id})
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+ yield self.runInteraction(
+ "populate_user_directory",
+ self._background_update_progress_txn,
+ "populate_user_directory_process_users",
+ progress,
+ )
+
+ defer.returnValue(len(users_to_work_on))
+
@defer.inlineCallbacks
def is_room_world_readable_or_publicly_joinable(self, room_id):
"""Check if the room is either world_readable or publically joinable
@@ -62,89 +329,16 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue(False)
- def add_profiles_to_user_dir(self, users_with_profile):
- """Add profiles to the user directory
-
- Args:
- users_with_profile (dict): Users to add to directory in the form of
- mapping of user_id -> ProfileInfo
+ def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
+ """
+ Update or add a user's profile in the user directory.
"""
- if isinstance(self.database_engine, PostgresEngine):
- # We weight the loclpart most highly, then display name and finally
- # server name
- sql = """
- INSERT INTO user_directory_search(user_id, vector)
- VALUES (?,
- setweight(to_tsvector('english', ?), 'A')
- || setweight(to_tsvector('english', ?), 'D')
- || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
- )
- """
- args = (
- (
- user_id,
- get_localpart_from_id(user_id),
- get_domain_from_id(user_id),
- profile.display_name,
- )
- for user_id, profile in iteritems(users_with_profile)
- )
- elif isinstance(self.database_engine, Sqlite3Engine):
- sql = """
- INSERT INTO user_directory_search(user_id, value)
- VALUES (?,?)
- """
- args = tuple(
- (
- user_id,
- "%s %s" % (user_id, p.display_name) if p.display_name else user_id,
- )
- for user_id, p in iteritems(users_with_profile)
- )
- else:
- # This should be unreachable.
- raise Exception("Unrecognized database engine")
-
- def _add_profiles_to_user_dir_txn(txn):
- txn.executemany(sql, args)
- self._simple_insert_many_txn(
- txn,
- table="user_directory",
- values=[
- {
- "user_id": user_id,
- "room_id": None,
- "display_name": profile.display_name,
- "avatar_url": profile.avatar_url,
- }
- for user_id, profile in iteritems(users_with_profile)
- ],
- )
- for user_id in users_with_profile:
- txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
-
- return self.runInteraction(
- "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
- )
-
- @defer.inlineCallbacks
- def update_user_in_user_dir(self, user_id, room_id):
- yield self._simple_update_one(
- table="user_directory",
- keyvalues={"user_id": user_id},
- updatevalues={"room_id": room_id},
- desc="update_user_in_user_dir",
- )
- self.get_user_in_directory.invalidate((user_id,))
-
- def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id):
def _update_profile_in_user_dir_txn(txn):
new_entry = self._simple_upsert_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
- insertion_values={"room_id": room_id},
values={"display_name": display_name, "avatar_url": avatar_url},
lock=False, # We're only inserter
)
@@ -282,18 +476,6 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue(user_ids)
@defer.inlineCallbacks
- def get_all_rooms(self):
- """Get all room_ids we've ever known about, in ascending order of "size"
- """
- sql = """
- SELECT room_id FROM current_state_events
- GROUP BY room_id
- ORDER BY count(*) ASC
- """
- rows = yield self._execute("get_all_rooms", None, sql)
- defer.returnValue([room_id for room_id, in rows])
-
- @defer.inlineCallbacks
def get_all_local_users(self):
"""Get all local users
"""
@@ -553,8 +735,8 @@ class UserDirectoryStore(SQLBaseStore):
"""
if self.hs.config.user_directory_search_all_users:
- join_args = ()
- where_clause = "1=1"
+ join_args = (user_id,)
+ where_clause = "user_id != ?"
else:
join_args = (user_id,)
where_clause = """
|