summary refs log tree commit diff
path: root/synapse/storage/data_stores
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-02-07 11:14:19 +0000
committerGitHub <noreply@github.com>2020-02-07 11:14:19 +0000
commitde2d267375069c2d22bceb0d6ef9c6f5a77380e3 (patch)
tree12636dca8ea90ea5369207b7aea134521a1fc8f5 /synapse/storage/data_stores
parentAdmin api to add an email address (#6789) (diff)
downloadsynapse-de2d267375069c2d22bceb0d6ef9c6f5a77380e3.tar.xz
Allow moving group read APIs to workers (#6866)
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r--synapse/storage/data_stores/main/group_server.py720
1 files changed, 361 insertions, 359 deletions
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index 6acd45e9f3..0963e6c250 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -27,21 +27,7 @@ _DEFAULT_CATEGORY_ID = ""
 _DEFAULT_ROLE_ID = ""
 
 
-class GroupServerStore(SQLBaseStore):
-    def set_group_join_policy(self, group_id, join_policy):
-        """Set the join policy of a group.
-
-        join_policy can be one of:
-         * "invite"
-         * "open"
-        """
-        return self.db.simple_update_one(
-            table="groups",
-            keyvalues={"group_id": group_id},
-            updatevalues={"join_policy": join_policy},
-            desc="set_group_join_policy",
-        )
-
+class GroupServerWorkerStore(SQLBaseStore):
     def get_group(self, group_id):
         return self.db.simple_select_one(
             table="groups",
@@ -157,6 +143,366 @@ class GroupServerStore(SQLBaseStore):
             "get_rooms_for_summary", _get_rooms_for_summary_txn
         )
 
+    @defer.inlineCallbacks
+    def get_group_categories(self, group_id):
+        rows = yield self.db.simple_select_list(
+            table="group_room_categories",
+            keyvalues={"group_id": group_id},
+            retcols=("category_id", "is_public", "profile"),
+            desc="get_group_categories",
+        )
+
+        return {
+            row["category_id"]: {
+                "is_public": row["is_public"],
+                "profile": json.loads(row["profile"]),
+            }
+            for row in rows
+        }
+
+    @defer.inlineCallbacks
+    def get_group_category(self, group_id, category_id):
+        category = yield self.db.simple_select_one(
+            table="group_room_categories",
+            keyvalues={"group_id": group_id, "category_id": category_id},
+            retcols=("is_public", "profile"),
+            desc="get_group_category",
+        )
+
+        category["profile"] = json.loads(category["profile"])
+
+        return category
+
+    @defer.inlineCallbacks
+    def get_group_roles(self, group_id):
+        rows = yield self.db.simple_select_list(
+            table="group_roles",
+            keyvalues={"group_id": group_id},
+            retcols=("role_id", "is_public", "profile"),
+            desc="get_group_roles",
+        )
+
+        return {
+            row["role_id"]: {
+                "is_public": row["is_public"],
+                "profile": json.loads(row["profile"]),
+            }
+            for row in rows
+        }
+
+    @defer.inlineCallbacks
+    def get_group_role(self, group_id, role_id):
+        role = yield self.db.simple_select_one(
+            table="group_roles",
+            keyvalues={"group_id": group_id, "role_id": role_id},
+            retcols=("is_public", "profile"),
+            desc="get_group_role",
+        )
+
+        role["profile"] = json.loads(role["profile"])
+
+        return role
+
+    def get_local_groups_for_room(self, room_id):
+        """Get all of the local group that contain a given room
+        Args:
+            room_id (str): The ID of a room
+        Returns:
+            Deferred[list[str]]: A twisted.Deferred containing a list of group ids
+                containing this room
+        """
+        return self.db.simple_select_onecol(
+            table="group_rooms",
+            keyvalues={"room_id": room_id},
+            retcol="group_id",
+            desc="get_local_groups_for_room",
+        )
+
+    def get_users_for_summary_by_role(self, group_id, include_private=False):
+        """Get the users and roles that should be included in a summary request
+
+        Returns ([users], [roles])
+        """
+
+        def _get_users_for_summary_txn(txn):
+            keyvalues = {"group_id": group_id}
+            if not include_private:
+                keyvalues["is_public"] = True
+
+            sql = """
+                SELECT user_id, is_public, role_id, user_order
+                FROM group_summary_users
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            users = [
+                {
+                    "user_id": row[0],
+                    "is_public": row[1],
+                    "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
+                    "order": row[3],
+                }
+                for row in txn
+            ]
+
+            sql = """
+                SELECT role_id, is_public, profile, role_order
+                FROM group_summary_roles
+                INNER JOIN group_roles USING (group_id, role_id)
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            roles = {
+                row[0]: {
+                    "is_public": row[1],
+                    "profile": json.loads(row[2]),
+                    "order": row[3],
+                }
+                for row in txn
+            }
+
+            return users, roles
+
+        return self.db.runInteraction(
+            "get_users_for_summary_by_role", _get_users_for_summary_txn
+        )
+
+    def is_user_in_group(self, user_id, group_id):
+        return self.db.simple_select_one_onecol(
+            table="group_users",
+            keyvalues={"group_id": group_id, "user_id": user_id},
+            retcol="user_id",
+            allow_none=True,
+            desc="is_user_in_group",
+        ).addCallback(lambda r: bool(r))
+
+    def is_user_admin_in_group(self, group_id, user_id):
+        return self.db.simple_select_one_onecol(
+            table="group_users",
+            keyvalues={"group_id": group_id, "user_id": user_id},
+            retcol="is_admin",
+            allow_none=True,
+            desc="is_user_admin_in_group",
+        )
+
+    def is_user_invited_to_local_group(self, group_id, user_id):
+        """Has the group server invited a user?
+        """
+        return self.db.simple_select_one_onecol(
+            table="group_invites",
+            keyvalues={"group_id": group_id, "user_id": user_id},
+            retcol="user_id",
+            desc="is_user_invited_to_local_group",
+            allow_none=True,
+        )
+
+    def get_users_membership_info_in_group(self, group_id, user_id):
+        """Get a dict describing the membership of a user in a group.
+
+        Example if joined:
+
+            {
+                "membership": "join",
+                "is_public": True,
+                "is_privileged": False,
+            }
+
+        Returns an empty dict if the user is not join/invite/etc
+        """
+
+        def _get_users_membership_in_group_txn(txn):
+            row = self.db.simple_select_one_txn(
+                txn,
+                table="group_users",
+                keyvalues={"group_id": group_id, "user_id": user_id},
+                retcols=("is_admin", "is_public"),
+                allow_none=True,
+            )
+
+            if row:
+                return {
+                    "membership": "join",
+                    "is_public": row["is_public"],
+                    "is_privileged": row["is_admin"],
+                }
+
+            row = self.db.simple_select_one_onecol_txn(
+                txn,
+                table="group_invites",
+                keyvalues={"group_id": group_id, "user_id": user_id},
+                retcol="user_id",
+                allow_none=True,
+            )
+
+            if row:
+                return {"membership": "invite"}
+
+            return {}
+
+        return self.db.runInteraction(
+            "get_users_membership_info_in_group", _get_users_membership_in_group_txn
+        )
+
+    def get_publicised_groups_for_user(self, user_id):
+        """Get all groups a user is publicising
+        """
+        return self.db.simple_select_onecol(
+            table="local_group_membership",
+            keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
+            retcol="group_id",
+            desc="get_publicised_groups_for_user",
+        )
+
+    def get_attestations_need_renewals(self, valid_until_ms):
+        """Get all attestations that need to be renewed until givent time
+        """
+
+        def _get_attestations_need_renewals_txn(txn):
+            sql = """
+                SELECT group_id, user_id FROM group_attestations_renewals
+                WHERE valid_until_ms <= ?
+            """
+            txn.execute(sql, (valid_until_ms,))
+            return self.db.cursor_to_dict(txn)
+
+        return self.db.runInteraction(
+            "get_attestations_need_renewals", _get_attestations_need_renewals_txn
+        )
+
+    @defer.inlineCallbacks
+    def get_remote_attestation(self, group_id, user_id):
+        """Get the attestation that proves the remote agrees that the user is
+        in the group.
+        """
+        row = yield self.db.simple_select_one(
+            table="group_attestations_remote",
+            keyvalues={"group_id": group_id, "user_id": user_id},
+            retcols=("valid_until_ms", "attestation_json"),
+            desc="get_remote_attestation",
+            allow_none=True,
+        )
+
+        now = int(self._clock.time_msec())
+        if row and now < row["valid_until_ms"]:
+            return json.loads(row["attestation_json"])
+
+        return None
+
+    def get_joined_groups(self, user_id):
+        return self.db.simple_select_onecol(
+            table="local_group_membership",
+            keyvalues={"user_id": user_id, "membership": "join"},
+            retcol="group_id",
+            desc="get_joined_groups",
+        )
+
+    def get_all_groups_for_user(self, user_id, now_token):
+        def _get_all_groups_for_user_txn(txn):
+            sql = """
+                SELECT group_id, type, membership, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND membership != 'leave'
+                    AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, now_token))
+            return [
+                {
+                    "group_id": row[0],
+                    "type": row[1],
+                    "membership": row[2],
+                    "content": json.loads(row[3]),
+                }
+                for row in txn
+            ]
+
+        return self.db.runInteraction(
+            "get_all_groups_for_user", _get_all_groups_for_user_txn
+        )
+
+    def get_groups_changes_for_user(self, user_id, from_token, to_token):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_entity_changed(
+            user_id, from_token
+        )
+        if not has_changed:
+            return defer.succeed([])
+
+        def _get_groups_changes_for_user_txn(txn):
+            sql = """
+                SELECT group_id, membership, type, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, from_token, to_token))
+            return [
+                {
+                    "group_id": group_id,
+                    "membership": membership,
+                    "type": gtype,
+                    "content": json.loads(content_json),
+                }
+                for group_id, membership, gtype, content_json in txn
+            ]
+
+        return self.db.runInteraction(
+            "get_groups_changes_for_user", _get_groups_changes_for_user_txn
+        )
+
+    def get_all_groups_changes(self, from_token, to_token, limit):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
+            from_token
+        )
+        if not has_changed:
+            return defer.succeed([])
+
+        def _get_all_groups_changes_txn(txn):
+            sql = """
+                SELECT stream_id, group_id, user_id, type, content
+                FROM local_group_updates
+                WHERE ? < stream_id AND stream_id <= ?
+                LIMIT ?
+            """
+            txn.execute(sql, (from_token, to_token, limit))
+            return [
+                (stream_id, group_id, user_id, gtype, json.loads(content_json))
+                for stream_id, group_id, user_id, gtype, content_json in txn
+            ]
+
+        return self.db.runInteraction(
+            "get_all_groups_changes", _get_all_groups_changes_txn
+        )
+
+
+class GroupServerStore(GroupServerWorkerStore):
+    def set_group_join_policy(self, group_id, join_policy):
+        """Set the join policy of a group.
+
+        join_policy can be one of:
+         * "invite"
+         * "open"
+        """
+        return self.db.simple_update_one(
+            table="groups",
+            keyvalues={"group_id": group_id},
+            updatevalues={"join_policy": join_policy},
+            desc="set_group_join_policy",
+        )
+
     def add_room_to_summary(self, group_id, room_id, category_id, order, is_public):
         return self.db.runInteraction(
             "add_room_to_summary",
@@ -299,36 +645,6 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_room_from_summary",
         )
 
