diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index f4451e5dfb..581c078bb2 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,12 +14,12 @@
# limitations under the License.
import logging
-
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
+from synapse.util.async import sleep
logger = logging.getLogger(__name__)
@@ -41,12 +41,15 @@ class UserDirectoyHandler(object):
one public room.
"""
+ INITIAL_SLEEP_MS = 50
+
def __init__(self, hs):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
self.notifier.add_replication_callback(self.notify_new_event)
@@ -55,6 +58,9 @@ class UserDirectoyHandler(object):
self.initially_handled_users = set()
self.initially_handled_users_in_public = set()
+ self.initially_handled_users_share = set()
+ self.initially_handled_users_share_private_room = set()
+
# The current position in the current_state_delta stream
self.pos = None
@@ -140,10 +146,14 @@ class UserDirectoyHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
yield self._handle_intial_room(room_id)
num_processed_rooms += 1
+ yield sleep(self.INITIAL_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
self.initially_handled_users = None
+ self.initially_handled_users_in_public = None
+ self.initially_handled_users_share = None
+ self.initially_handled_users_share_private_room = None
yield self.store.update_user_directory_stream_pos(new_pos)
@@ -158,7 +168,8 @@ class UserDirectoyHandler(object):
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
- unhandled_users = set(users_with_profile) - self.initially_handled_users
+ user_ids = set(users_with_profile)
+ unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
room_id, {
@@ -175,6 +186,69 @@ class UserDirectoyHandler(object):
)
self.initially_handled_users_in_public != unhandled_users
+ # We now go and figure out the new users who share rooms with user entries
+ # We sleep aggressively here as otherwise it can starve resources.
+ # We also batch up inserts/updates, but try to avoid too many at once.
+ to_insert = set()
+ to_update = set()
+ count = 0
+ for user_id in user_ids:
+ if count % 100 == 0:
+ yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+
+ if not self.is_mine_id(user_id):
+ count += 1
+ continue
+
+ for other_user_id in user_ids:
+ if user_id == other_user_id:
+ continue
+
+ if count % 100 == 0:
+ yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+ count += 1
+
+ user_set = (user_id, other_user_id)
+
+ if user_set in self.initially_handled_users_share_private_room:
+ continue
+
+ if user_set in self.initially_handled_users_share:
+ if is_public:
+ continue
+ to_update.add(user_set)
+ else:
+ to_insert.add(user_set)
+
+ if is_public:
+ self.initially_handled_users_share.add(user_set)
+ else:
+ self.initially_handled_users_share_private_room.add(user_set)
+
+ if len(to_insert) > 100:
+ yield self.store.add_users_who_share_room(
+ room_id, not is_public, to_insert,
+ )
+ to_insert.clear()
+
+ if len(to_update) > 100:
+ yield self.store.update_users_who_share_room(
+ room_id, not is_public, to_update,
+ )
+ to_update.clear()
+
+ if to_insert:
+ yield self.store.add_users_who_share_room(
+ room_id, not is_public, to_insert,
+ )
+ to_insert.clear()
+
+ if to_update:
+ yield self.store.update_users_who_share_room(
+ room_id, not is_public, to_update,
+ )
+ to_update.clear()
+
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 137aca2881..0123e28f99 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -16,16 +16,19 @@
from twisted.internet import defer
from ._base import SQLBaseStore
+
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id
import re
+import logging
+logger = logging.getLogger(__name__)
-class UserDirectoryStore(SQLBaseStore):
+class UserDirectoryStore(SQLBaseStore):
@cachedInlineCallbacks(cache_context=True)
def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context):
"""Check if the room is either world_readable or publically joinable
@@ -281,14 +284,118 @@ class UserDirectoryStore(SQLBaseStore):
desc="get_users_in_dir_due_to_room",
)
+ @defer.inlineCallbacks
def get_all_rooms(self):
- """Get all room_ids we've ever known about
+ """Get all room_ids we've ever known about, in ascending order of "size"
"""
- return self._simple_select_onecol(
- table="current_state_events",
- keyvalues={},
- retcol="DISTINCT room_id",
- desc="get_all_rooms",
+ sql = """
+ SELECT room_id FROM current_state_events
+ GROUP BY room_id
+ ORDER BY count(*) ASC
+ """
+ rows = yield self._execute("get_all_rooms", None, sql)
+ defer.returnValue([room_id for room_id, in rows])
+
+ def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
+ """Insert entries into the users_who_share_rooms table. The first
+ user should be a local user.
+
+ Args:
+ room_id (str)
+ share_private (bool): Is the room private
+ user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+ """
+ def _add_users_who_share_room_txn(txn):
+ self._simple_insert_many_txn(
+ txn,
+ table="users_who_share_rooms",
+ values=[
+ {
+ "user_id": user_id,
+ "other_user_id": other_user_id,
+ "room_id": room_id,
+ "share_private": share_private,
+ }
+ for user_id, other_user_id in user_id_tuples
+ ],
+ )
+ for user_id, other_user_id in user_id_tuples:
+ txn.call_after(
+ self.get_users_who_share_room_from_dir.invalidate,
+ (user_id,),
+ )
+ txn.call_after(
+ self.get_if_users_share_a_room.invalidate,
+ (user_id, other_user_id),
+ )
+ return self.runInteraction(
+ "add_users_who_share_room", _add_users_who_share_room_txn
+ )
+
+ def update_users_who_share_room(self, room_id, share_private, user_id_sets):
+ """Updates entries in the users_who_share_rooms table. The first
+ user should be a local user.
+
+ Args:
+ room_id (str)
+ share_private (bool): Is the room private
+ user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+ """
+ def _update_users_who_share_room_txn(txn):
+ sql = """
+ UPDATE users_who_share_rooms
+ SET room_id = ?, share_private = ?
+ WHERE user_id = ? AND other_user_id = ?
+ """
+ txn.executemany(
+ sql,
+ (
+ (room_id, share_private, uid, oid)
+ for uid, oid in user_id_sets
+ )
+ )
+ for user_id, other_user_id in user_id_sets:
+ txn.call_after(
+ self.get_users_who_share_room_from_dir.invalidate,
+ (user_id,),
+ )
+ txn.call_after(
+ self.get_if_users_share_a_room.invalidate,
+ (user_id, other_user_id),
+ )
+ return self.runInteraction(
+ "update_users_who_share_room", _update_users_who_share_room_txn
+ )
+
+ def remove_user_who_share_room(self, user_id, other_user_id):
+ """Deletes entries in the users_who_share_rooms table. The first
+ user should be a local user.
+
+ Args:
+ room_id (str)
+ share_private (bool): Is the room private
+ user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+ """
+ def _remove_user_who_share_room_txn(txn):
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_rooms",
+ keyvalues={
+ "user_id": user_id,
+ "other_user_id": other_user_id,
+ },
+ )
+ txn.call_after(
+ self.get_users_who_share_room_from_dir.invalidate,
+ (user_id,),
+ )
+ txn.call_after(
+ self.get_if_users_share_a_room.invalidate,
+ (user_id, other_user_id),
+ )
+
+ return self.runInteraction(
+ "remove_user_who_share_room", _remove_user_who_share_room_txn
)
def delete_all_from_user_dir(self):
@@ -298,8 +405,11 @@ class UserDirectoryStore(SQLBaseStore):
txn.execute("DELETE FROM user_directory")
txn.execute("DELETE FROM user_directory_search")
txn.execute("DELETE FROM users_in_pubic_room")
+ txn.execute("DELETE FROM users_who_share_rooms")
txn.call_after(self.get_user_in_directory.invalidate_all)
txn.call_after(self.get_user_in_public_room.invalidate_all)
+ txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
+ txn.call_after(self.get_if_users_share_a_room.invalidate_all)
return self.runInteraction(
"delete_all_from_user_dir", _delete_all_from_user_dir_txn
)
|