From b8ca494ee9e42e5b1aca8958088bd35cc5707437 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 15:44:15 +0100 Subject: Initial group server implementation --- synapse/storage/group_server.py | 280 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 280 insertions(+) create mode 100644 synapse/storage/group_server.py (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py new file mode 100644 index 0000000000..01d9a982c8 --- /dev/null +++ b/synapse/storage/group_server.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import SQLBaseStore + +import ujson as json + + +class GroupServerStore(SQLBaseStore): + def get_group(self, group_id): + return self._simple_select_one( + table="groups", + keyvalues={ + "group_id": group_id, + }, + retcols=("name", "short_description", "long_description", "avatar_url",), + allow_none=True, + desc="is_user_in_group", + ) + + def get_users_in_group(self, group_id, include_private=False): + # TODO: Pagination + + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + return self._simple_select_list( + table="group_users", + keyvalues=keyvalues, + retcols=("user_id", "is_public",), + desc="get_users_in_group", + ) + + def get_rooms_in_group(self, group_id, include_private=False): + # TODO: Pagination + + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + return self._simple_select_list( + table="group_rooms", + keyvalues=keyvalues, + retcols=("room_id", "is_public",), + desc="get_rooms_in_group", + ) + + def is_user_in_group(self, user_id, group_id): + return self._simple_select_one_onecol( + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + allow_none=True, + desc="is_user_in_group", + ).addCallback(lambda r: bool(r)) + + def is_user_admin_in_group(self, group_id, user_id): + return self._simple_select_one_onecol( + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="is_admin", + allow_none=True, + desc="is_user_adim_in_group", + ) + + def add_group_invite(self, group_id, user_id): + return self._simple_insert( + table="group_invites", + values={ + "group_id": group_id, + "user_id": user_id, + }, + desc="add_group_invite", + ) + + def is_user_invited_to_local_group(self, group_id, user_id): + return self._simple_select_one_onecol( + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + desc="is_user_invited_to_local_group", + allow_none=True, + ) + + def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True, + local_attestation=None, remote_attestation=None): + def _add_user_to_group_txn(txn): + self._simple_insert_txn( + txn, + table="group_users", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "is_public": is_public, + }, + ) + + self._simple_delete_txn( + txn, + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + }, + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + }, + ) + + return self.runInteraction( + "add_user_to_group", _add_user_to_group_txn + ) + + def remove_user_to_group(self, group_id, user_id): + def _remove_user_to_group_txn(txn): + self._simple_delete_txn( + txn, + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + return self.runInteraction("remove_user_to_group", _remove_user_to_group_txn) + + def add_room_to_group(self, group_id, room_id, is_public): + return self._simple_insert( + table="group_rooms", + values={ + "group_id": group_id, + "room_id": room_id, + "is_public": is_public, + }, + desc="add_room_to_group", + ) + + @defer.inlineCallbacks + def create_group(self, group_id, user_id, name, avatar_url, short_description, + long_description,): + yield self._simple_insert( + table="groups", + values={ + "group_id": group_id, + "name": name, + "avatar_url": avatar_url, + "short_description": short_description, + "long_description": long_description, + }, + desc="create_group", + ) + + def get_attestations_need_renewals(self, valid_until_ms): + def _get_attestations_need_renewals_txn(txn): + sql = """ + SELECT group_id, user_id FROM group_attestations_renewals + WHERE valid_until_ms <= ? + """ + txn.execute(sql, (valid_until_ms,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_attestations_need_renewals", _get_attestations_need_renewals_txn + ) + + def update_attestation_renewal(self, group_id, user_id, attestation): + return self._simple_update_one( + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + updatevalues={ + "valid_until_ms": attestation["valid_until_ms"], + }, + desc="update_attestation_renewal", + ) + + def update_remote_attestion(self, group_id, user_id, attestation): + return self._simple_update_one( + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + updatevalues={ + "valid_until_ms": attestation["valid_until_ms"], + "attestation": json.dumps(attestation) + }, + desc="update_remote_attestion", + ) + + @defer.inlineCallbacks + def get_remote_attestation(self, group_id, user_id): + row = yield self._simple_select_one( + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcols=("valid_until_ms", "attestation"), + desc="get_remote_attestation", + allow_none=True, + ) + + now = int(self._clock.time_msec()) + if row and now < row["valid_until_ms"]: + defer.returnValue(json.loads(row["attestation"])) + + defer.returnValue(None) -- cgit 1.5.1 From 83936293eb3ddb8998191b537153eaeec5e7afb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Jul 2017 09:58:59 +0100 Subject: Comments --- synapse/groups/attestations.py | 29 +++++- synapse/groups/groups_server.py | 108 +++++++++++++++-------- synapse/storage/group_server.py | 32 ++++++- synapse/storage/schema/delta/43/group_server.sql | 6 +- 4 files changed, 132 insertions(+), 43 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index d83076a9b3..6937fa44cf 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -28,6 +28,8 @@ UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 class GroupAttestationSigning(object): + """Creates and verifies group attestations. + """ def __init__(self, hs): self.keyring = hs.get_keyring() self.clock = hs.get_clock() @@ -36,11 +38,20 @@ class GroupAttestationSigning(object): @defer.inlineCallbacks def verify_attestation(self, attestation, group_id, user_id, server_name=None): + """Verifies that the given attestation matches the given paramaters. + + An optional server_name can be supplied to explicitly set which server's + signature is expected. Otherwise assumes that either the group_id or user_id + is local and uses the other's server as the one to check. + """ + if not server_name: if get_domain_from_id(group_id) == self.server_name: server_name = get_domain_from_id(user_id) - else: + elif get_domain_from_id(user_id) == self.server_name: server_name = get_domain_from_id(group_id) + else: + raise Exception("Expected eitehr group_id or user_id to be local") if user_id != attestation["user_id"]: raise SynapseError(400, "Attestation has incorrect user_id") @@ -48,6 +59,7 @@ class GroupAttestationSigning(object): if group_id != attestation["group_id"]: raise SynapseError(400, "Attestation has incorrect group_id") + # TODO: valid_until_ms = attestation["valid_until_ms"] if valid_until_ms - self.clock.time_msec() < MIN_ATTESTATION_LENGTH_MS: raise SynapseError(400, "Attestation not valid for long enough") @@ -55,6 +67,9 @@ class GroupAttestationSigning(object): yield self.keyring.verify_json_for_server(server_name, attestation) def create_attestation(self, group_id, user_id): + """Create an attestation for the group_id and user_id with default + validity length. + """ return sign_json({ "group_id": group_id, "user_id": user_id, @@ -63,11 +78,15 @@ class GroupAttestationSigning(object): class GroupAttestionRenewer(object): + """Responsible for sending and receiving attestation updates. + """ + def __init__(self, hs): self.clock = hs.get_clock() self.store = hs.get_datastore() self.assestations = hs.get_groups_attestation_signing() self.transport_client = hs.get_federation_transport_client() + self.is_mine_id = hs.is_mind_id self._renew_attestations_loop = self.clock.looping_call( self._renew_attestations, 30 * 60 * 1000, @@ -75,8 +94,13 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def on_renew_attestation(self, group_id, user_id, content): + """When a remote updates an attestation + """ attestation = content["attestation"] + if not self.is_mine_id(group_id) and not self.is_mine_id(user_id): + raise SynapseError(400, "Neither user not group are on this server") + yield self.attestations.verify_attestation( attestation, user_id=user_id, @@ -89,6 +113,9 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def _renew_attestations(self): + """Called periodically to check if we need to update any of our attestations + """ + now = self.clock.time_msec() rows = yield self.store.get_attestations_need_renewals( diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 195f1eae54..44083100f7 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -19,7 +19,6 @@ from synapse.api.errors import SynapseError from synapse.types import UserID, get_domain_from_id -import functools import logging logger = logging.getLogger(__name__) @@ -33,28 +32,6 @@ logger = logging.getLogger(__name__) # TODO: Flairs -UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 - - -def check_group_is_ours(and_exists=False): - def g(func): - @functools.wraps(func) - @defer.inlineCallbacks - def h(self, group_id, *args, **kwargs): - if not self.is_mine_id(group_id): - raise SynapseError(400, "Group not on this server") - if and_exists: - group = yield self.store.get_group(group_id) - if not group: - raise SynapseError(404, "Unknown group") - - res = yield func(self, group_id, *args, **kwargs) - defer.returnValue(res) - - return h - return g - - class GroupsServerHandler(object): def __init__(self, hs): self.hs = hs @@ -72,9 +49,28 @@ class GroupsServerHandler(object): # Ensure attestations get renewed hs.get_groups_attestation_renewer() - @check_group_is_ours() + @defer.inlineCallbacks + def check_group_is_ours(self, group_id, and_exists=False): + """Check that the group is ours, and optionally if it exists. + + If group does exist then return group. + """ + if not self.is_mine_id(group_id): + raise SynapseError(400, "Group not on this server") + + group = yield self.store.get_group(group_id) + if and_exists and not group: + raise SynapseError(404, "Unknown group") + + defer.returnValue(group) + @defer.inlineCallbacks def get_group_profile(self, group_id, requester_user_id): + """Get the group profile as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id) + group_description = yield self.store.get_group(group_id) if group_description: @@ -82,9 +78,13 @@ class GroupsServerHandler(object): else: raise SynapseError(404, "Unknown group") - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): + """Get the users in group as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) user_results = yield self.store.get_users_in_group( @@ -123,9 +123,13 @@ class GroupsServerHandler(object): "total_user_count_estimate": len(user_results), }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): + """Get the rooms in group as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) room_results = yield self.store.get_rooms_in_group( @@ -158,9 +162,13 @@ class GroupsServerHandler(object): "total_room_count_estimate": len(room_results), }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def add_room(self, group_id, requester_user_id, room_id, content): + """Add room to group + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) if not is_admin: raise SynapseError(403, "User is not admin in group") @@ -182,9 +190,13 @@ class GroupsServerHandler(object): defer.returnValue({}) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def invite_to_group(self, group_id, user_id, requester_user_id, content): + """Invite user to group + """ + + group = yield self.check_group_is_ours(group_id, and_exists=True) + is_admin = yield self.store.is_user_admin_in_group( group_id, requester_user_id ) @@ -194,7 +206,6 @@ class GroupsServerHandler(object): # TODO: Check if user knocked # TODO: Check if user is already invited - group = yield self.store.get_group(group_id) content = { "profile": { "name": group["name"], @@ -248,9 +259,16 @@ class GroupsServerHandler(object): else: raise SynapseError(502, "Unknown state returned by HS") - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def accept_invite(self, group_id, user_id, content): + """User tries to accept an invite to the group. + + This is different from them asking to join, and so should error if no + invite exists (and they're not a member of the group) + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + if not self.store.is_user_invited_to_local_group(group_id, user_id): raise SynapseError(403, "User not invited to group") @@ -291,19 +309,33 @@ class GroupsServerHandler(object): "attestation": local_attestation, }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def knock(self, group_id, user_id, content): - pass + """A user requests becoming a member of the group + """ + yield self.check_group_is_ours(group_id, and_exists=True) + + raise NotImplementedError() - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def accept_knock(self, group_id, user_id, content): - pass + """Accept a users knock to the room. + + Errors if the user hasn't knocked, rather than inviting them. + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + + raise NotImplementedError() - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + """Remove a user from the group; either a user is leaving or and admin + kicked htem. + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_kick = False if requester_user_id != user_id: is_admin = yield self.store.is_user_admin_in_group( @@ -314,7 +346,7 @@ class GroupsServerHandler(object): is_kick = True - yield self.store.remove_user_to_group( + yield self.store.remove_user_from_group( group_id, user_id, ) @@ -328,11 +360,11 @@ class GroupsServerHandler(object): defer.returnValue({}) - @check_group_is_ours() @defer.inlineCallbacks def create_group(self, group_id, user_id, content): + group = yield self.check_group_is_ours(group_id) + logger.info("Attempting to create group with ID: %r", group_id) - group = yield self.store.get_group(group_id) if group: raise SynapseError(400, "Group already exists") diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 01d9a982c8..327d770862 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -89,6 +89,8 @@ class GroupServerStore(SQLBaseStore): ) def add_group_invite(self, group_id, user_id): + """Record that the group server has invited a user + """ return self._simple_insert( table="group_invites", values={ @@ -99,6 +101,8 @@ class GroupServerStore(SQLBaseStore): ) def is_user_invited_to_local_group(self, group_id, user_id): + """Has the group server invited a user? + """ return self._simple_select_one_onecol( table="group_invites", keyvalues={ @@ -112,6 +116,19 @@ class GroupServerStore(SQLBaseStore): def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True, local_attestation=None, remote_attestation=None): + """Add a user to the group server. + + Args: + group_id (str) + user_id (str) + is_admin (bool) + is_public (bool) + local_attestation (dict): The attestation the GS created to give + to the remote server. Optional if the user and group are on the + same server + remote_attestation (dict): The attestation given to GS by remote + server. Optional if the user and group are on the same server + """ def _add_user_to_group_txn(txn): self._simple_insert_txn( txn, @@ -159,8 +176,8 @@ class GroupServerStore(SQLBaseStore): "add_user_to_group", _add_user_to_group_txn ) - def remove_user_to_group(self, group_id, user_id): - def _remove_user_to_group_txn(txn): + def remove_user_from_group(self, group_id, user_id): + def _remove_user_from_group_txn(txn): self._simple_delete_txn( txn, table="group_users", @@ -193,7 +210,7 @@ class GroupServerStore(SQLBaseStore): "user_id": user_id, }, ) - return self.runInteraction("remove_user_to_group", _remove_user_to_group_txn) + return self.runInteraction("remove_user_from_group", _remove_user_from_group_txn) def add_room_to_group(self, group_id, room_id, is_public): return self._simple_insert( @@ -222,6 +239,8 @@ class GroupServerStore(SQLBaseStore): ) def get_attestations_need_renewals(self, valid_until_ms): + """Get all attestations that need to be renewed until givent time + """ def _get_attestations_need_renewals_txn(txn): sql = """ SELECT group_id, user_id FROM group_attestations_renewals @@ -234,6 +253,8 @@ class GroupServerStore(SQLBaseStore): ) def update_attestation_renewal(self, group_id, user_id, attestation): + """Update an attestation that we have renewed + """ return self._simple_update_one( table="group_attestations_renewals", keyvalues={ @@ -247,6 +268,8 @@ class GroupServerStore(SQLBaseStore): ) def update_remote_attestion(self, group_id, user_id, attestation): + """Update an attestation that a remote has renewed + """ return self._simple_update_one( table="group_attestations_remote", keyvalues={ @@ -262,6 +285,9 @@ class GroupServerStore(SQLBaseStore): @defer.inlineCallbacks def get_remote_attestation(self, group_id, user_id): + """Get the attestation that proves the remote agrees that the user is + in the group. + """ row = yield self._simple_select_one( table="group_attestations_remote", keyvalues={ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 6f1a941990..5dc7a497e2 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -24,6 +24,7 @@ CREATE TABLE groups ( CREATE UNIQUE INDEX groups_idx ON groups(group_id); +-- list of users the group server thinks are joined CREATE TABLE group_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -35,7 +36,7 @@ CREATE TABLE group_users ( CREATE INDEX groups_users_g_idx ON group_users(group_id, user_id); CREATE INDEX groups_users_u_idx ON group_users(user_id); - +-- list of users the group server thinks are invited CREATE TABLE group_invites ( group_id TEXT NOT NULL, user_id TEXT NOT NULL @@ -55,6 +56,7 @@ CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); +-- List of attestations we've given out and need to renew CREATE TABLE group_attestations_renewals ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -65,6 +67,8 @@ CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(gr CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id); CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms); + +-- List of attestations we've received from remotes and are interested in. CREATE TABLE group_attestations_remote ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, -- cgit 1.5.1 From e52c391cd452077fc219fad0db8b9e5499251e5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Jul 2017 14:25:46 +0100 Subject: Rename column to attestation_json --- synapse/storage/group_server.py | 8 ++++---- synapse/storage/schema/delta/43/group_server.sql | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 327d770862..105ab9920e 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -168,7 +168,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), + "attestation_json": json.dumps(remote_attestation), }, ) @@ -278,7 +278,7 @@ class GroupServerStore(SQLBaseStore): }, updatevalues={ "valid_until_ms": attestation["valid_until_ms"], - "attestation": json.dumps(attestation) + "attestation_json": json.dumps(attestation) }, desc="update_remote_attestion", ) @@ -294,13 +294,13 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, }, - retcols=("valid_until_ms", "attestation"), + retcols=("valid_until_ms", "attestation_json"), desc="get_remote_attestation", allow_none=True, ) now = int(self._clock.time_msec()) if row and now < row["valid_until_ms"]: - defer.returnValue(json.loads(row["attestation"])) + defer.returnValue(json.loads(row["attestation_json"])) defer.returnValue(None) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index bfe8c2ca4a..b55b0a8deb 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -73,7 +73,7 @@ CREATE TABLE group_attestations_remote ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, valid_until_ms BIGINT NOT NULL, - attestation TEXT NOT NULL + attestation_json TEXT NOT NULL ); CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); -- cgit 1.5.1 From a62406aaa5c4ef3780e42c9de443a2cc1e82cd9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 15:44:40 +0100 Subject: Add group summary APIs --- synapse/federation/transport/server.py | 17 + synapse/groups/groups_server.py | 256 ++++++++- synapse/storage/group_server.py | 643 +++++++++++++++++++++++ synapse/storage/schema/delta/43/group_server.sql | 56 ++ 4 files changed, 970 insertions(+), 2 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 5d6ff79235..bbb66190e0 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -627,6 +627,22 @@ class FederationGroupsProfileServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsSummaryServlet(BaseFederationServlet): + PATH = "/groups/(?P[^/]*)/summary$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id): + requester_user_id = content["requester_user_id"] + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_group_summary( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsRoomsServlet(BaseFederationServlet): """Get the rooms in a group on behalf of a user """ @@ -784,6 +800,7 @@ ROOM_LIST_CLASSES = ( GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsProfileServlet, + FederationGroupsSummaryServlet, FederationGroupsRoomsServlet, FederationGroupsUsersServlet, FederationGroupsInviteServlet, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 414c95e3fe..29a911e18e 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -64,6 +64,255 @@ class GroupsServerHandler(object): defer.returnValue(group) + @defer.inlineCallbacks + def get_group_summary(self, group_id, requester_user_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) + + profile = yield self.get_group_profile(group_id, requester_user_id) + + users, roles = yield self.store.get_users_for_summary_by_role( + group_id, include_private=is_user_in_group, + ) + + # TODO: Add profiles to users + # TODO: Add assestations to users + + rooms, categories = yield self.store.get_rooms_for_summary_by_category( + group_id, include_private=is_user_in_group, + ) + + for room_entry in rooms: + room_id = room_entry["room_id"] + joined_users = yield self.store.get_users_in_room(room_id) + entry = yield self.room_list_handler.generate_room_entry( + room_id, len(joined_users), + with_alias=False, allow_private=True, + ) + entry.pop("room_id", None) + + room_entry["profile"] = entry + + rooms.sort(key=lambda e: e.get("order", 0)) + + for entry in users: + user_id = entry["user_id"] + + if not self.is_mine_id(requester_user_id): + attestation = yield self.store.get_remote_attestation(group_id, user_id) + if not attestation: + continue + + entry["attestation"] = attestation + else: + entry["attestation"] = self.attestations.create_attestation( + group_id, user_id, + ) + + users.sort(key=lambda e: e.get("order", 0)) + + defer.returnValue({ + "profile": profile, + "users_section": { + "users": users, + "roles": roles, + "total_user_count_estimate": 0, # TODO + }, + "rooms_section": { + "rooms": rooms, + "categories": categories, + "total_room_count_estimate": 0, # TODO + }, + }) + + @defer.inlineCallbacks + def update_group_summary_room(self, group_id, user_id, room_id, category_id, content): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + order = content.get("order", None) + + is_public = _parse_visibility_from_contents(content) + + yield self.store.add_room_to_summary( + group_id=group_id, + room_id=room_id, + category_id=category_id, + order=order, + is_public=is_public, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def delete_group_summary_room(self, group_id, user_id, room_id, category_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + yield self.store.remove_room_from_summary( + group_id=group_id, + room_id=room_id, + category_id=category_id, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def get_group_categories(self, group_id, user_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + categories = yield self.store.get_group_categories( + group_id=group_id, + ) + defer.returnValue({"categories": categories}) + + @defer.inlineCallbacks + def get_group_category(self, group_id, user_id, category_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + res = yield self.store.get_group_category( + group_id=group_id, + category_id=category_id, + ) + + defer.returnValue(res) + + @defer.inlineCallbacks + def update_group_category(self, group_id, user_id, category_id, content): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + is_public = _parse_visibility_from_contents(content) + profile = content.get("profile") + + yield self.store.upsert_group_category( + group_id=group_id, + category_id=category_id, + is_public=is_public, + profile=profile, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def delete_group_category(self, group_id, user_id, category_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + yield self.store.remove_group_category( + group_id=group_id, + category_id=category_id, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def get_group_roles(self, group_id, user_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + roles = yield self.store.get_group_roles( + group_id=group_id, + ) + defer.returnValue({"roles": roles}) + + @defer.inlineCallbacks + def get_group_role(self, group_id, user_id, role_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + res = yield self.store.get_group_role( + group_id=group_id, + role_id=role_id, + ) + defer.returnValue(res) + + @defer.inlineCallbacks + def update_group_role(self, group_id, user_id, role_id, content): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + is_public = _parse_visibility_from_contents(content) + + profile = content.get("profile") + + yield self.store.upsert_group_role( + group_id=group_id, + role_id=role_id, + is_public=is_public, + profile=profile, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def delete_group_role(self, group_id, user_id, role_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + yield self.store.remove_group_role( + group_id=group_id, + role_id=role_id, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def update_group_summary_user(self, group_id, requester_user_id, user_id, role_id, + content): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + order = content.get("order", None) + + is_public = _parse_visibility_from_contents(content) + + yield self.store.add_user_to_summary( + group_id=group_id, + user_id=user_id, + role_id=role_id, + order=order, + is_public=is_public, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + yield self.store.remove_user_from_summary( + group_id=group_id, + user_id=user_id, + role_id=role_id, + ) + + defer.returnValue({}) + @defer.inlineCallbacks def get_group_profile(self, group_id, requester_user_id): """Get the group profile as seen by requester_user_id @@ -210,7 +459,9 @@ class GroupsServerHandler(object): } if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + res = yield groups_local.on_invite(group_id, user_id, content) + local_attestation = None else: local_attestation = self.attestations.create_attestation(group_id, user_id) content.update({ @@ -338,7 +589,8 @@ class GroupsServerHandler(object): if is_kick: if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + yield groups_local.user_removed_from_group(group_id, user_id, {}) else: yield self.transport_client.remove_user_from_group_notification( get_domain_from_id(user_id), group_id, user_id, {} diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 105ab9920e..f4818ff174 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -15,11 +15,16 @@ from twisted.internet import defer +from synapse.api.errors import SynapseError + from ._base import SQLBaseStore import ujson as json +_DEFAULT_CATEGORY_ID = "default" + + class GroupServerStore(SQLBaseStore): def get_group(self, group_id): return self._simple_select_one( @@ -64,6 +69,492 @@ class GroupServerStore(SQLBaseStore): desc="get_rooms_in_group", ) + def get_rooms_for_summary_by_category(self, group_id, include_private=False): + def _get_rooms_for_summary_txn(txn): + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + sql = """ + SELECT room_id, is_public, category_id, room_order + FROM group_summary_rooms + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + rooms = [ + { + "room_id": row[0], + "is_public": row[1], + "category_id": row[2] if row[2] != _DEFAULT_CATEGORY_ID else None, + "order": row[3], + } + for row in txn + ] + + sql = """ + SELECT category_id, is_public, profile, cat_order + FROM group_summary_room_categories + INNER JOIN group_room_categories USING (group_id, category_id) + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + categories = { + row[0]: { + "is_public": row[1], + "profile": json.loads(row[2]), + "order": row[3], + } + for row in txn + } + + return rooms, categories + return self.runInteraction( + "get_rooms_for_summary", _get_rooms_for_summary_txn + ) + + def add_room_to_summary(self, group_id, room_id, category_id, order, is_public): + return self.runInteraction( + "add_room_to_summary", self._add_room_to_summary_txn, + group_id, room_id, category_id, order, is_public, + ) + + def _add_room_to_summary_txn(self, txn, group_id, room_id, category_id, order, + is_public): + if category_id is None: + category_id = _DEFAULT_CATEGORY_ID + else: + cat_exists = self._simple_select_one_onecol_txn( + txn, + table="group_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + retcol="group_id", + allow_none=True, + ) + if not cat_exists: + raise SynapseError(400, "Category doesn't exist") + + # TODO: Check room is part of group already + cat_exists = self._simple_select_one_onecol_txn( + txn, + table="group_summary_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + retcol="group_id", + allow_none=True, + ) + if not cat_exists: + txn.execute(""" + INSERT INTO group_summary_room_categories + (group_id, category_id, cat_order) + SELECT ?, ?, COALESCE(MAX(cat_order), 1) + FROM group_summary_room_categories + WHERE group_id = ? AND category_id = ? + """, (group_id, category_id, group_id, category_id)) + + existing = self._simple_select_one_txn( + txn, + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + "category_id": category_id, + }, + retcols=("room_order", "is_public",), + allow_none=True, + ) + + if order is not None: + sql = """ + UPDATE group_summary_rooms SET room_order = room_order + 1 + WHERE group_id = ? AND category_id = ? AND room_order >= ? + """ + txn.execute(sql, (group_id, category_id, order,)) + elif not existing: + sql = """ + SELECT COALESCE(MAX(room_order), 0) + 1 FROM group_summary_rooms + WHERE group_id = ? AND category_id = ? + """ + txn.execute(sql, (group_id, category_id,)) + order, = txn.fetchone() + + if existing: + to_update = {} + if order is not None: + to_update["room_order"] = order + if is_public is not None: + to_update["is_public"] = is_public + self._simple_update_txn( + txn, + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + "room_id": room_id, + }, + values=to_update, + ) + else: + if is_public is None: + is_public = True + + self._simple_insert_txn( + txn, + table="group_summary_rooms", + values={ + "group_id": group_id, + "category_id": category_id, + "room_id": room_id, + "room_order": order, + "is_public": is_public, + }, + ) + + def remove_room_from_summary(self, group_id, room_id, category_id): + if category_id is None: + category_id = _DEFAULT_CATEGORY_ID + + return self._simple_delete( + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + "room_id": room_id, + }, + desc="remove_room_from_summary", + ) + + @defer.inlineCallbacks + def get_group_categories(self, group_id): + rows = yield self._simple_select_list( + table="group_room_categories", + keyvalues={ + "group_id": group_id, + }, + retcols=("category_id", "is_public", "profile"), + desc="get_group_categories", + ) + + defer.returnValue({ + row["category_id"]: { + "is_public": row["is_public"], + "profile": json.loads(row["profile"]), + } + for row in rows + }) + + @defer.inlineCallbacks + def get_group_category(self, group_id, category_id): + category = yield self._simple_select_one( + table="group_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + retcols=("is_public", "profile"), + desc="get_group_category", + ) + + category["profile"] = json.loads(category["profile"]) + + defer.returnValue(category) + + def upsert_group_category(self, group_id, category_id, profile, is_public): + insertion_values = {} + update_values = {"category_id": category_id} # This cannot be empty + + if profile is None: + insertion_values["profile"] = "{}" + else: + update_values["profile"] = json.dumps(profile) + + if is_public is None: + insertion_values["is_public"] = True + else: + update_values["is_public"] = is_public + + return self._simple_upsert( + table="group_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + values=update_values, + insertion_values=insertion_values, + desc="upsert_group_category", + ) + + def remove_group_category(self, group_id, category_id): + return self._simple_delete( + table="group_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + desc="remove_group_category", + ) + + @defer.inlineCallbacks + def get_group_roles(self, group_id): + rows = yield self._simple_select_list( + table="group_roles", + keyvalues={ + "group_id": group_id, + }, + retcols=("role_id", "is_public", "profile"), + desc="get_group_roles", + ) + + defer.returnValue({ + row["role_id"]: { + "is_public": row["is_public"], + "profile": json.loads(row["profile"]), + } + for row in rows + }) + + @defer.inlineCallbacks + def get_group_role(self, group_id, role_id): + role = yield self._simple_select_one( + table="group_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + retcols=("is_public", "profile"), + desc="get_group_role", + ) + + role["profile"] = json.loads(role["profile"]) + + defer.returnValue(role) + + def upsert_group_role(self, group_id, role_id, profile, is_public): + insertion_values = {} + update_values = {"role_id": role_id} # This cannot be empty + + if profile is None: + insertion_values["profile"] = "{}" + else: + update_values["profile"] = json.dumps(profile) + + if is_public is None: + insertion_values["is_public"] = True + else: + update_values["is_public"] = is_public + + return self._simple_upsert( + table="group_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + values=update_values, + insertion_values=insertion_values, + desc="upsert_group_role", + ) + + def remove_group_role(self, group_id, role_id): + return self._simple_delete( + table="group_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + desc="remove_group_role", + ) + + def add_user_to_summary(self, group_id, user_id, role_id, order, is_public): + return self.runInteraction( + "add_user_to_summary", self._add_user_to_summary_txn, + group_id, user_id, role_id, order, is_public, + ) + + def _add_user_to_summary_txn(self, txn, group_id, user_id, role_id, order, + is_public): + if role_id is None: + role_id = _DEFAULT_CATEGORY_ID + else: + role_exists = self._simple_select_one_onecol_txn( + txn, + table="group_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + retcol="group_id", + allow_none=True, + ) + if not role_exists: + raise SynapseError(400, "Role doesn't exist") + + # TODO: Check room is part of group already + role_exists = self._simple_select_one_onecol_txn( + txn, + table="group_summary_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + retcol="group_id", + allow_none=True, + ) + if not role_exists: + txn.execute(""" + INSERT INTO group_summary_roles + (group_id, role_id, role_order) + SELECT ?, ?, COALESCE(MAX(role_order), 1) + FROM group_summary_roles + WHERE group_id = ? AND role_id = ? + """, (group_id, role_id, group_id, role_id)) + + existing = self._simple_select_one_txn( + txn, + table="group_summary_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + "role_id": role_id, + }, + retcols=("user_order", "is_public",), + allow_none=True, + ) + + if order is not None: + sql = """ + UPDATE group_summary_users SET user_order = user_order + 1 + WHERE group_id = ? AND role_id = ? AND user_order >= ? + """ + txn.execute(sql, (group_id, role_id, order,)) + elif not existing: + sql = """ + SELECT COALESCE(MAX(user_order), 0) + 1 FROM group_summary_users + WHERE group_id = ? AND role_id = ? + """ + txn.execute(sql, (group_id, role_id,)) + order, = txn.fetchone() + + if existing: + to_update = {} + if order is not None: + to_update["user_order"] = order + if is_public is not None: + to_update["is_public"] = is_public + self._simple_update_txn( + txn, + table="group_summary_users", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + "user_id": user_id, + }, + values=to_update, + ) + else: + if is_public is None: + is_public = True + + self._simple_insert_txn( + txn, + table="group_summary_users", + values={ + "group_id": group_id, + "role_id": role_id, + "user_id": user_id, + "user_order": order, + "is_public": is_public, + }, + ) + + def remove_user_from_summary(self, group_id, user_id, role_id): + if role_id is None: + role_id = _DEFAULT_CATEGORY_ID + + return self._simple_delete( + table="group_summary_users", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + "user_id": user_id, + }, + desc="remove_user_from_summary", + ) + + def get_users_for_summary_by_role(self, group_id, include_private=False): + def _get_users_for_summary_txn(txn): + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + sql = """ + SELECT user_id, is_public, role_id, user_order + FROM group_summary_users + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + users = [ + { + "user_id": row[0], + "is_public": row[1], + "role_id": row[2] if row[2] != _DEFAULT_CATEGORY_ID else None, + "order": row[3], + } + for row in txn + ] + + sql = """ + SELECT role_id, is_public, profile, role_order + FROM group_summary_roles + INNER JOIN group_roles USING (group_id, role_id) + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + roles = { + row[0]: { + "is_public": row[1], + "profile": json.loads(row[2]), + "order": row[3], + } + for row in txn + } + + return users, roles + return self.runInteraction( + "get_users_for_summary_by_role", _get_users_for_summary_txn + ) + def is_user_in_group(self, user_id, group_id): return self._simple_select_one_onecol( table="group_users", @@ -223,6 +714,103 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + @defer.inlineCallbacks + def register_user_group_membership(self, group_id, user_id, membership, + is_admin=False, content={}, + local_attestation=None, + remote_attestation=None, + ): + def _register_user_group_membership_txn(txn, next_id): + # TODO: Upsert? + self._simple_delete_txn( + txn, + table="local_group_membership", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_insert_txn( + txn, + table="local_group_membership", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "membership": membership, + "content": json.dumps(content), + }, + ) + self._simple_delete_txn( + txn, + table="local_group_updates", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + "type": "membership", + }, + ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + + # TODO: Insert profile to ensuer it comes down stream if its a join. + + if membership == "join": + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + } + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + } + ) + else: + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) + @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, long_description,): @@ -238,6 +826,61 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index c223ee275a..3013b89b7e 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -56,6 +56,62 @@ CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); +CREATE TABLE group_summary_rooms ( + group_id TEXT NOT NULL, + room_id TEXT NOT NULL, + category_id TEXT NOT NULL, + room_order BIGINT NOT NULL, + is_public BOOLEAN NOT NULL, + UNIQUE (group_id, category_id, room_id, room_order), + CHECK (room_order > 0) +); + +CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id); + +CREATE TABLE group_summary_room_categories ( + group_id TEXT NOT NULL, + category_id TEXT NOT NULL, + cat_order BIGINT NOT NULL, + UNIQUE (group_id, category_id, cat_order), + CHECK (cat_order > 0) +); + +CREATE TABLE group_room_categories ( + group_id TEXT NOT NULL, + category_id TEXT NOT NULL, + profile TEXT NOT NULL, + is_public BOOLEAN NOT NULL, + UNIQUE (group_id, category_id) +); + + +CREATE TABLE group_summary_users ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + role_id TEXT NOT NULL, + user_order BIGINT NOT NULL, + is_public BOOLEAN NOT NULL +); + +CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id); + +CREATE TABLE group_summary_roles ( + group_id TEXT NOT NULL, + role_id TEXT NOT NULL, + role_order BIGINT NOT NULL, + UNIQUE (group_id, role_id, role_order), + CHECK (role_order > 0) +); + +CREATE TABLE group_roles ( + group_id TEXT NOT NULL, + role_id TEXT NOT NULL, + profile TEXT NOT NULL, + is_public BOOLEAN NOT NULL, + UNIQUE (group_id, role_id) +); + + -- List of attestations we've given out and need to renew CREATE TABLE group_attestations_renewals ( group_id TEXT NOT NULL, -- cgit 1.5.1 From 26451a09eb938e6a72be3d77ff8c9e3fd2b33539 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Jul 2017 14:11:59 +0100 Subject: Comments --- synapse/groups/groups_server.py | 38 ++++++++++++++++++++++++ synapse/storage/group_server.py | 29 ++++++++++++++++++ synapse/storage/schema/delta/43/group_server.sql | 17 +++++++---- 3 files changed, 79 insertions(+), 5 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index ec45da2d7a..83dfcd0fd4 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -54,6 +54,12 @@ class GroupsServerHandler(object): """Check that the group is ours, and optionally if it exists. If group does exist then return group. + + Args: + group_id (str) + and_exists (bool): whether to also check if group exists + and_is_admin (str): whether to also check if given str is a user_id + that is an admin """ if not self.is_mine_id(group_id): raise SynapseError(400, "Group not on this server") @@ -71,6 +77,14 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_summary(self, group_id, requester_user_id): + """Get the summary for a group as seen by requester_user_id. + + The group summary consists of the profile of the room, and a curated + list of users and rooms. These list *may* be organised by role/category. + The roles/categories are ordered, and so are the users/rooms within them. + + A user/room may appear in multiple roles/categories. + """ yield self.check_group_is_ours(group_id, and_exists=True) is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) @@ -133,6 +147,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def update_group_summary_room(self, group_id, user_id, room_id, category_id, content): + """Add/update a room to the group summary + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) order = content.get("order", None) @@ -151,6 +167,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def delete_group_summary_room(self, group_id, user_id, room_id, category_id): + """Remove a room from the summary + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) yield self.store.remove_room_from_summary( @@ -163,6 +181,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_categories(self, group_id, user_id): + """Get all categories in a group (as seen by user) + """ yield self.check_group_is_ours(group_id, and_exists=True) categories = yield self.store.get_group_categories( @@ -172,6 +192,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_category(self, group_id, user_id, category_id): + """Get a specific category in a group (as seen by user) + """ yield self.check_group_is_ours(group_id, and_exists=True) res = yield self.store.get_group_category( @@ -183,6 +205,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def update_group_category(self, group_id, user_id, category_id, content): + """Add/Update a group category + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) is_public = _parse_visibility_from_contents(content) @@ -199,6 +223,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def delete_group_category(self, group_id, user_id, category_id): + """Delete a group category + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) yield self.store.remove_group_category( @@ -210,6 +236,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_roles(self, group_id, user_id): + """Get all roles in a group (as seen by user) + """ yield self.check_group_is_ours(group_id, and_exists=True) roles = yield self.store.get_group_roles( @@ -219,6 +247,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_role(self, group_id, user_id, role_id): + """Get a specific role in a group (as seen by user) + """ yield self.check_group_is_ours(group_id, and_exists=True) res = yield self.store.get_group_role( @@ -229,6 +259,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def update_group_role(self, group_id, user_id, role_id, content): + """Add/update a role in a group + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) is_public = _parse_visibility_from_contents(content) @@ -246,6 +278,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def delete_group_role(self, group_id, user_id, role_id): + """Remove role from group + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) yield self.store.remove_group_role( @@ -258,6 +292,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def update_group_summary_user(self, group_id, requester_user_id, user_id, role_id, content): + """Add/update a users entry in the group summary + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) order = content.get("order", None) @@ -276,6 +312,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): + """Remove a user from the group summary + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) yield self.store.remove_user_from_summary( diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index f4818ff174..18bfaeda6e 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -22,6 +22,8 @@ from ._base import SQLBaseStore import ujson as json +# The category ID for the "default" category. We don't store as null in the +# database to avoid the fun of null != null _DEFAULT_CATEGORY_ID = "default" @@ -70,6 +72,10 @@ class GroupServerStore(SQLBaseStore): ) def get_rooms_for_summary_by_category(self, group_id, include_private=False): + """Get the rooms and categories that should be included in a summary request + + Returns ([rooms], [categories]) + """ def _get_rooms_for_summary_txn(txn): keyvalues = { "group_id": group_id, @@ -134,6 +140,14 @@ class GroupServerStore(SQLBaseStore): def _add_room_to_summary_txn(self, txn, group_id, room_id, category_id, order, is_public): + """Add room to summary. + + This automatically adds the room to the end of the list of rooms to be + included in the summary response. If a role is given then user will + be added under that category (the category will automatically be added tothe + the summary if a user is listed under that role in the summary). + """ + if category_id is None: category_id = _DEFAULT_CATEGORY_ID else: @@ -278,6 +292,8 @@ class GroupServerStore(SQLBaseStore): defer.returnValue(category) def upsert_group_category(self, group_id, category_id, profile, is_public): + """Add/update room category for group + """ insertion_values = {} update_values = {"category_id": category_id} # This cannot be empty @@ -348,6 +364,8 @@ class GroupServerStore(SQLBaseStore): defer.returnValue(role) def upsert_group_role(self, group_id, role_id, profile, is_public): + """Add/remove user role + """ insertion_values = {} update_values = {"role_id": role_id} # This cannot be empty @@ -390,6 +408,13 @@ class GroupServerStore(SQLBaseStore): def _add_user_to_summary_txn(self, txn, group_id, user_id, role_id, order, is_public): + """Add user to summary. + + This automatically adds the user to the end of the list of users to be + included in the summary response. If a role is given then user will + be added under that role (the role will automatically be added to the + summary if a user is listed under that role in the summary). + """ if role_id is None: role_id = _DEFAULT_CATEGORY_ID else: @@ -499,6 +524,10 @@ class GroupServerStore(SQLBaseStore): ) def get_users_for_summary_by_role(self, group_id, include_private=False): + """Get the users and roles that should be included in a summary request + + Returns ([users], [roles]) + """ def _get_users_for_summary_txn(txn): keyvalues = { "group_id": group_id, diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 3013b89b7e..472aab0a78 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -56,18 +56,21 @@ CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); +-- Rooms to include in the summary CREATE TABLE group_summary_rooms ( group_id TEXT NOT NULL, room_id TEXT NOT NULL, category_id TEXT NOT NULL, room_order BIGINT NOT NULL, - is_public BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the room should be show to everyone UNIQUE (group_id, category_id, room_id, room_order), CHECK (room_order > 0) ); CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id); + +-- Categories to include in the summary CREATE TABLE group_summary_room_categories ( group_id TEXT NOT NULL, category_id TEXT NOT NULL, @@ -76,25 +79,27 @@ CREATE TABLE group_summary_room_categories ( CHECK (cat_order > 0) ); +-- The categories in the group CREATE TABLE group_room_categories ( group_id TEXT NOT NULL, category_id TEXT NOT NULL, profile TEXT NOT NULL, - is_public BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the category should be show to everyone UNIQUE (group_id, category_id) ); - +-- The users to include in the group summary CREATE TABLE group_summary_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, role_id TEXT NOT NULL, user_order BIGINT NOT NULL, - is_public BOOLEAN NOT NULL + is_public BOOLEAN NOT NULL -- whether the user should be show to everyone ); CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id); +-- The roles to include in the group summary CREATE TABLE group_summary_roles ( group_id TEXT NOT NULL, role_id TEXT NOT NULL, @@ -103,11 +108,13 @@ CREATE TABLE group_summary_roles ( CHECK (role_order > 0) ); + +-- The roles in a groups CREATE TABLE group_roles ( group_id TEXT NOT NULL, role_id TEXT NOT NULL, profile TEXT NOT NULL, - is_public BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the role should be show to everyone UNIQUE (group_id, role_id) ); -- cgit 1.5.1 From 8575e3160f98a0b33cd0ec6080389701dcb535e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Jul 2017 13:32:40 +0100 Subject: Comments --- synapse/federation/transport/server.py | 8 ++++++++ synapse/storage/group_server.py | 36 ++++++++++++++++++++++------------ 2 files changed, 32 insertions(+), 12 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 1ea2b37ce8..304c2a2a4c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -810,6 +810,8 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): class FederationGroupsCategoriesServlet(BaseFederationServlet): + """Get all categories for a group + """ PATH = ( "/groups/(?P[^/]*)/categories/$" ) @@ -828,6 +830,8 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet): class FederationGroupsCategoryServlet(BaseFederationServlet): + """Add/remove/get a category in a group + """ PATH = ( "/groups/(?P[^/]*)/categories/(?P[^/]+)$" ) @@ -870,6 +874,8 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): class FederationGroupsRolesServlet(BaseFederationServlet): + """Get roles in a group + """ PATH = ( "/groups/(?P[^/]*)/roles/$" ) @@ -888,6 +894,8 @@ class FederationGroupsRolesServlet(BaseFederationServlet): class FederationGroupsRoleServlet(BaseFederationServlet): + """Add/remove/get a role in a group + """ PATH = ( "/groups/(?P[^/]*)/roles/(?P[^/]+)$" ) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 18bfaeda6e..b328ef8bc4 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -140,12 +140,16 @@ class GroupServerStore(SQLBaseStore): def _add_room_to_summary_txn(self, txn, group_id, room_id, category_id, order, is_public): - """Add room to summary. + """Add (or update) room's entry in summary. - This automatically adds the room to the end of the list of rooms to be - included in the summary response. If a role is given then user will - be added under that category (the category will automatically be added tothe - the summary if a user is listed under that role in the summary). + Args: + group_id (str) + room_id (str) + category_id (str): If not None then adds the category to the end of + the summary if its not already there. [Optional] + order (int): If not None inserts the room at that position, e.g. + an order of 1 will put the room first. Otherwise, the room gets + added to the end. """ if category_id is None: @@ -164,7 +168,7 @@ class GroupServerStore(SQLBaseStore): if not cat_exists: raise SynapseError(400, "Category doesn't exist") - # TODO: Check room is part of group already + # TODO: Check category is part of summary already cat_exists = self._simple_select_one_onecol_txn( txn, table="group_summary_room_categories", @@ -176,6 +180,7 @@ class GroupServerStore(SQLBaseStore): allow_none=True, ) if not cat_exists: + # If not, add it with an order larger than all others txn.execute(""" INSERT INTO group_summary_room_categories (group_id, category_id, cat_order) @@ -197,6 +202,7 @@ class GroupServerStore(SQLBaseStore): ) if order is not None: + # Shuffle other room orders that come after the given order sql = """ UPDATE group_summary_rooms SET room_order = room_order + 1 WHERE group_id = ? AND category_id = ? AND room_order >= ? @@ -408,12 +414,16 @@ class GroupServerStore(SQLBaseStore): def _add_user_to_summary_txn(self, txn, group_id, user_id, role_id, order, is_public): - """Add user to summary. + """Add (or update) user's entry in summary. - This automatically adds the user to the end of the list of users to be - included in the summary response. If a role is given then user will - be added under that role (the role will automatically be added to the - summary if a user is listed under that role in the summary). + Args: + group_id (str) + user_id (str) + role_id (str): If not None then adds the role to the end of + the summary if its not already there. [Optional] + order (int): If not None inserts the user at that position, e.g. + an order of 1 will put the user first. Otherwise, the user gets + added to the end. """ if role_id is None: role_id = _DEFAULT_CATEGORY_ID @@ -431,7 +441,7 @@ class GroupServerStore(SQLBaseStore): if not role_exists: raise SynapseError(400, "Role doesn't exist") - # TODO: Check room is part of group already + # TODO: Check role is part of the summary already role_exists = self._simple_select_one_onecol_txn( txn, table="group_summary_roles", @@ -443,6 +453,7 @@ class GroupServerStore(SQLBaseStore): allow_none=True, ) if not role_exists: + # If not, add it with an order larger than all others txn.execute(""" INSERT INTO group_summary_roles (group_id, role_id, role_order) @@ -464,6 +475,7 @@ class GroupServerStore(SQLBaseStore): ) if order is not None: + # Shuffle other users orders that come after the given order sql = """ UPDATE group_summary_users SET user_order = user_order + 1 WHERE group_id = ? AND role_id = ? AND user_order >= ? -- cgit 1.5.1 From 3b0470dba59274c65e69a4eab8909eaa55393a2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Jul 2017 13:53:21 +0100 Subject: Remove unused functions --- synapse/storage/group_server.py | 152 ---------------------------------------- 1 file changed, 152 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index b328ef8bc4..2e05c23fd7 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -755,103 +755,6 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) - @defer.inlineCallbacks - def register_user_group_membership(self, group_id, user_id, membership, - is_admin=False, content={}, - local_attestation=None, - remote_attestation=None, - ): - def _register_user_group_membership_txn(txn, next_id): - # TODO: Upsert? - self._simple_delete_txn( - txn, - table="local_group_membership", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - }, - ) - self._simple_insert_txn( - txn, - table="local_group_membership", - values={ - "group_id": group_id, - "user_id": user_id, - "is_admin": is_admin, - "membership": membership, - "content": json.dumps(content), - }, - ) - self._simple_delete_txn( - txn, - table="local_group_updates", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - "type": "membership", - }, - ) - self._simple_insert_txn( - txn, - table="local_group_updates", - values={ - "stream_id": next_id, - "group_id": group_id, - "user_id": user_id, - "type": "membership", - "content": json.dumps({"membership": membership, "content": content}), - } - ) - self._group_updates_stream_cache.entity_has_changed(user_id, next_id) - - # TODO: Insert profile to ensuer it comes down stream if its a join. - - if membership == "join": - if local_attestation: - self._simple_insert_txn( - txn, - table="group_attestations_renewals", - values={ - "group_id": group_id, - "user_id": user_id, - "valid_until_ms": local_attestation["valid_until_ms"], - } - ) - if remote_attestation: - self._simple_insert_txn( - txn, - table="group_attestations_remote", - values={ - "group_id": group_id, - "user_id": user_id, - "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), - } - ) - else: - self._simple_delete_txn( - txn, - table="group_attestations_renewals", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - }, - ) - self._simple_delete_txn( - txn, - table="group_attestations_remote", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - }, - ) - - with self._group_updates_id_gen.get_next() as next_id: - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, next_id, - ) - @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, long_description,): @@ -867,61 +770,6 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) - def get_joined_groups(self, user_id): - return self._simple_select_onecol( - table="local_group_membership", - keyvalues={ - "user_id": user_id, - "membership": "join", - }, - retcol="group_id", - desc="get_joined_groups", - ) - - def get_all_groups_for_user(self, user_id, now_token): - def _get_all_groups_for_user_txn(txn): - sql = """ - SELECT group_id, type, membership, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND membership != 'leave' - AND stream_id <= ? - """ - txn.execute(sql, (user_id, now_token,)) - return self.cursor_to_dict(txn) - return self.runInteraction( - "get_all_groups_for_user", _get_all_groups_for_user_txn, - ) - - def get_groups_changes_for_user(self, user_id, from_token, to_token): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_entity_changed( - user_id, from_token, - ) - if not has_changed: - return [] - - def _get_groups_changes_for_user_txn(txn): - sql = """ - SELECT group_id, membership, type, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND ? < stream_id AND stream_id <= ? - """ - txn.execute(sql, (user_id, from_token, to_token,)) - return [{ - "group_id": group_id, - "membership": membership, - "type": gtype, - "content": json.loads(content_json), - } for group_id, membership, gtype, content_json in txn] - return self.runInteraction( - "get_groups_changes_for_user", _get_groups_changes_for_user_txn, - ) - - def get_group_stream_token(self): - return self._group_updates_id_gen.get_current_token() - def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ -- cgit 1.5.1 From 4b203bdba51a314abef56ccee4d77e1945d16735 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Jul 2017 14:02:00 +0100 Subject: Correctly increment orders --- synapse/storage/group_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 2e05c23fd7..c23dc79ca5 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -184,7 +184,7 @@ class GroupServerStore(SQLBaseStore): txn.execute(""" INSERT INTO group_summary_room_categories (group_id, category_id, cat_order) - SELECT ?, ?, COALESCE(MAX(cat_order), 1) + SELECT ?, ?, COALESCE(MAX(cat_order), 0) + 1 FROM group_summary_room_categories WHERE group_id = ? AND category_id = ? """, (group_id, category_id, group_id, category_id)) @@ -457,7 +457,7 @@ class GroupServerStore(SQLBaseStore): txn.execute(""" INSERT INTO group_summary_roles (group_id, role_id, role_order) - SELECT ?, ?, COALESCE(MAX(role_order), 1) + SELECT ?, ?, COALESCE(MAX(role_order), 0) + 1 FROM group_summary_roles WHERE group_id = ? AND role_id = ? """, (group_id, role_id, group_id, role_id)) -- cgit 1.5.1 From 85fda57208bb79e54fe473fda64351f04ffe1cda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Jul 2017 14:03:54 +0100 Subject: Add DEFAULT_ROLE_ID --- synapse/storage/group_server.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index c23dc79ca5..e8a799d8c7 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -24,7 +24,8 @@ import ujson as json # The category ID for the "default" category. We don't store as null in the # database to avoid the fun of null != null -_DEFAULT_CATEGORY_ID = "default" +_DEFAULT_CATEGORY_ID = "" +_DEFAULT_ROLE_ID = "" class GroupServerStore(SQLBaseStore): @@ -426,7 +427,7 @@ class GroupServerStore(SQLBaseStore): added to the end. """ if role_id is None: - role_id = _DEFAULT_CATEGORY_ID + role_id = _DEFAULT_ROLE_ID else: role_exists = self._simple_select_one_onecol_txn( txn, @@ -523,7 +524,7 @@ class GroupServerStore(SQLBaseStore): def remove_user_from_summary(self, group_id, user_id, role_id): if role_id is None: - role_id = _DEFAULT_CATEGORY_ID + role_id = _DEFAULT_ROLE_ID return self._simple_delete( table="group_summary_users", @@ -563,7 +564,7 @@ class GroupServerStore(SQLBaseStore): { "user_id": row[0], "is_public": row[1], - "role_id": row[2] if row[2] != _DEFAULT_CATEGORY_ID else None, + "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None, "order": row[3], } for row in txn -- cgit 1.5.1 From 2f9eafdd369796d8b7731b24ab8cf6a98ad19e29 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 14:52:27 +0100 Subject: Add local group server support --- synapse/federation/transport/client.py | 77 +++ synapse/federation/transport/server.py | 44 ++ synapse/groups/groups_server.py | 7 +- synapse/handlers/groups_local.py | 278 ++++++++++ synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/groups.py | 642 +++++++++++++++++++++++ synapse/server.py | 5 + synapse/storage/__init__.py | 15 + synapse/storage/group_server.py | 152 ++++++ synapse/storage/schema/delta/43/group_server.sql | 28 + 10 files changed, 1248 insertions(+), 2 deletions(-) create mode 100644 synapse/handlers/groups_local.py create mode 100644 synapse/rest/client/v2_alpha/groups.py (limited to 'synapse/storage/group_server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d0f8da7516..ea340e345c 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -472,6 +472,72 @@ class TransportLayerClient(object): defer.returnValue(content) + @log_function + def get_group_profile(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/profile" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_summary(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/summary" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_rooms(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/rooms" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_users(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/users" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def accept_group_invite(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + @log_function + def invite_to_group(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + @log_function def invite_to_group_notification(self, destination, group_id, user_id, content): """Sent by group server to inform a user's server that they have been @@ -487,6 +553,17 @@ class TransportLayerClient(object): ignore_backoff=True, ) + @log_function + def remove_user_from_group(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + @log_function def remove_user_from_group_notification(self, destination, group_id, user_id, content): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 4f7d2546cf..0f08334f33 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -715,6 +715,21 @@ class FederationGroupsInviteServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsLocalInviteServlet(BaseFederationServlet): + PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/invite$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "group_id doesn't match origin") + + new_content = yield self.handler.on_invite( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsAcceptInviteServlet(BaseFederationServlet): """Accept an invitation from the group server """ @@ -750,6 +765,21 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): + PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/remove$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "user_id doesn't match origin") + + new_content = yield self.handler.user_removed_from_group( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): """A group or user's server renews their attestation """ @@ -1053,6 +1083,12 @@ GROUP_SERVER_SERVLET_CLASSES = ( ) +GROUP_LOCAL_SERVLET_CLASSES = ( + FederationGroupsLocalInviteServlet, + FederationGroupsRemoveLocalUserServlet, +) + + GROUP_ATTESTATION_SERVLET_CLASSES = ( FederationGroupsRenewAttestaionServlet, ) @@ -1083,6 +1119,14 @@ def register_servlets(hs, resource, authenticator, ratelimiter): server_name=hs.hostname, ).register(resource) + for servletclass in GROUP_LOCAL_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_local_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: servletclass( handler=hs.get_groups_attestation_renewer(), diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index a00bafe3af..c8559577f7 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -462,7 +462,9 @@ class GroupsServerHandler(object): } if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + res = yield groups_local.on_invite(group_id, user_id, content) + local_attestation = None else: local_attestation = self.attestations.create_attestation(group_id, user_id) content.update({ @@ -590,7 +592,8 @@ class GroupsServerHandler(object): if is_kick: if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + yield groups_local.user_removed_from_group(group_id, user_id, {}) else: yield self.transport_client.remove_user_from_group_notification( get_domain_from_id(user_id), group_id, user_id, {} diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py new file mode 100644 index 0000000000..3df255b05a --- /dev/null +++ b/synapse/handlers/groups_local.py @@ -0,0 +1,278 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import SynapseError + +import logging + +logger = logging.getLogger(__name__) + + +# TODO: Validate attestations +# TODO: Allow users to "knock" or simpkly join depending on rules +# TODO: is_priveged flag to users and is_public to users and rooms +# TODO: Roles +# TODO: Audit log for admins (profile updates, membership changes, users who tried +# to join but were rejected, etc) +# TODO: Flairs +# TODO: Add group memebership /sync + + +def _create_rerouter(name): + def f(self, group_id, *args, **kwargs): + if self.is_mine_id(group_id): + return getattr(self.groups_server_handler, name)( + group_id, *args, **kwargs + ) + + repl_layer = self.hs.get_replication_layer() + return getattr(repl_layer, name)(group_id, *args, **kwargs) + return f + + +class GroupsLocalHandler(object): + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + self.room_list_handler = hs.get_room_list_handler() + self.groups_server_handler = hs.get_groups_server_handler() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.keyring = hs.get_keyring() + self.is_mine_id = hs.is_mine_id + self.signing_key = hs.config.signing_key[0] + self.server_name = hs.hostname + self.attestations = hs.get_groups_attestation_signing() + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + get_group_profile = _create_rerouter("get_group_profile") + get_rooms_in_group = _create_rerouter("get_rooms_in_group") + + update_group_summary_room = _create_rerouter("update_group_summary_room") + delete_group_summary_room = _create_rerouter("delete_group_summary_room") + + update_group_category = _create_rerouter("update_group_category") + delete_group_category = _create_rerouter("delete_group_category") + get_group_category = _create_rerouter("get_group_category") + get_group_categories = _create_rerouter("get_group_categories") + + update_group_summary_user = _create_rerouter("update_group_summary_user") + delete_group_summary_user = _create_rerouter("delete_group_summary_user") + + update_group_role = _create_rerouter("update_group_role") + delete_group_role = _create_rerouter("delete_group_role") + get_group_role = _create_rerouter("get_group_role") + get_group_roles = _create_rerouter("get_group_roles") + + @defer.inlineCallbacks + def get_group_summary(self, group_id, requester_user_id): + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_group_summary( + group_id, requester_user_id + ) + defer.returnValue(res) + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.get_group_summary(group_id, requester_user_id) + + chunk = res["users_section"]["users"] + valid_users = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation") + try: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + ) + valid_users.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["users_section"]["users"] = valid_users + + res["users_section"]["users"].sort(key=lambda e: e.get("order", 0)) + res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0)) + + defer.returnValue(res) + + def create_group(self, group_id, user_id, content): + logger.info("Asking to create group with ID: %r", group_id) + + if self.is_mine_id(group_id): + return self.groups_server_handler.create_group( + group_id, user_id, content + ) + + repl_layer = self.hs.get_replication_layer() + return repl_layer.create_group(group_id, user_id, content) # TODO + + def add_room(self, group_id, user_id, room_id, content): + if self.is_mine_id(group_id): + return self.groups_server_handler.add_room( + group_id, user_id, room_id, content + ) + + repl_layer = self.hs.get_replication_layer() + return repl_layer.add_room_to_group(group_id, user_id, room_id, content) # TODO + + @defer.inlineCallbacks + def get_users_in_group(self, group_id, requester_user_id): + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_users_in_group( + group_id, requester_user_id + ) + defer.returnValue(res) + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.get_users_in_group(group_id, requester_user_id) # TODO + + chunk = res["chunk"] + valid_entries = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation") + try: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + ) + valid_entries.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["chunk"] = valid_entries + + defer.returnValue(res) + + @defer.inlineCallbacks + def join_group(self, group_id, user_id, content): + raise NotImplementedError() # TODO + + @defer.inlineCallbacks + def accept_invite(self, group_id, user_id, content): + if self.is_mine_id(group_id): + yield self.groups_server_handler.accept_invite( + group_id, user_id, content + ) + local_attestation = None + remote_attestation = None + else: + local_attestation = self.attestations.create_attestation(group_id, user_id) + content["attestation"] = local_attestation + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.accept_group_invite(group_id, user_id, content) + + remote_attestation = res["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + group_id=group_id, + user_id=user_id, + ) + + yield self.store.register_user_group_membership( + group_id, user_id, + membership="join", + is_admin=False, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def invite(self, group_id, user_id, requester_user_id, config): + content = { + "requester_user_id": requester_user_id, + "config": config, + } + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.invite_to_group( + group_id, user_id, requester_user_id, content, + ) + else: + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.invite_to_group( + group_id, user_id, content, + ) + + defer.returnValue(res) + + @defer.inlineCallbacks + def on_invite(self, group_id, user_id, content): + # TODO: Support auto join and rejection + + if not self.is_mine_id(user_id): + raise SynapseError(400, "User not on this server") + + local_profile = {} + if "profile" in content: + if "name" in content["profile"]: + local_profile["name"] = content["profile"]["name"] + if "avatar_url" in content["profile"]: + local_profile["avatar_url"] = content["profile"]["avatar_url"] + + yield self.store.register_user_group_membership( + group_id, user_id, + membership="invite", + content={"profile": local_profile, "inviter": content["inviter"]}, + ) + + defer.returnValue({"state": "invite"}) + + @defer.inlineCallbacks + def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + if user_id == requester_user_id: + yield self.store.register_user_group_membership( + group_id, user_id, + membership="leave", + ) + + # TODO: Should probably remember that we tried to leave so that we can + # retry if the group server is currently down. + + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.remove_user_from_group( + group_id, user_id, requester_user_id, content, + ) + else: + content["requester_user_id"] = requester_user_id + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.remove_user_from_group( + group_id, user_id, content + ) # TODO + + defer.returnValue(res) + + @defer.inlineCallbacks + def user_removed_from_group(self, group_id, user_id, content): + # TODO: Check if user in group + yield self.store.register_user_group_membership( + group_id, user_id, + membership="leave", + ) + + @defer.inlineCallbacks + def get_joined_groups(self, user_id): + group_ids = yield self.store.get_joined_groups(user_id) + defer.returnValue({"groups": group_ids}) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 3d809d181b..16f5a73b95 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -52,6 +52,7 @@ from synapse.rest.client.v2_alpha import ( thirdparty, sendtodevice, user_directory, + groups, ) from synapse.http.server import JsonResource @@ -102,3 +103,4 @@ class ClientRestResource(JsonResource): thirdparty.register_servlets(hs, client_resource) sendtodevice.register_servlets(hs, client_resource) user_directory.register_servlets(hs, client_resource) + groups.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py new file mode 100644 index 0000000000..255552c365 --- /dev/null +++ b/synapse/rest/client/v2_alpha/groups.py @@ -0,0 +1,642 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.types import GroupID + +from ._base import client_v2_patterns + +import logging + +logger = logging.getLogger(__name__) + + +class GroupServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/profile$") + + def __init__(self, hs): + super(GroupServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + group_description = yield self.groups_handler.get_group_profile(group_id, user_id) + + defer.returnValue((200, group_description)) + + +class GroupSummaryServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/summary$") + + def __init__(self, hs): + super(GroupSummaryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id) + + defer.returnValue((200, get_group_summary)) + + +class GroupSummaryRoomsServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/summary/rooms$") + + def __init__(self, hs): + super(GroupSummaryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id) + + defer.returnValue((200, get_group_summary)) + + +class GroupSummaryRoomsDefaultCatServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary/rooms/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupSummaryRoomsDefaultCatServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=None, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=None, + ) + + defer.returnValue((200, resp)) + + +class GroupSummaryRoomsCatServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary" + "/categories/(?P[^/]+)/rooms/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupSummaryRoomsCatServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, category_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=category_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, category_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=category_id, + ) + + defer.returnValue((200, resp)) + + +class GroupCategoryServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/categories/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupCategoryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_category( + group_id, user_id, + category_id=category_id, + ) + + defer.returnValue((200, category)) + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_category( + group_id, user_id, + category_id=category_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_category( + group_id, user_id, + category_id=category_id, + ) + + defer.returnValue((200, resp)) + + +class GroupCategoriesServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/categories/$" + ) + + def __init__(self, hs): + super(GroupCategoriesServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_categories( + group_id, user_id, + ) + + defer.returnValue((200, category)) + + +class GroupRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/roles/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_role( + group_id, user_id, + role_id=role_id, + ) + + defer.returnValue((200, category)) + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_role( + group_id, user_id, + role_id=role_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_role( + group_id, user_id, + role_id=role_id, + ) + + defer.returnValue((200, resp)) + + +class GroupRolesServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/roles/$" + ) + + def __init__(self, hs): + super(GroupRolesServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_roles( + group_id, user_id, + ) + + defer.returnValue((200, category)) + + +class GroupSummaryUsersDefaultRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary/users/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupSummaryUsersDefaultRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=None, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=None, + ) + + defer.returnValue((200, resp)) + + +class GroupSummaryUsersRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary" + "/roles/(?P[^/]+)/users/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupSummaryUsersRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, role_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=role_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, role_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=role_id, + ) + + defer.returnValue((200, resp)) + + +class GroupRoomServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/rooms$") + + def __init__(self, hs): + super(GroupRoomServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_rooms_in_group(group_id, user_id) + + defer.returnValue((200, result)) + + +class GroupUsersServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/users$") + + def __init__(self, hs): + super(GroupUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_users_in_group(group_id, user_id) + + defer.returnValue((200, result)) + + +class GroupCreateServlet(RestServlet): + PATTERNS = client_v2_patterns("/create_group$") + + def __init__(self, hs): + super(GroupCreateServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + self.server_name = hs.hostname + + @defer.inlineCallbacks + def on_POST(self, request): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + # TODO: Create group on remote server + content = parse_json_object_from_request(request) + localpart = content.pop("localpart") + group_id = GroupID.create(localpart, self.server_name).to_string() + + result = yield self.groups_handler.create_group(group_id, user_id, content) + + defer.returnValue((200, result)) + + +class GroupAdminRoomsServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/rooms/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminRoomsServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.add_room(group_id, user_id, room_id, content) + + defer.returnValue((200, result)) + + +class GroupAdminUsersInviteServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/users/invite/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminUsersInviteServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + self.store = hs.get_datastore() + self.is_mine_id = hs.is_mine_id + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + config = content.get("config", {}) + result = yield self.groups_handler.invite( + group_id, user_id, requester_user_id, config, + ) + + defer.returnValue((200, result)) + + +class GroupAdminUsersKickServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/users/remove/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminUsersKickServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.remove_user_from_group( + group_id, user_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfLeaveServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/leave$" + ) + + def __init__(self, hs): + super(GroupSelfLeaveServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.remove_user_from_group( + group_id, requester_user_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfJoinServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/join$" + ) + + def __init__(self, hs): + super(GroupSelfJoinServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.join_group( + group_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfAcceptInviteServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/accept_invite$" + ) + + def __init__(self, hs): + super(GroupSelfAcceptInviteServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.accept_invite( + group_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupsForUserServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/joined_groups$" + ) + + def __init__(self, hs): + super(GroupsForUserServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_joined_groups(user_id) + + defer.returnValue((200, result)) + + +def register_servlets(hs, http_server): + GroupServlet(hs).register(http_server) + GroupSummaryServlet(hs).register(http_server) + GroupUsersServlet(hs).register(http_server) + GroupRoomServlet(hs).register(http_server) + GroupCreateServlet(hs).register(http_server) + GroupAdminRoomsServlet(hs).register(http_server) + GroupAdminUsersInviteServlet(hs).register(http_server) + GroupAdminUsersKickServlet(hs).register(http_server) + GroupSelfLeaveServlet(hs).register(http_server) + GroupSelfJoinServlet(hs).register(http_server) + GroupSelfAcceptInviteServlet(hs).register(http_server) + GroupsForUserServlet(hs).register(http_server) + GroupSummaryRoomsDefaultCatServlet(hs).register(http_server) + GroupCategoryServlet(hs).register(http_server) + GroupCategoriesServlet(hs).register(http_server) + GroupSummaryRoomsCatServlet(hs).register(http_server) + GroupRoleServlet(hs).register(http_server) + GroupRolesServlet(hs).register(http_server) + GroupSummaryUsersDefaultRoleServlet(hs).register(http_server) + GroupSummaryUsersRoleServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index d857cca848..d0a6272766 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -50,6 +50,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.user_directory import UserDirectoyHandler +from synapse.handlers.groups_local import GroupsLocalHandler from synapse.groups.groups_server import GroupsServerHandler from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory @@ -141,6 +142,7 @@ class HomeServer(object): 'read_marker_handler', 'action_generator', 'user_directory_handler', + 'groups_local_handler', 'groups_server_handler', 'groups_attestation_signing', 'groups_attestation_renewer', @@ -314,6 +316,9 @@ class HomeServer(object): def build_user_directory_handler(self): return UserDirectoyHandler(self) + def build_groups_local_handler(self): + return GroupsLocalHandler(self) + def build_groups_server_handler(self): return GroupsServerHandler(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fdee9f1ad5..594566eb38 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._group_updates_id_gen = StreamIdGenerator( + db_conn, "local_group_updates", "stream_id", + ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) + _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + db_conn, "local_group_updates", + entity_column="user_id", + stream_column="stream_id", + max_value=self._group_updates_id_gen.get_current_token(), + limit=1000, + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", min_group_updates_id, + prefilled_cache=_group_updates_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index e8a799d8c7..036549d437 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -756,6 +756,103 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + @defer.inlineCallbacks + def register_user_group_membership(self, group_id, user_id, membership, + is_admin=False, content={}, + local_attestation=None, + remote_attestation=None, + ): + def _register_user_group_membership_txn(txn, next_id): + # TODO: Upsert? + self._simple_delete_txn( + txn, + table="local_group_membership", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_insert_txn( + txn, + table="local_group_membership", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "membership": membership, + "content": json.dumps(content), + }, + ) + self._simple_delete_txn( + txn, + table="local_group_updates", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + "type": "membership", + }, + ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + + # TODO: Insert profile to ensuer it comes down stream if its a join. + + if membership == "join": + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + } + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + } + ) + else: + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) + @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, long_description,): @@ -771,6 +868,61 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 472aab0a78..e32db8b313 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -142,3 +142,31 @@ CREATE TABLE group_attestations_remote ( CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); + + +CREATE TABLE local_group_membership ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + is_admin BOOLEAN NOT NULL, + membership TEXT NOT NULL, + content TEXT NOT NULL +); + +CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); +CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); + + +CREATE TABLE local_group_profiles ( + group_id TEXT NOT NULL, + name TEXT, + avatar_url TEXT +); -- cgit 1.5.1 From e96ee95a7e84e7d75ac57395bb64e1c3428596e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 09:33:16 +0100 Subject: Remove sync stuff --- synapse/storage/__init__.py | 15 ----------- synapse/storage/group_server.py | 55 ----------------------------------------- 2 files changed, 70 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 594566eb38..fdee9f1ad5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,9 +136,6 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) - self._group_updates_id_gen = StreamIdGenerator( - db_conn, "local_group_updates", "stream_id", - ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -239,18 +236,6 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) - _group_updates_prefill, min_group_updates_id = self._get_cache_dict( - db_conn, "local_group_updates", - entity_column="user_id", - stream_column="stream_id", - max_value=self._group_updates_id_gen.get_current_token(), - limit=1000, - ) - self._group_updates_stream_cache = StreamChangeCache( - "_group_updates_stream_cache", min_group_updates_id, - prefilled_cache=_group_updates_prefill, - ) - cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 036549d437..2dcdcbfdfc 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -868,61 +868,6 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) - def get_joined_groups(self, user_id): - return self._simple_select_onecol( - table="local_group_membership", - keyvalues={ - "user_id": user_id, - "membership": "join", - }, - retcol="group_id", - desc="get_joined_groups", - ) - - def get_all_groups_for_user(self, user_id, now_token): - def _get_all_groups_for_user_txn(txn): - sql = """ - SELECT group_id, type, membership, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND membership != 'leave' - AND stream_id <= ? - """ - txn.execute(sql, (user_id, now_token,)) - return self.cursor_to_dict(txn) - return self.runInteraction( - "get_all_groups_for_user", _get_all_groups_for_user_txn, - ) - - def get_groups_changes_for_user(self, user_id, from_token, to_token): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_entity_changed( - user_id, from_token, - ) - if not has_changed: - return [] - - def _get_groups_changes_for_user_txn(txn): - sql = """ - SELECT group_id, membership, type, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND ? < stream_id AND stream_id <= ? - """ - txn.execute(sql, (user_id, from_token, to_token,)) - return [{ - "group_id": group_id, - "membership": membership, - "type": gtype, - "content": json.loads(content_json), - } for group_id, membership, gtype, content_json in txn] - return self.runInteraction( - "get_groups_changes_for_user", _get_groups_changes_for_user_txn, - ) - - def get_group_stream_token(self): - return self._group_updates_id_gen.get_current_token() - def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ -- cgit 1.5.1 From 508460f24077da74d8a3d3ce891c0b55ebbce2e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 09:55:46 +0100 Subject: Remove sync stuff --- synapse/storage/group_server.py | 20 -------------------- synapse/storage/schema/delta/43/group_server.sql | 10 +--------- 2 files changed, 1 insertion(+), 29 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 2dcdcbfdfc..bff2324cc7 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -783,26 +783,6 @@ class GroupServerStore(SQLBaseStore): "content": json.dumps(content), }, ) - self._simple_delete_txn( - txn, - table="local_group_updates", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - "type": "membership", - }, - ) - self._simple_insert_txn( - txn, - table="local_group_updates", - values={ - "stream_id": next_id, - "group_id": group_id, - "user_id": user_id, - "type": "membership", - "content": json.dumps({"membership": membership, "content": content}), - } - ) self._group_updates_stream_cache.entity_has_changed(user_id, next_id) # TODO: Insert profile to ensuer it comes down stream if its a join. diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index f9e11c9146..e1fd47aa7f 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -144,6 +144,7 @@ CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_i CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); +-- The group membership for the HS's users CREATE TABLE local_group_membership ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -154,12 +155,3 @@ CREATE TABLE local_group_membership ( CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); - - -CREATE TABLE local_group_updates ( - stream_id BIGINT NOT NULL, - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - type TEXT NOT NULL, - content TEXT NOT NULL -); -- cgit 1.5.1 From 3e703eb04e1b30dc2bce03d3895ac79ac24a063d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 10:17:25 +0100 Subject: Comment --- synapse/storage/group_server.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index bff2324cc7..3c6ee7df68 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -762,6 +762,20 @@ class GroupServerStore(SQLBaseStore): local_attestation=None, remote_attestation=None, ): + """Registers that a local user is a member of a (local or remote) group. + + Args: + group_id (str) + user_id (str) + membership (str) + is_admin (bool) + content (dict): Content of the membership, e.g. includes the inviter + if the user has been invited. + local_attestation (dict): If remote group then store the fact that we + have given out an attestation, else None. + remote_attestation (dict): If remote group then store the remote + attestation from the group, else None. + """ def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? self._simple_delete_txn( -- cgit 1.5.1 From 94ecd871a047707da5998f83440c039d064de8aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 16:38:54 +0100 Subject: Fix typos --- synapse/federation/transport/client.py | 4 ++-- synapse/handlers/groups_local.py | 5 +++-- synapse/storage/group_server.py | 25 +++++++++++++++++-------- 3 files changed, 22 insertions(+), 12 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 500f3622a2..e4d84c06c1 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -495,7 +495,7 @@ class TransportLayerClient(object): ) @log_function - def get_group_rooms(self, destination, group_id, requester_user_id): + def get_rooms_in_group(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/rooms" % (group_id,) return self.client.get_json( @@ -518,7 +518,7 @@ class TransportLayerClient(object): ) @log_function - def get_group_users(self, destination, group_id, requester_user_id): + def get_users_in_group(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/users" % (group_id,) return self.client.get_json( diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 7d7fc5d976..50f7fce885 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -159,7 +159,7 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) - res = yield self.transport_client.get_group_users( + res = yield self.transport_client.get_users_in_group( get_domain_from_id(group_id), group_id, requester_user_id, ) @@ -278,7 +278,8 @@ class GroupsLocalHandler(object): else: content["requester_user_id"] = requester_user_id res = yield self.transport_client.remove_user_from_group( - get_domain_from_id(group_id), group_id, user_id, content + get_domain_from_id(group_id), group_id, requester_user_id, + user_id, content, ) defer.returnValue(res) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 3c6ee7df68..0a69e0f501 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore): remote_attestation (dict): If remote group then store the remote attestation from the group, else None. """ - def _register_user_group_membership_txn(txn, next_id): + def _register_user_group_membership_txn(txn): # TODO: Upsert? self._simple_delete_txn( txn, @@ -797,7 +797,6 @@ class GroupServerStore(SQLBaseStore): "content": json.dumps(content), }, ) - self._group_updates_stream_cache.entity_has_changed(user_id, next_id) # TODO: Insert profile to ensuer it comes down stream if its a join. @@ -820,7 +819,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), + "attestation_json": json.dumps(remote_attestation), } ) else: @@ -841,11 +840,10 @@ class GroupServerStore(SQLBaseStore): }, ) - with self._group_updates_id_gen.get_next() as next_id: - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, next_id, - ) + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, + ) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, @@ -928,3 +926,14 @@ class GroupServerStore(SQLBaseStore): defer.returnValue(json.loads(row["attestation_json"])) defer.returnValue(None) + + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) -- cgit 1.5.1 From 6f443a74cf6ab0bfe452289f9888580725987765 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 09:46:33 +0100 Subject: Add update group profile API --- synapse/federation/transport/server.py | 12 ++++++++++++ synapse/groups/groups_server.py | 16 ++++++++++++++++ synapse/handlers/groups_local.py | 1 + synapse/rest/client/v2_alpha/groups.py | 12 ++++++++++++ synapse/storage/group_server.py | 11 +++++++++++ 5 files changed, 52 insertions(+) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 1332b49f35..e04750fd2a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -642,6 +642,18 @@ class FederationGroupsSummaryServlet(BaseFederationServlet): defer.returnValue((200, new_content)) + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.update_group_profile( + group_id, requester_user_id, content + ) + + defer.returnValue((200, new_content)) + class FederationGroupsRoomsServlet(BaseFederationServlet): """Get the rooms in a group on behalf of a user diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 1b6e354ca3..322aad2a6f 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -341,6 +341,22 @@ class GroupsServerHandler(object): else: raise SynapseError(404, "Unknown group") + @defer.inlineCallbacks + def update_group_profile(self, group_id, requester_user_id, content): + """Update the group profile + """ + yield self.check_group_is_ours( + group_id, and_exists=True, and_is_admin=requester_user_id, + ) + + profile = {} + for keyname in ("name", "avatar_url", "short_description", + "long_description"): + if keyname in content: + profile[keyname] = content[keyname] + + yield self.store.update_group_profile(group_id, profile) + @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): """Get the users in group as seen by requester_user_id. diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 0b80348c82..b2c920da38 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -72,6 +72,7 @@ class GroupsLocalHandler(object): # or federation depending on if the group is local or remote get_group_profile = _create_rerouter("get_group_profile") + update_group_profile = _create_rerouter("update_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") add_room_to_group = _create_rerouter("add_room_to_group") diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index f937d856fd..64d803d489 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -45,6 +45,18 @@ class GroupServlet(RestServlet): defer.returnValue((200, group_description)) + @defer.inlineCallbacks + def on_POST(self, request, group_id, content): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + yield self.groups_handler.update_group_profile( + group_id, user_id, content, + ) + + defer.returnValue((200, {})) + class GroupSummaryServlet(RestServlet): """Get the full group summary diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 0a69e0f501..4197d22d88 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -860,6 +860,17 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + @defer.inlineCallbacks + def update_group_profile(self, group_id, profile,): + yield self._simple_update_one( + table="groups", + keyvalues={ + "group_id": group_id, + }, + updatevalues=profile, + desc="create_group", + ) + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ -- cgit 1.5.1 From 57826d645bd62ab534dbcab8d66a98daec145459 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 13:15:22 +0100 Subject: Fix typo --- synapse/storage/group_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 0a69e0f501..a2e7aa47d8 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -798,7 +798,7 @@ class GroupServerStore(SQLBaseStore): }, ) - # TODO: Insert profile to ensuer it comes down stream if its a join. + # TODO: Insert profile to ensure it comes down stream if its a join. if membership == "join": if local_attestation: -- cgit 1.5.1 From 8209b5f033417ab018fdd1114170b89fb0b18aa9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 16:22:22 +0100 Subject: Fix a storage desc --- synapse/storage/group_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 4197d22d88..2331ec79bd 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -868,7 +868,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, }, updatevalues=profile, - desc="create_group", + desc="update_group_profile", ) def get_attestations_need_renewals(self, valid_until_ms): -- cgit 1.5.1 From c544188ee3644c85a97a3c4e09e63ad4e3c6f0cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 14:53:19 +0100 Subject: Add groups to sync stream --- synapse/handlers/sync.py | 64 +++++++++++++++++++++- synapse/rest/client/v2_alpha/sync.py | 5 ++ synapse/storage/__init__.py | 15 ++++++ synapse/storage/group_server.py | 68 ++++++++++++++++++++++-- synapse/storage/schema/delta/43/group_server.sql | 9 ++++ synapse/streams/events.py | 2 + synapse/types.py | 2 + tests/rest/client/v1/test_rooms.py | 4 +- 8 files changed, 161 insertions(+), 8 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91c6c6be3c..c01fcd3d59 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -108,6 +108,17 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ return True +class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ + "join", + "invite", + "leave", +])): + __slots__ = [] + + def __nonzero__(self): + return self.join or self.invite or self.leave + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -119,6 +130,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "device_lists", # List of user_ids whose devices have chanegd "device_one_time_keys_count", # Dict of algorithm to count for one time keys # for this device + "groups", ])): __slots__ = [] @@ -134,7 +146,8 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.archived or self.account_data or self.to_device or - self.device_lists + self.device_lists or + self.groups ) @@ -560,6 +573,8 @@ class SyncHandler(object): user_id, device_id ) + yield self._generate_sync_entry_for_groups(sync_result_builder) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -568,10 +583,56 @@ class SyncHandler(object): archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, device_lists=device_lists, + groups=sync_result_builder.groups, device_one_time_keys_count=one_time_key_counts, next_batch=sync_result_builder.now_token, )) + @measure_func("_generate_sync_entry_for_groups") + @defer.inlineCallbacks + def _generate_sync_entry_for_groups(self, sync_result_builder): + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + + if since_token and since_token.groups_key: + results = yield self.store.get_groups_changes_for_user( + user_id, since_token.groups_key, now_token.groups_key, + ) + else: + results = yield self.store.get_all_groups_for_user( + user_id, now_token.groups_key, + ) + + invited = {} + joined = {} + left = {} + for result in results: + membership = result["membership"] + group_id = result["group_id"] + gtype = result["type"] + content = result["content"] + + if membership == "join": + if gtype == "membership": + content.pop("membership", None) + invited[group_id] = content["content"] + else: + joined.setdefault(group_id, {})[gtype] = content + elif membership == "invite": + if gtype == "membership": + content.pop("membership", None) + invited[group_id] = content["content"] + else: + if gtype == "membership": + left[group_id] = content["content"] + + sync_result_builder.groups = GroupsSyncResult( + join=joined, + invite=invited, + leave=left, + ) + @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks def _generate_sync_entry_for_device_list(self, sync_result_builder): @@ -1260,6 +1321,7 @@ class SyncResultBuilder(object): self.invited = [] self.archived = [] self.device = [] + self.groups = None class RoomSyncResultBuilder(object): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 6dcc407451..5f208a4c1c 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -199,6 +199,11 @@ class SyncRestServlet(RestServlet): "invite": invited, "leave": archived, }, + "groups": { + "join": sync_result.groups.join, + "invite": sync_result.groups.invite, + "leave": sync_result.groups.leave, + }, "device_one_time_keys_count": sync_result.device_one_time_keys_count, "next_batch": sync_result.next_batch.to_string(), } diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fdee9f1ad5..594566eb38 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._group_updates_id_gen = StreamIdGenerator( + db_conn, "local_group_updates", "stream_id", + ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) + _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + db_conn, "local_group_updates", + entity_column="user_id", + stream_column="stream_id", + max_value=self._group_updates_id_gen.get_current_token(), + limit=1000, + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", min_group_updates_id, + prefilled_cache=_group_updates_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index a2e7aa47d8..45f0a4c599 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore): remote_attestation (dict): If remote group then store the remote attestation from the group, else None. """ - def _register_user_group_membership_txn(txn): + def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? self._simple_delete_txn( txn, @@ -798,6 +798,19 @@ class GroupServerStore(SQLBaseStore): }, ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + # TODO: Insert profile to ensure it comes down stream if its a join. if membership == "join": @@ -840,10 +853,11 @@ class GroupServerStore(SQLBaseStore): }, ) - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, - ) + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, @@ -937,3 +951,47 @@ class GroupServerStore(SQLBaseStore): retcol="group_id", desc="get_joined_groups", ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index e1fd47aa7f..92f3339c94 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -155,3 +155,12 @@ CREATE TABLE local_group_membership ( CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 91a59b0bae..e2be500815 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -45,6 +45,7 @@ class EventSources(object): push_rules_key, _ = self.store.get_push_rules_stream_token() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() + groups_key = self.store.get_group_stream_token() token = StreamToken( room_key=( @@ -65,6 +66,7 @@ class EventSources(object): push_rules_key=push_rules_key, to_device_key=to_device_key, device_list_key=device_list_key, + groups_key=groups_key, ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index b32c0e360d..37d5fa7f9f 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -171,6 +171,7 @@ class StreamToken( "push_rules_key", "to_device_key", "device_list_key", + "groups_key", )) ): _SEPARATOR = "_" @@ -209,6 +210,7 @@ class StreamToken( or (int(other.push_rules_key) < int(self.push_rules_key)) or (int(other.to_device_key) < int(self.to_device_key)) or (int(other.device_list_key) < int(self.device_list_key)) + or (int(other.groups_key) < int(self.groups_key)) ) def copy_and_advance(self, key, new_value): diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index d746ea8568..de376fb514 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1032,7 +1032,7 @@ class RoomMessageListTestCase(RestTestCase): @defer.inlineCallbacks def test_topo_token_is_accepted(self): - token = "t1-0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0" (code, response) = yield self.mock_resource.trigger_get( "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)) @@ -1044,7 +1044,7 @@ class RoomMessageListTestCase(RestTestCase): @defer.inlineCallbacks def test_stream_token_is_accepted_for_fwd_pagianation(self): - token = "s0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0" (code, response) = yield self.mock_resource.trigger_get( "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)) -- cgit 1.5.1 From 2cc998fed879357376edb35d5088d88a078dd576 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 17:13:18 +0100 Subject: Fix replication. And notify --- synapse/app/synchrotron.py | 6 ++++ synapse/handlers/groups_local.py | 20 ++++++++--- synapse/replication/slave/storage/groups.py | 54 +++++++++++++++++++++++++++++ synapse/replication/tcp/streams.py | 20 +++++++++++ synapse/storage/group_server.py | 23 ++++++++++++ 5 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 synapse/replication/slave/storage/groups.py (limited to 'synapse/storage/group_server.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4bdd99a966..d06a05acd9 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -41,6 +41,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -75,6 +76,7 @@ class SynchrotronSlavedStore( SlavedRegistrationStore, SlavedFilteringStore, SlavedPresenceStore, + SlavedGroupServerStore, SlavedDeviceInboxStore, SlavedDeviceStore, SlavedClientIpStore, @@ -409,6 +411,10 @@ class SyncReplicationHandler(ReplicationClientHandler): ) elif stream_name == "presence": yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) def start(config_options): diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 0b80348c82..4182ea5afa 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -211,13 +211,16 @@ class GroupsLocalHandler(object): user_id=user_id, ) - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="join", is_admin=False, local_attestation=local_attestation, remote_attestation=remote_attestation, ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) defer.returnValue({}) @@ -257,11 +260,14 @@ class GroupsLocalHandler(object): if "avatar_url" in content["profile"]: local_profile["avatar_url"] = content["profile"]["avatar_url"] - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="invite", content={"profile": local_profile, "inviter": content["inviter"]}, ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) defer.returnValue({"state": "invite"}) @@ -270,10 +276,13 @@ class GroupsLocalHandler(object): """Remove a user from a group """ if user_id == requester_user_id: - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="leave", ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) # TODO: Should probably remember that we tried to leave so that we can # retry if the group server is currently down. @@ -296,10 +305,13 @@ class GroupsLocalHandler(object): """One of our users was removed/kicked from a group """ # TODO: Check if user in group - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="leave", ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) @defer.inlineCallbacks def get_joined_groups(self, user_id): diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py new file mode 100644 index 0000000000..0bc4bce5b0 --- /dev/null +++ b/synapse/replication/slave/storage/groups.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedGroupServerStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedGroupServerStore, self).__init__(db_conn, hs) + + self.hs = hs + + self._group_updates_id_gen = SlavedIdTracker( + db_conn, "local_group_updates", "stream_id", + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(), + ) + + get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__ + get_group_stream_token = DataStore.get_group_stream_token.__func__ + get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__ + + def stream_positions(self): + result = super(SlavedGroupServerStore, self).stream_positions() + result["groups"] = self._group_updates_id_gen.get_current_token() + return result + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "groups": + self._group_updates_id_gen.advance(token) + for row in rows: + self._group_updates_stream_cache.entity_has_changed( + row.user_id, token + ) + + return super(SlavedGroupServerStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index fbafe12cc2..4c60bf79f9 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -118,6 +118,12 @@ CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( "state_key", # str "event_id", # str, optional )) +GroupsStreamRow = namedtuple("GroupsStreamRow", ( + "group_id", # str + "user_id", # str + "type", # str + "content", # dict +)) class Stream(object): @@ -464,6 +470,19 @@ class CurrentStateDeltaStream(Stream): super(CurrentStateDeltaStream, self).__init__(hs) +class GroupServerStream(Stream): + NAME = "groups" + ROW_TYPE = GroupsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_group_stream_token + self.update_function = store.get_all_groups_changes + + super(GroupServerStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -482,5 +501,6 @@ STREAMS_MAP = { TagAccountDataStream, AccountDataStream, CurrentStateDeltaStream, + GroupServerStream, ) } diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 45f0a4c599..5006ac863f 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -853,6 +853,8 @@ class GroupServerStore(SQLBaseStore): }, ) + return next_id + with self._group_updates_id_gen.get_next() as next_id: yield self.runInteraction( "register_user_group_membership", @@ -993,5 +995,26 @@ class GroupServerStore(SQLBaseStore): "get_groups_changes_for_user", _get_groups_changes_for_user_txn, ) + def get_all_groups_changes(self, from_token, to_token, limit): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_any_entity_changed( + from_token, + ) + if not has_changed: + return [] + + def _get_all_groups_changes_txn(txn): + sql = """ + SELECT stream_id, group_id, user_id, type, content + FROM local_group_updates + WHERE ? < stream_id AND stream_id <= ? + LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit,)) + return txn.fetchall() + return self.runInteraction( + "get_all_groups_changes", _get_all_groups_changes_txn, + ) + def get_group_stream_token(self): return self._group_updates_id_gen.get_current_token() -- cgit 1.5.1 From 851aeae7c796b127dddf7ca6df882df6104ee5e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 13:40:56 +0100 Subject: Check users/rooms are in group before adding to summary --- synapse/storage/group_server.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index d42e215b26..258c3168aa 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -152,6 +152,18 @@ class GroupServerStore(SQLBaseStore): an order of 1 will put the room first. Otherwise, the room gets added to the end. """ + room_in_group = self._simple_select_one_onecol_txn( + txn, + table="group_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + }, + retcol="room_id", + allow_none=True, + ) + if not room_in_group: + raise SynapseError(400, "room not in group") if category_id is None: category_id = _DEFAULT_CATEGORY_ID @@ -426,6 +438,19 @@ class GroupServerStore(SQLBaseStore): an order of 1 will put the user first. Otherwise, the user gets added to the end. """ + user_in_group = self._simple_select_one_onecol_txn( + txn, + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + allow_none=True, + ) + if not user_in_group: + raise SynapseError(400, "user not in group") + if role_id is None: role_id = _DEFAULT_ROLE_ID else: -- cgit 1.5.1 From b76ef6ccb8e00610c791a78b940d396da82fb1ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 13:55:39 +0100 Subject: Include users membership in group in summary API --- synapse/groups/groups_server.py | 5 ++++ synapse/storage/group_server.py | 55 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index b1ee43ef90..f25f327eb9 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -130,6 +130,10 @@ class GroupsServerHandler(object): users.sort(key=lambda e: e.get("order", 0)) + membership_info = yield self.store.get_users_membership_info_in_group( + group_id, requester_user_id, + ) + defer.returnValue({ "profile": profile, "users_section": { @@ -142,6 +146,7 @@ class GroupsServerHandler(object): "categories": categories, "total_room_count_estimate": 0, # TODO }, + "user": membership_info, }) @defer.inlineCallbacks diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 258c3168aa..989a10eea6 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -672,6 +672,61 @@ class GroupServerStore(SQLBaseStore): allow_none=True, ) + @defer.inlineCallbacks + def get_users_membership_info_in_group(self, group_id, user_id): + """Get a dict describing the memebrship of a user in a group. + + Example if joined: + + { + "memebrship": "joined", + "is_public": True, + "is_privileged": False, + } + + Returns an empty dict if the user is not joined/invited/etc + """ + def _get_users_membership_in_group_txn(txn): + row = self._simple_select_one_txn( + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcols=("is_admin", "is_public"), + allow_none=True, + desc="is_user_adim_in_group", + ) + + if row: + return { + "memebrship": "joined", + "is_public": row["is_public"], + "is_privileged": row["is_admin"], + } + + row = self._simple_select_one_onecol_txn( + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + desc="is_user_invited_to_local_group", + allow_none=True, + ) + + if row: + return { + "memebrship": "invited", + } + + return {} + + return self.runInteraction( + "get_users_membership_info_in_group", _get_users_membership_in_group_txn, + ) + def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True, local_attestation=None, remote_attestation=None): """Add a user to the group server. -- cgit 1.5.1 From ed666d396985c1fa2b9acb1c69199dd55670a88f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 14:05:09 +0100 Subject: Fix all the typos --- synapse/storage/group_server.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 989a10eea6..357111e305 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -643,7 +643,7 @@ class GroupServerStore(SQLBaseStore): }, retcol="is_admin", allow_none=True, - desc="is_user_adim_in_group", + desc="is_user_admin_in_group", ) def add_group_invite(self, group_id, user_id): @@ -672,14 +672,13 @@ class GroupServerStore(SQLBaseStore): allow_none=True, ) - @defer.inlineCallbacks def get_users_membership_info_in_group(self, group_id, user_id): - """Get a dict describing the memebrship of a user in a group. + """Get a dict describing the membership of a user in a group. Example if joined: { - "memebrship": "joined", + "membership": "joined", "is_public": True, "is_privileged": False, } @@ -688,6 +687,7 @@ class GroupServerStore(SQLBaseStore): """ def _get_users_membership_in_group_txn(txn): row = self._simple_select_one_txn( + txn, table="group_users", keyvalues={ "group_id": group_id, @@ -695,30 +695,29 @@ class GroupServerStore(SQLBaseStore): }, retcols=("is_admin", "is_public"), allow_none=True, - desc="is_user_adim_in_group", ) if row: return { - "memebrship": "joined", + "membership": "joined", "is_public": row["is_public"], "is_privileged": row["is_admin"], } row = self._simple_select_one_onecol_txn( + txn, table="group_invites", keyvalues={ "group_id": group_id, "user_id": user_id, }, retcol="user_id", - desc="is_user_invited_to_local_group", allow_none=True, ) if row: return { - "memebrship": "invited", + "membership": "invited", } return {} -- cgit 1.5.1 From 629cdfb124c013c07b50116386d05162e40871aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 14:54:05 +0100 Subject: Use join rather than joined, etc. --- synapse/storage/group_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 357111e305..9c55e10e77 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -699,7 +699,7 @@ class GroupServerStore(SQLBaseStore): if row: return { - "membership": "joined", + "membership": "join", "is_public": row["is_public"], "is_privileged": row["is_admin"], } @@ -717,7 +717,7 @@ class GroupServerStore(SQLBaseStore): if row: return { - "membership": "invited", + "membership": "invite", } return {} -- cgit 1.5.1 From 966a70f1fa74192866ff5b0dbae67ee8f490d97d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 17:49:39 +0100 Subject: Update comment --- synapse/storage/group_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 9c55e10e77..f44e80b514 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -678,12 +678,12 @@ class GroupServerStore(SQLBaseStore): Example if joined: { - "membership": "joined", + "membership": "join", "is_public": True, "is_privileged": False, } - Returns an empty dict if the user is not joined/invited/etc + Returns an empty dict if the user is not join/invite/etc """ def _get_users_membership_in_group_txn(txn): row = self._simple_select_one_txn( -- cgit 1.5.1 From 05e21285aae4a0411a9ec1151ce006297fa3ca91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Aug 2017 11:50:09 +0100 Subject: Store whether the user wants to publicise their membership of a group --- synapse/handlers/groups_local.py | 4 ++++ synapse/storage/group_server.py | 2 ++ synapse/storage/schema/delta/43/group_server.sql | 1 + 3 files changed, 7 insertions(+) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index b8b1e754c7..3a738ef36f 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -203,12 +203,16 @@ class GroupsLocalHandler(object): user_id=user_id, ) + # TODO: Check that the group is public and we're being added publically + is_publicised = content.get("publicise", False) + token = yield self.store.register_user_group_membership( group_id, user_id, membership="join", is_admin=False, local_attestation=local_attestation, remote_attestation=remote_attestation, + is_publicised=is_publicised, ) self.notifier.on_new_event( "groups_key", token, users=[user_id], diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index f44e80b514..31514f3cdb 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -840,6 +840,7 @@ class GroupServerStore(SQLBaseStore): is_admin=False, content={}, local_attestation=None, remote_attestation=None, + is_publicised=False, ): """Registers that a local user is a member of a (local or remote) group. @@ -873,6 +874,7 @@ class GroupServerStore(SQLBaseStore): "user_id": user_id, "is_admin": is_admin, "membership": membership, + "is_publicised": is_publicised, "content": json.dumps(content), }, ) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 92f3339c94..01ac0edc35 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -150,6 +150,7 @@ CREATE TABLE local_group_membership ( user_id TEXT NOT NULL, is_admin BOOLEAN NOT NULL, membership TEXT NOT NULL, + is_publicised TEXT NOT NULL, -- if the user is publicising their membership content TEXT NOT NULL ); -- cgit 1.5.1 From b880ff190a82d4f337b94115fc017d703e53878d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Aug 2017 14:19:07 +0100 Subject: Allow update group publicity --- synapse/rest/client/v2_alpha/groups.py | 28 ++++++++++++++++++++++++++++ synapse/storage/group_server.py | 15 +++++++++++++++ 2 files changed, 43 insertions(+) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 009cd70737..9b1116acee 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -557,6 +557,33 @@ class GroupSelfAcceptInviteServlet(RestServlet): defer.returnValue((200, result)) +class GroupSelfUpdatePublicityServlet(RestServlet): + """Update whether we publicise a users membership of a group + """ + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/update_publicity$" + ) + + def __init__(self, hs): + super(GroupSelfUpdatePublicityServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + publicise = content["publicise"] + yield self.store.update_group_publicity( + group_id, requester_user_id, publicise, + ) + + defer.returnValue((200, {})) + + class GroupsForUserServlet(RestServlet): """Get all groups the logged in user is joined to """ @@ -598,4 +625,5 @@ def register_servlets(hs, http_server): GroupSummaryRoomsCatServlet(hs).register(http_server) GroupRoleServlet(hs).register(http_server) GroupRolesServlet(hs).register(http_server) + GroupSelfUpdatePublicityServlet(hs).register(http_server) GroupSummaryUsersRoleServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 31514f3cdb..10e757e975 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -835,6 +835,21 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + def update_group_publicity(self, group_id, user_id, publicise): + """Update whether the user is publicising their membership of the group + """ + return self._simple_update_one( + table="local_group_membership", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + updatevalues={ + "is_publicised": publicise, + }, + desc="update_group_publicity" + ) + @defer.inlineCallbacks def register_user_group_membership(self, group_id, user_id, membership, is_admin=False, content={}, -- cgit 1.5.1 From ef8e5786770ff285ebdf1fce420b5aa86437673c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Aug 2017 13:36:22 +0100 Subject: Add bulk group publicised lookup API --- synapse/federation/transport/client.py | 15 ++++++++++ synapse/federation/transport/server.py | 17 +++++++++++ synapse/handlers/groups_local.py | 42 ++++++++++++++++++++++++++ synapse/rest/client/v2_alpha/groups.py | 54 ++++++++++++++++++++++++++++++++++ synapse/storage/group_server.py | 14 +++++++++ 5 files changed, 142 insertions(+) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 073d3abb2a..ce68cc4937 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -812,3 +812,18 @@ class TransportLayerClient(object): args={"requester_user_id": requester_user_id}, ignore_backoff=True, ) + + def bulk_get_publicised_groups(self, destination, user_ids): + """Get the groups a list of users are publicising + """ + + path = PREFIX + "/get_groups_publicised" + + content = {"user_ids": user_ids} + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index e04750fd2a..b5f07c50bf 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1050,6 +1050,22 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): defer.returnValue((200, resp)) +class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): + """Get roles in a group + """ + PATH = ( + "/get_groups_publicised$" + ) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query): + resp = yield self.handler.bulk_get_publicised_groups( + content["user_ids"], proxy=False, + ) + + defer.returnValue((200, resp)) + + FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -1102,6 +1118,7 @@ GROUP_SERVER_SERVLET_CLASSES = ( GROUP_LOCAL_SERVLET_CLASSES = ( FederationGroupsLocalInviteServlet, FederationGroupsRemoveLocalUserServlet, + FederationGroupsBulkPublicisedServlet, ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 3a738ef36f..c980623bbc 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -313,3 +313,45 @@ class GroupsLocalHandler(object): def get_joined_groups(self, user_id): group_ids = yield self.store.get_joined_groups(user_id) defer.returnValue({"groups": group_ids}) + + @defer.inlineCallbacks + def get_publicised_groups_for_user(self, user_id): + if self.hs.is_mine_id(user_id): + result = yield self.store.get_publicised_groups_for_user(user_id) + defer.returnValue({"groups": result}) + else: + result = yield self.transport_client.get_publicised_groups_for_user( + get_domain_from_id(user_id), user_id + ) + # TODO: Verify attestations + defer.returnValue(result) + + @defer.inlineCallbacks + def bulk_get_publicised_groups(self, user_ids, proxy=True): + destinations = {} + locals = [] + + for user_id in user_ids: + if self.hs.is_mine_id(user_id): + locals.append(user_id) + else: + destinations.setdefault( + get_domain_from_id(user_id), [] + ).append(user_id) + + if not proxy and destinations: + raise SynapseError(400, "Some user_ids are not local") + + results = {} + for destination, dest_user_ids in destinations.iteritems(): + r = yield self.transport_client.bulk_get_publicised_groups( + destination, dest_user_ids, + ) + results.update(r) + + for uid in locals: + results[uid] = yield self.store.get_publicised_groups_for_user( + uid + ) + + defer.returnValue({"users": results}) diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 9b1116acee..97d7948bb9 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -584,6 +584,59 @@ class GroupSelfUpdatePublicityServlet(RestServlet): defer.returnValue((200, {})) +class PublicisedGroupsForUserServlet(RestServlet): + """Get the list of groups a user is advertising + """ + PATTERNS = client_v2_patterns( + "/publicised_groups/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(PublicisedGroupsForUserServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + yield self.auth.get_user_by_req(request) + + result = yield self.groups_handler.get_publicised_groups_for_user( + user_id + ) + + defer.returnValue((200, result)) + + +class PublicisedGroupsForUsersServlet(RestServlet): + """Get the list of groups a user is advertising + """ + PATTERNS = client_v2_patterns( + "/publicised_groups$" + ) + + def __init__(self, hs): + super(PublicisedGroupsForUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_POST(self, request): + yield self.auth.get_user_by_req(request) + + content = parse_json_object_from_request(request) + user_ids = content["user_ids"] + + result = yield self.groups_handler.bulk_get_publicised_groups( + user_ids + ) + + defer.returnValue((200, result)) + + class GroupsForUserServlet(RestServlet): """Get all groups the logged in user is joined to """ @@ -627,3 +680,4 @@ def register_servlets(hs, http_server): GroupRolesServlet(hs).register(http_server) GroupSelfUpdatePublicityServlet(hs).register(http_server) GroupSummaryUsersRoleServlet(hs).register(http_server) + PublicisedGroupsForUserServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 10e757e975..0c35b03d2a 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -835,6 +835,20 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + def get_publicised_groups_for_user(self, user_id): + """Get all groups a user is publicising + """ + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + "is_publicised": True, + }, + retcol="group_id", + desc="get_publicised_groups_for_user", + ) + def update_group_publicity(self, group_id, user_id, publicise): """Update whether the user is publicising their membership of the group """ -- cgit 1.5.1 From 175a01f56c86a4c201e72d49f22663425656d81d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Aug 2017 14:45:56 +0100 Subject: Groups: Fix mising json.load in initial sync --- synapse/storage/group_server.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index f44e80b514..792a57deb5 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1101,7 +1101,13 @@ class GroupServerStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (from_token, to_token, limit,)) - return txn.fetchall() + return [{ + "stream_id": stream_id, + "group_id": group_id, + "user_id": user_id, + "type": gtype, + "content": json.loads(content_json), + } for stream_id, group_id, user_id, gtype, content_json in txn] return self.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn, ) -- cgit 1.5.1 From 069ae2df126418b5be1c96727a578cfd1dd4e506 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Sep 2017 10:52:12 +0100 Subject: Fix initial sync --- synapse/storage/group_server.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 5433063507..b0399f8133 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1085,7 +1085,15 @@ class GroupServerStore(SQLBaseStore): AND stream_id <= ? """ txn.execute(sql, (user_id, now_token,)) - return self.cursor_to_dict(txn) + return [ + { + "group_id": row[0], + "type": row[1], + "membership": row[2], + "content": json.loads(row[3]), + } + for row in txn + ] return self.runInteraction( "get_all_groups_for_user", _get_all_groups_for_user_txn, ) -- cgit 1.5.1 From 197d82dc070447b4a89a82816996f38f01ca7a04 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Sep 2017 11:12:11 +0100 Subject: Correctly return next token --- synapse/storage/group_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index b0399f8133..2afd689d83 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -966,10 +966,11 @@ class GroupServerStore(SQLBaseStore): return next_id with self._group_updates_id_gen.get_next() as next_id: - yield self.runInteraction( + res = yield self.runInteraction( "register_user_group_membership", _register_user_group_membership_txn, next_id, ) + defer.returnValue(res) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, -- cgit 1.5.1 From e1dec2f1a797122b4d72ba883e09b2d1b9eafcc9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Sep 2017 16:09:57 +0100 Subject: Remove user from group summary when the leave the group --- synapse/storage/group_server.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 2afd689d83..d0b5ad231a 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -822,6 +822,14 @@ class GroupServerStore(SQLBaseStore): "user_id": user_id, }, ) + self._simple_delete_txn( + txn, + table="group_summary_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) return self.runInteraction("remove_user_from_group", _remove_user_from_group_txn) def add_room_to_group(self, group_id, room_id, is_public): -- cgit 1.5.1 From 17b8e2bd02ad0abbd25103b637eb8490f3a53507 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Sep 2017 15:52:41 +0100 Subject: Add remove room API --- synapse/federation/transport/client.py | 12 ++++++++++++ synapse/federation/transport/server.py | 14 +++++++++++++- synapse/groups/groups_server.py | 12 ++++++++++++ synapse/handlers/groups_local.py | 1 + synapse/rest/client/v2_alpha/groups.py | 11 +++++++++++ synapse/storage/group_server.py | 23 +++++++++++++++++++++++ 6 files changed, 72 insertions(+), 1 deletion(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ce68cc4937..36f6eb75e9 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -525,6 +525,18 @@ class TransportLayerClient(object): ignore_backoff=True, ) + def remove_room_from_group(self, destination, group_id, requester_user_id, room_id): + """Remove a room from a group + """ + path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + @log_function def get_users_in_group(self, destination, group_id, requester_user_id): """Get users in a group diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b5f07c50bf..c7565e0737 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -674,7 +674,7 @@ class FederationGroupsRoomsServlet(BaseFederationServlet): class FederationGroupsAddRoomsServlet(BaseFederationServlet): - """Add room to group + """Add/remove room from group """ PATH = "/groups/(?P[^/]*)/room/(?)$" @@ -690,6 +690,18 @@ class FederationGroupsAddRoomsServlet(BaseFederationServlet): defer.returnValue((200, new_content)) + @defer.inlineCallbacks + def on_DELETE(self, origin, content, query, group_id, room_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.remove_room_from_group( + group_id, requester_user_id, room_id, + ) + + defer.returnValue((200, new_content)) + class FederationGroupsUsersServlet(BaseFederationServlet): """Get the users in a group on behalf of a user diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 699d8a5265..10bf61d178 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -472,6 +472,18 @@ class GroupsServerHandler(object): defer.returnValue({}) + @defer.inlineCallbacks + def remove_room_from_group(self, group_id, requester_user_id, room_id): + """Remove room from group + """ + yield self.check_group_is_ours( + group_id, and_exists=True, and_is_admin=requester_user_id + ) + + yield self.store.remove_room_from_group(group_id, room_id) + + defer.returnValue({}) + @defer.inlineCallbacks def invite_to_group(self, group_id, user_id, requester_user_id, content): """Invite user to group diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 14fdf06b58..a2bacbfc38 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -69,6 +69,7 @@ class GroupsLocalHandler(object): get_rooms_in_group = _create_rerouter("get_rooms_in_group") add_room_to_group = _create_rerouter("add_room_to_group") + remove_room_from_group = _create_rerouter("remove_room_from_group") update_group_summary_room = _create_rerouter("update_group_summary_room") delete_group_summary_room = _create_rerouter("delete_group_summary_room") diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index b469058e9d..8f3ce15b02 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -423,6 +423,17 @@ class GroupAdminRoomsServlet(RestServlet): defer.returnValue((200, result)) + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.remove_room_from_group( + group_id, user_id, room_id, + ) + + defer.returnValue((200, result)) + class GroupAdminUsersInviteServlet(RestServlet): """Invite a user to the group diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index d0b5ad231a..4fe9172adc 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -843,6 +843,29 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + def remove_room_from_group(self, group_id, room_id): + def _remove_room_from_group_txn(txn): + self._simple_delete_txn( + txn, + table="group_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + }, + ) + + self._simple_delete_txn( + txn, + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + }, + ) + return self.runInteraction( + "remove_room_from_group", _remove_room_from_group_txn, + ) + def get_publicised_groups_for_user(self, user_id): """Get all groups a user is publicising """ -- cgit 1.5.1 From ea18996f54194f920dc506201a65eb3d36bb161d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Oct 2017 15:44:37 +0100 Subject: Fix group stream replication The stream update functions expect the storage function to return a list of tuples. --- synapse/storage/group_server.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 4fe9172adc..22a6bc6261 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1172,13 +1172,13 @@ class GroupServerStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (from_token, to_token, limit,)) - return [{ - "stream_id": stream_id, - "group_id": group_id, - "user_id": user_id, - "type": gtype, - "content": json.loads(content_json), - } for stream_id, group_id, user_id, gtype, content_json in txn] + return [( + stream_id, + group_id, + user_id, + gtype, + json.loads(content_json), + ) for stream_id, group_id, user_id, gtype, content_json in txn] return self.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn, ) -- cgit 1.5.1 From 818b08d0e4be7571008af4590542fd652f028dcd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Oct 2017 15:54:00 +0100 Subject: peeeeeeeeep8888888888888888888888888888 --- synapse/storage/group_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 22a6bc6261..3af372de59 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1178,7 +1178,7 @@ class GroupServerStore(SQLBaseStore): user_id, gtype, json.loads(content_json), - ) for stream_id, group_id, user_id, gtype, content_json in txn] + ) for stream_id, group_id, user_id, gtype, content_json in txn] return self.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn, ) -- cgit 1.5.1 From 2c5972f87f0541aaeff43846f7050ab91d11cf0e Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 15:31:11 +0100 Subject: Implement GET /groups/$groupId/invited_users --- synapse/federation/transport/client.py | 13 +++++++++++++ synapse/federation/transport/server.py | 18 ++++++++++++++++- synapse/groups/groups_server.py | 35 ++++++++++++++++++++++++++++++++++ synapse/handlers/groups_local.py | 17 +++++++++++++++++ synapse/rest/client/v2_alpha/groups.py | 21 ++++++++++++++++++++ synapse/storage/group_server.py | 12 ++++++++++++ 6 files changed, 115 insertions(+), 1 deletion(-) (limited to 'synapse/storage/group_server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f96561c1fe..125d8f3598 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -550,6 +550,19 @@ class TransportLayerClient(object): ignore_backoff=True, ) + @log_function + def get_invited_users_in_group(self, destination, group_id, requester_user_id): + """Get users that have been invited to a group + """ + path = PREFIX + "/groups/%s/invited_users" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + @log_function def accept_group_invite(self, destination, group_id, user_id, content): """Accept a group invite diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index c7565e0737..625a2fe27f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -720,6 +720,22 @@ class FederationGroupsUsersServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsInvitedUsersServlet(BaseFederationServlet): + """Get the users that have been invited to a group + """ + PATH = "/groups/(?P[^/]*)/invited_users$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_invited_users_in_group( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) class FederationGroupsInviteServlet(BaseFederationServlet): """Ask a group server to invite someone to the group @@ -1109,12 +1125,12 @@ ROOM_LIST_CLASSES = ( PublicRoomList, ) - GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsProfileServlet, FederationGroupsSummaryServlet, FederationGroupsRoomsServlet, FederationGroupsUsersServlet, + FederationGroupsInvitedUsersServlet, FederationGroupsInviteServlet, FederationGroupsAcceptInviteServlet, FederationGroupsRemoveUserServlet, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 1083bc2990..bfa46b7cb2 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -420,6 +420,41 @@ class GroupsServerHandler(object): "total_user_count_estimate": len(user_results), }) + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get the users that have been invited to a group as seen by requester_user_id. + + The ordering is arbitrary at the moment + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) + + if not is_user_in_group: + raise SynapseError(403, "User not in group") + + invited_users = yield self.store.get_invited_users_in_group(group_id) + + user_profiles = [] + + for user_id in invited_users: + user_profile = { + "user_id": user_id + } + try: + profile = yield self.profile_handler.get_profile_from_cache(user) + user_profile.update(profile) + except Exception as e: + pass + user_profiles.append(user_profile) + + defer.returnValue({ + "chunk": user_profiles, + "total_user_count_estimate": len(invited_users), + }) + + @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): """Get the rooms in group as seen by requester_user_id diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 97a20f2b04..5263e769bb 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -219,6 +219,23 @@ class GroupsLocalHandler(object): defer.returnValue(res) + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get users invited to a group + """ + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_invited_users_in_group( + group_id, requester_user_id + ) + defer.returnValue(res) + + group_server_name = get_domain_from_id(group_id) + + res = yield self.transport_client.get_users_in_group( + get_domain_from_id(group_id), group_id, requester_user_id, + ) + defer.returnValue(res) + @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 8f3ce15b02..4532112cfc 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -370,6 +370,26 @@ class GroupUsersServlet(RestServlet): defer.returnValue((200, result)) +class GroupInvitedUsersServlet(RestServlet): + """Get users invited to a group + """ + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/invited_users$") + + def __init__(self, hs): + super(GroupInvitedUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_invited_users_in_group(group_id, user_id) + + defer.returnValue((200, result)) + class GroupCreateServlet(RestServlet): """Create a group @@ -674,6 +694,7 @@ class GroupsForUserServlet(RestServlet): def register_servlets(hs, http_server): GroupServlet(hs).register(http_server) GroupSummaryServlet(hs).register(http_server) + GroupInvitedUsersServlet(hs).register(http_server) GroupUsersServlet(hs).register(http_server) GroupRoomServlet(hs).register(http_server) GroupCreateServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 3af372de59..9e63db5c6c 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -56,6 +56,18 @@ class GroupServerStore(SQLBaseStore): desc="get_users_in_group", ) + def get_invited_users_in_group(self, group_id): + # TODO: Pagination + + return self._simple_select_onecol( + table="group_invites", + keyvalues={ + "group_id": group_id, + }, + retcol="user_id", + desc="get_invited_users_in_group", + ) + def get_rooms_in_group(self, group_id, include_private=False): # TODO: Pagination -- cgit 1.5.1