-    @defer.inlineCallbacks
-    def get_group_categories(self, group_id):
-        rows = yield self.db.simple_select_list(
-            table="group_room_categories",
-            keyvalues={"group_id": group_id},
-            retcols=("category_id", "is_public", "profile"),
-            desc="get_group_categories",
-        )
-
-        return {
-            row["category_id"]: {
-                "is_public": row["is_public"],
-                "profile": json.loads(row["profile"]),
-            }
-            for row in rows
-        }
-
-    @defer.inlineCallbacks
-    def get_group_category(self, group_id, category_id):
-        category = yield self.db.simple_select_one(
-            table="group_room_categories",
-            keyvalues={"group_id": group_id, "category_id": category_id},
-            retcols=("is_public", "profile"),
-            desc="get_group_category",
-        )
-
-        category["profile"] = json.loads(category["profile"])
-
-        return category
-
     def upsert_group_category(self, group_id, category_id, profile, is_public):
         """Add/update room category for group
         """
@@ -360,36 +676,6 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_group_category",
         )
 
-    @defer.inlineCallbacks
-    def get_group_roles(self, group_id):
-        rows = yield self.db.simple_select_list(
-            table="group_roles",
-            keyvalues={"group_id": group_id},
-            retcols=("role_id", "is_public", "profile"),
-            desc="get_group_roles",
-        )
-
-        return {
-            row["role_id"]: {
-                "is_public": row["is_public"],
-                "profile": json.loads(row["profile"]),
-            }
-            for row in rows
-        }
-
-    @defer.inlineCallbacks
-    def get_group_role(self, group_id, role_id):
-        role = yield self.db.simple_select_one(
-            table="group_roles",
-            keyvalues={"group_id": group_id, "role_id": role_id},
-            retcols=("is_public", "profile"),
-            desc="get_group_role",
-        )
-
-        role["profile"] = json.loads(role["profile"])
-
-        return role
-
     def upsert_group_role(self, group_id, role_id, profile, is_public):
         """Add/remove user role
         """
