diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index fc45123d0c..20a026e776 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -60,14 +60,16 @@ class UserDirectoryHandler(object):
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
+ # If we're a worker, don't sleep when doing the initial room work, as it
+ # won't monopolise the master's CPU.
+ if hs.config.worker_app:
+ self.INITIAL_ROOM_SLEEP_MS = 0
+ self.INITIAL_USER_SLEEP_MS = 0
+
# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
self.initially_handled_users = set()
- self.register_background_update_handler(
- "users_in_public_rooms_initial", self._populate_users_in_public_rooms
- )
-
# The current position in the current_state_delta stream
self.pos = None
@@ -81,41 +83,6 @@ class UserDirectoryHandler(object):
# we start populating the user directory
self.clock.call_later(0, self.notify_new_event)
- @defer.inlineCallbacks
- def _populate_users_in_public_rooms(self, progress, batch_size):
- """
- Populate the users_in_public_rooms table with the contents of the
- users_who_share_public_rooms table.
- """
-
- def _fetch(txn):
- sql = "SELECT DISTINCT other_user_id FROM users_who_share_public_rooms"
- txn.execute(sql)
- return txn.fetchall()
-
- users = yield self.store.runInteraction(
- "populate_users_in_public_rooms_fetch", _fetch
- )
-
- if users:
-
- def _fill(txn):
- self._simple_upsert_many_txn(
- txn,
- table="users_in_public_rooms",
- key_names=["user_id"],
- key_values=users,
- value_names=(),
- value_values=None,
- )
-
- users = yield self.store.runInteraction(
- "populate_users_in_public_rooms_fill", _fill
- )
-
- yield self._end_background_update("users_in_public_rooms_initial")
- defer.returnValue(1)
-
def search_users(self, user_id, search_term, limit):
"""Searches for users in directory
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a0333d5309..7e3903859b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -767,18 +767,25 @@ class SQLBaseStore(object):
"""
allvalues = {}
allvalues.update(keyvalues)
- allvalues.update(values)
allvalues.update(insertion_values)
+ if not values:
+ latter = "NOTHING"
+ else:
+ allvalues.update(values)
+ latter = (
+ "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
+ )
+
sql = (
"INSERT INTO %s (%s) VALUES (%s) "
- "ON CONFLICT (%s) DO UPDATE SET %s"
+ "ON CONFLICT (%s) DO %s"
) % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),
", ".join(k for k in keyvalues),
- ", ".join(k + "=EXCLUDED." + k for k in values),
+ latter
)
txn.execute(sql, list(allvalues.values()))
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 8f40277b50..a15366a117 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -22,16 +22,57 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.state import StateFilter
from synapse.types import get_domain_from_id, get_localpart_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from ._base import SQLBaseStore
-
logger = logging.getLogger(__name__)
-class UserDirectoryStore(SQLBaseStore):
+class UserDirectoryStore(BackgroundUpdateStore):
+ def __init__(self, dbconn, hs):
+ super(UserDirectoryStore, self).__init__(dbconn, hs)
+
+ self.register_background_update_handler(
+ "users_in_public_rooms_initial", self._populate_users_in_public_rooms
+ )
+
+
+ @defer.inlineCallbacks
+ def _populate_users_in_public_rooms(self, progress, batch_size):
+ """
+ Populate the users_in_public_rooms table with the contents of the
+ users_who_share_public_rooms table.
+ """
+
+ def _fetch(txn):
+ sql = "SELECT DISTINCT other_user_id FROM users_who_share_public_rooms"
+ txn.execute(sql)
+ return txn.fetchall()
+
+ users = yield self.runInteraction(
+ "populate_users_in_public_rooms_fetch", _fetch
+ )
+
+ if users:
+ def _fill(txn):
+ self._simple_upsert_many_txn(
+ txn,
+ table="users_in_public_rooms",
+ key_names=["user_id"],
+ key_values=users,
+ value_names=(),
+ value_values=None,
+ )
+
+ users = yield self.runInteraction(
+ "populate_users_in_public_rooms_fill", _fill
+ )
+
+ yield self._end_background_update("users_in_public_rooms_initial")
+ defer.returnValue(1)
+
@defer.inlineCallbacks
def is_room_world_readable_or_publicly_joinable(self, room_id):
"""Check if the room is either world_readable or publically joinable
@@ -353,8 +394,7 @@ class UserDirectoryStore(SQLBaseStore):
txn,
"users_in_public_rooms",
keyvalues={"user_id": user_id},
- values={},
- desc="add_user_as_in_public_room",
+ values=None,
)
for user_id, other_user_id in user_id_tuples:
@@ -603,7 +643,7 @@ class UserDirectoryStore(SQLBaseStore):
else:
join_clause = """
LEFT JOIN (
- SELECT other_user_id AS user_id FROM users_who_share_public_rooms
+ SELECT user_id FROM users_in_public_rooms
UNION
SELECT other_user_id AS user_id FROM users_who_share_private_rooms
WHERE user_id = ?
|