summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-02-07 11:14:19 +0000
committerGitHub <noreply@github.com>2020-02-07 11:14:19 +0000
commitde2d267375069c2d22bceb0d6ef9c6f5a77380e3 (patch)
tree12636dca8ea90ea5369207b7aea134521a1fc8f5
parentAdmin api to add an email address (#6789) (diff)
downloadsynapse-de2d267375069c2d22bceb0d6ef9c6f5a77380e3.tar.xz
Allow moving group read APIs to workers (#6866)
-rw-r--r--changelog.d/6866.feature1
-rw-r--r--docs/workers.md8
-rw-r--r--synapse/app/client_reader.py3
-rw-r--r--synapse/app/federation_reader.py2
-rw-r--r--synapse/groups/groups_server.py377
-rw-r--r--synapse/handlers/groups_local.py270
-rw-r--r--synapse/replication/slave/storage/groups.py14
-rw-r--r--synapse/server.py14
-rw-r--r--synapse/storage/data_stores/main/group_server.py720
9 files changed, 722 insertions, 687 deletions
diff --git a/changelog.d/6866.feature b/changelog.d/6866.feature
new file mode 100644
index 0000000000..256feab6ff
--- /dev/null
+++ b/changelog.d/6866.feature
@@ -0,0 +1 @@
+Add ability to run some group APIs on workers.
diff --git a/docs/workers.md b/docs/workers.md
index 09a9d8a7b8..82442d6a0a 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -177,8 +177,13 @@ endpoints matching the following regular expressions:
     ^/_matrix/federation/v1/event_auth/
     ^/_matrix/federation/v1/exchange_third_party_invite/
     ^/_matrix/federation/v1/send/
+    ^/_matrix/federation/v1/get_groups_publicised$
     ^/_matrix/key/v2/query
 
+Additionally, the following REST endpoints can be handled for GET requests:
+
+    ^/_matrix/federation/v1/groups/
+
 The above endpoints should all be routed to the federation_reader worker by the
 reverse-proxy configuration.
 
@@ -254,10 +259,13 @@ following regular expressions:
     ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$
     ^/_matrix/client/versions$
     ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$
+    ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
+    ^/_matrix/client/(api/v1|r0|unstable)/get_groups_publicised$
 
 Additionally, the following REST endpoints can be handled for GET requests:
 
     ^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$
+    ^/_matrix/client/(api/v1|r0|unstable)/groups/.*$
 
 Additionally, the following REST endpoints can be handled, but all requests must
 be routed to the same instance:
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ca96da6a4a..7fa91a3b11 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -57,6 +57,7 @@ from synapse.rest.client.v1.room import (
     RoomStateRestServlet,
 )
 from synapse.rest.client.v1.voip import VoipRestServlet
+from synapse.rest.client.v2_alpha import groups
 from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
 from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
 from synapse.rest.client.v2_alpha.register import RegisterRestServlet
@@ -124,6 +125,8 @@ class ClientReaderServer(HomeServer):
                     PushRuleRestServlet(self).register(resource)
                     VersionsRestServlet(self).register(resource)
 
+                    groups.register_servlets(self, resource)
+
                     resources.update({"/_matrix/client": resource})
 
         root_resource = create_resource_tree(resources, NoResource())
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 1f1cea1416..5e17ef1396 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -35,6 +35,7 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.profile import SlavedProfileStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
@@ -66,6 +67,7 @@ class FederationReaderSlavedStore(
     SlavedEventStore,
     SlavedKeyStore,
     SlavedRegistrationStore,
+    SlavedGroupServerStore,
     RoomStore,
     DirectoryStore,
     SlavedTransactionStore,
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 0ec9be3cb5..c106abae21 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -36,7 +36,7 @@ logger = logging.getLogger(__name__)
 # TODO: Flairs
 
 
-class GroupsServerHandler(object):
+class GroupsServerWorkerHandler(object):
     def __init__(self, hs):
         self.hs = hs
         self.store = hs.get_datastore()
@@ -51,9 +51,6 @@ class GroupsServerHandler(object):
         self.transport_client = hs.get_federation_transport_client()
         self.profile_handler = hs.get_profile_handler()
 
-        # Ensure attestations get renewed
-        hs.get_groups_attestation_renewer()
-
     @defer.inlineCallbacks
     def check_group_is_ours(
         self, group_id, requester_user_id, and_exists=False, and_is_admin=None
@@ -168,6 +165,197 @@ class GroupsServerHandler(object):
         }
 
     @defer.inlineCallbacks
+    def get_group_categories(self, group_id, requester_user_id):
+        """Get all categories in a group (as seen by user)
+        """
+        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+        categories = yield self.store.get_group_categories(group_id=group_id)
+        return {"categories": categories}
+
+    @defer.inlineCallbacks
+    def get_group_category(self, group_id, requester_user_id, category_id):
+        """Get a specific category in a group (as seen by user)
+        """
+        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+        res = yield self.store.get_group_category(
+            group_id=group_id, category_id=category_id
+        )
+
+        logger.info("group %s", res)
+
+        return res
+
+    @defer.inlineCallbacks
+    def get_group_roles(self, group_id, requester_user_id):
+        """Get all roles in a group (as seen by user)
+        """
+        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+        roles = yield self.store.get_group_roles(group_id=group_id)
+        return {"roles": roles}
+
+    @defer.inlineCallbacks
+    def get_group_role(self, group_id, requester_user_id, role_id):
+        """Get a specific role in a group (as seen by user)
+        """
+        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+        res = yield self.store.get_group_role(group_id=group_id, role_id=role_id)
+        return res
+
+    @defer.inlineCallbacks
+    def get_group_profile(self, group_id, requester_user_id):
+        """Get the group profile as seen by requester_user_id
+        """
+
+        yield self.check_group_is_ours(group_id, requester_user_id)
+
+        group = yield self.store.get_group(group_id)
+
+        if group:
+            cols = [
+                "name",
+                "short_description",
+                "long_description",
+                "avatar_url",
+                "is_public",
+            ]
+            group_description = {key: group[key] for key in cols}
+            group_description["is_openly_joinable"] = group["join_policy"] == "open"
+
+            return group_description
+        else:
+            raise SynapseError(404, "Unknown group")
+
+    @defer.inlineCallbacks
+    def get_users_in_group(self, group_id, requester_user_id):
+        """Get the users in group as seen by requester_user_id.
+
+        The ordering is arbitrary at the moment
+        """
+
+        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+        is_user_in_group = yield self.store.is_user_in_group(
+            requester_user_id, group_id
+        )
+
+        user_results = yield self.store.get_users_in_group(
+            group_id, include_private=is_user_in_group
+        )
+
+        chunk = []
+        for user_result in user_results:
+            g_user_id = user_result["user_id"]
+            is_public = user_result["is_public"]
+            is_privileged = user_result["is_admin"]
+
+            entry = {"user_id": g_user_id}
+
+            profile = yield self.profile_handler.get_profile_from_cache(g_user_id)
+            entry.update(profile)
+
+            entry["is_public"] = bool(is_public)
+            entry["is_privileged"] = bool(is_privileged)
+
+            if not self.is_mine_id(g_user_id):
+                attestation = yield self.store.get_remote_attestation(
+                    group_id, g_user_id
+                )
+                if not attestation:
+                    continue
+
+                entry["attestation"] = attestation
+            else:
+                entry["attestation"] = self.attestations.create_attestation(
+                    group_id, g_user_id
+                )
+
+            chunk.append(entry)
+
+        # TODO: If admin add lists of users whose attestations have timed out
+
+        return {"chunk": chunk, "total_user_count_estimate": len(user_results)}
+
+    @defer.inlineCallbacks
+    def get_invited_users_in_group(self, group_id, requester_user_id):
+        """Get the users that have been invited to a group as seen by requester_user_id.
+
+        The ordering is arbitrary at the moment
+        """
+
+        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+        is_user_in_group = yield self.store.is_user_in_group(
+            requester_user_id, group_id
+        )
+
+        if not is_user_in_group:
+            raise SynapseError(403, "User not in group")
+
+        invited_users = yield self.store.get_invited_users_in_group(group_id)
+
+        user_profiles = []
+
+        for user_id in invited_users:
+            user_profile = {"user_id": user_id}
+            try:
+                profile = yield self.profile_handler.get_profile_from_cache(user_id)
+                user_profile.update(profile)
+            except Exception as e:
+                logger.warning("Error getting profile for %s: %s", user_id, e)
+            user_profiles.append(user_profile)
+
+        return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)}
+
+    @defer.inlineCallbacks
+    def get_rooms_in_group(self, group_id, requester_user_id):
+        """Get the rooms in group as seen by requester_user_id
+
+        This returns rooms in order of decreasing number of joined users
+        """
+
+        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+        is_user_in_group = yield self.store.is_user_in_group(
+            requester_user_id, group_id
+        )
+
+        room_results = yield self.store.get_rooms_in_group(
+            group_id, include_private=is_user_in_group
+        )
+
+        chunk = []
+        for room_result in room_results:
+            room_id = room_result["room_id"]
+
+            joined_users = yield self.store.get_users_in_room(room_id)
+            entry = yield self.room_list_handler.generate_room_entry(
+                room_id, len(joined_users), with_alias=False, allow_private=True
+            )
+
+            if not entry:
+                continue
+
+            entry["is_public"] = bool(room_result["is_public"])
+
+            chunk.append(entry)
+
+        chunk.sort(key=lambda e: -e["num_joined_members"])
+
+        return {"chunk": chunk, "total_room_count_estimate": len(room_results)}
+
+
+class GroupsServerHandler(GroupsServerWorkerHandler):
+    def __init__(self, hs):
+        super(GroupsServerHandler, self).__init__(hs)
+
+        # Ensure attestations get renewed
+        hs.get_groups_attestation_renewer()
+
+    @defer.inlineCallbacks
     def update_group_summary_room(
         self, group_id, requester_user_id, room_id, category_id, content
     ):
@@ -230,27 +418,6 @@ class GroupsServerHandler(object):
         return {}
 
     @defer.inlineCallbacks
-    def get_group_categories(self, group_id, requester_user_id):
-        """Get all categories in a group (as seen by user)
-        """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
-        categories = yield self.store.get_group_categories(group_id=group_id)
-        return {"categories": categories}
-
-    @defer.inlineCallbacks
-    def get_group_category(self, group_id, requester_user_id, category_id):
-        """Get a specific category in a group (as seen by user)
-        """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
-        res = yield self.store.get_group_category(
-            group_id=group_id, category_id=category_id
-        )
-
-        return res
-
-    @defer.inlineCallbacks
     def update_group_category(self, group_id, requester_user_id, category_id, content):
         """Add/Update a group category
         """
@@ -285,24 +452,6 @@ class GroupsServerHandler(object):
         return {}
 
     @defer.inlineCallbacks
-    def get_group_roles(self, group_id, requester_user_id):
-        """Get all roles in a group (as seen by user)
-        """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
-        roles = yield self.store.get_group_roles(group_id=group_id)
-        return {"roles": roles}
-
-    @defer.inlineCallbacks
-    def get_group_role(self, group_id, requester_user_id, role_id):
-        """Get a specific role in a group (as seen by user)
-        """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
-        res = yield self.store.get_group_role(group_id=group_id, role_id=role_id)
-        return res
-
-    @defer.inlineCallbacks
     def update_group_role(self, group_id, requester_user_id, role_id, content):
         """Add/update a role in a group
         """
@@ -371,30 +520,6 @@ class GroupsServerHandler(object):
         return {}
 
     @defer.inlineCallbacks
-    def get_group_profile(self, group_id, requester_user_id):
-        """Get the group profile as seen by requester_user_id
-        """
-
-        yield self.check_group_is_ours(group_id, requester_user_id)
-
-        group = yield self.store.get_group(group_id)
-
-        if group:
-            cols = [
-                "name",
-                "short_description",
-                "long_description",
-                "avatar_url",
-                "is_public",
-            ]
-            group_description = {key: group[key] for key in cols}
-            group_description["is_openly_joinable"] = group["join_policy"] == "open"
-
-            return group_description
-        else:
-            raise SynapseError(404, "Unknown group")
-
-    @defer.inlineCallbacks
     def update_group_profile(self, group_id, requester_user_id, content):
         """Update the group profile
         """
@@ -413,124 +538,6 @@ class GroupsServerHandler(object):
         yield self.store.update_group_profile(group_id, profile)
 
     @defer.inlineCallbacks
-    def get_users_in_group(self, group_id, requester_user_id):
-        """Get the users in group as seen by requester_user_id.
-
-        The ordering is arbitrary at the moment
-        """
-
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
-        is_user_in_group = yield self.store.is_user_in_group(
-            requester_user_id, group_id
-        )
-
-        user_results = yield self.store.get_users_in_group(
-            group_id, include_private=is_user_in_group
-        )
-
-        chunk = []
-        for user_result in user_results:
-            g_user_id = user_result["user_id"]
-            is_public = user_result["is_public"]
-            is_privileged = user_result["is_admin"]
-
-            entry = {"user_id": g_user_id}
-
-            profile = yield self.profile_handler.get_profile_from_cache(g_user_id)
-            entry.update(profile)
-
-            entry["is_public"] = bool(is_public)
-            entry["is_privileged"] = bool(is_privileged)
-
-            if not self.is_mine_id(g_user_id):
-                attestation = yield self.store.get_remote_attestation(
-                    group_id, g_user_id
-                )
-                if not attestation:
-                    continue
-
-                entry["attestation"] = attestation
-            else:
-                entry["attestation"] = self.attestations.create_attestation(
-                    group_id, g_user_id
-                )
-
-            chunk.append(entry)
-
-        # TODO: If admin add lists of users whose attestations have timed out
-
-        return {"chunk": chunk, "total_user_count_estimate": len(user_results)}
-
-    @defer.inlineCallbacks
-    def get_invited_users_in_group(self, group_id, requester_user_id):
-        """Get the users that have been invited to a group as seen by requester_user_id.
-
-        The ordering is arbitrary at the moment
-        """
-
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
-        is_user_in_group = yield self.store.is_user_in_group(
-            requester_user_id, group_id
-        )
-
-        if not is_user_in_group:
-            raise SynapseError(403, "User not in group")
-
-        invited_users = yield self.store.get_invited_users_in_group(group_id)
-
-        user_profiles = []
-
-        for user_id in invited_users:
-            user_profile = {"user_id": user_id}
-            try:
-                profile = yield self.profile_handler.get_profile_from_cache(user_id)
-                user_profile.update(profile)
-            except Exception as e:
-                logger.warning("Error getting profile for %s: %s", user_id, e)
-            user_profiles.append(user_profile)
-
-        return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)}
-
-    @defer.inlineCallbacks
-    def get_rooms_in_group(self, group_id, requester_user_id):
-        """Get the rooms in group as seen by requester_user_id
-
-        This returns rooms in order of decreasing number of joined users
-        """
-
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
-        is_user_in_group = yield self.store.is_user_in_group(
-            requester_user_id, group_id
-        )
-
-        room_results = yield self.store.get_rooms_in_group(
-            group_id, include_private=is_user_in_group
-        )
-
-        chunk = []
-        for room_result in room_results:
-            room_id = room_result["room_id"]
-
-            joined_users = yield self.store.get_users_in_room(room_id)
-            entry = yield self.room_list_handler.generate_room_entry(
-                room_id, len(joined_users), with_alias=False, allow_private=True
-            )
-
-            if not entry:
-                continue
-
-            entry["is_public"] = bool(room_result["is_public"])
-
-            chunk.append(entry)
-
-        chunk.sort(key=lambda e: -e["num_joined_members"])
-
-        return {"chunk": chunk, "total_room_count_estimate": len(room_results)}
-
-    @defer.inlineCallbacks
     def add_room_to_group(self, group_id, requester_user_id, room_id, content):
         """Add room to group
         """
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 319565510f..ad22415782 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -63,7 +63,7 @@ def _create_rerouter(func_name):
     return f
 
 