@@ -555,100 +841,6 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_user_from_summary",
         )
 
-    def get_local_groups_for_room(self, room_id):
-        """Get all of the local group that contain a given room
-        Args:
-            room_id (str): The ID of a room
-        Returns:
-            Deferred[list[str]]: A twisted.Deferred containing a list of group ids
-                containing this room
-        """
-        return self.db.simple_select_onecol(
-            table="group_rooms",
-            keyvalues={"room_id": room_id},
-            retcol="group_id",
-            desc="get_local_groups_for_room",
-        )
-
-    def get_users_for_summary_by_role(self, group_id, include_private=False):
-        """Get the users and roles that should be included in a summary request
-
-        Returns ([users], [roles])
-        """
-
-        def _get_users_for_summary_txn(txn):
-            keyvalues = {"group_id": group_id}
-            if not include_private:
-                keyvalues["is_public"] = True
-
-            sql = """
-                SELECT user_id, is_public, role_id, user_order
-                FROM group_summary_users
-                WHERE group_id = ?
-            """
-
-            if not include_private:
-                sql += " AND is_public = ?"
-                txn.execute(sql, (group_id, True))
-            else:
-                txn.execute(sql, (group_id,))
-
-            users = [
-                {
-                    "user_id": row[0],
-                    "is_public": row[1],
-                    "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
-                    "order": row[3],
-                }
-                for row in txn
-            ]
-
-            sql = """
-                SELECT role_id, is_public, profile, role_order
-                FROM group_summary_roles
-                INNER JOIN group_roles USING (group_id, role_id)
-                WHERE group_id = ?
-            """
-
-            if not include_private:
-                sql += " AND is_public = ?"
-                txn.execute(sql, (group_id, True))
-            else:
-                txn.execute(sql, (group_id,))
-
-            roles = {
-                row[0]: {
-                    "is_public": row[1],
-                    "profile": json.loads(row[2]),
-                    "order": row[3],
-                }
-                for row in txn
-            }
-
-            return users, roles
-
-        return self.db.runInteraction(
-            "get_users_for_summary_by_role", _get_users_for_summary_txn
-        )
-
-    def is_user_in_group(self, user_id, group_id):
-        return self.db.simple_select_one_onecol(
-            table="group_users",
-            keyvalues={"group_id": group_id, "user_id": user_id},
-            retcol="user_id",
-            allow_none=True,
-            desc="is_user_in_group",
-        ).addCallback(lambda r: bool(r))
-
-    def is_user_admin_in_group(self, group_id, user_id):
-        return self.db.simple_select_one_onecol(
-            table="group_users",
-            keyvalues={"group_id": group_id, "user_id": user_id},
-            retcol="is_admin",
-            allow_none=True,
-            desc="is_user_admin_in_group",
-        )
-
     def add_group_invite(self, group_id, user_id):
         """Record that the group server has invited a user
         """
