diff options
author | Erik Johnston <erik@matrix.org> | 2020-02-07 11:14:19 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-07 11:14:19 +0000 |
commit | de2d267375069c2d22bceb0d6ef9c6f5a77380e3 (patch) | |
tree | 12636dca8ea90ea5369207b7aea134521a1fc8f5 /synapse/handlers/groups_local.py | |
parent | Admin api to add an email address (#6789) (diff) | |
download | synapse-de2d267375069c2d22bceb0d6ef9c6f5a77380e3.tar.xz |
Allow moving group read APIs to workers (#6866)
Diffstat (limited to 'synapse/handlers/groups_local.py')
-rw-r--r-- | synapse/handlers/groups_local.py | 270 |
1 files changed, 139 insertions, 131 deletions
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 319565510f..ad22415782 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -63,7 +63,7 @@ def _create_rerouter(func_name): return f -class GroupsLocalHandler(object): +class GroupsLocalWorkerHandler(object): def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -81,40 +81,17 @@ class GroupsLocalHandler(object): self.profile_handler = hs.get_profile_handler() - # Ensure attestations get renewed - hs.get_groups_attestation_renewer() - # The following functions merely route the query to the local groups server # or federation depending on if the group is local or remote get_group_profile = _create_rerouter("get_group_profile") - update_group_profile = _create_rerouter("update_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") - get_invited_users_in_group = _create_rerouter("get_invited_users_in_group") - - add_room_to_group = _create_rerouter("add_room_to_group") - update_room_in_group = _create_rerouter("update_room_in_group") - remove_room_from_group = _create_rerouter("remove_room_from_group") - - update_group_summary_room = _create_rerouter("update_group_summary_room") - delete_group_summary_room = _create_rerouter("delete_group_summary_room") - - update_group_category = _create_rerouter("update_group_category") - delete_group_category = _create_rerouter("delete_group_category") get_group_category = _create_rerouter("get_group_category") get_group_categories = _create_rerouter("get_group_categories") - - update_group_summary_user = _create_rerouter("update_group_summary_user") - delete_group_summary_user = _create_rerouter("delete_group_summary_user") - - update_group_role = _create_rerouter("update_group_role") - delete_group_role = _create_rerouter("delete_group_role") get_group_role = _create_rerouter("get_group_role") get_group_roles = _create_rerouter("get_group_roles") - set_group_join_policy = _create_rerouter("set_group_join_policy") - @defer.inlineCallbacks def get_group_summary(self, group_id, requester_user_id): """Get the group summary for a group. @@ -170,6 +147,144 @@ class GroupsLocalHandler(object): return res @defer.inlineCallbacks + def get_users_in_group(self, group_id, requester_user_id): + """Get users in a group + """ + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_users_in_group( + group_id, requester_user_id + ) + return res + + group_server_name = get_domain_from_id(group_id) + + try: + res = yield self.transport_client.get_users_in_group( + get_domain_from_id(group_id), group_id, requester_user_id + ) + except HttpResponseException as e: + raise e.to_synapse_error() + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") + + chunk = res["chunk"] + valid_entries = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation", {}) + try: + if get_domain_from_id(g_user_id) != group_server_name: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + server_name=get_domain_from_id(g_user_id), + ) + valid_entries.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["chunk"] = valid_entries + + return res + + @defer.inlineCallbacks + def get_joined_groups(self, user_id): + group_ids = yield self.store.get_joined_groups(user_id) + return {"groups": group_ids} + + @defer.inlineCallbacks + def get_publicised_groups_for_user(self, user_id): + if self.hs.is_mine_id(user_id): + result = yield self.store.get_publicised_groups_for_user(user_id) + + # Check AS associated groups for this user - this depends on the + # RegExps in the AS registration file (under `users`) + for app_service in self.store.get_app_services(): + result.extend(app_service.get_groups_for_user(user_id)) + + return {"groups": result} + else: + try: + bulk_result = yield self.transport_client.bulk_get_publicised_groups( + get_domain_from_id(user_id), [user_id] + ) + except HttpResponseException as e: + raise e.to_synapse_error() + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") + + result = bulk_result.get("users", {}).get(user_id) + # TODO: Verify attestations + return {"groups": result} + + @defer.inlineCallbacks + def bulk_get_publicised_groups(self, user_ids, proxy=True): + destinations = {} + local_users = set() + + for user_id in user_ids: + if self.hs.is_mine_id(user_id): + local_users.add(user_id) + else: + destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id) + + if not proxy and destinations: + raise SynapseError(400, "Some user_ids are not local") + + results = {} + failed_results = [] + for destination, dest_user_ids in iteritems(destinations): + try: + r = yield self.transport_client.bulk_get_publicised_groups( + destination, list(dest_user_ids) + ) + results.update(r["users"]) + except Exception: + failed_results.extend(dest_user_ids) + + for uid in local_users: + results[uid] = yield self.store.get_publicised_groups_for_user(uid) + + # Check AS associated groups for this user - this depends on the + # RegExps in the AS registration file (under `users`) + for app_service in self.store.get_app_services(): + results[uid].extend(app_service.get_groups_for_user(uid)) + + return {"users": results} + + +class GroupsLocalHandler(GroupsLocalWorkerHandler): + def __init__(self, hs): + super(GroupsLocalHandler, self).__init__(hs) + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + # The following functions merely route the query to the local groups server + # or federation depending on if the group is local or remote + + update_group_profile = _create_rerouter("update_group_profile") + + add_room_to_group = _create_rerouter("add_room_to_group") + update_room_in_group = _create_rerouter("update_room_in_group") + remove_room_from_group = _create_rerouter("remove_room_from_group") + + update_group_summary_room = _create_rerouter("update_group_summary_room") + delete_group_summary_room = _create_rerouter("delete_group_summary_room") + + update_group_category = _create_rerouter("update_group_category") + delete_group_category = _create_rerouter("delete_group_category") + + update_group_summary_user = _create_rerouter("update_group_summary_user") + delete_group_summary_user = _create_rerouter("delete_group_summary_user") + + update_group_role = _create_rerouter("update_group_role") + delete_group_role = _create_rerouter("delete_group_role") + + set_group_join_policy = _create_rerouter("set_group_join_policy") + + @defer.inlineCallbacks def create_group(self, group_id, user_id, content): """Create a group """ @@ -220,48 +335,6 @@ class GroupsLocalHandler(object): return res @defer.inlineCallbacks - def get_users_in_group(self, group_id, requester_user_id): - """Get users in a group - """ - if self.is_mine_id(group_id): - res = yield self.groups_server_handler.get_users_in_group( - group_id, requester_user_id - ) - return res - - group_server_name = get_domain_from_id(group_id) - - try: - res = yield self.transport_client.get_users_in_group( - get_domain_from_id(group_id), group_id, requester_user_id - ) - except HttpResponseException as e: - raise e.to_synapse_error() - except RequestSendFailed: - raise SynapseError(502, "Failed to contact group server") - - chunk = res["chunk"] - valid_entries = [] - for entry in chunk: - g_user_id = entry["user_id"] - attestation = entry.pop("attestation", {}) - try: - if get_domain_from_id(g_user_id) != group_server_name: - yield self.attestations.verify_attestation( - attestation, - group_id=group_id, - user_id=g_user_id, - server_name=get_domain_from_id(g_user_id), - ) - valid_entries.append(entry) - except Exception as e: - logger.info("Failed to verify user is in group: %s", e) - - res["chunk"] = valid_entries - - return res - - @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group """ @@ -452,68 +525,3 @@ class GroupsLocalHandler(object): group_id, user_id, membership="leave" ) self.notifier.on_new_event("groups_key", token, users=[user_id]) - - @defer.inlineCallbacks - def get_joined_groups(self, user_id): - group_ids = yield self.store.get_joined_groups(user_id) - return {"groups": group_ids} - - @defer.inlineCallbacks - def get_publicised_groups_for_user(self, user_id): - if self.hs.is_mine_id(user_id): - result = yield self.store.get_publicised_groups_for_user(user_id) - - # Check AS associated groups for this user - this depends on the - # RegExps in the AS registration file (under `users`) - for app_service in self.store.get_app_services(): - result.extend(app_service.get_groups_for_user(user_id)) - - return {"groups": result} - else: - try: - bulk_result = yield self.transport_client.bulk_get_publicised_groups( - get_domain_from_id(user_id), [user_id] - ) - except HttpResponseException as e: - raise e.to_synapse_error() - except RequestSendFailed: - raise SynapseError(502, "Failed to contact group server") - - result = bulk_result.get("users", {}).get(user_id) - # TODO: Verify attestations - return {"groups": result} - - @defer.inlineCallbacks - def bulk_get_publicised_groups(self, user_ids, proxy=True): - destinations = {} - local_users = set() - - for user_id in user_ids: - if self.hs.is_mine_id(user_id): - local_users.add(user_id) - else: - destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id) - - if not proxy and destinations: - raise SynapseError(400, "Some user_ids are not local") - - results = {} - failed_results = [] - for destination, dest_user_ids in iteritems(destinations): - try: - r = yield self.transport_client.bulk_get_publicised_groups( - destination, list(dest_user_ids) - ) - results.update(r["users"]) - except Exception: - failed_results.extend(dest_user_ids) - - for uid in local_users: - results[uid] = yield self.store.get_publicised_groups_for_user(uid) - - # Check AS associated groups for this user - this depends on the - # RegExps in the AS registration file (under `users`) - for app_service in self.store.get_app_services(): - results[uid].extend(app_service.get_groups_for_user(uid)) - - return {"users": results} |