-class GroupsLocalHandler(object):
+class GroupsLocalWorkerHandler(object):
     def __init__(self, hs):
         self.hs = hs
         self.store = hs.get_datastore()
@@ -81,40 +81,17 @@ class GroupsLocalHandler(object):
 
         self.profile_handler = hs.get_profile_handler()
 
-        # Ensure attestations get renewed
-        hs.get_groups_attestation_renewer()
-
     # The following functions merely route the query to the local groups server
     # or federation depending on if the group is local or remote
 
     get_group_profile = _create_rerouter("get_group_profile")
-    update_group_profile = _create_rerouter("update_group_profile")
     get_rooms_in_group = _create_rerouter("get_rooms_in_group")
-
     get_invited_users_in_group = _create_rerouter("get_invited_users_in_group")
-
-    add_room_to_group = _create_rerouter("add_room_to_group")
-    update_room_in_group = _create_rerouter("update_room_in_group")
-    remove_room_from_group = _create_rerouter("remove_room_from_group")
-
-    update_group_summary_room = _create_rerouter("update_group_summary_room")
-    delete_group_summary_room = _create_rerouter("delete_group_summary_room")
-
-    update_group_category = _create_rerouter("update_group_category")
-    delete_group_category = _create_rerouter("delete_group_category")
     get_group_category = _create_rerouter("get_group_category")
     get_group_categories = _create_rerouter("get_group_categories")