@@ -658,64 +850,6 @@ class GroupServerStore(SQLBaseStore):
             desc="add_group_invite",
         )
 
-    def is_user_invited_to_local_group(self, group_id, user_id):
-        """Has the group server invited a user?
-        """
-        return self.db.simple_select_one_onecol(
-            table="group_invites",
-            keyvalues={"group_id": group_id, "user_id": user_id},
-            retcol="user_id",
-            desc="is_user_invited_to_local_group",
-            allow_none=True,
-        )
-
-    def get_users_membership_info_in_group(self, group_id, user_id):
-        """Get a dict describing the membership of a user in a group.
-
-        Example if joined:
-
-            {
-                "membership": "join",
-                "is_public": True,
-                "is_privileged": False,
-            }
-
-        Returns an empty dict if the user is not join/invite/etc
-        """
-
-        def _get_users_membership_in_group_txn(txn):
-            row = self.db.simple_select_one_txn(
-                txn,
-                table="group_users",
-                keyvalues={"group_id": group_id, "user_id": user_id},
-                retcols=("is_admin", "is_public"),
-                allow_none=True,
-            )
-
-            if row:
-                return {
-                    "membership": "join",
-                    "is_public": row["is_public"],
-                    "is_privileged": row["is_admin"],
-                }
-
-            row = self.db.simple_select_one_onecol_txn(
-                txn,
-                table="group_invites",
-                keyvalues={"group_id": group_id, "user_id": user_id},
-                retcol="user_id",
-                allow_none=True,
-            )
-
-            if row:
-                return {"membership": "invite"}
-
-            return {}
-
-        return self.db.runInteraction(
-            "get_users_membership_info_in_group", _get_users_membership_in_group_txn
-        )
-
     def add_user_to_group(
         self,
         group_id,
@@ -846,16 +980,6 @@ class GroupServerStore(SQLBaseStore):
             "remove_room_from_group", _remove_room_from_group_txn
         )
 
