From b8ca494ee9e42e5b1aca8958088bd35cc5707437 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 15:44:15 +0100 Subject: Initial group server implementation --- synapse/storage/group_server.py | 280 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 280 insertions(+) create mode 100644 synapse/storage/group_server.py (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py new file mode 100644 index 0000000000..01d9a982c8 --- /dev/null +++ b/synapse/storage/group_server.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import SQLBaseStore + +import ujson as json + + +class GroupServerStore(SQLBaseStore): + def get_group(self, group_id): + return self._simple_select_one( + table="groups", + keyvalues={ + "group_id": group_id, + }, + retcols=("name", "short_description", "long_description", "avatar_url",), + allow_none=True, + desc="is_user_in_group", + ) + + def get_users_in_group(self, group_id, include_private=False): + # TODO: Pagination + + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + return self._simple_select_list( + table="group_users", + keyvalues=keyvalues, + retcols=("user_id", "is_public",), + desc="get_users_in_group", + ) + + def get_rooms_in_group(self, group_id, include_private=False): + # TODO: Pagination + + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + return self._simple_select_list( + table="group_rooms", + keyvalues=keyvalues, + retcols=("room_id", "is_public",), + desc="get_rooms_in_group", + ) + + def is_user_in_group(self, user_id, group_id): + return self._simple_select_one_onecol( + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + allow_none=True, + desc="is_user_in_group", + ).addCallback(lambda r: bool(r)) + + def is_user_admin_in_group(self, group_id, user_id): + return self._simple_select_one_onecol( + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="is_admin", + allow_none=True, + desc="is_user_adim_in_group", + ) + + def add_group_invite(self, group_id, user_id): + return self._simple_insert( + table="group_invites", + values={ + "group_id": group_id, + "user_id": user_id, + }, + desc="add_group_invite", + ) + + def is_user_invited_to_local_group(self, group_id, user_id): + return self._simple_select_one_onecol( + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + desc="is_user_invited_to_local_group", + allow_none=True, + ) + + def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True, + local_attestation=None, remote_attestation=None): + def _add_user_to_group_txn(txn): + self._simple_insert_txn( + txn, + table="group_users", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "is_public": is_public, + }, + ) + + self._simple_delete_txn( + txn, + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + }, + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + }, + ) + + return self.runInteraction( + "add_user_to_group", _add_user_to_group_txn + ) + + def remove_user_to_group(self, group_id, user_id): + def _remove_user_to_group_txn(txn): + self._simple_delete_txn( + txn, + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + return self.runInteraction("remove_user_to_group", _remove_user_to_group_txn) + + def add_room_to_group(self, group_id, room_id, is_public): + return self._simple_insert( + table="group_rooms", + values={ + "group_id": group_id, + "room_id": room_id, + "is_public": is_public, + }, + desc="add_room_to_group", + ) + + @defer.inlineCallbacks + def create_group(self, group_id, user_id, name, avatar_url, short_description, + long_description,): + yield self._simple_insert( + table="groups", + values={ + "group_id": group_id, + "name": name, + "avatar_url": avatar_url, + "short_description": short_description, + "long_description": long_description, + }, + desc="create_group", + ) + + def get_attestations_need_renewals(self, valid_until_ms): + def _get_attestations_need_renewals_txn(txn): + sql = """ + SELECT group_id, user_id FROM group_attestations_renewals + WHERE valid_until_ms <= ? + """ + txn.execute(sql, (valid_until_ms,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_attestations_need_renewals", _get_attestations_need_renewals_txn + ) + + def update_attestation_renewal(self, group_id, user_id, attestation): + return self._simple_update_one( + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + updatevalues={ + "valid_until_ms": attestation["valid_until_ms"], + }, + desc="update_attestation_renewal", + ) + + def update_remote_attestion(self, group_id, user_id, attestation): + return self._simple_update_one( + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + updatevalues={ + "valid_until_ms": attestation["valid_until_ms"], + "attestation": json.dumps(attestation) + }, + desc="update_remote_attestion", + ) + + @defer.inlineCallbacks + def get_remote_attestation(self, group_id, user_id): + row = yield self._simple_select_one( + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcols=("valid_until_ms", "attestation"), + desc="get_remote_attestation", + allow_none=True, + ) + + now = int(self._clock.time_msec()) + if row and now < row["valid_until_ms"]: + defer.returnValue(json.loads(row["attestation"])) + + defer.returnValue(None) -- cgit 1.4.1 From 83936293eb3ddb8998191b537153eaeec5e7afb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Jul 2017 09:58:59 +0100 Subject: Comments --- synapse/groups/attestations.py | 29 +++++- synapse/groups/groups_server.py | 108 +++++++++++++++-------- synapse/storage/group_server.py | 32 ++++++- synapse/storage/schema/delta/43/group_server.sql | 6 +- 4 files changed, 132 insertions(+), 43 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index d83076a9b3..6937fa44cf 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -28,6 +28,8 @@ UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 class GroupAttestationSigning(object): + """Creates and verifies group attestations. + """ def __init__(self, hs): self.keyring = hs.get_keyring() self.clock = hs.get_clock() @@ -36,11 +38,20 @@ class GroupAttestationSigning(object): @defer.inlineCallbacks def verify_attestation(self, attestation, group_id, user_id, server_name=None): + """Verifies that the given attestation matches the given paramaters. + + An optional server_name can be supplied to explicitly set which server's + signature is expected. Otherwise assumes that either the group_id or user_id + is local and uses the other's server as the one to check. + """ + if not server_name: if get_domain_from_id(group_id) == self.server_name: server_name = get_domain_from_id(user_id) - else: + elif get_domain_from_id(user_id) == self.server_name: server_name = get_domain_from_id(group_id) + else: + raise Exception("Expected eitehr group_id or user_id to be local") if user_id != attestation["user_id"]: raise SynapseError(400, "Attestation has incorrect user_id") @@ -48,6 +59,7 @@ class GroupAttestationSigning(object): if group_id != attestation["group_id"]: raise SynapseError(400, "Attestation has incorrect group_id") + # TODO: valid_until_ms = attestation["valid_until_ms"] if valid_until_ms - self.clock.time_msec() < MIN_ATTESTATION_LENGTH_MS: raise SynapseError(400, "Attestation not valid for long enough") @@ -55,6 +67,9 @@ class GroupAttestationSigning(object): yield self.keyring.verify_json_for_server(server_name, attestation) def create_attestation(self, group_id, user_id): + """Create an attestation for the group_id and user_id with default + validity length. + """ return sign_json({ "group_id": group_id, "user_id": user_id, @@ -63,11 +78,15 @@ class GroupAttestationSigning(object): class GroupAttestionRenewer(object): + """Responsible for sending and receiving attestation updates. + """ + def __init__(self, hs): self.clock = hs.get_clock() self.store = hs.get_datastore() self.assestations = hs.get_groups_attestation_signing() self.transport_client = hs.get_federation_transport_client() + self.is_mine_id = hs.is_mind_id self._renew_attestations_loop = self.clock.looping_call( self._renew_attestations, 30 * 60 * 1000, @@ -75,8 +94,13 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def on_renew_attestation(self, group_id, user_id, content): + """When a remote updates an attestation + """ attestation = content["attestation"] + if not self.is_mine_id(group_id) and not self.is_mine_id(user_id): + raise SynapseError(400, "Neither user not group are on this server") + yield self.attestations.verify_attestation( attestation, user_id=user_id, @@ -89,6 +113,9 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def _renew_attestations(self): + """Called periodically to check if we need to update any of our attestations + """ + now = self.clock.time_msec() rows = yield self.store.get_attestations_need_renewals( diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 195f1eae54..44083100f7 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -19,7 +19,6 @@ from synapse.api.errors import SynapseError from synapse.types import UserID, get_domain_from_id -import functools import logging logger = logging.getLogger(__name__) @@ -33,28 +32,6 @@ logger = logging.getLogger(__name__) # TODO: Flairs -UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 - - -def check_group_is_ours(and_exists=False): - def g(func): - @functools.wraps(func) - @defer.inlineCallbacks - def h(self, group_id, *args, **kwargs): - if not self.is_mine_id(group_id): - raise SynapseError(400, "Group not on this server") - if and_exists: - group = yield self.store.get_group(group_id) - if not group: - raise SynapseError(404, "Unknown group") - - res = yield func(self, group_id, *args, **kwargs) - defer.returnValue(res) - - return h - return g - - class GroupsServerHandler(object): def __init__(self, hs): self.hs = hs @@ -72,9 +49,28 @@ class GroupsServerHandler(object): # Ensure attestations get renewed hs.get_groups_attestation_renewer() - @check_group_is_ours() + @defer.inlineCallbacks + def check_group_is_ours(self, group_id, and_exists=False): + """Check that the group is ours, and optionally if it exists. + + If group does exist then return group. + """ + if not self.is_mine_id(group_id): + raise SynapseError(400, "Group not on this server") + + group = yield self.store.get_group(group_id) + if and_exists and not group: + raise SynapseError(404, "Unknown group") + + defer.returnValue(group) + @defer.inlineCallbacks def get_group_profile(self, group_id, requester_user_id): + """Get the group profile as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id) + group_description = yield self.store.get_group(group_id) if group_description: @@ -82,9 +78,13 @@ class GroupsServerHandler(object): else: raise SynapseError(404, "Unknown group") - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): + """Get the users in group as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) user_results = yield self.store.get_users_in_group( @@ -123,9 +123,13 @@ class GroupsServerHandler(object): "total_user_count_estimate": len(user_results), }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): + """Get the rooms in group as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) room_results = yield self.store.get_rooms_in_group( @@ -158,9 +162,13 @@ class GroupsServerHandler(object): "total_room_count_estimate": len(room_results), }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def add_room(self, group_id, requester_user_id, room_id, content): + """Add room to group + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) if not is_admin: raise SynapseError(403, "User is not admin in group") @@ -182,9 +190,13 @@ class GroupsServerHandler(object): defer.returnValue({}) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def invite_to_group(self, group_id, user_id, requester_user_id, content): + """Invite user to group + """ + + group = yield self.check_group_is_ours(group_id, and_exists=True) + is_admin = yield self.store.is_user_admin_in_group( group_id, requester_user_id ) @@ -194,7 +206,6 @@ class GroupsServerHandler(object): # TODO: Check if user knocked # TODO: Check if user is already invited - group = yield self.store.get_group(group_id) content = { "profile": { "name": group["name"], @@ -248,9 +259,16 @@ class GroupsServerHandler(object): else: raise SynapseError(502, "Unknown state returned by HS") - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def accept_invite(self, group_id, user_id, content): + """User tries to accept an invite to the group. + + This is different from them asking to join, and so should error if no + invite exists (and they're not a member of the group) + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + if not self.store.is_user_invited_to_local_group(group_id, user_id): raise SynapseError(403, "User not invited to group") @@ -291,19 +309,33 @@ class GroupsServerHandler(object): "attestation": local_attestation, }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def knock(self, group_id, user_id, content): - pass + """A user requests becoming a member of the group + """ + yield self.check_group_is_ours(group_id, and_exists=True) + + raise NotImplementedError() - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def accept_knock(self, group_id, user_id, content): - pass + """Accept a users knock to the room. + + Errors if the user hasn't knocked, rather than inviting them. + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + + raise NotImplementedError() - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + """Remove a user from the group; either a user is leaving or and admin + kicked htem. + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_kick = False if requester_user_id != user_id: is_admin = yield self.store.is_user_admin_in_group( @@ -314,7 +346,7 @@ class GroupsServerHandler(object): is_kick = True - yield self.store.remove_user_to_group( + yield self.store.remove_user_from_group( group_id, user_id, ) @@ -328,11 +360,11 @@ class GroupsServerHandler(object): defer.returnValue({}) - @check_group_is_ours() @defer.inlineCallbacks def create_group(self, group_id, user_id, content): + group = yield self.check_group_is_ours(group_id) + logger.info("Attempting to create group with ID: %r", group_id) - group = yield self.store.get_group(group_id) if group: raise SynapseError(400, "Group already exists") diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 01d9a982c8..327d770862 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -89,6 +89,8 @@ class GroupServerStore(SQLBaseStore): ) def add_group_invite(self, group_id, user_id): + """Record that the group server has invited a user + """ return self._simple_insert( table="group_invites", values={ @@ -99,6 +101,8 @@ class GroupServerStore(SQLBaseStore): ) def is_user_invited_to_local_group(self, group_id, user_id): + """Has the group server invited a user? + """ return self._simple_select_one_onecol( table="group_invites", keyvalues={ @@ -112,6 +116,19 @@ class GroupServerStore(SQLBaseStore): def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True, local_attestation=None, remote_attestation=None): + """Add a user to the group server. + + Args: + group_id (str) + user_id (str) + is_admin (bool) + is_public (bool) + local_attestation (dict): The attestation the GS created to give + to the remote server. Optional if the user and group are on the + same server + remote_attestation (dict): The attestation given to GS by remote + server. Optional if the user and group are on the same server + """ def _add_user_to_group_txn(txn): self._simple_insert_txn( txn, @@ -159,8 +176,8 @@ class GroupServerStore(SQLBaseStore): "add_user_to_group", _add_user_to_group_txn ) - def remove_user_to_group(self, group_id, user_id): - def _remove_user_to_group_txn(txn): + def remove_user_from_group(self, group_id, user_id): + def _remove_user_from_group_txn(txn): self._simple_delete_txn( txn, table="group_users", @@ -193,7 +210,7 @@ class GroupServerStore(SQLBaseStore): "user_id": user_id, }, ) - return self.runInteraction("remove_user_to_group", _remove_user_to_group_txn) + return self.runInteraction("remove_user_from_group", _remove_user_from_group_txn) def add_room_to_group(self, group_id, room_id, is_public): return self._simple_insert( @@ -222,6 +239,8 @@ class GroupServerStore(SQLBaseStore): ) def get_attestations_need_renewals(self, valid_until_ms): + """Get all attestations that need to be renewed until givent time + """ def _get_attestations_need_renewals_txn(txn): sql = """ SELECT group_id, user_id FROM group_attestations_renewals @@ -234,6 +253,8 @@ class GroupServerStore(SQLBaseStore): ) def update_attestation_renewal(self, group_id, user_id, attestation): + """Update an attestation that we have renewed + """ return self._simple_update_one( table="group_attestations_renewals", keyvalues={ @@ -247,6 +268,8 @@ class GroupServerStore(SQLBaseStore): ) def update_remote_attestion(self, group_id, user_id, attestation): + """Update an attestation that a remote has renewed + """ return self._simple_update_one( table="group_attestations_remote", keyvalues={ @@ -262,6 +285,9 @@ class GroupServerStore(SQLBaseStore): @defer.inlineCallbacks def get_remote_attestation(self, group_id, user_id): + """Get the attestation that proves the remote agrees that the user is + in the group. + """ row = yield self._simple_select_one( table="group_attestations_remote", keyvalues={ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 6f1a941990..5dc7a497e2 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -24,6 +24,7 @@ CREATE TABLE groups ( CREATE UNIQUE INDEX groups_idx ON groups(group_id); +-- list of users the group server thinks are joined CREATE TABLE group_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -35,7 +36,7 @@ CREATE TABLE group_users ( CREATE INDEX groups_users_g_idx ON group_users(group_id, user_id); CREATE INDEX groups_users_u_idx ON group_users(user_id); - +-- list of users the group server thinks are invited CREATE TABLE group_invites ( group_id TEXT NOT NULL, user_id TEXT NOT NULL @@ -55,6 +56,7 @@ CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); +-- List of attestations we've given out and need to renew CREATE TABLE group_attestations_renewals ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -65,6 +67,8 @@ CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(gr CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id); CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms); + +-- List of attestations we've received from remotes and are interested in. CREATE TABLE group_attestations_remote ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, -- cgit 1.4.1 From e52c391cd452077fc219fad0db8b9e5499251e5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Jul 2017 14:25:46 +0100 Subject: Rename column to attestation_json --- synapse/storage/group_server.py | 8 ++++---- synapse/storage/schema/delta/43/group_server.sql | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 327d770862..105ab9920e 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -168,7 +168,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), + "attestation_json": json.dumps(remote_attestation), }, ) @@ -278,7 +278,7 @@ class GroupServerStore(SQLBaseStore): }, updatevalues={ "valid_until_ms": attestation["valid_until_ms"], - "attestation": json.dumps(attestation) + "attestation_json": json.dumps(attestation) }, desc="update_remote_attestion", ) @@ -294,13 +294,13 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, }, - retcols=("valid_until_ms", "attestation"), + retcols=("valid_until_ms", "attestation_json"), desc="get_remote_attestation", allow_none=True, ) now = int(self._clock.time_msec()) if row and now < row["valid_until_ms"]: - defer.returnValue(json.loads(row["attestation"])) + defer.returnValue(json.loads(row["attestation_json"])) defer.returnValue(None) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index bfe8c2ca4a..b55b0a8deb 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -73,7 +73,7 @@ CREATE TABLE group_attestations_remote ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, valid_until_ms BIGINT NOT NULL, - attestation TEXT NOT NULL + attestation_json TEXT NOT NULL ); CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); -- cgit 1.4.1