-
-    update_group_summary_user = _create_rerouter("update_group_summary_user")
-    delete_group_summary_user = _create_rerouter("delete_group_summary_user")
-
-    update_group_role = _create_rerouter("update_group_role")
-    delete_group_role = _create_rerouter("delete_group_role")
     get_group_role = _create_rerouter("get_group_role")
     get_group_roles = _create_rerouter("get_group_roles")
 
-    set_group_join_policy = _create_rerouter("set_group_join_policy")
-
     @defer.inlineCallbacks
     def get_group_summary(self, group_id, requester_user_id):
         """Get the group summary for a group.
@@ -170,6 +147,144 @@ class GroupsLocalHandler(object):
         return res
 
     @defer.inlineCallbacks
+    def get_users_in_group(self, group_id, requester_user_id):
+        """Get users in a group
+        """
+        if self.is_mine_id(group_id):
+            res = yield self.groups_server_handler.get_users_in_group(
+                group_id, requester_user_id
+            )
+            return res
+
+        group_server_name = get_domain_from_id(group_id)
+
+        try:
+            res = yield self.transport_client.get_users_in_group(
+                get_domain_from_id(group_id), group_id, requester_user_id
+            )
+        except HttpResponseException as e:
+            raise e.to_synapse_error()
+        except RequestSendFailed:
+            raise SynapseError(502, "Failed to contact group server")
+
+        chunk = res["chunk"]
+        valid_entries = []
+        for entry in chunk:
+            g_user_id = entry["user_id"]
+            attestation = entry.pop("attestation", {})
+            try:
+                if get_domain_from_id(g_user_id) != group_server_name:
+                    yield self.attestations.verify_attestation(
+                        attestation,
+                        group_id=group_id,
+                        user_id=g_user_id,
+                        server_name=get_domain_from_id(g_user_id),
+                    )
+                valid_entries.append(entry)
+            except Exception as e:
+                logger.info("Failed to verify user is in group: %s", e)
+
+        res["chunk"] = valid_entries
+
+        return res
+
+    @defer.inlineCallbacks
+    def get_joined_groups(self, user_id):
+        group_ids = yield self.store.get_joined_groups(user_id)
+        return {"groups": group_ids}
+
+    @defer.inlineCallbacks
+    def get_publicised_groups_for_user(self, user_id):
+        if self.hs.is_mine_id(user_id):
+            result = yield self.store.get_publicised_groups_for_user(user_id)
+
+            # Check AS associated groups for this user - this depends on the
+            # RegExps in the AS registration file (under `users`)
+            for app_service in self.store.get_app_services():
+                result.extend(app_service.get_groups_for_user(user_id))
+
+            return {"groups": result}
+        else:
+            try:
+                bulk_result = yield self.transport_client.bulk_get_publicised_groups(
+                    get_domain_from_id(user_id), [user_id]
+                )
+            except HttpResponseException as e:
+                raise e.to_synapse_error()
+            except RequestSendFailed:
+                raise SynapseError(502, "Failed to contact group server")
+
+            result = bulk_result.get("users", {}).get(user_id)
+            # TODO: Verify attestations
+            return {"groups": result}
+
+    @defer.inlineCallbacks
+    def bulk_get_publicised_groups(self, user_ids, proxy=True):
+        destinations = {}
+        local_users = set()
+
+        for user_id in user_ids:
+            if self.hs.is_mine_id(user_id):
+                local_users.add(user_id)
+            else:
+                destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
+
+        if not proxy and destinations:
+            raise SynapseError(400, "Some user_ids are not local")
+
+        results = {}
+        failed_results = []
+        for destination, dest_user_ids in iteritems(destinations):
+            try:
+                r = yield self.transport_client.bulk_get_publicised_groups(
+                    destination, list(dest_user_ids)
+                )
+                results.update(r["users"])
+            except Exception:
+                failed_results.extend(dest_user_ids)
+
+        for uid in local_users:
+            results[uid] = yield self.store.get_publicised_groups_for_user(uid)
+
+            # Check AS associated groups for this user - this depends on the
+            # RegExps in the AS registration file (under `users`)
+            for app_service in self.store.get_app_services():
+                results[uid].extend(app_service.get_groups_for_user(uid))
+
+        return {"users": results}
+
+
+class GroupsLocalHandler(GroupsLocalWorkerHandler):
+    def __init__(self, hs):
+        super(GroupsLocalHandler, self).__init__(hs)
+
+        # Ensure attestations get renewed
+        hs.get_groups_attestation_renewer()
+
+    # The following functions merely route the query to the local groups server
+    # or federation depending on if the group is local or remote
+
+    update_group_profile = _create_rerouter("update_group_profile")
+
+    add_room_to_group = _create_rerouter("add_room_to_group")
+    update_room_in_group = _create_rerouter("update_room_in_group")
+    remove_room_from_group = _create_rerouter("remove_room_from_group")
+
+    update_group_summary_room = _create_rerouter("update_group_summary_room")
+    delete_group_summary_room = _create_rerouter("delete_group_summary_room")
+
+    update_group_category = _create_rerouter("update_group_category")
+    delete_group_category = _create_rerouter("delete_group_category")
+
+    update_group_summary_user = _create_rerouter("update_group_summary_user")
+    delete_group_summary_user = _create_rerouter("delete_group_summary_user")
+
+    update_group_role = _create_rerouter("update_group_role")
+    delete_group_role = _create_rerouter("delete_group_role")
+
+    set_group_join_policy = _create_rerouter("set_group_join_policy")
+
+    @defer.inlineCallbacks
     def create_group(self, group_id, user_id, content):
         """Create a group
         """
@@ -220,48 +335,6 @@ class GroupsLocalHandler(object):
         return res
 
     @defer.inlineCallbacks
-    def get_users_in_group(self, group_id, requester_user_id):
-        """Get users in a group
-        """
-        if self.is_mine_id(group_id):
-            res = yield self.groups_server_handler.get_users_in_group(
-                group_id, requester_user_id
-            )
-            return res
-
-        group_server_name = get_domain_from_id(group_id)
-
-        try:
-            res = yield self.transport_client.get_users_in_group(
-                get_domain_from_id(group_id), group_id, requester_user_id
-            )
-        except HttpResponseException as e:
-            raise e.to_synapse_error()
-        except RequestSendFailed:
-            raise SynapseError(502, "Failed to contact group server")
-
-        chunk = res["chunk"]
-        valid_entries = []
-        for entry in chunk:
-            g_user_id = entry["user_id"]
-            attestation = entry.pop("attestation", {})
-            try:
-                if get_domain_from_id(g_user_id) != group_server_name:
-                    yield self.attestations.verify_attestation(
-                        attestation,
-                        group_id=group_id,
-                        user_id=g_user_id,
-                        server_name=get_domain_from_id(g_user_id),
-                    )
-                valid_entries.append(entry)
-            except Exception as e:
-                logger.info("Failed to verify user is in group: %s", e)
-
-        res["chunk"] = valid_entries
-
-        return res
-
-    @defer.inlineCallbacks
     def join_group(self, group_id, user_id, content):
         """Request to join a group
         """
@@ -452,68 +525,3 @@ class GroupsLocalHandler(object):
             group_id, user_id, membership="leave"
         )
         self.notifier.on_new_event("groups_key", token, users=[user_id])
-
-    @defer.inlineCallbacks
-    def get_joined_groups(self, user_id):
-        group_ids = yield self.store.get_joined_groups(user_id)
-        return {"groups": group_ids}
-
-    @defer.inlineCallbacks
-    def get_publicised_groups_for_user(self, user_id):
-        if self.hs.is_mine_id(user_id):
-            result = yield self.store.get_publicised_groups_for_user(user_id)
-
-            # Check AS associated groups for this user - this depends on the
-            # RegExps in the AS registration file (under `users`)
-            for app_service in self.store.get_app_services():
-                result.extend(app_service.get_groups_for_user(user_id))
-
-            return {"groups": result}
-        else:
-            try:
-                bulk_result = yield self.transport_client.bulk_get_publicised_groups(
-                    get_domain_from_id(user_id), [user_id]
-                )
-            except HttpResponseException as e:
-                raise e.to_synapse_error()
-            except RequestSendFailed:
-                raise SynapseError(502, "Failed to contact group server")
-
-            result = bulk_result.get("users", {}).get(user_id)
-            # TODO: Verify attestations
-            return {"groups": result}
-
-    @defer.inlineCallbacks
-    def bulk_get_publicised_groups(self, user_ids, proxy=True):
-        destinations = {}
-        local_users = set()
-
-        for user_id in user_ids:
-            if self.hs.is_mine_id(user_id):
-                local_users.add(user_id)
-            else:
-                destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
-
-        if not proxy and destinations:
-            raise SynapseError(400, "Some user_ids are not local")
-
-        results = {}
-        failed_results = []
-        for destination, dest_user_ids in iteritems(destinations):
-            try:
-                r = yield self.transport_client.bulk_get_publicised_groups(
-                    destination, list(dest_user_ids)
-                )
-                results.update(r["users"])
-            except Exception:
-                failed_results.extend(dest_user_ids)
-
-        for uid in local_users:
-            results[uid] = yield self.store.get_publicised_groups_for_user(uid)
-
-            # Check AS associated groups for this user - this depends on the
-            # RegExps in the AS registration file (under `users`)
-            for app_service in self.store.get_app_services():
-                results[uid].extend(app_service.get_groups_for_user(uid))
-
-        return {"users": results}
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 69a4ae42f9..2d4fd08cf5 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -13,15 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import DataStore
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore
 from synapse.storage.database import Database
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-from ._base import BaseSlavedStore, __func__
-from ._slaved_id_tracker import SlavedIdTracker
 
-
-class SlavedGroupServerStore(BaseSlavedStore):
+class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
     def __init__(self, database: Database, db_conn, hs):
         super(SlavedGroupServerStore, self).__init__(database, db_conn, hs)
 
@@ -35,9 +34,8 @@ class SlavedGroupServerStore(BaseSlavedStore):
             self._group_updates_id_gen.get_current_token(),
         )
 
-    get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
-    get_group_stream_token = __func__(DataStore.get_group_stream_token)
-    get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
+    def get_group_stream_token(self):
+        return self._group_updates_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(SlavedGroupServerStore, self).stream_positions()
diff --git a/synapse/server.py b/synapse/server.py
index 7926867b77..fd2f69e928 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -50,7 +50,7 @@ from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.sender import FederationSender
 from synapse.federation.transport.client import TransportLayerClient
 from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
-from synapse.groups.groups_server import GroupsServerHandler
+from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
 from synapse.handlers import Handlers
 from synapse.handlers.account_validity import AccountValidityHandler
 from synapse.handlers.acme import AcmeHandler
@@ -62,7 +62,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
-from synapse.handlers.groups_local import GroupsLocalHandler
+from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
 from synapse.handlers.message import EventCreationHandler, MessageHandler
 from synapse.handlers.pagination import PaginationHandler
@@ -460,10 +460,16 @@ class HomeServer(object):
         return UserDirectoryHandler(self)
 
     def build_groups_local_handler(self):
-        return GroupsLocalHandler(self)
+        if self.config.worker_app:
+            return GroupsLocalWorkerHandler(self)
+        else:
+            return GroupsLocalHandler(self)
 
     def build_groups_server_handler(self):
-        return GroupsServerHandler(self)
+        if self.config.worker_app:
+            return GroupsServerWorkerHandler(self)
+        else:
+            return GroupsServerHandler(self)
 
     def build_groups_attestation_signing(self):
         return GroupAttestationSigning(self)
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index 6acd45e9f3..0963e6c250 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -27,21 +27,7 @@ _DEFAULT_CATEGORY_ID = ""
 _DEFAULT_ROLE_ID = ""
 
 
-class GroupServerStore(SQLBaseStore):
-    def set_group_join_policy(self, group_id, join_policy):
-        """Set the join policy of a group.
-
-        join_policy can be one of:
-         * "invite"
-         * "open"
-        """
-        return self.db.simple_update_one(
-            table="groups",
-            keyvalues={"group_id": group_id},
-            updatevalues={"join_policy": join_policy},
-            desc="set_group_join_policy",
-        )
-
+class GroupServerWorkerStore(SQLBaseStore):
     def get_group(self, group_id):
         return self.db.simple_select_one(
             table="groups",
@@ -157,6 +143,366 @@ class GroupServerStore(SQLBaseStore):
             "get_rooms_for_summary", _get_rooms_for_summary_txn
         )
 
+    @defer.inlineCallbacks
+    def get_group_categories(self, group_id):
+        rows = yield self.db.simple_select_list(
+            table="group_room_categories",
+            keyvalues={"group_id": group_id},
+            retcols=("category_id", "is_public", "profile"),
+            desc="get_group_categories",
+        )
+
+        return {
+            row["category_id"]: {
+                "is_public": row["is_public"],
+                "profile": json.loads(row["profile"]),
+            }
+            for row in rows
+        }
+
+    @defer.inlineCallbacks
+    def get_group_category(self, group_id, category_id):
+        category = yield self.db.simple_select_one(
+            table="group_room_categories",
+            keyvalues={"group_id": group_id, "category_id": category_id},
+            retcols=("is_public", "profile"),
+            desc="get_group_category",
+        )
+
+        category["profile"] = json.loads(category["profile"])
+
+        return category
+
+    @defer.inlineCallbacks
+    def get_group_roles(self, group_id):
+        rows = yield self.db.simple_select_list(
+            table="group_roles",
+            keyvalues={"group_id": group_id},
+            retcols=("role_id", "is_public", "profile"),
+            desc="get_group_roles",
+        )
+
+        return {
+            row["role_id"]: {
+                "is_public": row["is_public"],
+                "profile": json.loads(row["profile"]),
+            }
+            for row in rows
+        }
+
+    @defer.inlineCallbacks
+    def get_group_role(self, group_id, role_id):
+        role = yield self.db.simple_select_one(
+            table="group_roles",
+            keyvalues={"group_id": group_id, "role_id": role_id},
+            retcols=("is_public", "profile"),
+            desc="get_group_role",
+        )
+
+        role["profile"] = json.loads(role["profile"])
+
+        return role
+
+    def get_local_groups_for_room(self, room_id):
+        """Get all of the local group that contain a given room
+        Args:
+            room_id (str): The ID of a room
+        Returns:
+            Deferred[list[str]]: A twisted.Deferred containing a list of group ids
+                containing this room
+        """
+        return self.db.simple_select_onecol(
+            table="group_rooms",
+            keyvalues={"room_id": room_id},
+            retcol="group_id",
+            desc="get_local_groups_for_room",
+        )
+
+    def get_users_for_summary_by_role(self, group_id, include_private=False):
+        """Get the users and roles that should be included in a summary request
+
+        Returns ([users], [roles])
+        """
+
+        def _get_users_for_summary_txn(txn):
+            keyvalues = {"group_id": group_id}
+            if not include_private:
+                keyvalues["is_public"] = True
+
+            sql = """
+                SELECT user_id, is_public, role_id, user_order
+                FROM group_summary_users
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            users = [
+                {
+                    "user_id": row[0],
+                    "is_public": row[1],
+                    "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
+                    "order": row[3],
+                }
+                for row in txn
+            ]
+
+            sql = """
+                SELECT role_id, is_public, profile, role_order
+                FROM group_summary_roles
+                INNER JOIN group_roles USING (group_id, role_id)
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            roles = {
+                row[0]: {
+                    "is_public": row[1],
+                    "profile": json.loads(row[2]),
+                    "order": row[3],
+                }
+                for row in txn
+            }
+
+            return users, roles
+
+        return self.db.runInteraction(
+            "get_users_for_summary_by_role", _get_users_for_summary_txn
+        )
+
+    def is_user_in_group(self, user_id, group_id):
+        return self.db.simple_select_one_onecol(
+            table="group_users",
+            keyvalues={"group_id": group_id, "user_id": user_id},
+            retcol="user_id",
+            allow_none=True,
+            desc="is_user_in_group",
+        ).addCallback(lambda r: bool(r))
+
+    def is_user_admin_in_group(self, group_id, user_id):
+        return self.db.simple_select_one_onecol(
+            table="group_users",
+            keyvalues={"group_id": group_id, "user_id": user_id},
+            retcol="is_admin",
+            allow_none=True,
+            desc="is_user_admin_in_group",
+        )
+
+    def is_user_invited_to_local_group(self, group_id, user_id):
+        """Has the group server invited a user?
+        """
+        return self.db.simple_select_one_onecol(
+            table="group_invites",
+            keyvalues={"group_id": group_id, "user_id": user_id},
+            retcol="user_id",
+            desc="is_user_invited_to_local_group",
+            allow_none=True,
+        )
+
+    def get_users_membership_info_in_group(self, group_id, user_id):
+        """Get a dict describing the membership of a user in a group.
+
+        Example if joined:
+
+            {
+                "membership": "join",
+                "is_public": True,
+                "is_privileged": False,
+            }
+
+        Returns an empty dict if the user is not join/invite/etc
+        """
+
+        def _get_users_membership_in_group_txn(txn):
+            row = self.db.simple_select_one_txn(
+                txn,
+                table="group_users",
+                keyvalues={"group_id": group_id, "user_id": user_id},
+                retcols=("is_admin", "is_public"),
+                allow_none=True,
+            )
+
+            if row:
+                return {
+                    "membership": "join",
+                    "is_public": row["is_public"],
+                    "is_privileged": row["is_admin"],
+                }
+
+            row = self.db.simple_select_one_onecol_txn(
+                txn,
+                table="group_invites",
+                keyvalues={"group_id": group_id, "user_id": user_id},
+                retcol="user_id",
+                allow_none=True,
+            )
+
+            if row:
+                return {"membership": "invite"}
+
+            return {}
+
+        return self.db.runInteraction(
+            "get_users_membership_info_in_group", _get_users_membership_in_group_txn
+        )
+
+    def get_publicised_groups_for_user(self, user_id):
+        """Get all groups a user is publicising
+        """
+        return self.db.simple_select_onecol(
+            table="local_group_membership",
+            keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
+            retcol="group_id",
+            desc="get_publicised_groups_for_user",
+        )
+
+    def get_attestations_need_renewals(self, valid_until_ms):
+        """Get all attestations that need to be renewed until givent time
+        """
+
+        def _get_attestations_need_renewals_txn(txn):
+            sql = """
+                SELECT group_id, user_id FROM group_attestations_renewals
+                WHERE valid_until_ms <= ?
+            """
+            txn.execute(sql, (valid_until_ms,))
+            return self.db.cursor_to_dict(txn)
+
+        return self.db.runInteraction(
+            "get_attestations_need_renewals", _get_attestations_need_renewals_txn
+        )
+
+    @defer.inlineCallbacks
+    def get_remote_attestation(self, group_id, user_id):
+        """Get the attestation that proves the remote agrees that the user is
+        in the group.
+        """
+        row = yield self.db.simple_select_one(
+            table="group_attestations_remote",
+            keyvalues={"group_id": group_id, "user_id": user_id},
+            retcols=("valid_until_ms", "attestation_json"),
+            desc="get_remote_attestation",
+            allow_none=True,
+        )
+
+        now = int(self._clock.time_msec())
+        if row and now < row["valid_until_ms"]:
+            return json.loads(row["attestation_json"])
+
+        return None
+
+    def get_joined_groups(self, user_id):
+        return self.db.simple_select_onecol(
+            table="local_group_membership",
+            keyvalues={"user_id": user_id, "membership": "join"},
+            retcol="group_id",
+            desc="get_joined_groups",
+        )
+
+    def get_all_groups_for_user(self, user_id, now_token):
+        def _get_all_groups_for_user_txn(txn):
+            sql = """
+                SELECT group_id, type, membership, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND membership != 'leave'
+                    AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, now_token))
+            return [
+                {
+                    "group_id": row[0],
+                    "type": row[1],
+                    "membership": row[2],
+                    "content": json.loads(row[3]),
+                }
+                for row in txn
+            ]
+
+        return self.db.runInteraction(
+            "get_all_groups_for_user", _get_all_groups_for_user_txn
+        )
+
+    def get_groups_changes_for_user(self, user_id, from_token, to_token):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_entity_changed(
+            user_id, from_token
+        )
+        if not has_changed:
+            return defer.succeed([])
+
+        def _get_groups_changes_for_user_txn(txn):
+            sql = """
+                SELECT group_id, membership, type, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, from_token, to_token))
+            return [
+                {
+                    "group_id": group_id,
+                    "membership": membership,
+                    "type": gtype,
+                    "content": json.loads(content_json),
+                }
+                for group_id, membership, gtype, content_json in txn
+            ]
+
+        return self.db.runInteraction(
+            "get_groups_changes_for_user", _get_groups_changes_for_user_txn
+        )
+
+    def get_all_groups_changes(self, from_token, to_token, limit):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
+            from_token
+        )
+        if not has_changed:
+            return defer.succeed([])
+
+        def _get_all_groups_changes_txn(txn):
+            sql = """
+                SELECT stream_id, group_id, user_id, type, content
+                FROM local_group_updates
+                WHERE ? < stream_id AND stream_id <= ?
+                LIMIT ?
+            """
+            txn.execute(sql, (from_token, to_token, limit))
+            return [
+                (stream_id, group_id, user_id, gtype, json.loads(content_json))
+                for stream_id, group_id, user_id, gtype, content_json in txn
+            ]
+
+        return self.db.runInteraction(
+            "get_all_groups_changes", _get_all_groups_changes_txn
+        )
+
+
+class GroupServerStore(GroupServerWorkerStore):
+    def set_group_join_policy(self, group_id, join_policy):
+        """Set the join policy of a group.
+
+        join_policy can be one of:
+         * "invite"
+         * "open"
+        """
+        return self.db.simple_update_one(
+            table="groups",
+            keyvalues={"group_id": group_id},
+            updatevalues={"join_policy": join_policy},
+            desc="set_group_join_policy",
+        )
+
     def add_room_to_summary(self, group_id, room_id, category_id, order, is_public):
         return self.db.runInteraction(
             "add_room_to_summary",
@@ -299,36 +645,6 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_room_from_summary",
         )
 
-    @defer.inlineCallbacks
-    def get_group_categories(self, group_id):
-        rows = yield self.db.simple_select_list(
-            table="group_room_categories",
-            keyvalues={"group_id": group_id},
-            retcols=("category_id", "is_public", "profile"),
-            desc="get_group_categories",
-        )
-
-        return {
-            row["category_id"]: {
-                "is_public": row["is_public"],
-                "profile": json.loads(row["profile"]),
-            }
-            for row in rows
-        }
-
-    @defer.inlineCallbacks
-    def get_group_category(self, group_id, category_id):
-        category = yield self.db.simple_select_one(
-            table="group_room_categories",
-            keyvalues={"group_id": group_id, "category_id": category_id},
-            retcols=("is_public", "profile"),
-            desc="get_group_category",
-        )
-
-        category["profile"] = json.loads(category["profile"])
-
-        return category
-
     def upsert_group_category(self, group_id, category_id, profile, is_public):
         """Add/update room category for group
         """
@@ -360,36 +676,6 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_group_category",
         )
 
-    @defer.inlineCallbacks
-    def get_group_roles(self, group_id):
-        rows = yield self.db.simple_select_list(
-            table="group_roles",
-            keyvalues={"group_id": group_id},
-            retcols=("role_id", "is_public", "profile"),
-            desc="get_group_roles",
-        )
-
-        return {
-            row["role_id"]: {
-                "is_public": row["is_public"],
-                "profile": json.loads(row["profile"]),
-            }
-            for row in rows
-        }
-
-    @defer.inlineCallbacks
-    def get_group_role(self, group_id, role_id):
-        role = yield self.db.simple_select_one(
-            table="group_roles",
-            keyvalues={"group_id": group_id, "role_id": role_id},
-            retcols=("is_public", "profile"),
-            desc="get_group_role",
-        )
-
-        role["profile"] = json.loads(role["profile"])
-
-        return role
-
     def upsert_group_role(self, group_id, role_id, profile, is_public):
         """Add/remove user role
         """
@@ -555,100 +841,6 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_user_from_summary",
         )
 
-    def get_local_groups_for_room(self, room_id):
-        """Get all of the local group that contain a given room
-        Args:
-            room_id (str): The ID of a room
-        Returns:
-            Deferred[list[str]]: A twisted.Deferred containing a list of group ids
-                containing this room
-        """
-        return self.db.simple_select_onecol(
-            table="group_rooms",
-            keyvalues={"room_id": room_id},
-            retcol="group_id",
-            desc="get_local_groups_for_room",
-        )
-
-    def get_users_for_summary_by_role(self, group_id, include_private=False):
-        """Get the users and roles that should be included in a summary request
-
-        Returns ([users], [roles])
-        """
-
-        def _get_users_for_summary_txn(txn):
-            keyvalues = {"group_id": group_id}
-            if not include_private:
-                keyvalues["is_public"] = True
-
-            sql = """
-                SELECT user_id, is_public, role_id, user_order
-                FROM group_summary_users
-                WHERE group_id = ?
-            """
-
-            if not include_private:
-                sql += " AND is_public = ?"
-                txn.execute(sql, (group_id, True))
-            else:
-                txn.execute(sql, (group_id,))
-
-            users = [
-                {
-                    "user_id": row[0],
-                    "is_public": row[1],
-                    "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
-                    "order": row[3],
-                }
-                for row in txn
-            ]
-
-            sql = """
-                SELECT role_id, is_public, profile, role_order
-                FROM group_summary_roles
-                INNER JOIN group_roles USING (group_id, role_id)
-                WHERE group_id = ?
-            """
-
-            if not include_private:
-                sql += " AND is_public = ?"
-                txn.execute(sql, (group_id, True))
-            else:
-                txn.execute(sql, (group_id,))
-
-            roles = {
-                row[0]: {
-                    "is_public": row[1],
-                    "profile": json.loads(row[2]),
-                    "order": row[3],
-                }
-                for row in txn
-            }
-
-            return users, roles
-
-        return self.db.runInteraction(
-            "get_users_for_summary_by_role", _get_users_for_summary_txn
-        )
-
-    def is_user_in_group(self, user_id, group_id):
-        return self.db.simple_select_one_onecol(
-            table="group_users",
-            keyvalues={"group_id": group_id, "user_id": user_id},
-            retcol="user_id",
-            allow_none=True,
-            desc="is_user_in_group",
-        ).addCallback(lambda r: bool(r))
-
-    def is_user_admin_in_group(self, group_id, user_id):
-        return self.db.simple_select_one_onecol(
-            table="group_users",
-            keyvalues={"group_id": group_id, "user_id": user_id},
-            retcol="is_admin",
-            allow_none=True,
-            desc="is_user_admin_in_group",
-        )
-
     def add_group_invite(self, group_id, user_id):
         """Record that the group server has invited a user
         """
@@ -658,64 +850,6 @@ class GroupServerStore(SQLBaseStore):
             desc="add_group_invite",
         )
 
-    def is_user_invited_to_local_group(self, group_id, user_id):
-        """Has the group server invited a user?
-        """
-        return self.db.simple_select_one_onecol(
-            table="group_invites",
-            keyvalues={"group_id": group_id, "user_id": user_id},
-            retcol="user_id",
-            desc="is_user_invited_to_local_group",
-            allow_none=True,
-        )
-
-    def get_users_membership_info_in_group(self, group_id, user_id):
-        """Get a dict describing the membership of a user in a group.
-
-        Example if joined:
-
-            {
-                "membership": "join",
-                "is_public": True,
-                "is_privileged": False,
-            }
-
-        Returns an empty dict if the user is not join/invite/etc
-        """
-
-        def _get_users_membership_in_group_txn(txn):
-            row = self.db.simple_select_one_txn(
-                txn,
-                table="group_users",
-                keyvalues={"group_id": group_id, "user_id": user_id},
-                retcols=("is_admin", "is_public"),
-                allow_none=True,
-            )
-
-            if row:
-                return {
-                    "membership": "join",
-                    "is_public": row["is_public"],
-                    "is_privileged": row["is_admin"],
-                }
-
-            row = self.db.simple_select_one_onecol_txn(
-                txn,
-                table="group_invites",
-                keyvalues={"group_id": group_id, "user_id": user_id},
-                retcol="user_id",
-                allow_none=True,
-            )
-
-            if row:
-                return {"membership": "invite"}
-
-            return {}
-
-        return self.db.runInteraction(
-            "get_users_membership_info_in_group", _get_users_membership_in_group_txn
-        )
-
     def add_user_to_group(
         self,
         group_id,
@@ -846,16 +980,6 @@ class GroupServerStore(SQLBaseStore):
             "remove_room_from_group", _remove_room_from_group_txn
         )
 
-    def get_publicised_groups_for_user(self, user_id):
-        """Get all groups a user is publicising
-        """
-        return self.db.simple_select_onecol(
-            table="local_group_membership",
-            keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
-            retcol="group_id",
-            desc="get_publicised_groups_for_user",
-        )
-
     def update_group_publicity(self, group_id, user_id, publicise):
         """Update whether the user is publicising their membership of the group
         """
@@ -1000,22 +1124,6 @@ class GroupServerStore(SQLBaseStore):
             desc="update_group_profile",
         )
 
-    def get_attestations_need_renewals(self, valid_until_ms):
-        """Get all attestations that need to be renewed until givent time
-        """
-
-        def _get_attestations_need_renewals_txn(txn):
-            sql = """
-                SELECT group_id, user_id FROM group_attestations_renewals
-                WHERE valid_until_ms <= ?
-            """
-            txn.execute(sql, (valid_until_ms,))
-            return self.db.cursor_to_dict(txn)
-
-        return self.db.runInteraction(
-            "get_attestations_need_renewals", _get_attestations_need_renewals_txn
-        )
-
     def update_attestation_renewal(self, group_id, user_id, attestation):
         """Update an attestation that we have renewed
         """
@@ -1054,112 +1162,6 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_attestation_renewal",
         )
 
-    @defer.inlineCallbacks
-    def get_remote_attestation(self, group_id, user_id):
-        """Get the attestation that proves the remote agrees that the user is
-        in the group.
-        """
-        row = yield self.db.simple_select_one(
-            table="group_attestations_remote",
-            keyvalues={"group_id": group_id, "user_id": user_id},
-            retcols=("valid_until_ms", "attestation_json"),
-            desc="get_remote_attestation",
-            allow_none=True,
-        )
-
-        now = int(self._clock.time_msec())
-        if row and now < row["valid_until_ms"]:
-            return json.loads(row["attestation_json"])
-
-        return None
-
-    def get_joined_groups(self, user_id):
-        return self.db.simple_select_onecol(
-            table="local_group_membership",
-            keyvalues={"user_id": user_id, "membership": "join"},
-            retcol="group_id",
-            desc="get_joined_groups",
-        )
-
-    def get_all_groups_for_user(self, user_id, now_token):
-        def _get_all_groups_for_user_txn(txn):
-            sql = """
-                SELECT group_id, type, membership, u.content
-                FROM local_group_updates AS u
-                INNER JOIN local_group_membership USING (group_id, user_id)
-                WHERE user_id = ? AND membership != 'leave'
-                    AND stream_id <= ?
-            """
-            txn.execute(sql, (user_id, now_token))
-            return [
-                {
-                    "group_id": row[0],
-                    "type": row[1],
-                    "membership": row[2],
-                    "content": json.loads(row[3]),
-                }
-                for row in txn
-            ]
-
-        return self.db.runInteraction(
-            "get_all_groups_for_user", _get_all_groups_for_user_txn
-        )
-
-    def get_groups_changes_for_user(self, user_id, from_token, to_token):
-        from_token = int(from_token)
-        has_changed = self._group_updates_stream_cache.has_entity_changed(
-            user_id, from_token
-        )
-        if not has_changed:
-            return defer.succeed([])
-
-        def _get_groups_changes_for_user_txn(txn):
-            sql = """
-                SELECT group_id, membership, type, u.content
-                FROM local_group_updates AS u
-                INNER JOIN local_group_membership USING (group_id, user_id)
-                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
-            """
-            txn.execute(sql, (user_id, from_token, to_token))
-            return [
-                {
-                    "group_id": group_id,
-                    "membership": membership,
-                    "type": gtype,
-                    "content": json.loads(content_json),
-                }
-                for group_id, membership, gtype, content_json in txn
-            ]
-
-        return self.db.runInteraction(
-            "get_groups_changes_for_user", _get_groups_changes_for_user_txn
-        )
-
-    def get_all_groups_changes(self, from_token, to_token, limit):
-        from_token = int(from_token)
-        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
-            from_token
-        )
-        if not has_changed:
-            return defer.succeed([])
-
-        def _get_all_groups_changes_txn(txn):
-            sql = """
-                SELECT stream_id, group_id, user_id, type, content
-                FROM local_group_updates
-                WHERE ? < stream_id AND stream_id <= ?
-                LIMIT ?
-            """
-            txn.execute(sql, (from_token, to_token, limit))
-            return [
-                (stream_id, group_id, user_id, gtype, json.loads(content_json))
-                for stream_id, group_id, user_id, gtype, content_json in txn
-            ]
-
-        return self.db.runInteraction(
-            "get_all_groups_changes", _get_all_groups_changes_txn
-        )
-
     def get_group_stream_token(self):
         return self._group_updates_id_gen.get_current_token()