summary refs log tree commit diff
path: root/synapse/storage/group_server.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-10-11 13:20:07 +0100
committerGitHub <noreply@github.com>2017-10-11 13:20:07 +0100
commit535cc49f27e28dd1148c6b92b6d1584cf3093e5c (patch)
treef1714e604abf696ca1adec12fe7653dad8a3ef61 /synapse/storage/group_server.py
parentMerge pull request #2501 from matrix-org/dbkr/channel_notifications (diff)
parentValidate room ids (diff)
downloadsynapse-535cc49f27e28dd1148c6b92b6d1584cf3093e5c.tar.xz
Merge pull request #2466 from matrix-org/erikj/groups_merged
Initial Group Implementation
Diffstat (limited to 'synapse/storage/group_server.py')
-rw-r--r--synapse/storage/group_server.py1187
1 files changed, 1187 insertions, 0 deletions
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
new file mode 100644
index 0000000000..4fe9172adc
--- /dev/null
+++ b/synapse/storage/group_server.py
@@ -0,0 +1,1187 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+
+from ._base import SQLBaseStore
+
+import ujson as json
+
+
+# The category ID for the "default" category. We don't store as null in the
+# database to avoid the fun of null != null
+_DEFAULT_CATEGORY_ID = ""
+_DEFAULT_ROLE_ID = ""
+
+
+class GroupServerStore(SQLBaseStore):
+    def get_group(self, group_id):
+        return self._simple_select_one(
+            table="groups",
+            keyvalues={
+                "group_id": group_id,
+            },
+            retcols=("name", "short_description", "long_description", "avatar_url",),
+            allow_none=True,
+            desc="is_user_in_group",
+        )
+
+    def get_users_in_group(self, group_id, include_private=False):
+        # TODO: Pagination
+
+        keyvalues = {
+            "group_id": group_id,
+        }
+        if not include_private:
+            keyvalues["is_public"] = True
+
+        return self._simple_select_list(
+            table="group_users",
+            keyvalues=keyvalues,
+            retcols=("user_id", "is_public",),
+            desc="get_users_in_group",
+        )
+
+    def get_rooms_in_group(self, group_id, include_private=False):
+        # TODO: Pagination
+
+        keyvalues = {
+            "group_id": group_id,
+        }
+        if not include_private:
+            keyvalues["is_public"] = True
+
+        return self._simple_select_list(
+            table="group_rooms",
+            keyvalues=keyvalues,
+            retcols=("room_id", "is_public",),
+            desc="get_rooms_in_group",
+        )
+
+    def get_rooms_for_summary_by_category(self, group_id, include_private=False):
+        """Get the rooms and categories that should be included in a summary request
+
+        Returns ([rooms], [categories])
+        """
+        def _get_rooms_for_summary_txn(txn):
+            keyvalues = {
+                "group_id": group_id,
+            }
+            if not include_private:
+                keyvalues["is_public"] = True
+
+            sql = """
+                SELECT room_id, is_public, category_id, room_order
+                FROM group_summary_rooms
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            rooms = [
+                {
+                    "room_id": row[0],
+                    "is_public": row[1],
+                    "category_id": row[2] if row[2] != _DEFAULT_CATEGORY_ID else None,
+                    "order": row[3],
+                }
+                for row in txn
+            ]
+
+            sql = """
+                SELECT category_id, is_public, profile, cat_order
+                FROM group_summary_room_categories
+                INNER JOIN group_room_categories USING (group_id, category_id)
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            categories = {
+                row[0]: {
+                    "is_public": row[1],
+                    "profile": json.loads(row[2]),
+                    "order": row[3],
+                }
+                for row in txn
+            }
+
+            return rooms, categories
+        return self.runInteraction(
+            "get_rooms_for_summary", _get_rooms_for_summary_txn
+        )
+
+    def add_room_to_summary(self, group_id, room_id, category_id, order, is_public):
+        return self.runInteraction(
+            "add_room_to_summary", self._add_room_to_summary_txn,
+            group_id, room_id, category_id, order, is_public,
+        )
+
+    def _add_room_to_summary_txn(self, txn, group_id, room_id, category_id, order,
+                                 is_public):
+        """Add (or update) room's entry in summary.
+
+        Args:
+            group_id (str)
+            room_id (str)
+            category_id (str): If not None then adds the category to the end of
+                the summary if its not already there. [Optional]
+            order (int): If not None inserts the room at that position, e.g.
+                an order of 1 will put the room first. Otherwise, the room gets
+                added to the end.
+        """
+        room_in_group = self._simple_select_one_onecol_txn(
+            txn,
+            table="group_rooms",
+            keyvalues={
+                "group_id": group_id,
+                "room_id": room_id,
+            },
+            retcol="room_id",
+            allow_none=True,
+        )
+        if not room_in_group:
+            raise SynapseError(400, "room not in group")
+
+        if category_id is None:
+            category_id = _DEFAULT_CATEGORY_ID
+        else:
+            cat_exists = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_room_categories",
+                keyvalues={
+                    "group_id": group_id,
+                    "category_id": category_id,
+                },
+                retcol="group_id",
+                allow_none=True,
+            )
+            if not cat_exists:
+                raise SynapseError(400, "Category doesn't exist")
+
+            # TODO: Check category is part of summary already
+            cat_exists = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_summary_room_categories",
+                keyvalues={
+                    "group_id": group_id,
+                    "category_id": category_id,
+                },
+                retcol="group_id",
+                allow_none=True,
+            )
+            if not cat_exists:
+                # If not, add it with an order larger than all others
+                txn.execute("""
+                    INSERT INTO group_summary_room_categories
+                    (group_id, category_id, cat_order)
+                    SELECT ?, ?, COALESCE(MAX(cat_order), 0) + 1
+                    FROM group_summary_room_categories
+                    WHERE group_id = ? AND category_id = ?
+                """, (group_id, category_id, group_id, category_id))
+
+        existing = self._simple_select_one_txn(
+            txn,
+            table="group_summary_rooms",
+            keyvalues={
+                "group_id": group_id,
+                "room_id": room_id,
+                "category_id": category_id,
+            },
+            retcols=("room_order", "is_public",),
+            allow_none=True,
+        )
+
+        if order is not None:
+            # Shuffle other room orders that come after the given order
+            sql = """
+                UPDATE group_summary_rooms SET room_order = room_order + 1
+                WHERE group_id = ? AND category_id = ? AND room_order >= ?
+            """
+            txn.execute(sql, (group_id, category_id, order,))
+        elif not existing:
+            sql = """
+                SELECT COALESCE(MAX(room_order), 0) + 1 FROM group_summary_rooms
+                WHERE group_id = ? AND category_id = ?
+            """
+            txn.execute(sql, (group_id, category_id,))
+            order, = txn.fetchone()
+
+        if existing:
+            to_update = {}
+            if order is not None:
+                to_update["room_order"] = order
+            if is_public is not None:
+                to_update["is_public"] = is_public
+            self._simple_update_txn(
+                txn,
+                table="group_summary_rooms",
+                keyvalues={
+                    "group_id": group_id,
+                    "category_id": category_id,
+                    "room_id": room_id,
+                },
+                values=to_update,
+            )
+        else:
+            if is_public is None:
+                is_public = True
+
+            self._simple_insert_txn(
+                txn,
+                table="group_summary_rooms",
+                values={
+                    "group_id": group_id,
+                    "category_id": category_id,
+                    "room_id": room_id,
+                    "room_order": order,
+                    "is_public": is_public,
+                },
+            )
+
+    def remove_room_from_summary(self, group_id, room_id, category_id):
+        if category_id is None:
+            category_id = _DEFAULT_CATEGORY_ID
+
+        return self._simple_delete(
+            table="group_summary_rooms",
+            keyvalues={
+                "group_id": group_id,
+                "category_id": category_id,
+                "room_id": room_id,
+            },
+            desc="remove_room_from_summary",
+        )
+
+    @defer.inlineCallbacks
+    def get_group_categories(self, group_id):
+        rows = yield self._simple_select_list(
+            table="group_room_categories",
+            keyvalues={
+                "group_id": group_id,
+            },
+            retcols=("category_id", "is_public", "profile"),
+            desc="get_group_categories",
+        )
+
+        defer.returnValue({
+            row["category_id"]: {
+                "is_public": row["is_public"],
+                "profile": json.loads(row["profile"]),
+            }
+            for row in rows
+        })
+
+    @defer.inlineCallbacks
+    def get_group_category(self, group_id, category_id):
+        category = yield self._simple_select_one(
+            table="group_room_categories",
+            keyvalues={
+                "group_id": group_id,
+                "category_id": category_id,
+            },
+            retcols=("is_public", "profile"),
+            desc="get_group_category",
+        )
+
+        category["profile"] = json.loads(category["profile"])
+
+        defer.returnValue(category)
+
+    def upsert_group_category(self, group_id, category_id, profile, is_public):
+        """Add/update room category for group
+        """
+        insertion_values = {}
+        update_values = {"category_id": category_id}  # This cannot be empty
+
+        if profile is None:
+            insertion_values["profile"] = "{}"
+        else:
+            update_values["profile"] = json.dumps(profile)
+
+        if is_public is None:
+            insertion_values["is_public"] = True
+        else:
+            update_values["is_public"] = is_public
+
+        return self._simple_upsert(
+            table="group_room_categories",
+            keyvalues={
+                "group_id": group_id,
+                "category_id": category_id,
+            },
+            values=update_values,
+            insertion_values=insertion_values,
+            desc="upsert_group_category",
+        )
+
+    def remove_group_category(self, group_id, category_id):
+        return self._simple_delete(
+            table="group_room_categories",
+            keyvalues={
+                "group_id": group_id,
+                "category_id": category_id,
+            },
+            desc="remove_group_category",
+        )
+
+    @defer.inlineCallbacks
+    def get_group_roles(self, group_id):
+        rows = yield self._simple_select_list(
+            table="group_roles",
+            keyvalues={
+                "group_id": group_id,
+            },
+            retcols=("role_id", "is_public", "profile"),
+            desc="get_group_roles",
+        )
+
+        defer.returnValue({
+            row["role_id"]: {
+                "is_public": row["is_public"],
+                "profile": json.loads(row["profile"]),
+            }
+            for row in rows
+        })
+
+    @defer.inlineCallbacks
+    def get_group_role(self, group_id, role_id):
+        role = yield self._simple_select_one(
+            table="group_roles",
+            keyvalues={
+                "group_id": group_id,
+                "role_id": role_id,
+            },
+            retcols=("is_public", "profile"),
+            desc="get_group_role",
+        )
+
+        role["profile"] = json.loads(role["profile"])
+
+        defer.returnValue(role)
+
+    def upsert_group_role(self, group_id, role_id, profile, is_public):
+        """Add/remove user role
+        """
+        insertion_values = {}
+        update_values = {"role_id": role_id}  # This cannot be empty
+
+        if profile is None:
+            insertion_values["profile"] = "{}"
+        else:
+            update_values["profile"] = json.dumps(profile)
+
+        if is_public is None:
+            insertion_values["is_public"] = True
+        else:
+            update_values["is_public"] = is_public
+
+        return self._simple_upsert(
+            table="group_roles",
+            keyvalues={
+                "group_id": group_id,
+                "role_id": role_id,
+            },
+            values=update_values,
+            insertion_values=insertion_values,
+            desc="upsert_group_role",
+        )
+
+    def remove_group_role(self, group_id, role_id):
+        return self._simple_delete(
+            table="group_roles",
+            keyvalues={
+                "group_id": group_id,
+                "role_id": role_id,
+            },
+            desc="remove_group_role",
+        )
+
+    def add_user_to_summary(self, group_id, user_id, role_id, order, is_public):
+        return self.runInteraction(
+            "add_user_to_summary", self._add_user_to_summary_txn,
+            group_id, user_id, role_id, order, is_public,
+        )
+
+    def _add_user_to_summary_txn(self, txn, group_id, user_id, role_id, order,
+                                 is_public):
+        """Add (or update) user's entry in summary.
+
+        Args:
+            group_id (str)
+            user_id (str)
+            role_id (str): If not None then adds the role to the end of
+                the summary if its not already there. [Optional]
+            order (int): If not None inserts the user at that position, e.g.
+                an order of 1 will put the user first. Otherwise, the user gets
+                added to the end.
+        """
+        user_in_group = self._simple_select_one_onecol_txn(
+            txn,
+            table="group_users",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcol="user_id",
+            allow_none=True,
+        )
+        if not user_in_group:
+            raise SynapseError(400, "user not in group")
+
+        if role_id is None:
+            role_id = _DEFAULT_ROLE_ID
+        else:
+            role_exists = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_roles",
+                keyvalues={
+                    "group_id": group_id,
+                    "role_id": role_id,
+                },
+                retcol="group_id",
+                allow_none=True,
+            )
+            if not role_exists:
+                raise SynapseError(400, "Role doesn't exist")
+
+            # TODO: Check role is part of the summary already
+            role_exists = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_summary_roles",
+                keyvalues={
+                    "group_id": group_id,
+                    "role_id": role_id,
+                },
+                retcol="group_id",
+                allow_none=True,
+            )
+            if not role_exists:
+                # If not, add it with an order larger than all others
+                txn.execute("""
+                    INSERT INTO group_summary_roles
+                    (group_id, role_id, role_order)
+                    SELECT ?, ?, COALESCE(MAX(role_order), 0) + 1
+                    FROM group_summary_roles
+                    WHERE group_id = ? AND role_id = ?
+                """, (group_id, role_id, group_id, role_id))
+
+        existing = self._simple_select_one_txn(
+            txn,
+            table="group_summary_users",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+                "role_id": role_id,
+            },
+            retcols=("user_order", "is_public",),
+            allow_none=True,
+        )
+
+        if order is not None:
+            # Shuffle other users orders that come after the given order
+            sql = """
+                UPDATE group_summary_users SET user_order = user_order + 1
+                WHERE group_id = ? AND role_id = ? AND user_order >= ?
+            """
+            txn.execute(sql, (group_id, role_id, order,))
+        elif not existing:
+            sql = """
+                SELECT COALESCE(MAX(user_order), 0) + 1 FROM group_summary_users
+                WHERE group_id = ? AND role_id = ?
+            """
+            txn.execute(sql, (group_id, role_id,))
+            order, = txn.fetchone()
+
+        if existing:
+            to_update = {}
+            if order is not None:
+                to_update["user_order"] = order
+            if is_public is not None:
+                to_update["is_public"] = is_public
+            self._simple_update_txn(
+                txn,
+                table="group_summary_users",
+                keyvalues={
+                    "group_id": group_id,
+                    "role_id": role_id,
+                    "user_id": user_id,
+                },
+                values=to_update,
+            )
+        else:
+            if is_public is None:
+                is_public = True
+
+            self._simple_insert_txn(
+                txn,
+                table="group_summary_users",
+                values={
+                    "group_id": group_id,
+                    "role_id": role_id,
+                    "user_id": user_id,
+                    "user_order": order,
+                    "is_public": is_public,
+                },
+            )
+
+    def remove_user_from_summary(self, group_id, user_id, role_id):
+        if role_id is None:
+            role_id = _DEFAULT_ROLE_ID
+
+        return self._simple_delete(
+            table="group_summary_users",
+            keyvalues={
+                "group_id": group_id,
+                "role_id": role_id,
+                "user_id": user_id,
+            },
+            desc="remove_user_from_summary",
+        )
+
+    def get_users_for_summary_by_role(self, group_id, include_private=False):
+        """Get the users and roles that should be included in a summary request
+
+        Returns ([users], [roles])
+        """
+        def _get_users_for_summary_txn(txn):
+            keyvalues = {
+                "group_id": group_id,
+            }
+            if not include_private:
+                keyvalues["is_public"] = True
+
+            sql = """
+                SELECT user_id, is_public, role_id, user_order
+                FROM group_summary_users
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            users = [
+                {
+                    "user_id": row[0],
+                    "is_public": row[1],
+                    "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
+                    "order": row[3],
+                }
+                for row in txn
+            ]
+
+            sql = """
+                SELECT role_id, is_public, profile, role_order
+                FROM group_summary_roles
+                INNER JOIN group_roles USING (group_id, role_id)
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            roles = {
+                row[0]: {
+                    "is_public": row[1],
+                    "profile": json.loads(row[2]),
+                    "order": row[3],
+                }
+                for row in txn
+            }
+
+            return users, roles
+        return self.runInteraction(
+            "get_users_for_summary_by_role", _get_users_for_summary_txn
+        )
+
+    def is_user_in_group(self, user_id, group_id):
+        return self._simple_select_one_onecol(
+            table="group_users",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcol="user_id",
+            allow_none=True,
+            desc="is_user_in_group",
+        ).addCallback(lambda r: bool(r))
+
+    def is_user_admin_in_group(self, group_id, user_id):
+        return self._simple_select_one_onecol(
+            table="group_users",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcol="is_admin",
+            allow_none=True,
+            desc="is_user_admin_in_group",
+        )
+
+    def add_group_invite(self, group_id, user_id):
+        """Record that the group server has invited a user
+        """
+        return self._simple_insert(
+            table="group_invites",
+            values={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            desc="add_group_invite",
+        )
+
+    def is_user_invited_to_local_group(self, group_id, user_id):
+        """Has the group server invited a user?
+        """
+        return self._simple_select_one_onecol(
+            table="group_invites",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcol="user_id",
+            desc="is_user_invited_to_local_group",
+            allow_none=True,
+        )
+
+    def get_users_membership_info_in_group(self, group_id, user_id):
+        """Get a dict describing the membership of a user in a group.
+
+        Example if joined:
+
+            {
+                "membership": "join",
+                "is_public": True,
+                "is_privileged": False,
+            }
+
+        Returns an empty dict if the user is not join/invite/etc
+        """
+        def _get_users_membership_in_group_txn(txn):
+            row = self._simple_select_one_txn(
+                txn,
+                table="group_users",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+                retcols=("is_admin", "is_public"),
+                allow_none=True,
+            )
+
+            if row:
+                return {
+                    "membership": "join",
+                    "is_public": row["is_public"],
+                    "is_privileged": row["is_admin"],
+                }
+
+            row = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_invites",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+                retcol="user_id",
+                allow_none=True,
+            )
+
+            if row:
+                return {
+                    "membership": "invite",
+                }
+
+            return {}
+
+        return self.runInteraction(
+            "get_users_membership_info_in_group", _get_users_membership_in_group_txn,
+        )
+
+    def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True,
+                          local_attestation=None, remote_attestation=None):
+        """Add a user to the group server.
+
+        Args:
+            group_id (str)
+            user_id (str)
+            is_admin (bool)
+            is_public (bool)
+            local_attestation (dict): The attestation the GS created to give
+                to the remote server. Optional if the user and group are on the
+                same server
+            remote_attestation (dict): The attestation given to GS by remote
+                server. Optional if the user and group are on the same server
+        """
+        def _add_user_to_group_txn(txn):
+            self._simple_insert_txn(
+                txn,
+                table="group_users",
+                values={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "is_admin": is_admin,
+                    "is_public": is_public,
+                },
+            )
+
+            self._simple_delete_txn(
+                txn,
+                table="group_invites",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+
+            if local_attestation:
+                self._simple_insert_txn(
+                    txn,
+                    table="group_attestations_renewals",
+                    values={
+                        "group_id": group_id,
+                        "user_id": user_id,
+                        "valid_until_ms": local_attestation["valid_until_ms"],
+                    },
+                )
+            if remote_attestation:
+                self._simple_insert_txn(
+                    txn,
+                    table="group_attestations_remote",
+                    values={
+                        "group_id": group_id,
+                        "user_id": user_id,
+                        "valid_until_ms": remote_attestation["valid_until_ms"],
+                        "attestation_json": json.dumps(remote_attestation),
+                    },
+                )
+
+        return self.runInteraction(
+            "add_user_to_group", _add_user_to_group_txn
+        )
+
+    def remove_user_from_group(self, group_id, user_id):
+        def _remove_user_from_group_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="group_users",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                table="group_invites",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                table="group_attestations_renewals",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                table="group_attestations_remote",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                table="group_summary_users",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+        return self.runInteraction("remove_user_from_group", _remove_user_from_group_txn)
+
+    def add_room_to_group(self, group_id, room_id, is_public):
+        return self._simple_insert(
+            table="group_rooms",
+            values={
+                "group_id": group_id,
+                "room_id": room_id,
+                "is_public": is_public,
+            },
+            desc="add_room_to_group",
+        )
+
+    def remove_room_from_group(self, group_id, room_id):
+        def _remove_room_from_group_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="group_rooms",
+                keyvalues={
+                    "group_id": group_id,
+                    "room_id": room_id,
+                },
+            )
+
+            self._simple_delete_txn(
+                txn,
+                table="group_summary_rooms",
+                keyvalues={
+                    "group_id": group_id,
+                    "room_id": room_id,
+                },
+            )
+        return self.runInteraction(
+            "remove_room_from_group", _remove_room_from_group_txn,
+        )
+
+    def get_publicised_groups_for_user(self, user_id):
+        """Get all groups a user is publicising
+        """
+        return self._simple_select_onecol(
+            table="local_group_membership",
+            keyvalues={
+                "user_id": user_id,
+                "membership": "join",
+                "is_publicised": True,
+            },
+            retcol="group_id",
+            desc="get_publicised_groups_for_user",
+        )
+
+    def update_group_publicity(self, group_id, user_id, publicise):
+        """Update whether the user is publicising their membership of the group
+        """
+        return self._simple_update_one(
+            table="local_group_membership",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            updatevalues={
+                "is_publicised": publicise,
+            },
+            desc="update_group_publicity"
+        )
+
+    @defer.inlineCallbacks
+    def register_user_group_membership(self, group_id, user_id, membership,
+                                       is_admin=False, content={},
+                                       local_attestation=None,
+                                       remote_attestation=None,
+                                       is_publicised=False,
+                                       ):
+        """Registers that a local user is a member of a (local or remote) group.
+
+        Args:
+            group_id (str)
+            user_id (str)
+            membership (str)
+            is_admin (bool)
+            content (dict): Content of the membership, e.g. includes the inviter
+                if the user has been invited.
+            local_attestation (dict): If remote group then store the fact that we
+                have given out an attestation, else None.
+            remote_attestation (dict): If remote group then store the remote
+                attestation from the group, else None.
+        """
+        def _register_user_group_membership_txn(txn, next_id):
+            # TODO: Upsert?
+            self._simple_delete_txn(
+                txn,
+                table="local_group_membership",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_insert_txn(
+                txn,
+                table="local_group_membership",
+                values={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "is_admin": is_admin,
+                    "membership": membership,
+                    "is_publicised": is_publicised,
+                    "content": json.dumps(content),
+                },
+            )
+
+            self._simple_insert_txn(
+                txn,
+                table="local_group_updates",
+                values={
+                    "stream_id": next_id,
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "type": "membership",
+                    "content": json.dumps({"membership": membership, "content": content}),
+                }
+            )
+            self._group_updates_stream_cache.entity_has_changed(user_id, next_id)
+
+            # TODO: Insert profile to ensure it comes down stream if its a join.
+
+            if membership == "join":
+                if local_attestation:
+                    self._simple_insert_txn(
+                        txn,
+                        table="group_attestations_renewals",
+                        values={
+                            "group_id": group_id,
+                            "user_id": user_id,
+                            "valid_until_ms": local_attestation["valid_until_ms"],
+                        }
+                    )
+                if remote_attestation:
+                    self._simple_insert_txn(
+                        txn,
+                        table="group_attestations_remote",
+                        values={
+                            "group_id": group_id,
+                            "user_id": user_id,
+                            "valid_until_ms": remote_attestation["valid_until_ms"],
+                            "attestation_json": json.dumps(remote_attestation),
+                        }
+                    )
+            else:
+                self._simple_delete_txn(
+                    txn,
+                    table="group_attestations_renewals",
+                    keyvalues={
+                        "group_id": group_id,
+                        "user_id": user_id,
+                    },
+                )
+                self._simple_delete_txn(
+                    txn,
+                    table="group_attestations_remote",
+                    keyvalues={
+                        "group_id": group_id,
+                        "user_id": user_id,
+                    },
+                )
+
+            return next_id
+
+        with self._group_updates_id_gen.get_next() as next_id:
+            res = yield self.runInteraction(
+                "register_user_group_membership",
+                _register_user_group_membership_txn, next_id,
+            )
+        defer.returnValue(res)
+
+    @defer.inlineCallbacks
+    def create_group(self, group_id, user_id, name, avatar_url, short_description,
+                     long_description,):
+        yield self._simple_insert(
+            table="groups",
+            values={
+                "group_id": group_id,
+                "name": name,
+                "avatar_url": avatar_url,
+                "short_description": short_description,
+                "long_description": long_description,
+            },
+            desc="create_group",
+        )
+
+    @defer.inlineCallbacks
+    def update_group_profile(self, group_id, profile,):
+        yield self._simple_update_one(
+            table="groups",
+            keyvalues={
+                "group_id": group_id,
+            },
+            updatevalues=profile,
+            desc="update_group_profile",
+        )
+
+    def get_attestations_need_renewals(self, valid_until_ms):
+        """Get all attestations that need to be renewed until givent time
+        """
+        def _get_attestations_need_renewals_txn(txn):
+            sql = """
+                SELECT group_id, user_id FROM group_attestations_renewals
+                WHERE valid_until_ms <= ?
+            """
+            txn.execute(sql, (valid_until_ms,))
+            return self.cursor_to_dict(txn)
+        return self.runInteraction(
+            "get_attestations_need_renewals", _get_attestations_need_renewals_txn
+        )
+
+    def update_attestation_renewal(self, group_id, user_id, attestation):
+        """Update an attestation that we have renewed
+        """
+        return self._simple_update_one(
+            table="group_attestations_renewals",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            updatevalues={
+                "valid_until_ms": attestation["valid_until_ms"],
+            },
+            desc="update_attestation_renewal",
+        )
+
+    def update_remote_attestion(self, group_id, user_id, attestation):
+        """Update an attestation that a remote has renewed
+        """
+        return self._simple_update_one(
+            table="group_attestations_remote",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            updatevalues={
+                "valid_until_ms": attestation["valid_until_ms"],
+                "attestation_json": json.dumps(attestation)
+            },
+            desc="update_remote_attestion",
+        )
+
+    @defer.inlineCallbacks
+    def get_remote_attestation(self, group_id, user_id):
+        """Get the attestation that proves the remote agrees that the user is
+        in the group.
+        """
+        row = yield self._simple_select_one(
+            table="group_attestations_remote",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcols=("valid_until_ms", "attestation_json"),
+            desc="get_remote_attestation",
+            allow_none=True,
+        )
+
+        now = int(self._clock.time_msec())
+        if row and now < row["valid_until_ms"]:
+            defer.returnValue(json.loads(row["attestation_json"]))
+
+        defer.returnValue(None)
+
+    def get_joined_groups(self, user_id):
+        return self._simple_select_onecol(
+            table="local_group_membership",
+            keyvalues={
+                "user_id": user_id,
+                "membership": "join",
+            },
+            retcol="group_id",
+            desc="get_joined_groups",
+        )
+
+    def get_all_groups_for_user(self, user_id, now_token):
+        def _get_all_groups_for_user_txn(txn):
+            sql = """
+                SELECT group_id, type, membership, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND membership != 'leave'
+                    AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, now_token,))
+            return [
+                {
+                    "group_id": row[0],
+                    "type": row[1],
+                    "membership": row[2],
+                    "content": json.loads(row[3]),
+                }
+                for row in txn
+            ]
+        return self.runInteraction(
+            "get_all_groups_for_user", _get_all_groups_for_user_txn,
+        )
+
+    def get_groups_changes_for_user(self, user_id, from_token, to_token):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_entity_changed(
+            user_id, from_token,
+        )
+        if not has_changed:
+            return []
+
+        def _get_groups_changes_for_user_txn(txn):
+            sql = """
+                SELECT group_id, membership, type, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, from_token, to_token,))
+            return [{
+                "group_id": group_id,
+                "membership": membership,
+                "type": gtype,
+                "content": json.loads(content_json),
+            } for group_id, membership, gtype, content_json in txn]
+        return self.runInteraction(
+            "get_groups_changes_for_user", _get_groups_changes_for_user_txn,
+        )
+
+    def get_all_groups_changes(self, from_token, to_token, limit):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
+            from_token,
+        )
+        if not has_changed:
+            return []
+
+        def _get_all_groups_changes_txn(txn):
+            sql = """
+                SELECT stream_id, group_id, user_id, type, content
+                FROM local_group_updates
+                WHERE ? < stream_id AND stream_id <= ?
+                LIMIT ?
+            """
+            txn.execute(sql, (from_token, to_token, limit,))
+            return [{
+                "stream_id": stream_id,
+                "group_id": group_id,
+                "user_id": user_id,
+                "type": gtype,
+                "content": json.loads(content_json),
+            } for stream_id, group_id, user_id, gtype, content_json in txn]
+        return self.runInteraction(
+            "get_all_groups_changes", _get_all_groups_changes_txn,
+        )
+
+    def get_group_stream_token(self):
+        return self._group_updates_id_gen.get_current_token()