diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/user_directory.py | 78 | ||||
-rw-r--r-- | synapse/storage/user_directory.py | 124 |
2 files changed, 193 insertions, 9 deletions
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index f4451e5dfb..581c078bb2 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -14,12 +14,12 @@ # limitations under the License. import logging - from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure +from synapse.util.async import sleep logger = logging.getLogger(__name__) @@ -41,12 +41,15 @@ class UserDirectoyHandler(object): one public room. """ + INITIAL_SLEEP_MS = 50 + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.notifier.add_replication_callback(self.notify_new_event) @@ -55,6 +58,9 @@ class UserDirectoyHandler(object): self.initially_handled_users = set() self.initially_handled_users_in_public = set() + self.initially_handled_users_share = set() + self.initially_handled_users_share_private_room = set() + # The current position in the current_state_delta stream self.pos = None @@ -140,10 +146,14 @@ class UserDirectoyHandler(object): logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) yield self._handle_intial_room(room_id) num_processed_rooms += 1 + yield sleep(self.INITIAL_SLEEP_MS / 1000.) logger.info("Processed all rooms.") self.initially_handled_users = None + self.initially_handled_users_in_public = None + self.initially_handled_users_share = None + self.initially_handled_users_share_private_room = None yield self.store.update_user_directory_stream_pos(new_pos) @@ -158,7 +168,8 @@ class UserDirectoyHandler(object): is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users + user_ids = set(users_with_profile) + unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( room_id, { @@ -175,6 +186,69 @@ class UserDirectoyHandler(object): ) self.initially_handled_users_in_public != unhandled_users + # We now go and figure out the new users who share rooms with user entries + # We sleep aggressively here as otherwise it can starve resources. + # We also batch up inserts/updates, but try to avoid too many at once. + to_insert = set() + to_update = set() + count = 0 + for user_id in user_ids: + if count % 100 == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + if not self.is_mine_id(user_id): + count += 1 + continue + + for other_user_id in user_ids: + if user_id == other_user_id: + continue + + if count % 100 == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + count += 1 + + user_set = (user_id, other_user_id) + + if user_set in self.initially_handled_users_share_private_room: + continue + + if user_set in self.initially_handled_users_share: + if is_public: + continue + to_update.add(user_set) + else: + to_insert.add(user_set) + + if is_public: + self.initially_handled_users_share.add(user_set) + else: + self.initially_handled_users_share_private_room.add(user_set) + + if len(to_insert) > 100: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if len(to_update) > 100: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 137aca2881..0123e28f99 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -16,16 +16,19 @@ from twisted.internet import defer from ._base import SQLBaseStore + from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id import re +import logging +logger = logging.getLogger(__name__) -class UserDirectoryStore(SQLBaseStore): +class UserDirectoryStore(SQLBaseStore): @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): """Check if the room is either world_readable or publically joinable @@ -281,14 +284,118 @@ class UserDirectoryStore(SQLBaseStore): desc="get_users_in_dir_due_to_room", ) + @defer.inlineCallbacks def get_all_rooms(self): - """Get all room_ids we've ever known about + """Get all room_ids we've ever known about, in ascending order of "size" """ - return self._simple_select_onecol( - table="current_state_events", - keyvalues={}, - retcol="DISTINCT room_id", - desc="get_all_rooms", + sql = """ + SELECT room_id FROM current_state_events + GROUP BY room_id + ORDER BY count(*) ASC + """ + rows = yield self._execute("get_all_rooms", None, sql) + defer.returnValue([room_id for room_id, in rows]) + + def add_users_who_share_room(self, room_id, share_private, user_id_tuples): + """Insert entries into the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _add_users_who_share_room_txn(txn): + self._simple_insert_many_txn( + txn, + table="users_who_share_rooms", + values=[ + { + "user_id": user_id, + "other_user_id": other_user_id, + "room_id": room_id, + "share_private": share_private, + } + for user_id, other_user_id in user_id_tuples + ], + ) + for user_id, other_user_id in user_id_tuples: + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + return self.runInteraction( + "add_users_who_share_room", _add_users_who_share_room_txn + ) + + def update_users_who_share_room(self, room_id, share_private, user_id_sets): + """Updates entries in the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _update_users_who_share_room_txn(txn): + sql = """ + UPDATE users_who_share_rooms + SET room_id = ?, share_private = ? + WHERE user_id = ? AND other_user_id = ? + """ + txn.executemany( + sql, + ( + (room_id, share_private, uid, oid) + for uid, oid in user_id_sets + ) + ) + for user_id, other_user_id in user_id_sets: + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + return self.runInteraction( + "update_users_who_share_room", _update_users_who_share_room_txn + ) + + def remove_user_who_share_room(self, user_id, other_user_id): + """Deletes entries in the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _remove_user_who_share_room_txn(txn): + self._simple_delete_txn( + txn, + table="users_who_share_rooms", + keyvalues={ + "user_id": user_id, + "other_user_id": other_user_id, + }, + ) + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + + return self.runInteraction( + "remove_user_who_share_room", _remove_user_who_share_room_txn ) def delete_all_from_user_dir(self): @@ -298,8 +405,11 @@ class UserDirectoryStore(SQLBaseStore): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") txn.execute("DELETE FROM users_in_pubic_room") + txn.execute("DELETE FROM users_who_share_rooms") txn.call_after(self.get_user_in_directory.invalidate_all) txn.call_after(self.get_user_in_public_room.invalidate_all) + txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all) + txn.call_after(self.get_if_users_share_a_room.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn ) |