summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-06-15 09:59:04 +0100
committerErik Johnston <erik@matrix.org>2017-06-15 09:59:04 +0100
commit72613bc3798d34a7bf93defd6624b84669078e2a (patch)
tree8aba738f65014a0c6bc56459f719e6f86cc18d59
parentAdd DB schema for tracking users who share rooms (diff)
downloadsynapse-72613bc3798d34a7bf93defd6624b84669078e2a.tar.xz
Implement initial population of users who share rooms table
Diffstat (limited to '')
-rw-r--r--synapse/handlers/user_directory.py78
-rw-r--r--synapse/storage/user_directory.py124
2 files changed, 193 insertions, 9 deletions
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index f4451e5dfb..581c078bb2 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,12 +14,12 @@
 # limitations under the License.
 
 import logging
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.storage.roommember import ProfileInfo
 from synapse.util.metrics import Measure
+from synapse.util.async import sleep
 
 
 logger = logging.getLogger(__name__)
@@ -41,12 +41,15 @@ class UserDirectoyHandler(object):
     one public room.
     """
 
+    INITIAL_SLEEP_MS = 50
+
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
 
         self.notifier.add_replication_callback(self.notify_new_event)
 
@@ -55,6 +58,9 @@ class UserDirectoyHandler(object):
         self.initially_handled_users = set()
         self.initially_handled_users_in_public = set()
 
+        self.initially_handled_users_share = set()
+        self.initially_handled_users_share_private_room = set()
+
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -140,10 +146,14 @@ class UserDirectoyHandler(object):
             logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
             yield self._handle_intial_room(room_id)
             num_processed_rooms += 1
+            yield sleep(self.INITIAL_SLEEP_MS / 1000.)
 
         logger.info("Processed all rooms.")
 
         self.initially_handled_users = None
+        self.initially_handled_users_in_public = None
+        self.initially_handled_users_share = None
+        self.initially_handled_users_share_private_room = None
 
         yield self.store.update_user_directory_stream_pos(new_pos)
 
@@ -158,7 +168,8 @@ class UserDirectoyHandler(object):
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
 
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
-        unhandled_users = set(users_with_profile) - self.initially_handled_users
+        user_ids = set(users_with_profile)
+        unhandled_users = user_ids - self.initially_handled_users
 
         yield self.store.add_profiles_to_user_dir(
             room_id, {
@@ -175,6 +186,69 @@ class UserDirectoyHandler(object):
             )
             self.initially_handled_users_in_public != unhandled_users
 
+        # We now go and figure out the new users who share rooms with user entries
+        # We sleep aggressively here as otherwise it can starve resources.
+        # We also batch up inserts/updates, but try to avoid too many at once.
+        to_insert = set()
+        to_update = set()
+        count = 0
+        for user_id in user_ids:
+            if count % 100 == 0:
+                yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+
+            if not self.is_mine_id(user_id):
+                count += 1
+                continue
+
+            for other_user_id in user_ids:
+                if user_id == other_user_id:
+                    continue
+
+                if count % 100 == 0:
+                    yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+                count += 1
+
+                user_set = (user_id, other_user_id)
+
+                if user_set in self.initially_handled_users_share_private_room:
+                    continue
+
+                if user_set in self.initially_handled_users_share:
+                    if is_public:
+                        continue
+                    to_update.add(user_set)
+                else:
+                    to_insert.add(user_set)
+
+                if is_public:
+                    self.initially_handled_users_share.add(user_set)
+                else:
+                    self.initially_handled_users_share_private_room.add(user_set)
+
+                if len(to_insert) > 100:
+                    yield self.store.add_users_who_share_room(
+                        room_id, not is_public, to_insert,
+                    )
+                    to_insert.clear()
+
+                if len(to_update) > 100:
+                    yield self.store.update_users_who_share_room(
+                        room_id, not is_public, to_update,
+                    )
+                    to_update.clear()
+
+        if to_insert:
+            yield self.store.add_users_who_share_room(
+                room_id, not is_public, to_insert,
+            )
+            to_insert.clear()
+
+        if to_update:
+            yield self.store.update_users_who_share_room(
+                room_id, not is_public, to_update,
+            )
+            to_update.clear()
+
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
         """Called with the state deltas to process
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 137aca2881..0123e28f99 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -16,16 +16,19 @@
 from twisted.internet import defer
 
 from ._base import SQLBaseStore
+
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id, get_localpart_from_id
 
 import re
+import logging
 
+logger = logging.getLogger(__name__)
 
-class UserDirectoryStore(SQLBaseStore):
 