-    def get_publicised_groups_for_user(self, user_id):
-        """Get all groups a user is publicising
-        """
-        return self.db.simple_select_onecol(
-            table="local_group_membership",
-            keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
-            retcol="group_id",
-            desc="get_publicised_groups_for_user",
-        )
-
     def update_group_publicity(self, group_id, user_id, publicise):
         """Update whether the user is publicising their membership of the group
         """
@@ -1000,22 +1124,6 @@ class GroupServerStore(SQLBaseStore):
             desc="update_group_profile",
         )
 
-    def get_attestations_need_renewals(self, valid_until_ms):
-        """Get all attestations that need to be renewed until givent time
-        """
-
-        def _get_attestations_need_renewals_txn(txn):
-            sql = """
-                SELECT group_id, user_id FROM group_attestations_renewals
-                WHERE valid_until_ms <= ?
-            """
-            txn.execute(sql, (valid_until_ms,))
-            return self.db.cursor_to_dict(txn)
-
-        return self.db.runInteraction(
-            "get_attestations_need_renewals", _get_attestations_need_renewals_txn
-        )
-
     def update_attestation_renewal(self, group_id, user_id, attestation):
         """Update an attestation that we have renewed
         """
@@ -1054,112 +1162,6 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_attestation_renewal",
         )
 
