diff options
Diffstat (limited to 'synapse/groups')
-rw-r--r-- | synapse/groups/attestations.py | 17 | ||||
-rw-r--r-- | synapse/groups/groups_server.py | 635 |
2 files changed, 321 insertions, 331 deletions
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index dfd7ae041b..27b0c02655 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -37,13 +37,13 @@ An attestation is a signed blob of json that looks like: import logging import random +from typing import Tuple from signedjson.sign import sign_json from twisted.internet import defer from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError -from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id @@ -162,26 +162,26 @@ class GroupAttestionRenewer(object): def _start_renew_attestations(self): return run_as_background_process("renew_attestations", self._renew_attestations) - @defer.inlineCallbacks - def _renew_attestations(self): + async def _renew_attestations(self): """Called periodically to check if we need to update any of our attestations """ now = self.clock.time_msec() - rows = yield self.store.get_attestations_need_renewals( + rows = await self.store.get_attestations_need_renewals( now + UPDATE_ATTESTATION_TIME_MS ) @defer.inlineCallbacks - def _renew_attestation(group_id, user_id): + def _renew_attestation(group_user: Tuple[str, str]): + group_id, user_id = group_user try: if not self.is_mine_id(group_id): destination = get_domain_from_id(group_id) elif not self.is_mine_id(user_id): destination = get_domain_from_id(user_id) else: - logger.warn( + logger.warning( "Incorrectly trying to do attestations for user: %r in %r", user_id, group_id, @@ -208,7 +208,4 @@ class GroupAttestionRenewer(object): ) for row in rows: - group_id = row["group_id"] - user_id = row["user_id"] - - run_in_background(_renew_attestation, group_id, user_id) + await _renew_attestation((row["group_id"], row["user_id"])) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index d50e691436..8a9de913b3 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2018 New Vector Ltd +# Copyright 2019 Michael Telatynski <7t3chguy@gmail.com> # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,24 +19,22 @@ import logging from six import string_types -from twisted.internet import defer - -from synapse.api.errors import SynapseError +from synapse.api.errors import Codes, SynapseError from synapse.types import GroupID, RoomID, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute logger = logging.getLogger(__name__) -# TODO: Allow users to "knock" or simpkly join depending on rules +# TODO: Allow users to "knock" or simply join depending on rules # TODO: Federation admin APIs -# TODO: is_priveged flag to users and is_public to users and rooms +# TODO: is_privileged flag to users and is_public to users and rooms # TODO: Audit log for admins (profile updates, membership changes, users who tried # to join but were rejected, etc) # TODO: Flairs -class GroupsServerHandler(object): +class GroupsServerWorkerHandler(object): def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -50,11 +49,7 @@ class GroupsServerHandler(object): self.transport_client = hs.get_federation_transport_client() self.profile_handler = hs.get_profile_handler() - # Ensure attestations get renewed - hs.get_groups_attestation_renewer() - - @defer.inlineCallbacks - def check_group_is_ours( + async def check_group_is_ours( self, group_id, requester_user_id, and_exists=False, and_is_admin=None ): """Check that the group is ours, and optionally if it exists. @@ -70,25 +65,24 @@ class GroupsServerHandler(object): if not self.is_mine_id(group_id): raise SynapseError(400, "Group not on this server") - group = yield self.store.get_group(group_id) + group = await self.store.get_group(group_id) if and_exists and not group: raise SynapseError(404, "Unknown group") - is_user_in_group = yield self.store.is_user_in_group( + is_user_in_group = await self.store.is_user_in_group( requester_user_id, group_id ) if group and not is_user_in_group and not group["is_public"]: raise SynapseError(404, "Unknown group") if and_is_admin: - is_admin = yield self.store.is_user_admin_in_group(group_id, and_is_admin) + is_admin = await self.store.is_user_admin_in_group(group_id, and_is_admin) if not is_admin: raise SynapseError(403, "User is not admin in group") return group - @defer.inlineCallbacks - def get_group_summary(self, group_id, requester_user_id): + async def get_group_summary(self, group_id, requester_user_id): """Get the summary for a group as seen by requester_user_id. The group summary consists of the profile of the room, and a curated @@ -97,28 +91,28 @@ class GroupsServerHandler(object): A user/room may appear in multiple roles/categories. """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - is_user_in_group = yield self.store.is_user_in_group( + is_user_in_group = await self.store.is_user_in_group( requester_user_id, group_id ) - profile = yield self.get_group_profile(group_id, requester_user_id) + profile = await self.get_group_profile(group_id, requester_user_id) - users, roles = yield self.store.get_users_for_summary_by_role( + users, roles = await self.store.get_users_for_summary_by_role( group_id, include_private=is_user_in_group ) # TODO: Add profiles to users - rooms, categories = yield self.store.get_rooms_for_summary_by_category( + rooms, categories = await self.store.get_rooms_for_summary_by_category( group_id, include_private=is_user_in_group ) for room_entry in rooms: room_id = room_entry["room_id"] - joined_users = yield self.store.get_users_in_room(room_id) - entry = yield self.room_list_handler.generate_room_entry( + joined_users = await self.store.get_users_in_room(room_id) + entry = await self.room_list_handler.generate_room_entry( room_id, len(joined_users), with_alias=False, allow_private=True ) entry = dict(entry) # so we don't change whats cached @@ -132,7 +126,7 @@ class GroupsServerHandler(object): user_id = entry["user_id"] if not self.is_mine_id(requester_user_id): - attestation = yield self.store.get_remote_attestation(group_id, user_id) + attestation = await self.store.get_remote_attestation(group_id, user_id) if not attestation: continue @@ -142,12 +136,12 @@ class GroupsServerHandler(object): group_id, user_id ) - user_profile = yield self.profile_handler.get_profile_from_cache(user_id) + user_profile = await self.profile_handler.get_profile_from_cache(user_id) entry.update(user_profile) users.sort(key=lambda e: e.get("order", 0)) - membership_info = yield self.store.get_users_membership_info_in_group( + membership_info = await self.store.get_users_membership_info_in_group( group_id, requester_user_id ) @@ -166,13 +160,195 @@ class GroupsServerHandler(object): "user": membership_info, } - @defer.inlineCallbacks - def update_group_summary_room( + async def get_group_categories(self, group_id, requester_user_id): + """Get all categories in a group (as seen by user) + """ + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + categories = await self.store.get_group_categories(group_id=group_id) + return {"categories": categories} + + async def get_group_category(self, group_id, requester_user_id, category_id): + """Get a specific category in a group (as seen by user) + """ + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + res = await self.store.get_group_category( + group_id=group_id, category_id=category_id + ) + + logger.info("group %s", res) + + return res + + async def get_group_roles(self, group_id, requester_user_id): + """Get all roles in a group (as seen by user) + """ + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + roles = await self.store.get_group_roles(group_id=group_id) + return {"roles": roles} + + async def get_group_role(self, group_id, requester_user_id, role_id): + """Get a specific role in a group (as seen by user) + """ + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + res = await self.store.get_group_role(group_id=group_id, role_id=role_id) + return res + + async def get_group_profile(self, group_id, requester_user_id): + """Get the group profile as seen by requester_user_id + """ + + await self.check_group_is_ours(group_id, requester_user_id) + + group = await self.store.get_group(group_id) + + if group: + cols = [ + "name", + "short_description", + "long_description", + "avatar_url", + "is_public", + ] + group_description = {key: group[key] for key in cols} + group_description["is_openly_joinable"] = group["join_policy"] == "open" + + return group_description + else: + raise SynapseError(404, "Unknown group") + + async def get_users_in_group(self, group_id, requester_user_id): + """Get the users in group as seen by requester_user_id. + + The ordering is arbitrary at the moment + """ + + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + is_user_in_group = await self.store.is_user_in_group( + requester_user_id, group_id + ) + + user_results = await self.store.get_users_in_group( + group_id, include_private=is_user_in_group + ) + + chunk = [] + for user_result in user_results: + g_user_id = user_result["user_id"] + is_public = user_result["is_public"] + is_privileged = user_result["is_admin"] + + entry = {"user_id": g_user_id} + + profile = await self.profile_handler.get_profile_from_cache(g_user_id) + entry.update(profile) + + entry["is_public"] = bool(is_public) + entry["is_privileged"] = bool(is_privileged) + + if not self.is_mine_id(g_user_id): + attestation = await self.store.get_remote_attestation( + group_id, g_user_id + ) + if not attestation: + continue + + entry["attestation"] = attestation + else: + entry["attestation"] = self.attestations.create_attestation( + group_id, g_user_id + ) + + chunk.append(entry) + + # TODO: If admin add lists of users whose attestations have timed out + + return {"chunk": chunk, "total_user_count_estimate": len(user_results)} + + async def get_invited_users_in_group(self, group_id, requester_user_id): + """Get the users that have been invited to a group as seen by requester_user_id. + + The ordering is arbitrary at the moment + """ + + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + is_user_in_group = await self.store.is_user_in_group( + requester_user_id, group_id + ) + + if not is_user_in_group: + raise SynapseError(403, "User not in group") + + invited_users = await self.store.get_invited_users_in_group(group_id) + + user_profiles = [] + + for user_id in invited_users: + user_profile = {"user_id": user_id} + try: + profile = await self.profile_handler.get_profile_from_cache(user_id) + user_profile.update(profile) + except Exception as e: + logger.warning("Error getting profile for %s: %s", user_id, e) + user_profiles.append(user_profile) + + return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)} + + async def get_rooms_in_group(self, group_id, requester_user_id): + """Get the rooms in group as seen by requester_user_id + + This returns rooms in order of decreasing number of joined users + """ + + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + is_user_in_group = await self.store.is_user_in_group( + requester_user_id, group_id + ) + + room_results = await self.store.get_rooms_in_group( + group_id, include_private=is_user_in_group + ) + + chunk = [] + for room_result in room_results: + room_id = room_result["room_id"] + + joined_users = await self.store.get_users_in_room(room_id) + entry = await self.room_list_handler.generate_room_entry( + room_id, len(joined_users), with_alias=False, allow_private=True + ) + + if not entry: + continue + + entry["is_public"] = bool(room_result["is_public"]) + + chunk.append(entry) + + chunk.sort(key=lambda e: -e["num_joined_members"]) + + return {"chunk": chunk, "total_room_count_estimate": len(room_results)} + + +class GroupsServerHandler(GroupsServerWorkerHandler): + def __init__(self, hs): + super(GroupsServerHandler, self).__init__(hs) + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + async def update_group_summary_room( self, group_id, requester_user_id, room_id, category_id, content ): """Add/update a room to the group summary """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -182,7 +358,7 @@ class GroupsServerHandler(object): is_public = _parse_visibility_from_contents(content) - yield self.store.add_room_to_summary( + await self.store.add_room_to_summary( group_id=group_id, room_id=room_id, category_id=category_id, @@ -192,31 +368,29 @@ class GroupsServerHandler(object): return {} - @defer.inlineCallbacks - def delete_group_summary_room( + async def delete_group_summary_room( self, group_id, requester_user_id, room_id, category_id ): """Remove a room from the summary """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_room_from_summary( + await self.store.remove_room_from_summary( group_id=group_id, room_id=room_id, category_id=category_id ) return {} - @defer.inlineCallbacks - def set_group_join_policy(self, group_id, requester_user_id, content): + async def set_group_join_policy(self, group_id, requester_user_id, content): """Sets the group join policy. Currently supported policies are: - "invite": an invite must be received and accepted in order to join. - "open": anyone can join. """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -224,43 +398,23 @@ class GroupsServerHandler(object): if join_policy is None: raise SynapseError(400, "No value specified for 'm.join_policy'") - yield self.store.set_group_join_policy(group_id, join_policy=join_policy) + await self.store.set_group_join_policy(group_id, join_policy=join_policy) return {} - @defer.inlineCallbacks - def get_group_categories(self, group_id, requester_user_id): - """Get all categories in a group (as seen by user) - """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - categories = yield self.store.get_group_categories(group_id=group_id) - return {"categories": categories} - - @defer.inlineCallbacks - def get_group_category(self, group_id, requester_user_id, category_id): - """Get a specific category in a group (as seen by user) - """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - res = yield self.store.get_group_category( - group_id=group_id, category_id=category_id - ) - - return res - - @defer.inlineCallbacks - def update_group_category(self, group_id, requester_user_id, category_id, content): + async def update_group_category( + self, group_id, requester_user_id, category_id, content + ): """Add/Update a group category """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) is_public = _parse_visibility_from_contents(content) profile = content.get("profile") - yield self.store.upsert_group_category( + await self.store.upsert_group_category( group_id=group_id, category_id=category_id, is_public=is_public, @@ -269,43 +423,23 @@ class GroupsServerHandler(object): return {} - @defer.inlineCallbacks - def delete_group_category(self, group_id, requester_user_id, category_id): + async def delete_group_category(self, group_id, requester_user_id, category_id): """Delete a group category """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_group_category( + await self.store.remove_group_category( group_id=group_id, category_id=category_id ) return {} - @defer.inlineCallbacks - def get_group_roles(self, group_id, requester_user_id): - """Get all roles in a group (as seen by user) - """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - roles = yield self.store.get_group_roles(group_id=group_id) - return {"roles": roles} - - @defer.inlineCallbacks - def get_group_role(self, group_id, requester_user_id, role_id): - """Get a specific role in a group (as seen by user) - """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - res = yield self.store.get_group_role(group_id=group_id, role_id=role_id) - return res - - @defer.inlineCallbacks - def update_group_role(self, group_id, requester_user_id, role_id, content): + async def update_group_role(self, group_id, requester_user_id, role_id, content): """Add/update a role in a group """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -313,31 +447,29 @@ class GroupsServerHandler(object): profile = content.get("profile") - yield self.store.upsert_group_role( + await self.store.upsert_group_role( group_id=group_id, role_id=role_id, is_public=is_public, profile=profile ) return {} - @defer.inlineCallbacks - def delete_group_role(self, group_id, requester_user_id, role_id): + async def delete_group_role(self, group_id, requester_user_id, role_id): """Remove role from group """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_group_role(group_id=group_id, role_id=role_id) + await self.store.remove_group_role(group_id=group_id, role_id=role_id) return {} - @defer.inlineCallbacks - def update_group_summary_user( + async def update_group_summary_user( self, group_id, requester_user_id, user_id, role_id, content ): """Add/update a users entry in the group summary """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -345,7 +477,7 @@ class GroupsServerHandler(object): is_public = _parse_visibility_from_contents(content) - yield self.store.add_user_to_summary( + await self.store.add_user_to_summary( group_id=group_id, user_id=user_id, role_id=role_id, @@ -355,49 +487,25 @@ class GroupsServerHandler(object): return {} - @defer.inlineCallbacks - def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): + async def delete_group_summary_user( + self, group_id, requester_user_id, user_id, role_id + ): """Remove a user from the group summary """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_user_from_summary( + await self.store.remove_user_from_summary( group_id=group_id, user_id=user_id, role_id=role_id ) return {} - @defer.inlineCallbacks - def get_group_profile(self, group_id, requester_user_id): - """Get the group profile as seen by requester_user_id - """ - - yield self.check_group_is_ours(group_id, requester_user_id) - - group = yield self.store.get_group(group_id) - - if group: - cols = [ - "name", - "short_description", - "long_description", - "avatar_url", - "is_public", - ] - group_description = {key: group[key] for key in cols} - group_description["is_openly_joinable"] = group["join_policy"] == "open" - - return group_description - else: - raise SynapseError(404, "Unknown group") - - @defer.inlineCallbacks - def update_group_profile(self, group_id, requester_user_id, content): + async def update_group_profile(self, group_id, requester_user_id, content): """Update the group profile """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -409,158 +517,38 @@ class GroupsServerHandler(object): raise SynapseError(400, "%r value is not a string" % (keyname,)) profile[keyname] = value - yield self.store.update_group_profile(group_id, profile) - - @defer.inlineCallbacks - def get_users_in_group(self, group_id, requester_user_id): - """Get the users in group as seen by requester_user_id. + await self.store.update_group_profile(group_id, profile) - The ordering is arbitrary at the moment - """ - - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - is_user_in_group = yield self.store.is_user_in_group( - requester_user_id, group_id - ) - - user_results = yield self.store.get_users_in_group( - group_id, include_private=is_user_in_group - ) - - chunk = [] - for user_result in user_results: - g_user_id = user_result["user_id"] - is_public = user_result["is_public"] - is_privileged = user_result["is_admin"] - - entry = {"user_id": g_user_id} - - profile = yield self.profile_handler.get_profile_from_cache(g_user_id) - entry.update(profile) - - entry["is_public"] = bool(is_public) - entry["is_privileged"] = bool(is_privileged) - - if not self.is_mine_id(g_user_id): - attestation = yield self.store.get_remote_attestation( - group_id, g_user_id - ) - if not attestation: - continue - - entry["attestation"] = attestation - else: - entry["attestation"] = self.attestations.create_attestation( - group_id, g_user_id - ) - - chunk.append(entry) - - # TODO: If admin add lists of users whose attestations have timed out - - return {"chunk": chunk, "total_user_count_estimate": len(user_results)} - - @defer.inlineCallbacks - def get_invited_users_in_group(self, group_id, requester_user_id): - """Get the users that have been invited to a group as seen by requester_user_id. - - The ordering is arbitrary at the moment - """ - - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - is_user_in_group = yield self.store.is_user_in_group( - requester_user_id, group_id - ) - - if not is_user_in_group: - raise SynapseError(403, "User not in group") - - invited_users = yield self.store.get_invited_users_in_group(group_id) - - user_profiles = [] - - for user_id in invited_users: - user_profile = {"user_id": user_id} - try: - profile = yield self.profile_handler.get_profile_from_cache(user_id) - user_profile.update(profile) - except Exception as e: - logger.warn("Error getting profile for %s: %s", user_id, e) - user_profiles.append(user_profile) - - return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)} - - @defer.inlineCallbacks - def get_rooms_in_group(self, group_id, requester_user_id): - """Get the rooms in group as seen by requester_user_id - - This returns rooms in order of decreasing number of joined users - """ - - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - is_user_in_group = yield self.store.is_user_in_group( - requester_user_id, group_id - ) - - room_results = yield self.store.get_rooms_in_group( - group_id, include_private=is_user_in_group - ) - - chunk = [] - for room_result in room_results: - room_id = room_result["room_id"] - - joined_users = yield self.store.get_users_in_room(room_id) - entry = yield self.room_list_handler.generate_room_entry( - room_id, len(joined_users), with_alias=False, allow_private=True - ) - - if not entry: - continue - - entry["is_public"] = bool(room_result["is_public"]) - - chunk.append(entry) - - chunk.sort(key=lambda e: -e["num_joined_members"]) - - return {"chunk": chunk, "total_room_count_estimate": len(room_results)} - - @defer.inlineCallbacks - def add_room_to_group(self, group_id, requester_user_id, room_id, content): + async def add_room_to_group(self, group_id, requester_user_id, room_id, content): """Add room to group """ RoomID.from_string(room_id) # Ensure valid room id - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) is_public = _parse_visibility_from_contents(content) - yield self.store.add_room_to_group(group_id, room_id, is_public=is_public) + await self.store.add_room_to_group(group_id, room_id, is_public=is_public) return {} - @defer.inlineCallbacks - def update_room_in_group( + async def update_room_in_group( self, group_id, requester_user_id, room_id, config_key, content ): """Update room in group """ RoomID.from_string(room_id) # Ensure valid room id - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) if config_key == "m.visibility": is_public = _parse_visibility_dict(content) - yield self.store.update_room_in_group_visibility( + await self.store.update_room_in_group_visibility( group_id, room_id, is_public=is_public ) else: @@ -568,29 +556,38 @@ class GroupsServerHandler(object): return {} - @defer.inlineCallbacks - def remove_room_from_group(self, group_id, requester_user_id, room_id): + async def remove_room_from_group(self, group_id, requester_user_id, room_id): """Remove room from group """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_room_from_group(group_id, room_id) + await self.store.remove_room_from_group(group_id, room_id) return {} - @defer.inlineCallbacks - def invite_to_group(self, group_id, user_id, requester_user_id, content): + async def invite_to_group(self, group_id, user_id, requester_user_id, content): """Invite user to group """ - group = yield self.check_group_is_ours( + group = await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) # TODO: Check if user knocked - # TODO: Check if user is already invited + + invited_users = await self.store.get_invited_users_in_group(group_id) + if user_id in invited_users: + raise SynapseError( + 400, "User already invited to group", errcode=Codes.BAD_STATE + ) + + user_results = await self.store.get_users_in_group( + group_id, include_private=True + ) + if user_id in (user_result["user_id"] for user_result in user_results): + raise SynapseError(400, "User already in group") content = { "profile": {"name": group["name"], "avatar_url": group["avatar_url"]}, @@ -599,18 +596,18 @@ class GroupsServerHandler(object): if self.hs.is_mine_id(user_id): groups_local = self.hs.get_groups_local_handler() - res = yield groups_local.on_invite(group_id, user_id, content) + res = await groups_local.on_invite(group_id, user_id, content) local_attestation = None else: local_attestation = self.attestations.create_attestation(group_id, user_id) content.update({"attestation": local_attestation}) - res = yield self.transport_client.invite_to_group_notification( + res = await self.transport_client.invite_to_group_notification( get_domain_from_id(user_id), group_id, user_id, content ) user_profile = res.get("user_profile", {}) - yield self.store.add_remote_profile_cache( + await self.store.add_remote_profile_cache( user_id, displayname=user_profile.get("displayname"), avatar_url=user_profile.get("avatar_url"), @@ -620,13 +617,13 @@ class GroupsServerHandler(object): if not self.hs.is_mine_id(user_id): remote_attestation = res["attestation"] - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( remote_attestation, user_id=user_id, group_id=group_id ) else: remote_attestation = None - yield self.store.add_user_to_group( + await self.store.add_user_to_group( group_id, user_id, is_admin=False, @@ -635,15 +632,14 @@ class GroupsServerHandler(object): remote_attestation=remote_attestation, ) elif res["state"] == "invite": - yield self.store.add_group_invite(group_id, user_id) + await self.store.add_group_invite(group_id, user_id) return {"state": "invite"} elif res["state"] == "reject": return {"state": "reject"} else: raise SynapseError(502, "Unknown state returned by HS") - @defer.inlineCallbacks - def _add_user(self, group_id, user_id, content): + async def _add_user(self, group_id, user_id, content): """Add a user to a group based on a content dict. See accept_invite, join_group. @@ -653,7 +649,7 @@ class GroupsServerHandler(object): remote_attestation = content["attestation"] - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( remote_attestation, user_id=user_id, group_id=group_id ) else: @@ -662,7 +658,7 @@ class GroupsServerHandler(object): is_public = _parse_visibility_from_contents(content) - yield self.store.add_user_to_group( + await self.store.add_user_to_group( group_id, user_id, is_admin=False, @@ -673,73 +669,70 @@ class GroupsServerHandler(object): return local_attestation - @defer.inlineCallbacks - def accept_invite(self, group_id, requester_user_id, content): + async def accept_invite(self, group_id, requester_user_id, content): """User tries to accept an invite to the group. This is different from them asking to join, and so should error if no invite exists (and they're not a member of the group) """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - is_invited = yield self.store.is_user_invited_to_local_group( + is_invited = await self.store.is_user_invited_to_local_group( group_id, requester_user_id ) if not is_invited: raise SynapseError(403, "User not invited to group") - local_attestation = yield self._add_user(group_id, requester_user_id, content) + local_attestation = await self._add_user(group_id, requester_user_id, content) return {"state": "join", "attestation": local_attestation} - @defer.inlineCallbacks - def join_group(self, group_id, requester_user_id, content): + async def join_group(self, group_id, requester_user_id, content): """User tries to join the group. This will error if the group requires an invite/knock to join """ - group_info = yield self.check_group_is_ours( + group_info = await self.check_group_is_ours( group_id, requester_user_id, and_exists=True ) if group_info["join_policy"] != "open": raise SynapseError(403, "Group is not publicly joinable") - local_attestation = yield self._add_user(group_id, requester_user_id, content) + local_attestation = await self._add_user(group_id, requester_user_id, content) return {"state": "join", "attestation": local_attestation} - @defer.inlineCallbacks - def knock(self, group_id, requester_user_id, content): + async def knock(self, group_id, requester_user_id, content): """A user requests becoming a member of the group """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) raise NotImplementedError() - @defer.inlineCallbacks - def accept_knock(self, group_id, requester_user_id, content): + async def accept_knock(self, group_id, requester_user_id, content): """Accept a users knock to the room. Errors if the user hasn't knocked, rather than inviting them. """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) raise NotImplementedError() - @defer.inlineCallbacks - def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + async def remove_user_from_group( + self, group_id, user_id, requester_user_id, content + ): """Remove a user from the group; either a user is leaving or an admin kicked them. """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) is_kick = False if requester_user_id != user_id: - is_admin = yield self.store.is_user_admin_in_group( + is_admin = await self.store.is_user_admin_in_group( group_id, requester_user_id ) if not is_admin: @@ -747,25 +740,29 @@ class GroupsServerHandler(object): is_kick = True - yield self.store.remove_user_from_group(group_id, user_id) + await self.store.remove_user_from_group(group_id, user_id) if is_kick: if self.hs.is_mine_id(user_id): groups_local = self.hs.get_groups_local_handler() - yield groups_local.user_removed_from_group(group_id, user_id, {}) + await groups_local.user_removed_from_group(group_id, user_id, {}) else: - yield self.transport_client.remove_user_from_group_notification( + await self.transport_client.remove_user_from_group_notification( get_domain_from_id(user_id), group_id, user_id, {} ) if not self.hs.is_mine_id(user_id): - yield self.store.maybe_delete_remote_profile_cache(user_id) + await self.store.maybe_delete_remote_profile_cache(user_id) + + # Delete group if the last user has left + users = await self.store.get_users_in_group(group_id, include_private=True) + if not users: + await self.store.delete_group(group_id) return {} - @defer.inlineCallbacks - def create_group(self, group_id, requester_user_id, content): - group = yield self.check_group_is_ours(group_id, requester_user_id) + async def create_group(self, group_id, requester_user_id, content): + group = await self.check_group_is_ours(group_id, requester_user_id) logger.info("Attempting to create group with ID: %r", group_id) @@ -775,7 +772,7 @@ class GroupsServerHandler(object): if group: raise SynapseError(400, "Group already exists") - is_admin = yield self.auth.is_server_admin( + is_admin = await self.auth.is_server_admin( UserID.from_string(requester_user_id) ) if not is_admin: @@ -798,7 +795,7 @@ class GroupsServerHandler(object): long_description = profile.get("long_description") user_profile = content.get("user_profile", {}) - yield self.store.create_group( + await self.store.create_group( group_id, requester_user_id, name=name, @@ -810,7 +807,7 @@ class GroupsServerHandler(object): if not self.hs.is_mine_id(requester_user_id): remote_attestation = content["attestation"] - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( remote_attestation, user_id=requester_user_id, group_id=group_id ) @@ -821,7 +818,7 @@ class GroupsServerHandler(object): local_attestation = None remote_attestation = None - yield self.store.add_user_to_group( + await self.store.add_user_to_group( group_id, requester_user_id, is_admin=True, @@ -831,7 +828,7 @@ class GroupsServerHandler(object): ) if not self.hs.is_mine_id(requester_user_id): - yield self.store.add_remote_profile_cache( + await self.store.add_remote_profile_cache( requester_user_id, displayname=user_profile.get("displayname"), avatar_url=user_profile.get("avatar_url"), @@ -839,8 +836,7 @@ class GroupsServerHandler(object): return {"group_id": group_id} - @defer.inlineCallbacks - def delete_group(self, group_id, requester_user_id): + async def delete_group(self, group_id, requester_user_id): """Deletes a group, kicking out all current members. Only group admins or server admins can call this request @@ -849,18 +845,16 @@ class GroupsServerHandler(object): group_id (str) request_user_id (str) - Returns: - Deferred """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) # Only server admins or group admins can delete groups. - is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) + is_admin = await self.store.is_user_admin_in_group(group_id, requester_user_id) if not is_admin: - is_admin = yield self.auth.is_server_admin( + is_admin = await self.auth.is_server_admin( UserID.from_string(requester_user_id) ) @@ -868,18 +862,17 @@ class GroupsServerHandler(object): raise SynapseError(403, "User is not an admin") # Before deleting the group lets kick everyone out of it - users = yield self.store.get_users_in_group(group_id, include_private=True) + users = await self.store.get_users_in_group(group_id, include_private=True) - @defer.inlineCallbacks - def _kick_user_from_group(user_id): + async def _kick_user_from_group(user_id): if self.hs.is_mine_id(user_id): groups_local = self.hs.get_groups_local_handler() - yield groups_local.user_removed_from_group(group_id, user_id, {}) + await groups_local.user_removed_from_group(group_id, user_id, {}) else: - yield self.transport_client.remove_user_from_group_notification( + await self.transport_client.remove_user_from_group_notification( get_domain_from_id(user_id), group_id, user_id, {} ) - yield self.store.maybe_delete_remote_profile_cache(user_id) + await self.store.maybe_delete_remote_profile_cache(user_id) # We kick users out in the order of: # 1. Non-admins @@ -898,11 +891,11 @@ class GroupsServerHandler(object): else: non_admins.append(u["user_id"]) - yield concurrently_execute(_kick_user_from_group, non_admins, 10) - yield concurrently_execute(_kick_user_from_group, admins, 10) - yield _kick_user_from_group(requester_user_id) + await concurrently_execute(_kick_user_from_group, non_admins, 10) + await concurrently_execute(_kick_user_from_group, admins, 10) + await _kick_user_from_group(requester_user_id) - yield self.store.delete_group(group_id) + await self.store.delete_group(group_id) def _parse_join_policy_from_contents(content): |