+class UserDirectoryStore(SQLBaseStore):
     @cachedInlineCallbacks(cache_context=True)
     def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context):
         """Check if the room is either world_readable or publically joinable
@@ -281,14 +284,118 @@ class UserDirectoryStore(SQLBaseStore):
             desc="get_users_in_dir_due_to_room",
         )
 
+    @defer.inlineCallbacks
     def get_all_rooms(self):
-        """Get all room_ids we've ever known about
+        """Get all room_ids we've ever known about, in ascending order of "size"
         """
-        return self._simple_select_onecol(
-            table="current_state_events",
-            keyvalues={},
-            retcol="DISTINCT room_id",
-            desc="get_all_rooms",
+        sql = """
+            SELECT room_id FROM current_state_events
+            GROUP BY room_id
+            ORDER BY count(*) ASC
+        """
+        rows = yield self._execute("get_all_rooms", None, sql)
+        defer.returnValue([room_id for room_id, in rows])
+
+    def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
+        """Insert entries into the users_who_share_rooms table. The first
+        user should be a local user.
+
+        Args:
+            room_id (str)
+            share_private (bool): Is the room private
+            user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+        """
+        def _add_users_who_share_room_txn(txn):
+            self._simple_insert_many_txn(
+                txn,
+                table="users_who_share_rooms",
+                values=[
+                    {
+                        "user_id": user_id,
+                        "other_user_id": other_user_id,
+                        "room_id": room_id,
+                        "share_private": share_private,
+                    }
+                    for user_id, other_user_id in user_id_tuples
+                ],
+            )
+            for user_id, other_user_id in user_id_tuples:
+                txn.call_after(
+                    self.get_users_who_share_room_from_dir.invalidate,
+                    (user_id,),
+                )
+                txn.call_after(
+                    self.get_if_users_share_a_room.invalidate,
+                    (user_id, other_user_id),
+                )
+        return self.runInteraction(
+            "add_users_who_share_room", _add_users_who_share_room_txn
+        )
+
+    def update_users_who_share_room(self, room_id, share_private, user_id_sets):
+        """Updates entries in the users_who_share_rooms table. The first
+        user should be a local user.
+
+        Args:
+            room_id (str)
+            share_private (bool): Is the room private
+            user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+        """
+        def _update_users_who_share_room_txn(txn):
+            sql = """
+                UPDATE users_who_share_rooms
+                SET room_id = ?, share_private = ?
+                WHERE user_id = ? AND other_user_id = ?
+            """
+            txn.executemany(
+                sql,
+                (
+                    (room_id, share_private, uid, oid)
+                    for uid, oid in user_id_sets
+                )
+            )
+            for user_id, other_user_id in user_id_sets:
+                txn.call_after(
+                    self.get_users_who_share_room_from_dir.invalidate,
+                    (user_id,),
+                )
+                txn.call_after(
+                    self.get_if_users_share_a_room.invalidate,
+                    (user_id, other_user_id),
+                )
+        return self.runInteraction(
+            "update_users_who_share_room", _update_users_who_share_room_txn
+        )
+
+    def remove_user_who_share_room(self, user_id, other_user_id):
+        """Deletes entries in the users_who_share_rooms table. The first
+        user should be a local user.
+
+        Args:
+            room_id (str)
+            share_private (bool): Is the room private
+            user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+        """
+        def _remove_user_who_share_room_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="users_who_share_rooms",
+                keyvalues={
+                    "user_id": user_id,
+                    "other_user_id": other_user_id,
+                },
+            )
+            txn.call_after(
+                self.get_users_who_share_room_from_dir.invalidate,
+                (user_id,),
+            )
+            txn.call_after(
+                self.get_if_users_share_a_room.invalidate,
+                (user_id, other_user_id),
+            )
+
+        return self.runInteraction(
+            "remove_user_who_share_room", _remove_user_who_share_room_txn
         )
 
     def delete_all_from_user_dir(self):
@@ -298,8 +405,11 @@ class UserDirectoryStore(SQLBaseStore):
             txn.execute("DELETE FROM user_directory")
             txn.execute("DELETE FROM user_directory_search")
             txn.execute("DELETE FROM users_in_pubic_room")
+            txn.execute("DELETE FROM users_who_share_rooms")
             txn.call_after(self.get_user_in_directory.invalidate_all)
             txn.call_after(self.get_user_in_public_room.invalidate_all)
+            txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
+            txn.call_after(self.get_if_users_share_a_room.invalidate_all)
         return self.runInteraction(
             "delete_all_from_user_dir", _delete_all_from_user_dir_txn
         )