summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-03-23 17:10:38 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2020-03-23 17:10:38 +0000
commit5bb1eb61367558f20a5fc350b743a020896e33f3 (patch)
treed539122fffa66e0de05645f0f8b91fccc115b2f9 /synapse/handlers
parentAdmin api to add an email address (#6789) (diff)
parentAllow moving group read APIs to workers (#6866) (diff)
downloadsynapse-5bb1eb61367558f20a5fc350b743a020896e33f3.tar.xz
Allow moving group read APIs to workers (#6866)
* commit 'de2d26737':
  Allow moving group read APIs to workers (#6866)
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/groups_local.py270
1 files changed, 139 insertions, 131 deletions
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 319565510f..ad22415782 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -63,7 +63,7 @@ def _create_rerouter(func_name):
     return f
 
 
-class GroupsLocalHandler(object):
+class GroupsLocalWorkerHandler(object):
     def __init__(self, hs):
         self.hs = hs
         self.store = hs.get_datastore()
@@ -81,40 +81,17 @@ class GroupsLocalHandler(object):
 
         self.profile_handler = hs.get_profile_handler()
 
-        # Ensure attestations get renewed
-        hs.get_groups_attestation_renewer()
-
     # The following functions merely route the query to the local groups server
     # or federation depending on if the group is local or remote
 
     get_group_profile = _create_rerouter("get_group_profile")
-    update_group_profile = _create_rerouter("update_group_profile")
     get_rooms_in_group = _create_rerouter("get_rooms_in_group")
-
     get_invited_users_in_group = _create_rerouter("get_invited_users_in_group")
-
-    add_room_to_group = _create_rerouter("add_room_to_group")
-    update_room_in_group = _create_rerouter("update_room_in_group")
-    remove_room_from_group = _create_rerouter("remove_room_from_group")
-
-    update_group_summary_room = _create_rerouter("update_group_summary_room")
-    delete_group_summary_room = _create_rerouter("delete_group_summary_room")
-
-    update_group_category = _create_rerouter("update_group_category")
-    delete_group_category = _create_rerouter("delete_group_category")
     get_group_category = _create_rerouter("get_group_category")
     get_group_categories = _create_rerouter("get_group_categories")
-
-    update_group_summary_user = _create_rerouter("update_group_summary_user")
-    delete_group_summary_user = _create_rerouter("delete_group_summary_user")
-
-    update_group_role = _create_rerouter("update_group_role")
-    delete_group_role = _create_rerouter("delete_group_role")
     get_group_role = _create_rerouter("get_group_role")
     get_group_roles = _create_rerouter("get_group_roles")
 
-    set_group_join_policy = _create_rerouter("set_group_join_policy")
-
     @defer.inlineCallbacks
     def get_group_summary(self, group_id, requester_user_id):
         """Get the group summary for a group.
@@ -170,6 +147,144 @@ class GroupsLocalHandler(object):
         return res
 
     @defer.inlineCallbacks
+    def get_users_in_group(self, group_id, requester_user_id):
+        """Get users in a group
+        """
+        if self.is_mine_id(group_id):
+            res = yield self.groups_server_handler.get_users_in_group(
+                group_id, requester_user_id
+            )
+            return res
+
+        group_server_name = get_domain_from_id(group_id)
+
+        try:
+            res = yield self.transport_client.get_users_in_group(
+                get_domain_from_id(group_id), group_id, requester_user_id
+            )
+        except HttpResponseException as e:
+            raise e.to_synapse_error()
+        except RequestSendFailed:
+            raise SynapseError(502, "Failed to contact group server")
+
+        chunk = res["chunk"]
+        valid_entries = []
+        for entry in chunk:
+            g_user_id = entry["user_id"]
+            attestation = entry.pop("attestation", {})
+            try:
+                if get_domain_from_id(g_user_id) != group_server_name:
+                    yield self.attestations.verify_attestation(
+                        attestation,
+                        group_id=group_id,
+                        user_id=g_user_id,
+                        server_name=get_domain_from_id(g_user_id),
+                    )
+                valid_entries.append(entry)
+            except Exception as e:
+                logger.info("Failed to verify user is in group: %s", e)
+
+        res["chunk"] = valid_entries
+
+        return res
+
+    @defer.inlineCallbacks
+    def get_joined_groups(self, user_id):
+        group_ids = yield self.store.get_joined_groups(user_id)
+        return {"groups": group_ids}
+
+    @defer.inlineCallbacks
+    def get_publicised_groups_for_user(self, user_id):
+        if self.hs.is_mine_id(user_id):
+            result = yield self.store.get_publicised_groups_for_user(user_id)
+
+            # Check AS associated groups for this user - this depends on the
+            # RegExps in the AS registration file (under `users`)
+            for app_service in self.store.get_app_services():
+                result.extend(app_service.get_groups_for_user(user_id))
+
+            return {"groups": result}
+        else:
+            try:
+                bulk_result = yield self.transport_client.bulk_get_publicised_groups(
+                    get_domain_from_id(user_id), [user_id]
+                )
+            except HttpResponseException as e:
+                raise e.to_synapse_error()
+            except RequestSendFailed:
+                raise SynapseError(502, "Failed to contact group server")
+
+            result = bulk_result.get("users", {}).get(user_id)
+            # TODO: Verify attestations
+            return {"groups": result}
+
+    @defer.inlineCallbacks
+    def bulk_get_publicised_groups(self, user_ids, proxy=True):
+        destinations = {}
+        local_users = set()
+
+        for user_id in user_ids:
+            if self.hs.is_mine_id(user_id):
+                local_users.add(user_id)
+            else:
+                destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
+
+        if not proxy and destinations:
+            raise SynapseError(400, "Some user_ids are not local")
+
+        results = {}
+        failed_results = []
+        for destination, dest_user_ids in iteritems(destinations):
+            try:
+                r = yield self.transport_client.bulk_get_publicised_groups(
+                    destination, list(dest_user_ids)
+                )
+                results.update(r["users"])
+            except Exception:
+                failed_results.extend(dest_user_ids)
+
+        for uid in local_users:
+            results[uid] = yield self.store.get_publicised_groups_for_user(uid)
+
+            # Check AS associated groups for this user - this depends on the
+            # RegExps in the AS registration file (under `users`)
+            for app_service in self.store.get_app_services():
+                results[uid].extend(app_service.get_groups_for_user(uid))
+
+        return {"users": results}
+
+
+class GroupsLocalHandler(GroupsLocalWorkerHandler):
+    def __init__(self, hs):
+        super(GroupsLocalHandler, self).__init__(hs)
+
+        # Ensure attestations get renewed
+        hs.get_groups_attestation_renewer()
+
+    # The following functions merely route the query to the local groups server
+    # or federation depending on if the group is local or remote
+
+    update_group_profile = _create_rerouter("update_group_profile")
+
+    add_room_to_group = _create_rerouter("add_room_to_group")
+    update_room_in_group = _create_rerouter("update_room_in_group")
+    remove_room_from_group = _create_rerouter("remove_room_from_group")
+
+    update_group_summary_room = _create_rerouter("update_group_summary_room")
+    delete_group_summary_room = _create_rerouter("delete_group_summary_room")
+
+    update_group_category = _create_rerouter("update_group_category")
+    delete_group_category = _create_rerouter("delete_group_category")
+
+    update_group_summary_user = _create_rerouter("update_group_summary_user")
+    delete_group_summary_user = _create_rerouter("delete_group_summary_user")
+
+    update_group_role = _create_rerouter("update_group_role")
+    delete_group_role = _create_rerouter("delete_group_role")
+
+    set_group_join_policy = _create_rerouter("set_group_join_policy")
+
+    @defer.inlineCallbacks
     def create_group(self, group_id, user_id, content):
         """Create a group
         """
@@ -220,48 +335,6 @@ class GroupsLocalHandler(object):
         return res
 
     @defer.inlineCallbacks
-    def get_users_in_group(self, group_id, requester_user_id):
-        """Get users in a group
-        """
-        if self.is_mine_id(group_id):
-            res = yield self.groups_server_handler.get_users_in_group(
-                group_id, requester_user_id
-            )
-            return res
-
-        group_server_name = get_domain_from_id(group_id)
-
-        try:
-            res = yield self.transport_client.get_users_in_group(
-                get_domain_from_id(group_id), group_id, requester_user_id
-            )
-        except HttpResponseException as e:
-            raise e.to_synapse_error()
-        except RequestSendFailed:
-            raise SynapseError(502, "Failed to contact group server")
-
-        chunk = res["chunk"]
-        valid_entries = []
-        for entry in chunk:
-            g_user_id = entry["user_id"]
-            attestation = entry.pop("attestation", {})
-            try:
-                if get_domain_from_id(g_user_id) != group_server_name:
-                    yield self.attestations.verify_attestation(
-                        attestation,
-                        group_id=group_id,
-                        user_id=g_user_id,
-                        server_name=get_domain_from_id(g_user_id),
-                    )
-                valid_entries.append(entry)
-            except Exception as e:
-                logger.info("Failed to verify user is in group: %s", e)
-
-        res["chunk"] = valid_entries
-
-        return res
-
-    @defer.inlineCallbacks
     def join_group(self, group_id, user_id, content):
         """Request to join a group
         """
@@ -452,68 +525,3 @@ class GroupsLocalHandler(object):
             group_id, user_id, membership="leave"
         )
         self.notifier.on_new_event("groups_key", token, users=[user_id])
-
-    @defer.inlineCallbacks
-    def get_joined_groups(self, user_id):
-        group_ids = yield self.store.get_joined_groups(user_id)
-        return {"groups": group_ids}
-
-    @defer.inlineCallbacks
-    def get_publicised_groups_for_user(self, user_id):
-        if self.hs.is_mine_id(user_id):
-            result = yield self.store.get_publicised_groups_for_user(user_id)
-
-            # Check AS associated groups for this user - this depends on the
-            # RegExps in the AS registration file (under `users`)
-            for app_service in self.store.get_app_services():
-                result.extend(app_service.get_groups_for_user(user_id))
-
-            return {"groups": result}
-        else:
-            try:
-                bulk_result = yield self.transport_client.bulk_get_publicised_groups(
-                    get_domain_from_id(user_id), [user_id]
-                )
-            except HttpResponseException as e:
-                raise e.to_synapse_error()
-            except RequestSendFailed:
-                raise SynapseError(502, "Failed to contact group server")
-
-            result = bulk_result.get("users", {}).get(user_id)
-            # TODO: Verify attestations
-            return {"groups": result}
-
-    @defer.inlineCallbacks
-    def bulk_get_publicised_groups(self, user_ids, proxy=True):
-        destinations = {}
-        local_users = set()
-
-        for user_id in user_ids:
-            if self.hs.is_mine_id(user_id):
-                local_users.add(user_id)
-            else:
-                destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
-
-        if not proxy and destinations:
-            raise SynapseError(400, "Some user_ids are not local")
-
-        results = {}
-        failed_results = []
-        for destination, dest_user_ids in iteritems(destinations):
-            try:
-                r = yield self.transport_client.bulk_get_publicised_groups(
-                    destination, list(dest_user_ids)
-                )
-                results.update(r["users"])
-            except Exception:
-                failed_results.extend(dest_user_ids)
-
-        for uid in local_users:
-            results[uid] = yield self.store.get_publicised_groups_for_user(uid)
-
-            # Check AS associated groups for this user - this depends on the
-            # RegExps in the AS registration file (under `users`)
-            for app_service in self.store.get_app_services():
-                results[uid].extend(app_service.get_groups_for_user(uid))
-
-        return {"users": results}