-    @defer.inlineCallbacks
-    def get_remote_attestation(self, group_id, user_id):
-        """Get the attestation that proves the remote agrees that the user is
-        in the group.
-        """
-        row = yield self.db.simple_select_one(
-            table="group_attestations_remote",
-            keyvalues={"group_id": group_id, "user_id": user_id},
-            retcols=("valid_until_ms", "attestation_json"),
-            desc="get_remote_attestation",
-            allow_none=True,
-        )
-
-        now = int(self._clock.time_msec())
-        if row and now < row["valid_until_ms"]:
-            return json.loads(row["attestation_json"])
-
-        return None
-
-    def get_joined_groups(self, user_id):
-        return self.db.simple_select_onecol(
-            table="local_group_membership",
-            keyvalues={"user_id": user_id, "membership": "join"},
-            retcol="group_id",
-            desc="get_joined_groups",
-        )
-
-    def get_all_groups_for_user(self, user_id, now_token):
-        def _get_all_groups_for_user_txn(txn):
-            sql = """
-                SELECT group_id, type, membership, u.content
-                FROM local_group_updates AS u
-                INNER JOIN local_group_membership USING (group_id, user_id)
-                WHERE user_id = ? AND membership != 'leave'
-                    AND stream_id <= ?
-            """
-            txn.execute(sql, (user_id, now_token))
-            return [
-                {
-                    "group_id": row[0],
-                    "type": row[1],
-                    "membership": row[2],
-                    "content": json.loads(row[3]),
-                }
-                for row in txn
-            ]
-
-        return self.db.runInteraction(
-            "get_all_groups_for_user", _get_all_groups_for_user_txn
-        )
-
-    def get_groups_changes_for_user(self, user_id, from_token, to_token):
-        from_token = int(from_token)
-        has_changed = self._group_updates_stream_cache.has_entity_changed(
-            user_id, from_token
-        )
-        if not has_changed:
-            return defer.succeed([])
-
-        def _get_groups_changes_for_user_txn(txn):
-            sql = """
-                SELECT group_id, membership, type, u.content
-                FROM local_group_updates AS u
-                INNER JOIN local_group_membership USING (group_id, user_id)
-                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
-            """
-            txn.execute(sql, (user_id, from_token, to_token))
-            return [
-                {
-                    "group_id": group_id,
-                    "membership": membership,
-                    "type": gtype,
-                    "content": json.loads(content_json),
-                }
-                for group_id, membership, gtype, content_json in txn
-            ]
-
-        return self.db.runInteraction(
-            "get_groups_changes_for_user", _get_groups_changes_for_user_txn
-        )
-
-    def get_all_groups_changes(self, from_token, to_token, limit):
-        from_token = int(from_token)
-        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
-            from_token
-        )
-        if not has_changed:
-            return defer.succeed([])
-
-        def _get_all_groups_changes_txn(txn):
-            sql = """
-                SELECT stream_id, group_id, user_id, type, content
-                FROM local_group_updates
-                WHERE ? < stream_id AND stream_id <= ?
-                LIMIT ?
-            """
-            txn.execute(sql, (from_token, to_token, limit))
-            return [
-                (stream_id, group_id, user_id, gtype, json.loads(content_json))
-                for stream_id, group_id, user_id, gtype, content_json in txn
-            ]
-
-        return self.db.runInteraction(
-            "get_all_groups_changes", _get_all_groups_changes_txn
-        )
-
     def get_group_stream_token(self):
         return self._group_updates_id_gen.get_current_token()