summary refs log tree commit diff
path: root/synapse/handlers/user_directory.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-06-15 09:59:04 +0100
committerErik Johnston <erik@matrix.org>2017-06-15 09:59:04 +0100
commit72613bc3798d34a7bf93defd6624b84669078e2a (patch)
tree8aba738f65014a0c6bc56459f719e6f86cc18d59 /synapse/handlers/user_directory.py
parentAdd DB schema for tracking users who share rooms (diff)
downloadsynapse-72613bc3798d34a7bf93defd6624b84669078e2a.tar.xz
Implement initial population of users who share rooms table
Diffstat (limited to 'synapse/handlers/user_directory.py')
-rw-r--r--synapse/handlers/user_directory.py78
1 files changed, 76 insertions, 2 deletions
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index f4451e5dfb..581c078bb2 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,12 +14,12 @@
 # limitations under the License.
 
 import logging
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.storage.roommember import ProfileInfo
 from synapse.util.metrics import Measure
+from synapse.util.async import sleep
 
 
 logger = logging.getLogger(__name__)
@@ -41,12 +41,15 @@ class UserDirectoyHandler(object):
     one public room.
     """
 
+    INITIAL_SLEEP_MS = 50
+
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
 
         self.notifier.add_replication_callback(self.notify_new_event)
 
@@ -55,6 +58,9 @@ class UserDirectoyHandler(object):
         self.initially_handled_users = set()
         self.initially_handled_users_in_public = set()
 
+        self.initially_handled_users_share = set()
+        self.initially_handled_users_share_private_room = set()
+
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -140,10 +146,14 @@ class UserDirectoyHandler(object):
             logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
             yield self._handle_intial_room(room_id)
             num_processed_rooms += 1
+            yield sleep(self.INITIAL_SLEEP_MS / 1000.)
 
         logger.info("Processed all rooms.")
 
         self.initially_handled_users = None
+        self.initially_handled_users_in_public = None
+        self.initially_handled_users_share = None
+        self.initially_handled_users_share_private_room = None
 
         yield self.store.update_user_directory_stream_pos(new_pos)
 
@@ -158,7 +168,8 @@ class UserDirectoyHandler(object):
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
 
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
-        unhandled_users = set(users_with_profile) - self.initially_handled_users
+        user_ids = set(users_with_profile)
+        unhandled_users = user_ids - self.initially_handled_users
 
         yield self.store.add_profiles_to_user_dir(
             room_id, {
@@ -175,6 +186,69 @@ class UserDirectoyHandler(object):
             )
             self.initially_handled_users_in_public != unhandled_users
 
+        # We now go and figure out the new users who share rooms with user entries
+        # We sleep aggressively here as otherwise it can starve resources.
+        # We also batch up inserts/updates, but try to avoid too many at once.
+        to_insert = set()
+        to_update = set()
+        count = 0
+        for user_id in user_ids:
+            if count % 100 == 0:
+                yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+
+            if not self.is_mine_id(user_id):
+                count += 1
+                continue
+
+            for other_user_id in user_ids:
+                if user_id == other_user_id:
+                    continue
+
+                if count % 100 == 0:
+                    yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+                count += 1
+
+                user_set = (user_id, other_user_id)
+
+                if user_set in self.initially_handled_users_share_private_room:
+                    continue
+
+                if user_set in self.initially_handled_users_share:
+                    if is_public:
+                        continue
+                    to_update.add(user_set)
+                else:
+                    to_insert.add(user_set)
+
+                if is_public:
+                    self.initially_handled_users_share.add(user_set)
+                else:
+                    self.initially_handled_users_share_private_room.add(user_set)
+
+                if len(to_insert) > 100:
+                    yield self.store.add_users_who_share_room(
+                        room_id, not is_public, to_insert,
+                    )
+                    to_insert.clear()
+
+                if len(to_update) > 100:
+                    yield self.store.update_users_who_share_room(
+                        room_id, not is_public, to_update,
+                    )
+                    to_update.clear()
+
+        if to_insert:
+            yield self.store.add_users_who_share_room(
+                room_id, not is_public, to_insert,
+            )
+            to_insert.clear()
+
+        if to_update:
+            yield self.store.update_users_who_share_room(
+                room_id, not is_public, to_update,
+            )
+            to_update.clear()
+
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
         """Called with the state deltas to process