diff options
author | Erik Johnston <erik@matrix.org> | 2017-07-10 15:44:15 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-07-10 15:44:15 +0100 |
commit | b8ca494ee9e42e5b1aca8958088bd35cc5707437 (patch) | |
tree | b5e9cfaebfe6014f510f346dc9758e03a8be1f70 /synapse/groups | |
parent | Include registration and as stores in frontend proxy (diff) | |
download | synapse-b8ca494ee9e42e5b1aca8958088bd35cc5707437.tar.xz |
Initial group server implementation
Diffstat (limited to 'synapse/groups')
-rw-r--r-- | synapse/groups/__init__.py | 0 | ||||
-rw-r--r-- | synapse/groups/attestations.py | 120 | ||||
-rw-r--r-- | synapse/groups/groups_server.py | 382 |
3 files changed, 502 insertions, 0 deletions
diff --git a/synapse/groups/__init__.py b/synapse/groups/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/synapse/groups/__init__.py diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py new file mode 100644 index 0000000000..d83076a9b3 --- /dev/null +++ b/synapse/groups/attestations.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.types import get_domain_from_id +from synapse.util.logcontext import preserve_fn + +from signedjson.sign import sign_json + + +DEFAULT_ATTESTATION_LENGTH_MS = 3 * 24 * 60 * 60 * 1000 +MIN_ATTESTATION_LENGTH_MS = 1 * 60 * 60 * 1000 +UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 + + +class GroupAttestationSigning(object): + def __init__(self, hs): + self.keyring = hs.get_keyring() + self.clock = hs.get_clock() + self.server_name = hs.hostname + self.signing_key = hs.config.signing_key[0] + + @defer.inlineCallbacks + def verify_attestation(self, attestation, group_id, user_id, server_name=None): + if not server_name: + if get_domain_from_id(group_id) == self.server_name: + server_name = get_domain_from_id(user_id) + else: + server_name = get_domain_from_id(group_id) + + if user_id != attestation["user_id"]: + raise SynapseError(400, "Attestation has incorrect user_id") + + if group_id != attestation["group_id"]: + raise SynapseError(400, "Attestation has incorrect group_id") + + valid_until_ms = attestation["valid_until_ms"] + if valid_until_ms - self.clock.time_msec() < MIN_ATTESTATION_LENGTH_MS: + raise SynapseError(400, "Attestation not valid for long enough") + + yield self.keyring.verify_json_for_server(server_name, attestation) + + def create_attestation(self, group_id, user_id): + return sign_json({ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": self.clock.time_msec() + DEFAULT_ATTESTATION_LENGTH_MS, + }, self.server_name, self.signing_key) + + +class GroupAttestionRenewer(object): + def __init__(self, hs): + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.assestations = hs.get_groups_attestation_signing() + self.transport_client = hs.get_federation_transport_client() + + self._renew_attestations_loop = self.clock.looping_call( + self._renew_attestations, 30 * 60 * 1000, + ) + + @defer.inlineCallbacks + def on_renew_attestation(self, group_id, user_id, content): + attestation = content["attestation"] + + yield self.attestations.verify_attestation( + attestation, + user_id=user_id, + group_id=group_id, + ) + + yield self.store.update_remote_attestion(group_id, user_id, attestation) + + defer.returnValue({}) + + @defer.inlineCallbacks + def _renew_attestations(self): + now = self.clock.time_msec() + + rows = yield self.store.get_attestations_need_renewals( + now + UPDATE_ATTESTATION_TIME_MS + ) + + @defer.inlineCallbacks + def _renew_attestation(self, group_id, user_id): + attestation = self.attestations.create_attestation(group_id, user_id) + + if self.hs.is_mine_id(group_id): + destination = get_domain_from_id(user_id) + else: + destination = get_domain_from_id(group_id) + + yield self.transport_client.renew_group_attestation( + destination, group_id, user_id, + content={"attestation": attestation}, + ) + + yield self.store.update_attestation_renewal( + group_id, user_id, attestation + ) + + for row in rows: + group_id = row["group_id"] + user_id = row["user_id"] + + preserve_fn(_renew_attestation)(group_id, user_id) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py new file mode 100644 index 0000000000..195f1eae54 --- /dev/null +++ b/synapse/groups/groups_server.py @@ -0,0 +1,382 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.types import UserID, get_domain_from_id + + +import functools +import logging + +logger = logging.getLogger(__name__) + + +# TODO: Allow users to "knock" or simpkly join depending on rules +# TODO: Federation admin APIs +# TODO: is_priveged flag to users and is_public to users and rooms +# TODO: Audit log for admins (profile updates, membership changes, users who tried +# to join but were rejected, etc) +# TODO: Flairs + + +UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 + + +def check_group_is_ours(and_exists=False): + def g(func): + @functools.wraps(func) + @defer.inlineCallbacks + def h(self, group_id, *args, **kwargs): + if not self.is_mine_id(group_id): + raise SynapseError(400, "Group not on this server") + if and_exists: + group = yield self.store.get_group(group_id) + if not group: + raise SynapseError(404, "Unknown group") + + res = yield func(self, group_id, *args, **kwargs) + defer.returnValue(res) + + return h + return g + + +class GroupsServerHandler(object): + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + self.room_list_handler = hs.get_room_list_handler() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.keyring = hs.get_keyring() + self.is_mine_id = hs.is_mine_id + self.signing_key = hs.config.signing_key[0] + self.server_name = hs.hostname + self.attestations = hs.get_groups_attestation_signing() + self.transport_client = hs.get_federation_transport_client() + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + @check_group_is_ours() + @defer.inlineCallbacks + def get_group_profile(self, group_id, requester_user_id): + group_description = yield self.store.get_group(group_id) + + if group_description: + defer.returnValue(group_description) + else: + raise SynapseError(404, "Unknown group") + + @check_group_is_ours(and_exists=True) + @defer.inlineCallbacks + def get_users_in_group(self, group_id, requester_user_id): + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) + + user_results = yield self.store.get_users_in_group( + group_id, include_private=is_user_in_group, + ) + + chunk = [] + for user_result in user_results: + g_user_id = user_result["user_id"] + is_public = user_result["is_public"] + + entry = {"user_id": g_user_id} + + # TODO: Get profile information + + if not is_public: + entry["is_public"] = False + + if not self.is_mine_id(requester_user_id): + attestation = yield self.store.get_remote_attestation(group_id, g_user_id) + if not attestation: + continue + + entry["attestation"] = attestation + else: + entry["attestation"] = self.attestations.create_attestation( + group_id, g_user_id, + ) + + chunk.append(entry) + + # TODO: If admin add lists of users whose attestations have timed out + + defer.returnValue({ + "chunk": chunk, + "total_user_count_estimate": len(user_results), + }) + + @check_group_is_ours(and_exists=True) + @defer.inlineCallbacks + def get_rooms_in_group(self, group_id, requester_user_id): + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) + + room_results = yield self.store.get_rooms_in_group( + group_id, include_private=is_user_in_group, + ) + + chunk = [] + for room_result in room_results: + room_id = room_result["room_id"] + is_public = room_result["is_public"] + + joined_users = yield self.store.get_users_in_room(room_id) + entry = yield self.room_list_handler.generate_room_entry( + room_id, len(joined_users), + with_alias=False, allow_private=True, + ) + + if not entry: + continue + + if not is_public: + entry["is_public"] = False + + chunk.append(entry) + + chunk.sort(key=lambda e: -e["num_joined_members"]) + + defer.returnValue({ + "chunk": chunk, + "total_room_count_estimate": len(room_results), + }) + + @check_group_is_ours(and_exists=True) + @defer.inlineCallbacks + def add_room(self, group_id, requester_user_id, room_id, content): + is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + # TODO: Check if room has already been added + + visibility = content.get("visibility") + if visibility: + vis_type = visibility["type"] + if vis_type not in ("public", "private"): + raise SynapseError( + 400, "Synapse only supports 'public'/'private' visibility" + ) + is_public = vis_type == "public" + else: + is_public = True + + yield self.store.add_room_to_group(group_id, room_id, is_public=is_public) + + defer.returnValue({}) + + @check_group_is_ours(and_exists=True) + @defer.inlineCallbacks + def invite_to_group(self, group_id, user_id, requester_user_id, content): + is_admin = yield self.store.is_user_admin_in_group( + group_id, requester_user_id + ) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + # TODO: Check if user knocked + # TODO: Check if user is already invited + + group = yield self.store.get_group(group_id) + content = { + "profile": { + "name": group["name"], + "avatar_url": group["avatar_url"], + }, + "inviter": requester_user_id, + } + + if self.hs.is_mine_id(user_id): + raise NotImplementedError() + else: + local_attestation = self.attestations.create_attestation(group_id, user_id) + content.update({ + "attestation": local_attestation, + }) + + res = yield self.transport_client.invite_to_group_notification( + get_domain_from_id(user_id), group_id, user_id, content + ) + + if res["state"] == "join": + if not self.hs.is_mine_id(user_id): + remote_attestation = res["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + user_id=user_id, + group_id=group_id, + ) + else: + remote_attestation = None + + yield self.store.add_user_to_group( + group_id, user_id, + is_admin=False, + is_public=False, # TODO + local_attestation=local_attestation, + remote_attestation=remote_attestation, + ) + elif res["state"] == "invite": + yield self.store.add_group_invite( + group_id, user_id, + ) + defer.returnValue({ + "state": "invite" + }) + elif res["state"] == "reject": + defer.returnValue({ + "state": "reject" + }) + else: + raise SynapseError(502, "Unknown state returned by HS") + + @check_group_is_ours(and_exists=True) + @defer.inlineCallbacks + def accept_invite(self, group_id, user_id, content): + if not self.store.is_user_invited_to_local_group(group_id, user_id): + raise SynapseError(403, "User not invited to group") + + if not self.hs.is_mine_id(user_id): + remote_attestation = content["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + user_id=user_id, + group_id=group_id, + ) + else: + remote_attestation = None + + local_attestation = self.attestations.create_attestation(group_id, user_id) + + visibility = content.get("visibility") + if visibility: + vis_type = visibility["type"] + if vis_type not in ("public", "private"): + raise SynapseError( + 400, "Synapse only supports 'public'/'private' visibility" + ) + is_public = vis_type == "public" + else: + is_public = True + + yield self.store.add_user_to_group( + group_id, user_id, + is_admin=False, + is_public=is_public, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + ) + + defer.returnValue({ + "state": "join", + "attestation": local_attestation, + }) + + @check_group_is_ours(and_exists=True) + @defer.inlineCallbacks + def knock(self, group_id, user_id, content): + pass + + @check_group_is_ours(and_exists=True) + @defer.inlineCallbacks + def accept_knock(self, group_id, user_id, content): + pass + + @check_group_is_ours(and_exists=True) + @defer.inlineCallbacks + def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + is_kick = False + if requester_user_id != user_id: + is_admin = yield self.store.is_user_admin_in_group( + group_id, requester_user_id + ) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + is_kick = True + + yield self.store.remove_user_to_group( + group_id, user_id, + ) + + if is_kick: + if self.hs.is_mine_id(user_id): + raise NotImplementedError() + else: + yield self.transport_client.remove_user_from_group_notification( + get_domain_from_id(user_id), group_id, user_id, {} + ) + + defer.returnValue({}) + + @check_group_is_ours() + @defer.inlineCallbacks + def create_group(self, group_id, user_id, content): + logger.info("Attempting to create group with ID: %r", group_id) + group = yield self.store.get_group(group_id) + if group: + raise SynapseError(400, "Group already exists") + + is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) + if not is_admin and not group_id.startswith("+u/"): + raise SynapseError(403, "Group ID must start with '+u/' or be a server admin") + + profile = content.get("profile", {}) + name = profile.get("name") + avatar_url = profile.get("avatar_url") + short_description = profile.get("short_description") + long_description = profile.get("long_description") + + yield self.store.create_group( + group_id, + user_id, + name=name, + avatar_url=avatar_url, + short_description=short_description, + long_description=long_description, + ) + + if not self.hs.is_mine_id(user_id): + remote_attestation = content["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + user_id=user_id, + group_id=group_id, + ) + + local_attestation = self.attestations.create_attestation(group_id, user_id) + else: + local_attestation = None + remote_attestation = None + + yield self.store.add_user_to_group( + group_id, user_id, + is_admin=True, + is_public=True, # TODO + local_attestation=local_attestation, + remote_attestation=remote_attestation, + ) + + defer.returnValue({ + "group_id": group_id, + }) |