From b8ca494ee9e42e5b1aca8958088bd35cc5707437 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 15:44:15 +0100 Subject: Initial group server implementation --- synapse/storage/__init__.py | 3 +- synapse/storage/group_server.py | 280 +++++++++++++++++++++++ synapse/storage/schema/delta/43/group_server.sql | 77 +++++++ 3 files changed, 359 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/group_server.py create mode 100644 synapse/storage/schema/delta/43/group_server.sql (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b92472df33..fdee9f1ad5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -37,7 +37,7 @@ from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore from .event_push_actions import EventPushActionsStore from .deviceinbox import DeviceInboxStore - +from .group_server import GroupServerStore from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore @@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore, DeviceStore, DeviceInboxStore, UserDirectoryStore, + GroupServerStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py new file mode 100644 index 0000000000..01d9a982c8 --- /dev/null +++ b/synapse/storage/group_server.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import SQLBaseStore + +import ujson as json + + +class GroupServerStore(SQLBaseStore): + def get_group(self, group_id): + return self._simple_select_one( + table="groups", + keyvalues={ + "group_id": group_id, + }, + retcols=("name", "short_description", "long_description", "avatar_url",), + allow_none=True, + desc="is_user_in_group", + ) + + def get_users_in_group(self, group_id, include_private=False): + # TODO: Pagination + + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + return self._simple_select_list( + table="group_users", + keyvalues=keyvalues, + retcols=("user_id", "is_public",), + desc="get_users_in_group", + ) + + def get_rooms_in_group(self, group_id, include_private=False): + # TODO: Pagination + + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + return self._simple_select_list( + table="group_rooms", + keyvalues=keyvalues, + retcols=("room_id", "is_public",), + desc="get_rooms_in_group", + ) + + def is_user_in_group(self, user_id, group_id): + return self._simple_select_one_onecol( + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + allow_none=True, + desc="is_user_in_group", + ).addCallback(lambda r: bool(r)) + + def is_user_admin_in_group(self, group_id, user_id): + return self._simple_select_one_onecol( + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="is_admin", + allow_none=True, + desc="is_user_adim_in_group", + ) + + def add_group_invite(self, group_id, user_id): + return self._simple_insert( + table="group_invites", + values={ + "group_id": group_id, + "user_id": user_id, + }, + desc="add_group_invite", + ) + + def is_user_invited_to_local_group(self, group_id, user_id): + return self._simple_select_one_onecol( + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + desc="is_user_invited_to_local_group", + allow_none=True, + ) + + def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True, + local_attestation=None, remote_attestation=None): + def _add_user_to_group_txn(txn): + self._simple_insert_txn( + txn, + table="group_users", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "is_public": is_public, + }, + ) + + self._simple_delete_txn( + txn, + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + }, + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + }, + ) + + return self.runInteraction( + "add_user_to_group", _add_user_to_group_txn + ) + + def remove_user_to_group(self, group_id, user_id): + def _remove_user_to_group_txn(txn): + self._simple_delete_txn( + txn, + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + return self.runInteraction("remove_user_to_group", _remove_user_to_group_txn) + + def add_room_to_group(self, group_id, room_id, is_public): + return self._simple_insert( + table="group_rooms", + values={ + "group_id": group_id, + "room_id": room_id, + "is_public": is_public, + }, + desc="add_room_to_group", + ) + + @defer.inlineCallbacks + def create_group(self, group_id, user_id, name, avatar_url, short_description, + long_description,): + yield self._simple_insert( + table="groups", + values={ + "group_id": group_id, + "name": name, + "avatar_url": avatar_url, + "short_description": short_description, + "long_description": long_description, + }, + desc="create_group", + ) + + def get_attestations_need_renewals(self, valid_until_ms): + def _get_attestations_need_renewals_txn(txn): + sql = """ + SELECT group_id, user_id FROM group_attestations_renewals + WHERE valid_until_ms <= ? + """ + txn.execute(sql, (valid_until_ms,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_attestations_need_renewals", _get_attestations_need_renewals_txn + ) + + def update_attestation_renewal(self, group_id, user_id, attestation): + return self._simple_update_one( + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + updatevalues={ + "valid_until_ms": attestation["valid_until_ms"], + }, + desc="update_attestation_renewal", + ) + + def update_remote_attestion(self, group_id, user_id, attestation): + return self._simple_update_one( + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + updatevalues={ + "valid_until_ms": attestation["valid_until_ms"], + "attestation": json.dumps(attestation) + }, + desc="update_remote_attestion", + ) + + @defer.inlineCallbacks + def get_remote_attestation(self, group_id, user_id): + row = yield self._simple_select_one( + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcols=("valid_until_ms", "attestation"), + desc="get_remote_attestation", + allow_none=True, + ) + + now = int(self._clock.time_msec()) + if row and now < row["valid_until_ms"]: + defer.returnValue(json.loads(row["attestation"])) + + defer.returnValue(None) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql new file mode 100644 index 0000000000..6f1a941990 --- /dev/null +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -0,0 +1,77 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE groups ( + group_id TEXT NOT NULL, + name TEXT, + avatar_url TEXT, + short_description TEXT, + long_description TEXT +); + +CREATE UNIQUE INDEX groups_idx ON groups(group_id); + + +CREATE TABLE group_users ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + is_admin BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL +); + + +CREATE INDEX groups_users_g_idx ON group_users(group_id, user_id); +CREATE INDEX groups_users_u_idx ON group_users(user_id); + + +CREATE TABLE group_invites ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL +); + +CREATE INDEX groups_invites_g_idx ON group_invites(group_id, user_id); +CREATE INDEX groups_invites_u_idx ON group_invites(user_id); + + +CREATE TABLE group_rooms ( + group_id TEXT NOT NULL, + room_id TEXT NOT NULL, + is_public BOOLEAN NOT NULL +); + +CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); +CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); + + +CREATE TABLE group_attestations_renewals ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + valid_until_ms BIGINT NOT NULL +); + +CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(group_id, user_id); +CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id); +CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms); + +CREATE TABLE group_attestations_remote ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + valid_until_ms BIGINT NOT NULL, + attestation TEXT NOT NULL +); + +CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); +CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); +CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); -- cgit 1.5.1 From 83936293eb3ddb8998191b537153eaeec5e7afb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Jul 2017 09:58:59 +0100 Subject: Comments --- synapse/groups/attestations.py | 29 +++++- synapse/groups/groups_server.py | 108 +++++++++++++++-------- synapse/storage/group_server.py | 32 ++++++- synapse/storage/schema/delta/43/group_server.sql | 6 +- 4 files changed, 132 insertions(+), 43 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index d83076a9b3..6937fa44cf 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -28,6 +28,8 @@ UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 class GroupAttestationSigning(object): + """Creates and verifies group attestations. + """ def __init__(self, hs): self.keyring = hs.get_keyring() self.clock = hs.get_clock() @@ -36,11 +38,20 @@ class GroupAttestationSigning(object): @defer.inlineCallbacks def verify_attestation(self, attestation, group_id, user_id, server_name=None): + """Verifies that the given attestation matches the given paramaters. + + An optional server_name can be supplied to explicitly set which server's + signature is expected. Otherwise assumes that either the group_id or user_id + is local and uses the other's server as the one to check. + """ + if not server_name: if get_domain_from_id(group_id) == self.server_name: server_name = get_domain_from_id(user_id) - else: + elif get_domain_from_id(user_id) == self.server_name: server_name = get_domain_from_id(group_id) + else: + raise Exception("Expected eitehr group_id or user_id to be local") if user_id != attestation["user_id"]: raise SynapseError(400, "Attestation has incorrect user_id") @@ -48,6 +59,7 @@ class GroupAttestationSigning(object): if group_id != attestation["group_id"]: raise SynapseError(400, "Attestation has incorrect group_id") + # TODO: valid_until_ms = attestation["valid_until_ms"] if valid_until_ms - self.clock.time_msec() < MIN_ATTESTATION_LENGTH_MS: raise SynapseError(400, "Attestation not valid for long enough") @@ -55,6 +67,9 @@ class GroupAttestationSigning(object): yield self.keyring.verify_json_for_server(server_name, attestation) def create_attestation(self, group_id, user_id): + """Create an attestation for the group_id and user_id with default + validity length. + """ return sign_json({ "group_id": group_id, "user_id": user_id, @@ -63,11 +78,15 @@ class GroupAttestationSigning(object): class GroupAttestionRenewer(object): + """Responsible for sending and receiving attestation updates. + """ + def __init__(self, hs): self.clock = hs.get_clock() self.store = hs.get_datastore() self.assestations = hs.get_groups_attestation_signing() self.transport_client = hs.get_federation_transport_client() + self.is_mine_id = hs.is_mind_id self._renew_attestations_loop = self.clock.looping_call( self._renew_attestations, 30 * 60 * 1000, @@ -75,8 +94,13 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def on_renew_attestation(self, group_id, user_id, content): + """When a remote updates an attestation + """ attestation = content["attestation"] + if not self.is_mine_id(group_id) and not self.is_mine_id(user_id): + raise SynapseError(400, "Neither user not group are on this server") + yield self.attestations.verify_attestation( attestation, user_id=user_id, @@ -89,6 +113,9 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def _renew_attestations(self): + """Called periodically to check if we need to update any of our attestations + """ + now = self.clock.time_msec() rows = yield self.store.get_attestations_need_renewals( diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 195f1eae54..44083100f7 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -19,7 +19,6 @@ from synapse.api.errors import SynapseError from synapse.types import UserID, get_domain_from_id -import functools import logging logger = logging.getLogger(__name__) @@ -33,28 +32,6 @@ logger = logging.getLogger(__name__) # TODO: Flairs -UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 - - -def check_group_is_ours(and_exists=False): - def g(func): - @functools.wraps(func) - @defer.inlineCallbacks - def h(self, group_id, *args, **kwargs): - if not self.is_mine_id(group_id): - raise SynapseError(400, "Group not on this server") - if and_exists: - group = yield self.store.get_group(group_id) - if not group: - raise SynapseError(404, "Unknown group") - - res = yield func(self, group_id, *args, **kwargs) - defer.returnValue(res) - - return h - return g - - class GroupsServerHandler(object): def __init__(self, hs): self.hs = hs @@ -72,9 +49,28 @@ class GroupsServerHandler(object): # Ensure attestations get renewed hs.get_groups_attestation_renewer() - @check_group_is_ours() + @defer.inlineCallbacks + def check_group_is_ours(self, group_id, and_exists=False): + """Check that the group is ours, and optionally if it exists. + + If group does exist then return group. + """ + if not self.is_mine_id(group_id): + raise SynapseError(400, "Group not on this server") + + group = yield self.store.get_group(group_id) + if and_exists and not group: + raise SynapseError(404, "Unknown group") + + defer.returnValue(group) + @defer.inlineCallbacks def get_group_profile(self, group_id, requester_user_id): + """Get the group profile as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id) + group_description = yield self.store.get_group(group_id) if group_description: @@ -82,9 +78,13 @@ class GroupsServerHandler(object): else: raise SynapseError(404, "Unknown group") - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): + """Get the users in group as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) user_results = yield self.store.get_users_in_group( @@ -123,9 +123,13 @@ class GroupsServerHandler(object): "total_user_count_estimate": len(user_results), }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): + """Get the rooms in group as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) room_results = yield self.store.get_rooms_in_group( @@ -158,9 +162,13 @@ class GroupsServerHandler(object): "total_room_count_estimate": len(room_results), }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def add_room(self, group_id, requester_user_id, room_id, content): + """Add room to group + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) if not is_admin: raise SynapseError(403, "User is not admin in group") @@ -182,9 +190,13 @@ class GroupsServerHandler(object): defer.returnValue({}) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def invite_to_group(self, group_id, user_id, requester_user_id, content): + """Invite user to group + """ + + group = yield self.check_group_is_ours(group_id, and_exists=True) + is_admin = yield self.store.is_user_admin_in_group( group_id, requester_user_id ) @@ -194,7 +206,6 @@ class GroupsServerHandler(object): # TODO: Check if user knocked # TODO: Check if user is already invited - group = yield self.store.get_group(group_id) content = { "profile": { "name": group["name"], @@ -248,9 +259,16 @@ class GroupsServerHandler(object): else: raise SynapseError(502, "Unknown state returned by HS") - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def accept_invite(self, group_id, user_id, content): + """User tries to accept an invite to the group. + + This is different from them asking to join, and so should error if no + invite exists (and they're not a member of the group) + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + if not self.store.is_user_invited_to_local_group(group_id, user_id): raise SynapseError(403, "User not invited to group") @@ -291,19 +309,33 @@ class GroupsServerHandler(object): "attestation": local_attestation, }) - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def knock(self, group_id, user_id, content): - pass + """A user requests becoming a member of the group + """ + yield self.check_group_is_ours(group_id, and_exists=True) + + raise NotImplementedError() - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def accept_knock(self, group_id, user_id, content): - pass + """Accept a users knock to the room. + + Errors if the user hasn't knocked, rather than inviting them. + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + + raise NotImplementedError() - @check_group_is_ours(and_exists=True) @defer.inlineCallbacks def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + """Remove a user from the group; either a user is leaving or and admin + kicked htem. + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + is_kick = False if requester_user_id != user_id: is_admin = yield self.store.is_user_admin_in_group( @@ -314,7 +346,7 @@ class GroupsServerHandler(object): is_kick = True - yield self.store.remove_user_to_group( + yield self.store.remove_user_from_group( group_id, user_id, ) @@ -328,11 +360,11 @@ class GroupsServerHandler(object): defer.returnValue({}) - @check_group_is_ours() @defer.inlineCallbacks def create_group(self, group_id, user_id, content): + group = yield self.check_group_is_ours(group_id) + logger.info("Attempting to create group with ID: %r", group_id) - group = yield self.store.get_group(group_id) if group: raise SynapseError(400, "Group already exists") diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 01d9a982c8..327d770862 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -89,6 +89,8 @@ class GroupServerStore(SQLBaseStore): ) def add_group_invite(self, group_id, user_id): + """Record that the group server has invited a user + """ return self._simple_insert( table="group_invites", values={ @@ -99,6 +101,8 @@ class GroupServerStore(SQLBaseStore): ) def is_user_invited_to_local_group(self, group_id, user_id): + """Has the group server invited a user? + """ return self._simple_select_one_onecol( table="group_invites", keyvalues={ @@ -112,6 +116,19 @@ class GroupServerStore(SQLBaseStore): def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True, local_attestation=None, remote_attestation=None): + """Add a user to the group server. + + Args: + group_id (str) + user_id (str) + is_admin (bool) + is_public (bool) + local_attestation (dict): The attestation the GS created to give + to the remote server. Optional if the user and group are on the + same server + remote_attestation (dict): The attestation given to GS by remote + server. Optional if the user and group are on the same server + """ def _add_user_to_group_txn(txn): self._simple_insert_txn( txn, @@ -159,8 +176,8 @@ class GroupServerStore(SQLBaseStore): "add_user_to_group", _add_user_to_group_txn ) - def remove_user_to_group(self, group_id, user_id): - def _remove_user_to_group_txn(txn): + def remove_user_from_group(self, group_id, user_id): + def _remove_user_from_group_txn(txn): self._simple_delete_txn( txn, table="group_users", @@ -193,7 +210,7 @@ class GroupServerStore(SQLBaseStore): "user_id": user_id, }, ) - return self.runInteraction("remove_user_to_group", _remove_user_to_group_txn) + return self.runInteraction("remove_user_from_group", _remove_user_from_group_txn) def add_room_to_group(self, group_id, room_id, is_public): return self._simple_insert( @@ -222,6 +239,8 @@ class GroupServerStore(SQLBaseStore): ) def get_attestations_need_renewals(self, valid_until_ms): + """Get all attestations that need to be renewed until givent time + """ def _get_attestations_need_renewals_txn(txn): sql = """ SELECT group_id, user_id FROM group_attestations_renewals @@ -234,6 +253,8 @@ class GroupServerStore(SQLBaseStore): ) def update_attestation_renewal(self, group_id, user_id, attestation): + """Update an attestation that we have renewed + """ return self._simple_update_one( table="group_attestations_renewals", keyvalues={ @@ -247,6 +268,8 @@ class GroupServerStore(SQLBaseStore): ) def update_remote_attestion(self, group_id, user_id, attestation): + """Update an attestation that a remote has renewed + """ return self._simple_update_one( table="group_attestations_remote", keyvalues={ @@ -262,6 +285,9 @@ class GroupServerStore(SQLBaseStore): @defer.inlineCallbacks def get_remote_attestation(self, group_id, user_id): + """Get the attestation that proves the remote agrees that the user is + in the group. + """ row = yield self._simple_select_one( table="group_attestations_remote", keyvalues={ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 6f1a941990..5dc7a497e2 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -24,6 +24,7 @@ CREATE TABLE groups ( CREATE UNIQUE INDEX groups_idx ON groups(group_id); +-- list of users the group server thinks are joined CREATE TABLE group_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -35,7 +36,7 @@ CREATE TABLE group_users ( CREATE INDEX groups_users_g_idx ON group_users(group_id, user_id); CREATE INDEX groups_users_u_idx ON group_users(user_id); - +-- list of users the group server thinks are invited CREATE TABLE group_invites ( group_id TEXT NOT NULL, user_id TEXT NOT NULL @@ -55,6 +56,7 @@ CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); +-- List of attestations we've given out and need to renew CREATE TABLE group_attestations_renewals ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -65,6 +67,8 @@ CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(gr CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id); CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms); + +-- List of attestations we've received from remotes and are interested in. CREATE TABLE group_attestations_remote ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, -- cgit 1.5.1 From 0aac30d53b1dba2f399cad0044a905286d8c79d2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Jul 2017 14:23:50 +0100 Subject: Comments --- synapse/groups/attestations.py | 4 ++-- synapse/groups/groups_server.py | 6 +++++- synapse/storage/schema/delta/43/group_server.sql | 4 ++-- 3 files changed, 9 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 6937fa44cf..0741b55c1c 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -38,7 +38,7 @@ class GroupAttestationSigning(object): @defer.inlineCallbacks def verify_attestation(self, attestation, group_id, user_id, server_name=None): - """Verifies that the given attestation matches the given paramaters. + """Verifies that the given attestation matches the given parameters. An optional server_name can be supplied to explicitly set which server's signature is expected. Otherwise assumes that either the group_id or user_id @@ -51,7 +51,7 @@ class GroupAttestationSigning(object): elif get_domain_from_id(user_id) == self.server_name: server_name = get_domain_from_id(group_id) else: - raise Exception("Expected eitehr group_id or user_id to be local") + raise Exception("Expected either group_id or user_id to be local") if user_id != attestation["user_id"]: raise SynapseError(400, "Attestation has incorrect user_id") diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 61fe0d49d9..414c95e3fe 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -80,7 +80,9 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): - """Get the users in group as seen by requester_user_id + """Get the users in group as seen by requester_user_id. + + The ordering is arbitrary at the moment """ yield self.check_group_is_ours(group_id, and_exists=True) @@ -126,6 +128,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): """Get the rooms in group as seen by requester_user_id + + This returns rooms in order of decreasing number of joined users """ yield self.check_group_is_ours(group_id, and_exists=True) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 5dc7a497e2..bfe8c2ca4a 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -28,7 +28,7 @@ CREATE UNIQUE INDEX groups_idx ON groups(group_id); CREATE TABLE group_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, - is_admin BOOLEAN NOT NULL, + is_admin BOOLEAN NOT NULL, -- whether the users membership can be seen by everyone is_public BOOLEAN NOT NULL ); @@ -49,7 +49,7 @@ CREATE INDEX groups_invites_u_idx ON group_invites(user_id); CREATE TABLE group_rooms ( group_id TEXT NOT NULL, room_id TEXT NOT NULL, - is_public BOOLEAN NOT NULL + is_public BOOLEAN NOT NULL -- whether the room can be seen by everyone ); CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); -- cgit 1.5.1 From e52c391cd452077fc219fad0db8b9e5499251e5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Jul 2017 14:25:46 +0100 Subject: Rename column to attestation_json --- synapse/storage/group_server.py | 8 ++++---- synapse/storage/schema/delta/43/group_server.sql | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 327d770862..105ab9920e 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -168,7 +168,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), + "attestation_json": json.dumps(remote_attestation), }, ) @@ -278,7 +278,7 @@ class GroupServerStore(SQLBaseStore): }, updatevalues={ "valid_until_ms": attestation["valid_until_ms"], - "attestation": json.dumps(attestation) + "attestation_json": json.dumps(attestation) }, desc="update_remote_attestion", ) @@ -294,13 +294,13 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, }, - retcols=("valid_until_ms", "attestation"), + retcols=("valid_until_ms", "attestation_json"), desc="get_remote_attestation", allow_none=True, ) now = int(self._clock.time_msec()) if row and now < row["valid_until_ms"]: - defer.returnValue(json.loads(row["attestation"])) + defer.returnValue(json.loads(row["attestation_json"])) defer.returnValue(None) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index bfe8c2ca4a..b55b0a8deb 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -73,7 +73,7 @@ CREATE TABLE group_attestations_remote ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, valid_until_ms BIGINT NOT NULL, - attestation TEXT NOT NULL + attestation_json TEXT NOT NULL ); CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); -- cgit 1.5.1 From 410b4e14a176293ee1f41f24a641db031c6192a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Jul 2017 15:44:18 +0100 Subject: Move comment --- synapse/storage/schema/delta/43/group_server.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index b55b0a8deb..cf0659c51d 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -28,8 +28,8 @@ CREATE UNIQUE INDEX groups_idx ON groups(group_id); CREATE TABLE group_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, - is_admin BOOLEAN NOT NULL, -- whether the users membership can be seen by everyone - is_public BOOLEAN NOT NULL + is_admin BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL -- whether the users membership can be seen by everyone ); -- cgit 1.5.1 From 6d586dc05c35f1c0159b1eff3d83d7e3973b425d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Jul 2017 09:58:37 +0100 Subject: Comment --- synapse/storage/schema/delta/43/group_server.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index cf0659c51d..c223ee275a 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -15,7 +15,7 @@ CREATE TABLE groups ( group_id TEXT NOT NULL, - name TEXT, + name TEXT, -- the display name of the room avatar_url TEXT, short_description TEXT, long_description TEXT -- cgit 1.5.1 From a62406aaa5c4ef3780e42c9de443a2cc1e82cd9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 15:44:40 +0100 Subject: Add group summary APIs --- synapse/federation/transport/server.py | 17 + synapse/groups/groups_server.py | 256 ++++++++- synapse/storage/group_server.py | 643 +++++++++++++++++++++++ synapse/storage/schema/delta/43/group_server.sql | 56 ++ 4 files changed, 970 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 5d6ff79235..bbb66190e0 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -627,6 +627,22 @@ class FederationGroupsProfileServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsSummaryServlet(BaseFederationServlet): + PATH = "/groups/(?P[^/]*)/summary$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id): + requester_user_id = content["requester_user_id"] + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_group_summary( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsRoomsServlet(BaseFederationServlet): """Get the rooms in a group on behalf of a user """ @@ -784,6 +800,7 @@ ROOM_LIST_CLASSES = ( GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsProfileServlet, + FederationGroupsSummaryServlet, FederationGroupsRoomsServlet, FederationGroupsUsersServlet, FederationGroupsInviteServlet, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 414c95e3fe..29a911e18e 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -64,6 +64,255 @@ class GroupsServerHandler(object): defer.returnValue(group) + @defer.inlineCallbacks + def get_group_summary(self, group_id, requester_user_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) + + profile = yield self.get_group_profile(group_id, requester_user_id) + + users, roles = yield self.store.get_users_for_summary_by_role( + group_id, include_private=is_user_in_group, + ) + + # TODO: Add profiles to users + # TODO: Add assestations to users + + rooms, categories = yield self.store.get_rooms_for_summary_by_category( + group_id, include_private=is_user_in_group, + ) + + for room_entry in rooms: + room_id = room_entry["room_id"] + joined_users = yield self.store.get_users_in_room(room_id) + entry = yield self.room_list_handler.generate_room_entry( + room_id, len(joined_users), + with_alias=False, allow_private=True, + ) + entry.pop("room_id", None) + + room_entry["profile"] = entry + + rooms.sort(key=lambda e: e.get("order", 0)) + + for entry in users: + user_id = entry["user_id"] + + if not self.is_mine_id(requester_user_id): + attestation = yield self.store.get_remote_attestation(group_id, user_id) + if not attestation: + continue + + entry["attestation"] = attestation + else: + entry["attestation"] = self.attestations.create_attestation( + group_id, user_id, + ) + + users.sort(key=lambda e: e.get("order", 0)) + + defer.returnValue({ + "profile": profile, + "users_section": { + "users": users, + "roles": roles, + "total_user_count_estimate": 0, # TODO + }, + "rooms_section": { + "rooms": rooms, + "categories": categories, + "total_room_count_estimate": 0, # TODO + }, + }) + + @defer.inlineCallbacks + def update_group_summary_room(self, group_id, user_id, room_id, category_id, content): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + order = content.get("order", None) + + is_public = _parse_visibility_from_contents(content) + + yield self.store.add_room_to_summary( + group_id=group_id, + room_id=room_id, + category_id=category_id, + order=order, + is_public=is_public, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def delete_group_summary_room(self, group_id, user_id, room_id, category_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + yield self.store.remove_room_from_summary( + group_id=group_id, + room_id=room_id, + category_id=category_id, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def get_group_categories(self, group_id, user_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + categories = yield self.store.get_group_categories( + group_id=group_id, + ) + defer.returnValue({"categories": categories}) + + @defer.inlineCallbacks + def get_group_category(self, group_id, user_id, category_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + res = yield self.store.get_group_category( + group_id=group_id, + category_id=category_id, + ) + + defer.returnValue(res) + + @defer.inlineCallbacks + def update_group_category(self, group_id, user_id, category_id, content): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + is_public = _parse_visibility_from_contents(content) + profile = content.get("profile") + + yield self.store.upsert_group_category( + group_id=group_id, + category_id=category_id, + is_public=is_public, + profile=profile, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def delete_group_category(self, group_id, user_id, category_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + yield self.store.remove_group_category( + group_id=group_id, + category_id=category_id, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def get_group_roles(self, group_id, user_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + roles = yield self.store.get_group_roles( + group_id=group_id, + ) + defer.returnValue({"roles": roles}) + + @defer.inlineCallbacks + def get_group_role(self, group_id, user_id, role_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + res = yield self.store.get_group_role( + group_id=group_id, + role_id=role_id, + ) + defer.returnValue(res) + + @defer.inlineCallbacks + def update_group_role(self, group_id, user_id, role_id, content): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + is_public = _parse_visibility_from_contents(content) + + profile = content.get("profile") + + yield self.store.upsert_group_role( + group_id=group_id, + role_id=role_id, + is_public=is_public, + profile=profile, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def delete_group_role(self, group_id, user_id, role_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + yield self.store.remove_group_role( + group_id=group_id, + role_id=role_id, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def update_group_summary_user(self, group_id, requester_user_id, user_id, role_id, + content): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + order = content.get("order", None) + + is_public = _parse_visibility_from_contents(content) + + yield self.store.add_user_to_summary( + group_id=group_id, + user_id=user_id, + role_id=role_id, + order=order, + is_public=is_public, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): + yield self.check_group_is_ours(group_id, and_exists=True) + + is_admin = yield self.store.is_user_admin_in_group(group_id, requester_user_id) + if not is_admin: + raise SynapseError(403, "User is not admin in group") + + yield self.store.remove_user_from_summary( + group_id=group_id, + user_id=user_id, + role_id=role_id, + ) + + defer.returnValue({}) + @defer.inlineCallbacks def get_group_profile(self, group_id, requester_user_id): """Get the group profile as seen by requester_user_id @@ -210,7 +459,9 @@ class GroupsServerHandler(object): } if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + res = yield groups_local.on_invite(group_id, user_id, content) + local_attestation = None else: local_attestation = self.attestations.create_attestation(group_id, user_id) content.update({ @@ -338,7 +589,8 @@ class GroupsServerHandler(object): if is_kick: if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + yield groups_local.user_removed_from_group(group_id, user_id, {}) else: yield self.transport_client.remove_user_from_group_notification( get_domain_from_id(user_id), group_id, user_id, {} diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 105ab9920e..f4818ff174 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -15,11 +15,16 @@ from twisted.internet import defer +from synapse.api.errors import SynapseError + from ._base import SQLBaseStore import ujson as json +_DEFAULT_CATEGORY_ID = "default" + + class GroupServerStore(SQLBaseStore): def get_group(self, group_id): return self._simple_select_one( @@ -64,6 +69,492 @@ class GroupServerStore(SQLBaseStore): desc="get_rooms_in_group", ) + def get_rooms_for_summary_by_category(self, group_id, include_private=False): + def _get_rooms_for_summary_txn(txn): + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + sql = """ + SELECT room_id, is_public, category_id, room_order + FROM group_summary_rooms + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + rooms = [ + { + "room_id": row[0], + "is_public": row[1], + "category_id": row[2] if row[2] != _DEFAULT_CATEGORY_ID else None, + "order": row[3], + } + for row in txn + ] + + sql = """ + SELECT category_id, is_public, profile, cat_order + FROM group_summary_room_categories + INNER JOIN group_room_categories USING (group_id, category_id) + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + categories = { + row[0]: { + "is_public": row[1], + "profile": json.loads(row[2]), + "order": row[3], + } + for row in txn + } + + return rooms, categories + return self.runInteraction( + "get_rooms_for_summary", _get_rooms_for_summary_txn + ) + + def add_room_to_summary(self, group_id, room_id, category_id, order, is_public): + return self.runInteraction( + "add_room_to_summary", self._add_room_to_summary_txn, + group_id, room_id, category_id, order, is_public, + ) + + def _add_room_to_summary_txn(self, txn, group_id, room_id, category_id, order, + is_public): + if category_id is None: + category_id = _DEFAULT_CATEGORY_ID + else: + cat_exists = self._simple_select_one_onecol_txn( + txn, + table="group_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + retcol="group_id", + allow_none=True, + ) + if not cat_exists: + raise SynapseError(400, "Category doesn't exist") + + # TODO: Check room is part of group already + cat_exists = self._simple_select_one_onecol_txn( + txn, + table="group_summary_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + retcol="group_id", + allow_none=True, + ) + if not cat_exists: + txn.execute(""" + INSERT INTO group_summary_room_categories + (group_id, category_id, cat_order) + SELECT ?, ?, COALESCE(MAX(cat_order), 1) + FROM group_summary_room_categories + WHERE group_id = ? AND category_id = ? + """, (group_id, category_id, group_id, category_id)) + + existing = self._simple_select_one_txn( + txn, + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + "category_id": category_id, + }, + retcols=("room_order", "is_public",), + allow_none=True, + ) + + if order is not None: + sql = """ + UPDATE group_summary_rooms SET room_order = room_order + 1 + WHERE group_id = ? AND category_id = ? AND room_order >= ? + """ + txn.execute(sql, (group_id, category_id, order,)) + elif not existing: + sql = """ + SELECT COALESCE(MAX(room_order), 0) + 1 FROM group_summary_rooms + WHERE group_id = ? AND category_id = ? + """ + txn.execute(sql, (group_id, category_id,)) + order, = txn.fetchone() + + if existing: + to_update = {} + if order is not None: + to_update["room_order"] = order + if is_public is not None: + to_update["is_public"] = is_public + self._simple_update_txn( + txn, + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + "room_id": room_id, + }, + values=to_update, + ) + else: + if is_public is None: + is_public = True + + self._simple_insert_txn( + txn, + table="group_summary_rooms", + values={ + "group_id": group_id, + "category_id": category_id, + "room_id": room_id, + "room_order": order, + "is_public": is_public, + }, + ) + + def remove_room_from_summary(self, group_id, room_id, category_id): + if category_id is None: + category_id = _DEFAULT_CATEGORY_ID + + return self._simple_delete( + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + "room_id": room_id, + }, + desc="remove_room_from_summary", + ) + + @defer.inlineCallbacks + def get_group_categories(self, group_id): + rows = yield self._simple_select_list( + table="group_room_categories", + keyvalues={ + "group_id": group_id, + }, + retcols=("category_id", "is_public", "profile"), + desc="get_group_categories", + ) + + defer.returnValue({ + row["category_id"]: { + "is_public": row["is_public"], + "profile": json.loads(row["profile"]), + } + for row in rows + }) + + @defer.inlineCallbacks + def get_group_category(self, group_id, category_id): + category = yield self._simple_select_one( + table="group_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + retcols=("is_public", "profile"), + desc="get_group_category", + ) + + category["profile"] = json.loads(category["profile"]) + + defer.returnValue(category) + + def upsert_group_category(self, group_id, category_id, profile, is_public): + insertion_values = {} + update_values = {"category_id": category_id} # This cannot be empty + + if profile is None: + insertion_values["profile"] = "{}" + else: + update_values["profile"] = json.dumps(profile) + + if is_public is None: + insertion_values["is_public"] = True + else: + update_values["is_public"] = is_public + + return self._simple_upsert( + table="group_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + values=update_values, + insertion_values=insertion_values, + desc="upsert_group_category", + ) + + def remove_group_category(self, group_id, category_id): + return self._simple_delete( + table="group_room_categories", + keyvalues={ + "group_id": group_id, + "category_id": category_id, + }, + desc="remove_group_category", + ) + + @defer.inlineCallbacks + def get_group_roles(self, group_id): + rows = yield self._simple_select_list( + table="group_roles", + keyvalues={ + "group_id": group_id, + }, + retcols=("role_id", "is_public", "profile"), + desc="get_group_roles", + ) + + defer.returnValue({ + row["role_id"]: { + "is_public": row["is_public"], + "profile": json.loads(row["profile"]), + } + for row in rows + }) + + @defer.inlineCallbacks + def get_group_role(self, group_id, role_id): + role = yield self._simple_select_one( + table="group_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + retcols=("is_public", "profile"), + desc="get_group_role", + ) + + role["profile"] = json.loads(role["profile"]) + + defer.returnValue(role) + + def upsert_group_role(self, group_id, role_id, profile, is_public): + insertion_values = {} + update_values = {"role_id": role_id} # This cannot be empty + + if profile is None: + insertion_values["profile"] = "{}" + else: + update_values["profile"] = json.dumps(profile) + + if is_public is None: + insertion_values["is_public"] = True + else: + update_values["is_public"] = is_public + + return self._simple_upsert( + table="group_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + values=update_values, + insertion_values=insertion_values, + desc="upsert_group_role", + ) + + def remove_group_role(self, group_id, role_id): + return self._simple_delete( + table="group_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + desc="remove_group_role", + ) + + def add_user_to_summary(self, group_id, user_id, role_id, order, is_public): + return self.runInteraction( + "add_user_to_summary", self._add_user_to_summary_txn, + group_id, user_id, role_id, order, is_public, + ) + + def _add_user_to_summary_txn(self, txn, group_id, user_id, role_id, order, + is_public): + if role_id is None: + role_id = _DEFAULT_CATEGORY_ID + else: + role_exists = self._simple_select_one_onecol_txn( + txn, + table="group_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + retcol="group_id", + allow_none=True, + ) + if not role_exists: + raise SynapseError(400, "Role doesn't exist") + + # TODO: Check room is part of group already + role_exists = self._simple_select_one_onecol_txn( + txn, + table="group_summary_roles", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + }, + retcol="group_id", + allow_none=True, + ) + if not role_exists: + txn.execute(""" + INSERT INTO group_summary_roles + (group_id, role_id, role_order) + SELECT ?, ?, COALESCE(MAX(role_order), 1) + FROM group_summary_roles + WHERE group_id = ? AND role_id = ? + """, (group_id, role_id, group_id, role_id)) + + existing = self._simple_select_one_txn( + txn, + table="group_summary_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + "role_id": role_id, + }, + retcols=("user_order", "is_public",), + allow_none=True, + ) + + if order is not None: + sql = """ + UPDATE group_summary_users SET user_order = user_order + 1 + WHERE group_id = ? AND role_id = ? AND user_order >= ? + """ + txn.execute(sql, (group_id, role_id, order,)) + elif not existing: + sql = """ + SELECT COALESCE(MAX(user_order), 0) + 1 FROM group_summary_users + WHERE group_id = ? AND role_id = ? + """ + txn.execute(sql, (group_id, role_id,)) + order, = txn.fetchone() + + if existing: + to_update = {} + if order is not None: + to_update["user_order"] = order + if is_public is not None: + to_update["is_public"] = is_public + self._simple_update_txn( + txn, + table="group_summary_users", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + "user_id": user_id, + }, + values=to_update, + ) + else: + if is_public is None: + is_public = True + + self._simple_insert_txn( + txn, + table="group_summary_users", + values={ + "group_id": group_id, + "role_id": role_id, + "user_id": user_id, + "user_order": order, + "is_public": is_public, + }, + ) + + def remove_user_from_summary(self, group_id, user_id, role_id): + if role_id is None: + role_id = _DEFAULT_CATEGORY_ID + + return self._simple_delete( + table="group_summary_users", + keyvalues={ + "group_id": group_id, + "role_id": role_id, + "user_id": user_id, + }, + desc="remove_user_from_summary", + ) + + def get_users_for_summary_by_role(self, group_id, include_private=False): + def _get_users_for_summary_txn(txn): + keyvalues = { + "group_id": group_id, + } + if not include_private: + keyvalues["is_public"] = True + + sql = """ + SELECT user_id, is_public, role_id, user_order + FROM group_summary_users + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + users = [ + { + "user_id": row[0], + "is_public": row[1], + "role_id": row[2] if row[2] != _DEFAULT_CATEGORY_ID else None, + "order": row[3], + } + for row in txn + ] + + sql = """ + SELECT role_id, is_public, profile, role_order + FROM group_summary_roles + INNER JOIN group_roles USING (group_id, role_id) + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + roles = { + row[0]: { + "is_public": row[1], + "profile": json.loads(row[2]), + "order": row[3], + } + for row in txn + } + + return users, roles + return self.runInteraction( + "get_users_for_summary_by_role", _get_users_for_summary_txn + ) + def is_user_in_group(self, user_id, group_id): return self._simple_select_one_onecol( table="group_users", @@ -223,6 +714,103 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + @defer.inlineCallbacks + def register_user_group_membership(self, group_id, user_id, membership, + is_admin=False, content={}, + local_attestation=None, + remote_attestation=None, + ): + def _register_user_group_membership_txn(txn, next_id): + # TODO: Upsert? + self._simple_delete_txn( + txn, + table="local_group_membership", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_insert_txn( + txn, + table="local_group_membership", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "membership": membership, + "content": json.dumps(content), + }, + ) + self._simple_delete_txn( + txn, + table="local_group_updates", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + "type": "membership", + }, + ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + + # TODO: Insert profile to ensuer it comes down stream if its a join. + + if membership == "join": + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + } + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + } + ) + else: + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) + @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, long_description,): @@ -238,6 +826,61 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index c223ee275a..3013b89b7e 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -56,6 +56,62 @@ CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); +CREATE TABLE group_summary_rooms ( + group_id TEXT NOT NULL, + room_id TEXT NOT NULL, + category_id TEXT NOT NULL, + room_order BIGINT NOT NULL, + is_public BOOLEAN NOT NULL, + UNIQUE (group_id, category_id, room_id, room_order), + CHECK (room_order > 0) +); + +CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id); + +CREATE TABLE group_summary_room_categories ( + group_id TEXT NOT NULL, + category_id TEXT NOT NULL, + cat_order BIGINT NOT NULL, + UNIQUE (group_id, category_id, cat_order), + CHECK (cat_order > 0) +); + +CREATE TABLE group_room_categories ( + group_id TEXT NOT NULL, + category_id TEXT NOT NULL, + profile TEXT NOT NULL, + is_public BOOLEAN NOT NULL, + UNIQUE (group_id, category_id) +); + + +CREATE TABLE group_summary_users ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + role_id TEXT NOT NULL, + user_order BIGINT NOT NULL, + is_public BOOLEAN NOT NULL +); + +CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id); + +CREATE TABLE group_summary_roles ( + group_id TEXT NOT NULL, + role_id TEXT NOT NULL, + role_order BIGINT NOT NULL, + UNIQUE (group_id, role_id, role_order), + CHECK (role_order > 0) +); + +CREATE TABLE group_roles ( + group_id TEXT NOT NULL, + role_id TEXT NOT NULL, + profile TEXT NOT NULL, + is_public BOOLEAN NOT NULL, + UNIQUE (group_id, role_id) +); + + -- List of attestations we've given out and need to renew CREATE TABLE group_attestations_renewals ( group_id TEXT NOT NULL, -- cgit 1.5.1 From 26451a09eb938e6a72be3d77ff8c9e3fd2b33539 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Jul 2017 14:11:59 +0100 Subject: Comments --- synapse/groups/groups_server.py | 38 ++++++++++++++++++++++++ synapse/storage/group_server.py | 29 ++++++++++++++++++ synapse/storage/schema/delta/43/group_server.sql | 17 +++++++---- 3 files changed, 79 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index ec45da2d7a..83dfcd0fd4 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -54,6 +54,12 @@ class GroupsServerHandler(object): """Check that the group is ours, and optionally if it exists. If group does exist then return group. + + Args: + group_id (str) + and_exists (bool): whether to also check if group exists + and_is_admin (str): whether to also check if given str is a user_id + that is an admin """ if not self.is_mine_id(group_id): raise SynapseError(400, "Group not on this server") @@ -71,6 +77,14 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_summary(self, group_id, requester_user_id): + """Get the summary for a group as seen by requester_user_id. + + The group summary consists of the profile of the room, and a curated + list of users and rooms. These list *may* be organised by role/category. + The roles/categories are ordered, and so are the users/rooms within them. + + A user/room may appear in multiple roles/categories. + """ yield self.check_group_is_ours(group_id, and_exists=True) is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) @@ -133,6 +147,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def update_group_summary_room(self, group_id, user_id, room_id, category_id, content): + """Add/update a room to the group summary + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) order = content.get("order", None) @@ -151,6 +167,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def delete_group_summary_room(self, group_id, user_id, room_id, category_id): + """Remove a room from the summary + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) yield self.store.remove_room_from_summary( @@ -163,6 +181,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_categories(self, group_id, user_id): + """Get all categories in a group (as seen by user) + """ yield self.check_group_is_ours(group_id, and_exists=True) categories = yield self.store.get_group_categories( @@ -172,6 +192,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_category(self, group_id, user_id, category_id): + """Get a specific category in a group (as seen by user) + """ yield self.check_group_is_ours(group_id, and_exists=True) res = yield self.store.get_group_category( @@ -183,6 +205,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def update_group_category(self, group_id, user_id, category_id, content): + """Add/Update a group category + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) is_public = _parse_visibility_from_contents(content) @@ -199,6 +223,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def delete_group_category(self, group_id, user_id, category_id): + """Delete a group category + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) yield self.store.remove_group_category( @@ -210,6 +236,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_roles(self, group_id, user_id): + """Get all roles in a group (as seen by user) + """ yield self.check_group_is_ours(group_id, and_exists=True) roles = yield self.store.get_group_roles( @@ -219,6 +247,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def get_group_role(self, group_id, user_id, role_id): + """Get a specific role in a group (as seen by user) + """ yield self.check_group_is_ours(group_id, and_exists=True) res = yield self.store.get_group_role( @@ -229,6 +259,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def update_group_role(self, group_id, user_id, role_id, content): + """Add/update a role in a group + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) is_public = _parse_visibility_from_contents(content) @@ -246,6 +278,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def delete_group_role(self, group_id, user_id, role_id): + """Remove role from group + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) yield self.store.remove_group_role( @@ -258,6 +292,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def update_group_summary_user(self, group_id, requester_user_id, user_id, role_id, content): + """Add/update a users entry in the group summary + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) order = content.get("order", None) @@ -276,6 +312,8 @@ class GroupsServerHandler(object): @defer.inlineCallbacks def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): + """Remove a user from the group summary + """ yield self.check_group_is_ours(group_id, and_exists=True, and_is_admin=user_id) yield self.store.remove_user_from_summary( diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index f4818ff174..18bfaeda6e 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -22,6 +22,8 @@ from ._base import SQLBaseStore import ujson as json +# The category ID for the "default" category. We don't store as null in the +# database to avoid the fun of null != null _DEFAULT_CATEGORY_ID = "default" @@ -70,6 +72,10 @@ class GroupServerStore(SQLBaseStore): ) def get_rooms_for_summary_by_category(self, group_id, include_private=False): + """Get the rooms and categories that should be included in a summary request + + Returns ([rooms], [categories]) + """ def _get_rooms_for_summary_txn(txn): keyvalues = { "group_id": group_id, @@ -134,6 +140,14 @@ class GroupServerStore(SQLBaseStore): def _add_room_to_summary_txn(self, txn, group_id, room_id, category_id, order, is_public): + """Add room to summary. + + This automatically adds the room to the end of the list of rooms to be + included in the summary response. If a role is given then user will + be added under that category (the category will automatically be added tothe + the summary if a user is listed under that role in the summary). + """ + if category_id is None: category_id = _DEFAULT_CATEGORY_ID else: @@ -278,6 +292,8 @@ class GroupServerStore(SQLBaseStore): defer.returnValue(category) def upsert_group_category(self, group_id, category_id, profile, is_public): + """Add/update room category for group + """ insertion_values = {} update_values = {"category_id": category_id} # This cannot be empty @@ -348,6 +364,8 @@ class GroupServerStore(SQLBaseStore): defer.returnValue(role) def upsert_group_role(self, group_id, role_id, profile, is_public): + """Add/remove user role + """ insertion_values = {} update_values = {"role_id": role_id} # This cannot be empty @@ -390,6 +408,13 @@ class GroupServerStore(SQLBaseStore): def _add_user_to_summary_txn(self, txn, group_id, user_id, role_id, order, is_public): + """Add user to summary. + + This automatically adds the user to the end of the list of users to be + included in the summary response. If a role is given then user will + be added under that role (the role will automatically be added to the + summary if a user is listed under that role in the summary). + """ if role_id is None: role_id = _DEFAULT_CATEGORY_ID else: @@ -499,6 +524,10 @@ class GroupServerStore(SQLBaseStore): ) def get_users_for_summary_by_role(self, group_id, include_private=False): + """Get the users and roles that should be included in a summary request + + Returns ([users], [roles]) + """ def _get_users_for_summary_txn(txn): keyvalues = { "group_id": group_id, diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 3013b89b7e..472aab0a78 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -56,18 +56,21 @@ CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); +-- Rooms to include in the summary CREATE TABLE group_summary_rooms ( group_id TEXT NOT NULL, room_id TEXT NOT NULL, category_id TEXT NOT NULL, room_order BIGINT NOT NULL, - is_public BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the room should be show to everyone UNIQUE (group_id, category_id, room_id, room_order), CHECK (room_order > 0) ); CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id); + +-- Categories to include in the summary CREATE TABLE group_summary_room_categories ( group_id TEXT NOT NULL, category_id TEXT NOT NULL, @@ -76,25 +79,27 @@ CREATE TABLE group_summary_room_categories ( CHECK (cat_order > 0) ); +-- The categories in the group CREATE TABLE group_room_categories ( group_id TEXT NOT NULL, category_id TEXT NOT NULL, profile TEXT NOT NULL, - is_public BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the category should be show to everyone UNIQUE (group_id, category_id) ); - +-- The users to include in the group summary CREATE TABLE group_summary_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, role_id TEXT NOT NULL, user_order BIGINT NOT NULL, - is_public BOOLEAN NOT NULL + is_public BOOLEAN NOT NULL -- whether the user should be show to everyone ); CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id); +-- The roles to include in the group summary CREATE TABLE group_summary_roles ( group_id TEXT NOT NULL, role_id TEXT NOT NULL, @@ -103,11 +108,13 @@ CREATE TABLE group_summary_roles ( CHECK (role_order > 0) ); + +-- The roles in a groups CREATE TABLE group_roles ( group_id TEXT NOT NULL, role_id TEXT NOT NULL, profile TEXT NOT NULL, - is_public BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the role should be show to everyone UNIQUE (group_id, role_id) ); -- cgit 1.5.1 From 8575e3160f98a0b33cd0ec6080389701dcb535e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Jul 2017 13:32:40 +0100 Subject: Comments --- synapse/federation/transport/server.py | 8 ++++++++ synapse/storage/group_server.py | 36 ++++++++++++++++++++++------------ 2 files changed, 32 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 1ea2b37ce8..304c2a2a4c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -810,6 +810,8 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): class FederationGroupsCategoriesServlet(BaseFederationServlet): + """Get all categories for a group + """ PATH = ( "/groups/(?P[^/]*)/categories/$" ) @@ -828,6 +830,8 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet): class FederationGroupsCategoryServlet(BaseFederationServlet): + """Add/remove/get a category in a group + """ PATH = ( "/groups/(?P[^/]*)/categories/(?P[^/]+)$" ) @@ -870,6 +874,8 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): class FederationGroupsRolesServlet(BaseFederationServlet): + """Get roles in a group + """ PATH = ( "/groups/(?P[^/]*)/roles/$" ) @@ -888,6 +894,8 @@ class FederationGroupsRolesServlet(BaseFederationServlet): class FederationGroupsRoleServlet(BaseFederationServlet): + """Add/remove/get a role in a group + """ PATH = ( "/groups/(?P[^/]*)/roles/(?P[^/]+)$" ) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 18bfaeda6e..b328ef8bc4 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -140,12 +140,16 @@ class GroupServerStore(SQLBaseStore): def _add_room_to_summary_txn(self, txn, group_id, room_id, category_id, order, is_public): - """Add room to summary. + """Add (or update) room's entry in summary. - This automatically adds the room to the end of the list of rooms to be - included in the summary response. If a role is given then user will - be added under that category (the category will automatically be added tothe - the summary if a user is listed under that role in the summary). + Args: + group_id (str) + room_id (str) + category_id (str): If not None then adds the category to the end of + the summary if its not already there. [Optional] + order (int): If not None inserts the room at that position, e.g. + an order of 1 will put the room first. Otherwise, the room gets + added to the end. """ if category_id is None: @@ -164,7 +168,7 @@ class GroupServerStore(SQLBaseStore): if not cat_exists: raise SynapseError(400, "Category doesn't exist") - # TODO: Check room is part of group already + # TODO: Check category is part of summary already cat_exists = self._simple_select_one_onecol_txn( txn, table="group_summary_room_categories", @@ -176,6 +180,7 @@ class GroupServerStore(SQLBaseStore): allow_none=True, ) if not cat_exists: + # If not, add it with an order larger than all others txn.execute(""" INSERT INTO group_summary_room_categories (group_id, category_id, cat_order) @@ -197,6 +202,7 @@ class GroupServerStore(SQLBaseStore): ) if order is not None: + # Shuffle other room orders that come after the given order sql = """ UPDATE group_summary_rooms SET room_order = room_order + 1 WHERE group_id = ? AND category_id = ? AND room_order >= ? @@ -408,12 +414,16 @@ class GroupServerStore(SQLBaseStore): def _add_user_to_summary_txn(self, txn, group_id, user_id, role_id, order, is_public): - """Add user to summary. + """Add (or update) user's entry in summary. - This automatically adds the user to the end of the list of users to be - included in the summary response. If a role is given then user will - be added under that role (the role will automatically be added to the - summary if a user is listed under that role in the summary). + Args: + group_id (str) + user_id (str) + role_id (str): If not None then adds the role to the end of + the summary if its not already there. [Optional] + order (int): If not None inserts the user at that position, e.g. + an order of 1 will put the user first. Otherwise, the user gets + added to the end. """ if role_id is None: role_id = _DEFAULT_CATEGORY_ID @@ -431,7 +441,7 @@ class GroupServerStore(SQLBaseStore): if not role_exists: raise SynapseError(400, "Role doesn't exist") - # TODO: Check room is part of group already + # TODO: Check role is part of the summary already role_exists = self._simple_select_one_onecol_txn( txn, table="group_summary_roles", @@ -443,6 +453,7 @@ class GroupServerStore(SQLBaseStore): allow_none=True, ) if not role_exists: + # If not, add it with an order larger than all others txn.execute(""" INSERT INTO group_summary_roles (group_id, role_id, role_order) @@ -464,6 +475,7 @@ class GroupServerStore(SQLBaseStore): ) if order is not None: + # Shuffle other users orders that come after the given order sql = """ UPDATE group_summary_users SET user_order = user_order + 1 WHERE group_id = ? AND role_id = ? AND user_order >= ? -- cgit 1.5.1 From 3b0470dba59274c65e69a4eab8909eaa55393a2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Jul 2017 13:53:21 +0100 Subject: Remove unused functions --- synapse/storage/group_server.py | 152 ---------------------------------------- 1 file changed, 152 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index b328ef8bc4..2e05c23fd7 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -755,103 +755,6 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) - @defer.inlineCallbacks - def register_user_group_membership(self, group_id, user_id, membership, - is_admin=False, content={}, - local_attestation=None, - remote_attestation=None, - ): - def _register_user_group_membership_txn(txn, next_id): - # TODO: Upsert? - self._simple_delete_txn( - txn, - table="local_group_membership", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - }, - ) - self._simple_insert_txn( - txn, - table="local_group_membership", - values={ - "group_id": group_id, - "user_id": user_id, - "is_admin": is_admin, - "membership": membership, - "content": json.dumps(content), - }, - ) - self._simple_delete_txn( - txn, - table="local_group_updates", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - "type": "membership", - }, - ) - self._simple_insert_txn( - txn, - table="local_group_updates", - values={ - "stream_id": next_id, - "group_id": group_id, - "user_id": user_id, - "type": "membership", - "content": json.dumps({"membership": membership, "content": content}), - } - ) - self._group_updates_stream_cache.entity_has_changed(user_id, next_id) - - # TODO: Insert profile to ensuer it comes down stream if its a join. - - if membership == "join": - if local_attestation: - self._simple_insert_txn( - txn, - table="group_attestations_renewals", - values={ - "group_id": group_id, - "user_id": user_id, - "valid_until_ms": local_attestation["valid_until_ms"], - } - ) - if remote_attestation: - self._simple_insert_txn( - txn, - table="group_attestations_remote", - values={ - "group_id": group_id, - "user_id": user_id, - "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), - } - ) - else: - self._simple_delete_txn( - txn, - table="group_attestations_renewals", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - }, - ) - self._simple_delete_txn( - txn, - table="group_attestations_remote", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - }, - ) - - with self._group_updates_id_gen.get_next() as next_id: - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, next_id, - ) - @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, long_description,): @@ -867,61 +770,6 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) - def get_joined_groups(self, user_id): - return self._simple_select_onecol( - table="local_group_membership", - keyvalues={ - "user_id": user_id, - "membership": "join", - }, - retcol="group_id", - desc="get_joined_groups", - ) - - def get_all_groups_for_user(self, user_id, now_token): - def _get_all_groups_for_user_txn(txn): - sql = """ - SELECT group_id, type, membership, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND membership != 'leave' - AND stream_id <= ? - """ - txn.execute(sql, (user_id, now_token,)) - return self.cursor_to_dict(txn) - return self.runInteraction( - "get_all_groups_for_user", _get_all_groups_for_user_txn, - ) - - def get_groups_changes_for_user(self, user_id, from_token, to_token): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_entity_changed( - user_id, from_token, - ) - if not has_changed: - return [] - - def _get_groups_changes_for_user_txn(txn): - sql = """ - SELECT group_id, membership, type, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND ? < stream_id AND stream_id <= ? - """ - txn.execute(sql, (user_id, from_token, to_token,)) - return [{ - "group_id": group_id, - "membership": membership, - "type": gtype, - "content": json.loads(content_json), - } for group_id, membership, gtype, content_json in txn] - return self.runInteraction( - "get_groups_changes_for_user", _get_groups_changes_for_user_txn, - ) - - def get_group_stream_token(self): - return self._group_updates_id_gen.get_current_token() - def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ -- cgit 1.5.1 From 4b203bdba51a314abef56ccee4d77e1945d16735 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Jul 2017 14:02:00 +0100 Subject: Correctly increment orders --- synapse/storage/group_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 2e05c23fd7..c23dc79ca5 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -184,7 +184,7 @@ class GroupServerStore(SQLBaseStore): txn.execute(""" INSERT INTO group_summary_room_categories (group_id, category_id, cat_order) - SELECT ?, ?, COALESCE(MAX(cat_order), 1) + SELECT ?, ?, COALESCE(MAX(cat_order), 0) + 1 FROM group_summary_room_categories WHERE group_id = ? AND category_id = ? """, (group_id, category_id, group_id, category_id)) @@ -457,7 +457,7 @@ class GroupServerStore(SQLBaseStore): txn.execute(""" INSERT INTO group_summary_roles (group_id, role_id, role_order) - SELECT ?, ?, COALESCE(MAX(role_order), 1) + SELECT ?, ?, COALESCE(MAX(role_order), 0) + 1 FROM group_summary_roles WHERE group_id = ? AND role_id = ? """, (group_id, role_id, group_id, role_id)) -- cgit 1.5.1 From 85fda57208bb79e54fe473fda64351f04ffe1cda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Jul 2017 14:03:54 +0100 Subject: Add DEFAULT_ROLE_ID --- synapse/storage/group_server.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index c23dc79ca5..e8a799d8c7 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -24,7 +24,8 @@ import ujson as json # The category ID for the "default" category. We don't store as null in the # database to avoid the fun of null != null -_DEFAULT_CATEGORY_ID = "default" +_DEFAULT_CATEGORY_ID = "" +_DEFAULT_ROLE_ID = "" class GroupServerStore(SQLBaseStore): @@ -426,7 +427,7 @@ class GroupServerStore(SQLBaseStore): added to the end. """ if role_id is None: - role_id = _DEFAULT_CATEGORY_ID + role_id = _DEFAULT_ROLE_ID else: role_exists = self._simple_select_one_onecol_txn( txn, @@ -523,7 +524,7 @@ class GroupServerStore(SQLBaseStore): def remove_user_from_summary(self, group_id, user_id, role_id): if role_id is None: - role_id = _DEFAULT_CATEGORY_ID + role_id = _DEFAULT_ROLE_ID return self._simple_delete( table="group_summary_users", @@ -563,7 +564,7 @@ class GroupServerStore(SQLBaseStore): { "user_id": row[0], "is_public": row[1], - "role_id": row[2] if row[2] != _DEFAULT_CATEGORY_ID else None, + "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None, "order": row[3], } for row in txn -- cgit 1.5.1 From 2f9eafdd369796d8b7731b24ab8cf6a98ad19e29 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 14:52:27 +0100 Subject: Add local group server support --- synapse/federation/transport/client.py | 77 +++ synapse/federation/transport/server.py | 44 ++ synapse/groups/groups_server.py | 7 +- synapse/handlers/groups_local.py | 278 ++++++++++ synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/groups.py | 642 +++++++++++++++++++++++ synapse/server.py | 5 + synapse/storage/__init__.py | 15 + synapse/storage/group_server.py | 152 ++++++ synapse/storage/schema/delta/43/group_server.sql | 28 + 10 files changed, 1248 insertions(+), 2 deletions(-) create mode 100644 synapse/handlers/groups_local.py create mode 100644 synapse/rest/client/v2_alpha/groups.py (limited to 'synapse/storage') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d0f8da7516..ea340e345c 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -472,6 +472,72 @@ class TransportLayerClient(object): defer.returnValue(content) + @log_function + def get_group_profile(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/profile" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_summary(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/summary" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_rooms(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/rooms" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_users(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/users" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def accept_group_invite(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + @log_function + def invite_to_group(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + @log_function def invite_to_group_notification(self, destination, group_id, user_id, content): """Sent by group server to inform a user's server that they have been @@ -487,6 +553,17 @@ class TransportLayerClient(object): ignore_backoff=True, ) + @log_function + def remove_user_from_group(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + @log_function def remove_user_from_group_notification(self, destination, group_id, user_id, content): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 4f7d2546cf..0f08334f33 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -715,6 +715,21 @@ class FederationGroupsInviteServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsLocalInviteServlet(BaseFederationServlet): + PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/invite$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "group_id doesn't match origin") + + new_content = yield self.handler.on_invite( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsAcceptInviteServlet(BaseFederationServlet): """Accept an invitation from the group server """ @@ -750,6 +765,21 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): + PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/remove$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "user_id doesn't match origin") + + new_content = yield self.handler.user_removed_from_group( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): """A group or user's server renews their attestation """ @@ -1053,6 +1083,12 @@ GROUP_SERVER_SERVLET_CLASSES = ( ) +GROUP_LOCAL_SERVLET_CLASSES = ( + FederationGroupsLocalInviteServlet, + FederationGroupsRemoveLocalUserServlet, +) + + GROUP_ATTESTATION_SERVLET_CLASSES = ( FederationGroupsRenewAttestaionServlet, ) @@ -1083,6 +1119,14 @@ def register_servlets(hs, resource, authenticator, ratelimiter): server_name=hs.hostname, ).register(resource) + for servletclass in GROUP_LOCAL_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_local_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: servletclass( handler=hs.get_groups_attestation_renewer(), diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index a00bafe3af..c8559577f7 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -462,7 +462,9 @@ class GroupsServerHandler(object): } if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + res = yield groups_local.on_invite(group_id, user_id, content) + local_attestation = None else: local_attestation = self.attestations.create_attestation(group_id, user_id) content.update({ @@ -590,7 +592,8 @@ class GroupsServerHandler(object): if is_kick: if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + yield groups_local.user_removed_from_group(group_id, user_id, {}) else: yield self.transport_client.remove_user_from_group_notification( get_domain_from_id(user_id), group_id, user_id, {} diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py new file mode 100644 index 0000000000..3df255b05a --- /dev/null +++ b/synapse/handlers/groups_local.py @@ -0,0 +1,278 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import SynapseError + +import logging + +logger = logging.getLogger(__name__) + + +# TODO: Validate attestations +# TODO: Allow users to "knock" or simpkly join depending on rules +# TODO: is_priveged flag to users and is_public to users and rooms +# TODO: Roles +# TODO: Audit log for admins (profile updates, membership changes, users who tried +# to join but were rejected, etc) +# TODO: Flairs +# TODO: Add group memebership /sync + + +def _create_rerouter(name): + def f(self, group_id, *args, **kwargs): + if self.is_mine_id(group_id): + return getattr(self.groups_server_handler, name)( + group_id, *args, **kwargs + ) + + repl_layer = self.hs.get_replication_layer() + return getattr(repl_layer, name)(group_id, *args, **kwargs) + return f + + +class GroupsLocalHandler(object): + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + self.room_list_handler = hs.get_room_list_handler() + self.groups_server_handler = hs.get_groups_server_handler() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.keyring = hs.get_keyring() + self.is_mine_id = hs.is_mine_id + self.signing_key = hs.config.signing_key[0] + self.server_name = hs.hostname + self.attestations = hs.get_groups_attestation_signing() + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + get_group_profile = _create_rerouter("get_group_profile") + get_rooms_in_group = _create_rerouter("get_rooms_in_group") + + update_group_summary_room = _create_rerouter("update_group_summary_room") + delete_group_summary_room = _create_rerouter("delete_group_summary_room") + + update_group_category = _create_rerouter("update_group_category") + delete_group_category = _create_rerouter("delete_group_category") + get_group_category = _create_rerouter("get_group_category") + get_group_categories = _create_rerouter("get_group_categories") + + update_group_summary_user = _create_rerouter("update_group_summary_user") + delete_group_summary_user = _create_rerouter("delete_group_summary_user") + + update_group_role = _create_rerouter("update_group_role") + delete_group_role = _create_rerouter("delete_group_role") + get_group_role = _create_rerouter("get_group_role") + get_group_roles = _create_rerouter("get_group_roles") + + @defer.inlineCallbacks + def get_group_summary(self, group_id, requester_user_id): + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_group_summary( + group_id, requester_user_id + ) + defer.returnValue(res) + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.get_group_summary(group_id, requester_user_id) + + chunk = res["users_section"]["users"] + valid_users = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation") + try: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + ) + valid_users.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["users_section"]["users"] = valid_users + + res["users_section"]["users"].sort(key=lambda e: e.get("order", 0)) + res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0)) + + defer.returnValue(res) + + def create_group(self, group_id, user_id, content): + logger.info("Asking to create group with ID: %r", group_id) + + if self.is_mine_id(group_id): + return self.groups_server_handler.create_group( + group_id, user_id, content + ) + + repl_layer = self.hs.get_replication_layer() + return repl_layer.create_group(group_id, user_id, content) # TODO + + def add_room(self, group_id, user_id, room_id, content): + if self.is_mine_id(group_id): + return self.groups_server_handler.add_room( + group_id, user_id, room_id, content + ) + + repl_layer = self.hs.get_replication_layer() + return repl_layer.add_room_to_group(group_id, user_id, room_id, content) # TODO + + @defer.inlineCallbacks + def get_users_in_group(self, group_id, requester_user_id): + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_users_in_group( + group_id, requester_user_id + ) + defer.returnValue(res) + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.get_users_in_group(group_id, requester_user_id) # TODO + + chunk = res["chunk"] + valid_entries = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation") + try: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + ) + valid_entries.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["chunk"] = valid_entries + + defer.returnValue(res) + + @defer.inlineCallbacks + def join_group(self, group_id, user_id, content): + raise NotImplementedError() # TODO + + @defer.inlineCallbacks + def accept_invite(self, group_id, user_id, content): + if self.is_mine_id(group_id): + yield self.groups_server_handler.accept_invite( + group_id, user_id, content + ) + local_attestation = None + remote_attestation = None + else: + local_attestation = self.attestations.create_attestation(group_id, user_id) + content["attestation"] = local_attestation + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.accept_group_invite(group_id, user_id, content) + + remote_attestation = res["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + group_id=group_id, + user_id=user_id, + ) + + yield self.store.register_user_group_membership( + group_id, user_id, + membership="join", + is_admin=False, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def invite(self, group_id, user_id, requester_user_id, config): + content = { + "requester_user_id": requester_user_id, + "config": config, + } + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.invite_to_group( + group_id, user_id, requester_user_id, content, + ) + else: + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.invite_to_group( + group_id, user_id, content, + ) + + defer.returnValue(res) + + @defer.inlineCallbacks + def on_invite(self, group_id, user_id, content): + # TODO: Support auto join and rejection + + if not self.is_mine_id(user_id): + raise SynapseError(400, "User not on this server") + + local_profile = {} + if "profile" in content: + if "name" in content["profile"]: + local_profile["name"] = content["profile"]["name"] + if "avatar_url" in content["profile"]: + local_profile["avatar_url"] = content["profile"]["avatar_url"] + + yield self.store.register_user_group_membership( + group_id, user_id, + membership="invite", + content={"profile": local_profile, "inviter": content["inviter"]}, + ) + + defer.returnValue({"state": "invite"}) + + @defer.inlineCallbacks + def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + if user_id == requester_user_id: + yield self.store.register_user_group_membership( + group_id, user_id, + membership="leave", + ) + + # TODO: Should probably remember that we tried to leave so that we can + # retry if the group server is currently down. + + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.remove_user_from_group( + group_id, user_id, requester_user_id, content, + ) + else: + content["requester_user_id"] = requester_user_id + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.remove_user_from_group( + group_id, user_id, content + ) # TODO + + defer.returnValue(res) + + @defer.inlineCallbacks + def user_removed_from_group(self, group_id, user_id, content): + # TODO: Check if user in group + yield self.store.register_user_group_membership( + group_id, user_id, + membership="leave", + ) + + @defer.inlineCallbacks + def get_joined_groups(self, user_id): + group_ids = yield self.store.get_joined_groups(user_id) + defer.returnValue({"groups": group_ids}) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 3d809d181b..16f5a73b95 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -52,6 +52,7 @@ from synapse.rest.client.v2_alpha import ( thirdparty, sendtodevice, user_directory, + groups, ) from synapse.http.server import JsonResource @@ -102,3 +103,4 @@ class ClientRestResource(JsonResource): thirdparty.register_servlets(hs, client_resource) sendtodevice.register_servlets(hs, client_resource) user_directory.register_servlets(hs, client_resource) + groups.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py new file mode 100644 index 0000000000..255552c365 --- /dev/null +++ b/synapse/rest/client/v2_alpha/groups.py @@ -0,0 +1,642 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.types import GroupID + +from ._base import client_v2_patterns + +import logging + +logger = logging.getLogger(__name__) + + +class GroupServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/profile$") + + def __init__(self, hs): + super(GroupServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + group_description = yield self.groups_handler.get_group_profile(group_id, user_id) + + defer.returnValue((200, group_description)) + + +class GroupSummaryServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/summary$") + + def __init__(self, hs): + super(GroupSummaryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id) + + defer.returnValue((200, get_group_summary)) + + +class GroupSummaryRoomsServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/summary/rooms$") + + def __init__(self, hs): + super(GroupSummaryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id) + + defer.returnValue((200, get_group_summary)) + + +class GroupSummaryRoomsDefaultCatServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary/rooms/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupSummaryRoomsDefaultCatServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=None, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=None, + ) + + defer.returnValue((200, resp)) + + +class GroupSummaryRoomsCatServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary" + "/categories/(?P[^/]+)/rooms/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupSummaryRoomsCatServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, category_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=category_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, category_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=category_id, + ) + + defer.returnValue((200, resp)) + + +class GroupCategoryServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/categories/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupCategoryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_category( + group_id, user_id, + category_id=category_id, + ) + + defer.returnValue((200, category)) + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_category( + group_id, user_id, + category_id=category_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_category( + group_id, user_id, + category_id=category_id, + ) + + defer.returnValue((200, resp)) + + +class GroupCategoriesServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/categories/$" + ) + + def __init__(self, hs): + super(GroupCategoriesServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_categories( + group_id, user_id, + ) + + defer.returnValue((200, category)) + + +class GroupRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/roles/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_role( + group_id, user_id, + role_id=role_id, + ) + + defer.returnValue((200, category)) + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_role( + group_id, user_id, + role_id=role_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_role( + group_id, user_id, + role_id=role_id, + ) + + defer.returnValue((200, resp)) + + +class GroupRolesServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/roles/$" + ) + + def __init__(self, hs): + super(GroupRolesServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_roles( + group_id, user_id, + ) + + defer.returnValue((200, category)) + + +class GroupSummaryUsersDefaultRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary/users/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupSummaryUsersDefaultRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=None, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=None, + ) + + defer.returnValue((200, resp)) + + +class GroupSummaryUsersRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary" + "/roles/(?P[^/]+)/users/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupSummaryUsersRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, role_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=role_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, role_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=role_id, + ) + + defer.returnValue((200, resp)) + + +class GroupRoomServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/rooms$") + + def __init__(self, hs): + super(GroupRoomServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_rooms_in_group(group_id, user_id) + + defer.returnValue((200, result)) + + +class GroupUsersServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/users$") + + def __init__(self, hs): + super(GroupUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_users_in_group(group_id, user_id) + + defer.returnValue((200, result)) + + +class GroupCreateServlet(RestServlet): + PATTERNS = client_v2_patterns("/create_group$") + + def __init__(self, hs): + super(GroupCreateServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + self.server_name = hs.hostname + + @defer.inlineCallbacks + def on_POST(self, request): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + # TODO: Create group on remote server + content = parse_json_object_from_request(request) + localpart = content.pop("localpart") + group_id = GroupID.create(localpart, self.server_name).to_string() + + result = yield self.groups_handler.create_group(group_id, user_id, content) + + defer.returnValue((200, result)) + + +class GroupAdminRoomsServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/rooms/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminRoomsServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.add_room(group_id, user_id, room_id, content) + + defer.returnValue((200, result)) + + +class GroupAdminUsersInviteServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/users/invite/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminUsersInviteServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + self.store = hs.get_datastore() + self.is_mine_id = hs.is_mine_id + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + config = content.get("config", {}) + result = yield self.groups_handler.invite( + group_id, user_id, requester_user_id, config, + ) + + defer.returnValue((200, result)) + + +class GroupAdminUsersKickServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/users/remove/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminUsersKickServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.remove_user_from_group( + group_id, user_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfLeaveServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/leave$" + ) + + def __init__(self, hs): + super(GroupSelfLeaveServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.remove_user_from_group( + group_id, requester_user_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfJoinServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/join$" + ) + + def __init__(self, hs): + super(GroupSelfJoinServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.join_group( + group_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfAcceptInviteServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/accept_invite$" + ) + + def __init__(self, hs): + super(GroupSelfAcceptInviteServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.accept_invite( + group_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupsForUserServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/joined_groups$" + ) + + def __init__(self, hs): + super(GroupsForUserServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_joined_groups(user_id) + + defer.returnValue((200, result)) + + +def register_servlets(hs, http_server): + GroupServlet(hs).register(http_server) + GroupSummaryServlet(hs).register(http_server) + GroupUsersServlet(hs).register(http_server) + GroupRoomServlet(hs).register(http_server) + GroupCreateServlet(hs).register(http_server) + GroupAdminRoomsServlet(hs).register(http_server) + GroupAdminUsersInviteServlet(hs).register(http_server) + GroupAdminUsersKickServlet(hs).register(http_server) + GroupSelfLeaveServlet(hs).register(http_server) + GroupSelfJoinServlet(hs).register(http_server) + GroupSelfAcceptInviteServlet(hs).register(http_server) + GroupsForUserServlet(hs).register(http_server) + GroupSummaryRoomsDefaultCatServlet(hs).register(http_server) + GroupCategoryServlet(hs).register(http_server) + GroupCategoriesServlet(hs).register(http_server) + GroupSummaryRoomsCatServlet(hs).register(http_server) + GroupRoleServlet(hs).register(http_server) + GroupRolesServlet(hs).register(http_server) + GroupSummaryUsersDefaultRoleServlet(hs).register(http_server) + GroupSummaryUsersRoleServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index d857cca848..d0a6272766 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -50,6 +50,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.user_directory import UserDirectoyHandler +from synapse.handlers.groups_local import GroupsLocalHandler from synapse.groups.groups_server import GroupsServerHandler from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory @@ -141,6 +142,7 @@ class HomeServer(object): 'read_marker_handler', 'action_generator', 'user_directory_handler', + 'groups_local_handler', 'groups_server_handler', 'groups_attestation_signing', 'groups_attestation_renewer', @@ -314,6 +316,9 @@ class HomeServer(object): def build_user_directory_handler(self): return UserDirectoyHandler(self) + def build_groups_local_handler(self): + return GroupsLocalHandler(self) + def build_groups_server_handler(self): return GroupsServerHandler(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fdee9f1ad5..594566eb38 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._group_updates_id_gen = StreamIdGenerator( + db_conn, "local_group_updates", "stream_id", + ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) + _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + db_conn, "local_group_updates", + entity_column="user_id", + stream_column="stream_id", + max_value=self._group_updates_id_gen.get_current_token(), + limit=1000, + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", min_group_updates_id, + prefilled_cache=_group_updates_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index e8a799d8c7..036549d437 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -756,6 +756,103 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + @defer.inlineCallbacks + def register_user_group_membership(self, group_id, user_id, membership, + is_admin=False, content={}, + local_attestation=None, + remote_attestation=None, + ): + def _register_user_group_membership_txn(txn, next_id): + # TODO: Upsert? + self._simple_delete_txn( + txn, + table="local_group_membership", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_insert_txn( + txn, + table="local_group_membership", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "membership": membership, + "content": json.dumps(content), + }, + ) + self._simple_delete_txn( + txn, + table="local_group_updates", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + "type": "membership", + }, + ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + + # TODO: Insert profile to ensuer it comes down stream if its a join. + + if membership == "join": + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + } + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + } + ) + else: + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) + @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, long_description,): @@ -771,6 +868,61 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 472aab0a78..e32db8b313 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -142,3 +142,31 @@ CREATE TABLE group_attestations_remote ( CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); + + +CREATE TABLE local_group_membership ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + is_admin BOOLEAN NOT NULL, + membership TEXT NOT NULL, + content TEXT NOT NULL +); + +CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); +CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); + + +CREATE TABLE local_group_profiles ( + group_id TEXT NOT NULL, + name TEXT, + avatar_url TEXT +); -- cgit 1.5.1 From e96ee95a7e84e7d75ac57395bb64e1c3428596e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 09:33:16 +0100 Subject: Remove sync stuff --- synapse/storage/__init__.py | 15 ----------- synapse/storage/group_server.py | 55 ----------------------------------------- 2 files changed, 70 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 594566eb38..fdee9f1ad5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,9 +136,6 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) - self._group_updates_id_gen = StreamIdGenerator( - db_conn, "local_group_updates", "stream_id", - ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -239,18 +236,6 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) - _group_updates_prefill, min_group_updates_id = self._get_cache_dict( - db_conn, "local_group_updates", - entity_column="user_id", - stream_column="stream_id", - max_value=self._group_updates_id_gen.get_current_token(), - limit=1000, - ) - self._group_updates_stream_cache = StreamChangeCache( - "_group_updates_stream_cache", min_group_updates_id, - prefilled_cache=_group_updates_prefill, - ) - cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 036549d437..2dcdcbfdfc 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -868,61 +868,6 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) - def get_joined_groups(self, user_id): - return self._simple_select_onecol( - table="local_group_membership", - keyvalues={ - "user_id": user_id, - "membership": "join", - }, - retcol="group_id", - desc="get_joined_groups", - ) - - def get_all_groups_for_user(self, user_id, now_token): - def _get_all_groups_for_user_txn(txn): - sql = """ - SELECT group_id, type, membership, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND membership != 'leave' - AND stream_id <= ? - """ - txn.execute(sql, (user_id, now_token,)) - return self.cursor_to_dict(txn) - return self.runInteraction( - "get_all_groups_for_user", _get_all_groups_for_user_txn, - ) - - def get_groups_changes_for_user(self, user_id, from_token, to_token): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_entity_changed( - user_id, from_token, - ) - if not has_changed: - return [] - - def _get_groups_changes_for_user_txn(txn): - sql = """ - SELECT group_id, membership, type, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND ? < stream_id AND stream_id <= ? - """ - txn.execute(sql, (user_id, from_token, to_token,)) - return [{ - "group_id": group_id, - "membership": membership, - "type": gtype, - "content": json.loads(content_json), - } for group_id, membership, gtype, content_json in txn] - return self.runInteraction( - "get_groups_changes_for_user", _get_groups_changes_for_user_txn, - ) - - def get_group_stream_token(self): - return self._group_updates_id_gen.get_current_token() - def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ -- cgit 1.5.1 From 45407301111e55d04f46957a824d73eab69796de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 09:36:17 +0100 Subject: Remove unused tables --- synapse/storage/schema/delta/43/group_server.sql | 7 ------- 1 file changed, 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index e32db8b313..f9e11c9146 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -163,10 +163,3 @@ CREATE TABLE local_group_updates ( type TEXT NOT NULL, content TEXT NOT NULL ); - - -CREATE TABLE local_group_profiles ( - group_id TEXT NOT NULL, - name TEXT, - avatar_url TEXT -); -- cgit 1.5.1 From 508460f24077da74d8a3d3ce891c0b55ebbce2e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 09:55:46 +0100 Subject: Remove sync stuff --- synapse/storage/group_server.py | 20 -------------------- synapse/storage/schema/delta/43/group_server.sql | 10 +--------- 2 files changed, 1 insertion(+), 29 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 2dcdcbfdfc..bff2324cc7 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -783,26 +783,6 @@ class GroupServerStore(SQLBaseStore): "content": json.dumps(content), }, ) - self._simple_delete_txn( - txn, - table="local_group_updates", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - "type": "membership", - }, - ) - self._simple_insert_txn( - txn, - table="local_group_updates", - values={ - "stream_id": next_id, - "group_id": group_id, - "user_id": user_id, - "type": "membership", - "content": json.dumps({"membership": membership, "content": content}), - } - ) self._group_updates_stream_cache.entity_has_changed(user_id, next_id) # TODO: Insert profile to ensuer it comes down stream if its a join. diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index f9e11c9146..e1fd47aa7f 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -144,6 +144,7 @@ CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_i CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); +-- The group membership for the HS's users CREATE TABLE local_group_membership ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -154,12 +155,3 @@ CREATE TABLE local_group_membership ( CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); - - -CREATE TABLE local_group_updates ( - stream_id BIGINT NOT NULL, - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - type TEXT NOT NULL, - content TEXT NOT NULL -); -- cgit 1.5.1 From 3e703eb04e1b30dc2bce03d3895ac79ac24a063d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 10:17:25 +0100 Subject: Comment --- synapse/storage/group_server.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index bff2324cc7..3c6ee7df68 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -762,6 +762,20 @@ class GroupServerStore(SQLBaseStore): local_attestation=None, remote_attestation=None, ): + """Registers that a local user is a member of a (local or remote) group. + + Args: + group_id (str) + user_id (str) + membership (str) + is_admin (bool) + content (dict): Content of the membership, e.g. includes the inviter + if the user has been invited. + local_attestation (dict): If remote group then store the fact that we + have given out an attestation, else None. + remote_attestation (dict): If remote group then store the remote + attestation from the group, else None. + """ def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? self._simple_delete_txn( -- cgit 1.5.1 From 94ecd871a047707da5998f83440c039d064de8aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 16:38:54 +0100 Subject: Fix typos --- synapse/federation/transport/client.py | 4 ++-- synapse/handlers/groups_local.py | 5 +++-- synapse/storage/group_server.py | 25 +++++++++++++++++-------- 3 files changed, 22 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 500f3622a2..e4d84c06c1 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -495,7 +495,7 @@ class TransportLayerClient(object): ) @log_function - def get_group_rooms(self, destination, group_id, requester_user_id): + def get_rooms_in_group(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/rooms" % (group_id,) return self.client.get_json( @@ -518,7 +518,7 @@ class TransportLayerClient(object): ) @log_function - def get_group_users(self, destination, group_id, requester_user_id): + def get_users_in_group(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/users" % (group_id,) return self.client.get_json( diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 7d7fc5d976..50f7fce885 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -159,7 +159,7 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) - res = yield self.transport_client.get_group_users( + res = yield self.transport_client.get_users_in_group( get_domain_from_id(group_id), group_id, requester_user_id, ) @@ -278,7 +278,8 @@ class GroupsLocalHandler(object): else: content["requester_user_id"] = requester_user_id res = yield self.transport_client.remove_user_from_group( - get_domain_from_id(group_id), group_id, user_id, content + get_domain_from_id(group_id), group_id, requester_user_id, + user_id, content, ) defer.returnValue(res) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 3c6ee7df68..0a69e0f501 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore): remote_attestation (dict): If remote group then store the remote attestation from the group, else None. """ - def _register_user_group_membership_txn(txn, next_id): + def _register_user_group_membership_txn(txn): # TODO: Upsert? self._simple_delete_txn( txn, @@ -797,7 +797,6 @@ class GroupServerStore(SQLBaseStore): "content": json.dumps(content), }, ) - self._group_updates_stream_cache.entity_has_changed(user_id, next_id) # TODO: Insert profile to ensuer it comes down stream if its a join. @@ -820,7 +819,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), + "attestation_json": json.dumps(remote_attestation), } ) else: @@ -841,11 +840,10 @@ class GroupServerStore(SQLBaseStore): }, ) - with self._group_updates_id_gen.get_next() as next_id: - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, next_id, - ) + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, + ) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, @@ -928,3 +926,14 @@ class GroupServerStore(SQLBaseStore): defer.returnValue(json.loads(row["attestation_json"])) defer.returnValue(None) + + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) -- cgit 1.5.1 From 6f443a74cf6ab0bfe452289f9888580725987765 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 09:46:33 +0100 Subject: Add update group profile API --- synapse/federation/transport/server.py | 12 ++++++++++++ synapse/groups/groups_server.py | 16 ++++++++++++++++ synapse/handlers/groups_local.py | 1 + synapse/rest/client/v2_alpha/groups.py | 12 ++++++++++++ synapse/storage/group_server.py | 11 +++++++++++ 5 files changed, 52 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 1332b49f35..e04750fd2a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -642,6 +642,18 @@ class FederationGroupsSummaryServlet(BaseFederationServlet): defer.returnValue((200, new_content)) + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.update_group_profile( + group_id, requester_user_id, content + ) + + defer.returnValue((200, new_content)) + class FederationGroupsRoomsServlet(BaseFederationServlet): """Get the rooms in a group on behalf of a user diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 1b6e354ca3..322aad2a6f 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -341,6 +341,22 @@ class GroupsServerHandler(object): else: raise SynapseError(404, "Unknown group") + @defer.inlineCallbacks + def update_group_profile(self, group_id, requester_user_id, content): + """Update the group profile + """ + yield self.check_group_is_ours( + group_id, and_exists=True, and_is_admin=requester_user_id, + ) + + profile = {} + for keyname in ("name", "avatar_url", "short_description", + "long_description"): + if keyname in content: + profile[keyname] = content[keyname] + + yield self.store.update_group_profile(group_id, profile) + @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): """Get the users in group as seen by requester_user_id. diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 0b80348c82..b2c920da38 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -72,6 +72,7 @@ class GroupsLocalHandler(object): # or federation depending on if the group is local or remote get_group_profile = _create_rerouter("get_group_profile") + update_group_profile = _create_rerouter("update_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") add_room_to_group = _create_rerouter("add_room_to_group") diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index f937d856fd..64d803d489 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -45,6 +45,18 @@ class GroupServlet(RestServlet): defer.returnValue((200, group_description)) + @defer.inlineCallbacks + def on_POST(self, request, group_id, content): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + yield self.groups_handler.update_group_profile( + group_id, user_id, content, + ) + + defer.returnValue((200, {})) + class GroupSummaryServlet(RestServlet): """Get the full group summary diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 0a69e0f501..4197d22d88 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -860,6 +860,17 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + @defer.inlineCallbacks + def update_group_profile(self, group_id, profile,): + yield self._simple_update_one( + table="groups", + keyvalues={ + "group_id": group_id, + }, + updatevalues=profile, + desc="create_group", + ) + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ -- cgit 1.5.1 From 57826d645bd62ab534dbcab8d66a98daec145459 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 13:15:22 +0100 Subject: Fix typo --- synapse/storage/group_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 0a69e0f501..a2e7aa47d8 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -798,7 +798,7 @@ class GroupServerStore(SQLBaseStore): }, ) - # TODO: Insert profile to ensuer it comes down stream if its a join. + # TODO: Insert profile to ensure it comes down stream if its a join. if membership == "join": if local_attestation: -- cgit 1.5.1 From 8209b5f033417ab018fdd1114170b89fb0b18aa9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 16:22:22 +0100 Subject: Fix a storage desc --- synapse/storage/group_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 4197d22d88..2331ec79bd 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -868,7 +868,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, }, updatevalues=profile, - desc="create_group", + desc="update_group_profile", ) def get_attestations_need_renewals(self, valid_until_ms): -- cgit 1.5.1 From c544188ee3644c85a97a3c4e09e63ad4e3c6f0cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 14:53:19 +0100 Subject: Add groups to sync stream --- synapse/handlers/sync.py | 64 +++++++++++++++++++++- synapse/rest/client/v2_alpha/sync.py | 5 ++ synapse/storage/__init__.py | 15 ++++++ synapse/storage/group_server.py | 68 ++++++++++++++++++++++-- synapse/storage/schema/delta/43/group_server.sql | 9 ++++ synapse/streams/events.py | 2 + synapse/types.py | 2 + tests/rest/client/v1/test_rooms.py | 4 +- 8 files changed, 161 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91c6c6be3c..c01fcd3d59 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -108,6 +108,17 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ return True +class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ + "join", + "invite", + "leave", +])): + __slots__ = [] + + def __nonzero__(self): + return self.join or self.invite or self.leave + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -119,6 +130,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "device_lists", # List of user_ids whose devices have chanegd "device_one_time_keys_count", # Dict of algorithm to count for one time keys # for this device + "groups", ])): __slots__ = [] @@ -134,7 +146,8 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.archived or self.account_data or self.to_device or - self.device_lists + self.device_lists or + self.groups ) @@ -560,6 +573,8 @@ class SyncHandler(object): user_id, device_id ) + yield self._generate_sync_entry_for_groups(sync_result_builder) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -568,10 +583,56 @@ class SyncHandler(object): archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, device_lists=device_lists, + groups=sync_result_builder.groups, device_one_time_keys_count=one_time_key_counts, next_batch=sync_result_builder.now_token, )) + @measure_func("_generate_sync_entry_for_groups") + @defer.inlineCallbacks + def _generate_sync_entry_for_groups(self, sync_result_builder): + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + + if since_token and since_token.groups_key: + results = yield self.store.get_groups_changes_for_user( + user_id, since_token.groups_key, now_token.groups_key, + ) + else: + results = yield self.store.get_all_groups_for_user( + user_id, now_token.groups_key, + ) + + invited = {} + joined = {} + left = {} + for result in results: + membership = result["membership"] + group_id = result["group_id"] + gtype = result["type"] + content = result["content"] + + if membership == "join": + if gtype == "membership": + content.pop("membership", None) + invited[group_id] = content["content"] + else: + joined.setdefault(group_id, {})[gtype] = content + elif membership == "invite": + if gtype == "membership": + content.pop("membership", None) + invited[group_id] = content["content"] + else: + if gtype == "membership": + left[group_id] = content["content"] + + sync_result_builder.groups = GroupsSyncResult( + join=joined, + invite=invited, + leave=left, + ) + @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks def _generate_sync_entry_for_device_list(self, sync_result_builder): @@ -1260,6 +1321,7 @@ class SyncResultBuilder(object): self.invited = [] self.archived = [] self.device = [] + self.groups = None class RoomSyncResultBuilder(object): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 6dcc407451..5f208a4c1c 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -199,6 +199,11 @@ class SyncRestServlet(RestServlet): "invite": invited, "leave": archived, }, + "groups": { + "join": sync_result.groups.join, + "invite": sync_result.groups.invite, + "leave": sync_result.groups.leave, + }, "device_one_time_keys_count": sync_result.device_one_time_keys_count, "next_batch": sync_result.next_batch.to_string(), } diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fdee9f1ad5..594566eb38 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._group_updates_id_gen = StreamIdGenerator( + db_conn, "local_group_updates", "stream_id", + ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) + _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + db_conn, "local_group_updates", + entity_column="user_id", + stream_column="stream_id", + max_value=self._group_updates_id_gen.get_current_token(), + limit=1000, + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", min_group_updates_id, + prefilled_cache=_group_updates_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index a2e7aa47d8..45f0a4c599 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore): remote_attestation (dict): If remote group then store the remote attestation from the group, else None. """ - def _register_user_group_membership_txn(txn): + def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? self._simple_delete_txn( txn, @@ -798,6 +798,19 @@ class GroupServerStore(SQLBaseStore): }, ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + # TODO: Insert profile to ensure it comes down stream if its a join. if membership == "join": @@ -840,10 +853,11 @@ class GroupServerStore(SQLBaseStore): }, ) - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, - ) + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, @@ -937,3 +951,47 @@ class GroupServerStore(SQLBaseStore): retcol="group_id", desc="get_joined_groups", ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index e1fd47aa7f..92f3339c94 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -155,3 +155,12 @@ CREATE TABLE local_group_membership ( CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 91a59b0bae..e2be500815 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -45,6 +45,7 @@ class EventSources(object): push_rules_key, _ = self.store.get_push_rules_stream_token() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() + groups_key = self.store.get_group_stream_token() token = StreamToken( room_key=( @@ -65,6 +66,7 @@ class EventSources(object): push_rules_key=push_rules_key, to_device_key=to_device_key, device_list_key=device_list_key, + groups_key=groups_key, ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index b32c0e360d..37d5fa7f9f 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -171,6 +171,7 @@ class StreamToken( "push_rules_key", "to_device_key", "device_list_key", + "groups_key", )) ): _SEPARATOR = "_" @@ -209,6 +210,7 @@ class StreamToken( or (int(other.push_rules_key) < int(self.push_rules_key)) or (int(other.to_device_key) < int(self.to_device_key)) or (int(other.device_list_key) < int(self.device_list_key)) + or (int(other.groups_key) < int(self.groups_key)) ) def copy_and_advance(self, key, new_value): diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index d746ea8568..de376fb514 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1032,7 +1032,7 @@ class RoomMessageListTestCase(RestTestCase): @defer.inlineCallbacks def test_topo_token_is_accepted(self): - token = "t1-0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0" (code, response) = yield self.mock_resource.trigger_get( "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)) @@ -1044,7 +1044,7 @@ class RoomMessageListTestCase(RestTestCase): @defer.inlineCallbacks def test_stream_token_is_accepted_for_fwd_pagianation(self): - token = "s0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0" (code, response) = yield self.mock_resource.trigger_get( "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)) -- cgit 1.5.1 From 2cc998fed879357376edb35d5088d88a078dd576 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 17:13:18 +0100 Subject: Fix replication. And notify --- synapse/app/synchrotron.py | 6 ++++ synapse/handlers/groups_local.py | 20 ++++++++--- synapse/replication/slave/storage/groups.py | 54 +++++++++++++++++++++++++++++ synapse/replication/tcp/streams.py | 20 +++++++++++ synapse/storage/group_server.py | 23 ++++++++++++ 5 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 synapse/replication/slave/storage/groups.py (limited to 'synapse/storage') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4bdd99a966..d06a05acd9 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -41,6 +41,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -75,6 +76,7 @@ class SynchrotronSlavedStore( SlavedRegistrationStore, SlavedFilteringStore, SlavedPresenceStore, + SlavedGroupServerStore, SlavedDeviceInboxStore, SlavedDeviceStore, SlavedClientIpStore, @@ -409,6 +411,10 @@ class SyncReplicationHandler(ReplicationClientHandler): ) elif stream_name == "presence": yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) def start(config_options): diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 0b80348c82..4182ea5afa 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -211,13 +211,16 @@ class GroupsLocalHandler(object): user_id=user_id, ) - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="join", is_admin=False, local_attestation=local_attestation, remote_attestation=remote_attestation, ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) defer.returnValue({}) @@ -257,11 +260,14 @@ class GroupsLocalHandler(object): if "avatar_url" in content["profile"]: local_profile["avatar_url"] = content["profile"]["avatar_url"] - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="invite", content={"profile": local_profile, "inviter": content["inviter"]}, ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) defer.returnValue({"state": "invite"}) @@ -270,10 +276,13 @@ class GroupsLocalHandler(object): """Remove a user from a group """ if user_id == requester_user_id: - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="leave", ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) # TODO: Should probably remember that we tried to leave so that we can # retry if the group server is currently down. @@ -296,10 +305,13 @@ class GroupsLocalHandler(object): """One of our users was removed/kicked from a group """ # TODO: Check if user in group - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="leave", ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) @defer.inlineCallbacks def get_joined_groups(self, user_id): diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py new file mode 100644 index 0000000000..0bc4bce5b0 --- /dev/null +++ b/synapse/replication/slave/storage/groups.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedGroupServerStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedGroupServerStore, self).__init__(db_conn, hs) + + self.hs = hs + + self._group_updates_id_gen = SlavedIdTracker( + db_conn, "local_group_updates", "stream_id", + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(), + ) + + get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__ + get_group_stream_token = DataStore.get_group_stream_token.__func__ + get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__ + + def stream_positions(self): + result = super(SlavedGroupServerStore, self).stream_positions() + result["groups"] = self._group_updates_id_gen.get_current_token() + return result + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "groups": + self._group_updates_id_gen.advance(token) + for row in rows: + self._group_updates_stream_cache.entity_has_changed( + row.user_id, token + ) + + return super(SlavedGroupServerStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index fbafe12cc2..4c60bf79f9 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -118,6 +118,12 @@ CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( "state_key", # str "event_id", # str, optional )) +GroupsStreamRow = namedtuple("GroupsStreamRow", ( + "group_id", # str + "user_id", # str + "type", # str + "content", # dict +)) class Stream(object): @@ -464,6 +470,19 @@ class CurrentStateDeltaStream(Stream): super(CurrentStateDeltaStream, self).__init__(hs) +class GroupServerStream(Stream): + NAME = "groups" + ROW_TYPE = GroupsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_group_stream_token + self.update_function = store.get_all_groups_changes + + super(GroupServerStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -482,5 +501,6 @@ STREAMS_MAP = { TagAccountDataStream, AccountDataStream, CurrentStateDeltaStream, + GroupServerStream, ) } diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 45f0a4c599..5006ac863f 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -853,6 +853,8 @@ class GroupServerStore(SQLBaseStore): }, ) + return next_id + with self._group_updates_id_gen.get_next() as next_id: yield self.runInteraction( "register_user_group_membership", @@ -993,5 +995,26 @@ class GroupServerStore(SQLBaseStore): "get_groups_changes_for_user", _get_groups_changes_for_user_txn, ) + def get_all_groups_changes(self, from_token, to_token, limit): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_any_entity_changed( + from_token, + ) + if not has_changed: + return [] + + def _get_all_groups_changes_txn(txn): + sql = """ + SELECT stream_id, group_id, user_id, type, content + FROM local_group_updates + WHERE ? < stream_id AND stream_id <= ? + LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit,)) + return txn.fetchall() + return self.runInteraction( + "get_all_groups_changes", _get_all_groups_changes_txn, + ) + def get_group_stream_token(self): return self._group_updates_id_gen.get_current_token() -- cgit 1.5.1 From 851aeae7c796b127dddf7ca6df882df6104ee5e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 13:40:56 +0100 Subject: Check users/rooms are in group before adding to summary --- synapse/storage/group_server.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index d42e215b26..258c3168aa 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -152,6 +152,18 @@ class GroupServerStore(SQLBaseStore): an order of 1 will put the room first. Otherwise, the room gets added to the end. """ + room_in_group = self._simple_select_one_onecol_txn( + txn, + table="group_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + }, + retcol="room_id", + allow_none=True, + ) + if not room_in_group: + raise SynapseError(400, "room not in group") if category_id is None: category_id = _DEFAULT_CATEGORY_ID @@ -426,6 +438,19 @@ class GroupServerStore(SQLBaseStore): an order of 1 will put the user first. Otherwise, the user gets added to the end. """ + user_in_group = self._simple_select_one_onecol_txn( + txn, + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + allow_none=True, + ) + if not user_in_group: + raise SynapseError(400, "user not in group") + if role_id is None: role_id = _DEFAULT_ROLE_ID else: -- cgit 1.5.1 From b76ef6ccb8e00610c791a78b940d396da82fb1ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 13:55:39 +0100 Subject: Include users membership in group in summary API --- synapse/groups/groups_server.py | 5 ++++ synapse/storage/group_server.py | 55 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index b1ee43ef90..f25f327eb9 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -130,6 +130,10 @@ class GroupsServerHandler(object): users.sort(key=lambda e: e.get("order", 0)) + membership_info = yield self.store.get_users_membership_info_in_group( + group_id, requester_user_id, + ) + defer.returnValue({ "profile": profile, "users_section": { @@ -142,6 +146,7 @@ class GroupsServerHandler(object): "categories": categories, "total_room_count_estimate": 0, # TODO }, + "user": membership_info, }) @defer.inlineCallbacks diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 258c3168aa..989a10eea6 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -672,6 +672,61 @@ class GroupServerStore(SQLBaseStore): allow_none=True, ) + @defer.inlineCallbacks + def get_users_membership_info_in_group(self, group_id, user_id): + """Get a dict describing the memebrship of a user in a group. + + Example if joined: + + { + "memebrship": "joined", + "is_public": True, + "is_privileged": False, + } + + Returns an empty dict if the user is not joined/invited/etc + """ + def _get_users_membership_in_group_txn(txn): + row = self._simple_select_one_txn( + table="group_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcols=("is_admin", "is_public"), + allow_none=True, + desc="is_user_adim_in_group", + ) + + if row: + return { + "memebrship": "joined", + "is_public": row["is_public"], + "is_privileged": row["is_admin"], + } + + row = self._simple_select_one_onecol_txn( + table="group_invites", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + retcol="user_id", + desc="is_user_invited_to_local_group", + allow_none=True, + ) + + if row: + return { + "memebrship": "invited", + } + + return {} + + return self.runInteraction( + "get_users_membership_info_in_group", _get_users_membership_in_group_txn, + ) + def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True, local_attestation=None, remote_attestation=None): """Add a user to the group server. -- cgit 1.5.1 From ed666d396985c1fa2b9acb1c69199dd55670a88f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 14:05:09 +0100 Subject: Fix all the typos --- synapse/storage/group_server.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 989a10eea6..357111e305 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -643,7 +643,7 @@ class GroupServerStore(SQLBaseStore): }, retcol="is_admin", allow_none=True, - desc="is_user_adim_in_group", + desc="is_user_admin_in_group", ) def add_group_invite(self, group_id, user_id): @@ -672,14 +672,13 @@ class GroupServerStore(SQLBaseStore): allow_none=True, ) - @defer.inlineCallbacks def get_users_membership_info_in_group(self, group_id, user_id): - """Get a dict describing the memebrship of a user in a group. + """Get a dict describing the membership of a user in a group. Example if joined: { - "memebrship": "joined", + "membership": "joined", "is_public": True, "is_privileged": False, } @@ -688,6 +687,7 @@ class GroupServerStore(SQLBaseStore): """ def _get_users_membership_in_group_txn(txn): row = self._simple_select_one_txn( + txn, table="group_users", keyvalues={ "group_id": group_id, @@ -695,30 +695,29 @@ class GroupServerStore(SQLBaseStore): }, retcols=("is_admin", "is_public"), allow_none=True, - desc="is_user_adim_in_group", ) if row: return { - "memebrship": "joined", + "membership": "joined", "is_public": row["is_public"], "is_privileged": row["is_admin"], } row = self._simple_select_one_onecol_txn( + txn, table="group_invites", keyvalues={ "group_id": group_id, "user_id": user_id, }, retcol="user_id", - desc="is_user_invited_to_local_group", allow_none=True, ) if row: return { - "memebrship": "invited", + "membership": "invited", } return {} -- cgit 1.5.1 From 629cdfb124c013c07b50116386d05162e40871aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 14:54:05 +0100 Subject: Use join rather than joined, etc. --- synapse/storage/group_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 357111e305..9c55e10e77 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -699,7 +699,7 @@ class GroupServerStore(SQLBaseStore): if row: return { - "membership": "joined", + "membership": "join", "is_public": row["is_public"], "is_privileged": row["is_admin"], } @@ -717,7 +717,7 @@ class GroupServerStore(SQLBaseStore): if row: return { - "membership": "invited", + "membership": "invite", } return {} -- cgit 1.5.1 From 966a70f1fa74192866ff5b0dbae67ee8f490d97d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 17:49:39 +0100 Subject: Update comment --- synapse/storage/group_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 9c55e10e77..f44e80b514 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -678,12 +678,12 @@ class GroupServerStore(SQLBaseStore): Example if joined: { - "membership": "joined", + "membership": "join", "is_public": True, "is_privileged": False, } - Returns an empty dict if the user is not joined/invited/etc + Returns an empty dict if the user is not join/invite/etc """ def _get_users_membership_in_group_txn(txn): row = self._simple_select_one_txn( -- cgit 1.5.1 From 05e21285aae4a0411a9ec1151ce006297fa3ca91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Aug 2017 11:50:09 +0100 Subject: Store whether the user wants to publicise their membership of a group --- synapse/handlers/groups_local.py | 4 ++++ synapse/storage/group_server.py | 2 ++ synapse/storage/schema/delta/43/group_server.sql | 1 + 3 files changed, 7 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index b8b1e754c7..3a738ef36f 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -203,12 +203,16 @@ class GroupsLocalHandler(object): user_id=user_id, ) + # TODO: Check that the group is public and we're being added publically + is_publicised = content.get("publicise", False) + token = yield self.store.register_user_group_membership( group_id, user_id, membership="join", is_admin=False, local_attestation=local_attestation, remote_attestation=remote_attestation, + is_publicised=is_publicised, ) self.notifier.on_new_event( "groups_key", token, users=[user_id], diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index f44e80b514..31514f3cdb 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -840,6 +840,7 @@ class GroupServerStore(SQLBaseStore): is_admin=False, content={}, local_attestation=None, remote_attestation=None, + is_publicised=False, ): """Registers that a local user is a member of a (local or remote) group. @@ -873,6 +874,7 @@ class GroupServerStore(SQLBaseStore): "user_id": user_id, "is_admin": is_admin, "membership": membership, + "is_publicised": is_publicised, "content": json.dumps(content), }, ) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 92f3339c94..01ac0edc35 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -150,6 +150,7 @@ CREATE TABLE local_group_membership ( user_id TEXT NOT NULL, is_admin BOOLEAN NOT NULL, membership TEXT NOT NULL, + is_publicised TEXT NOT NULL, -- if the user is publicising their membership content TEXT NOT NULL ); -- cgit 1.5.1 From b880ff190a82d4f337b94115fc017d703e53878d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Aug 2017 14:19:07 +0100 Subject: Allow update group publicity --- synapse/rest/client/v2_alpha/groups.py | 28 ++++++++++++++++++++++++++++ synapse/storage/group_server.py | 15 +++++++++++++++ 2 files changed, 43 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 009cd70737..9b1116acee 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -557,6 +557,33 @@ class GroupSelfAcceptInviteServlet(RestServlet): defer.returnValue((200, result)) +class GroupSelfUpdatePublicityServlet(RestServlet): + """Update whether we publicise a users membership of a group + """ + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/update_publicity$" + ) + + def __init__(self, hs): + super(GroupSelfUpdatePublicityServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + publicise = content["publicise"] + yield self.store.update_group_publicity( + group_id, requester_user_id, publicise, + ) + + defer.returnValue((200, {})) + + class GroupsForUserServlet(RestServlet): """Get all groups the logged in user is joined to """ @@ -598,4 +625,5 @@ def register_servlets(hs, http_server): GroupSummaryRoomsCatServlet(hs).register(http_server) GroupRoleServlet(hs).register(http_server) GroupRolesServlet(hs).register(http_server) + GroupSelfUpdatePublicityServlet(hs).register(http_server) GroupSummaryUsersRoleServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 31514f3cdb..10e757e975 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -835,6 +835,21 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + def update_group_publicity(self, group_id, user_id, publicise): + """Update whether the user is publicising their membership of the group + """ + return self._simple_update_one( + table="local_group_membership", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + updatevalues={ + "is_publicised": publicise, + }, + desc="update_group_publicity" + ) + @defer.inlineCallbacks def register_user_group_membership(self, group_id, user_id, membership, is_admin=False, content={}, -- cgit 1.5.1 From ef8e5786770ff285ebdf1fce420b5aa86437673c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Aug 2017 13:36:22 +0100 Subject: Add bulk group publicised lookup API --- synapse/federation/transport/client.py | 15 ++++++++++ synapse/federation/transport/server.py | 17 +++++++++++ synapse/handlers/groups_local.py | 42 ++++++++++++++++++++++++++ synapse/rest/client/v2_alpha/groups.py | 54 ++++++++++++++++++++++++++++++++++ synapse/storage/group_server.py | 14 +++++++++ 5 files changed, 142 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 073d3abb2a..ce68cc4937 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -812,3 +812,18 @@ class TransportLayerClient(object): args={"requester_user_id": requester_user_id}, ignore_backoff=True, ) + + def bulk_get_publicised_groups(self, destination, user_ids): + """Get the groups a list of users are publicising + """ + + path = PREFIX + "/get_groups_publicised" + + content = {"user_ids": user_ids} + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index e04750fd2a..b5f07c50bf 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1050,6 +1050,22 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): defer.returnValue((200, resp)) +class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): + """Get roles in a group + """ + PATH = ( + "/get_groups_publicised$" + ) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query): + resp = yield self.handler.bulk_get_publicised_groups( + content["user_ids"], proxy=False, + ) + + defer.returnValue((200, resp)) + + FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -1102,6 +1118,7 @@ GROUP_SERVER_SERVLET_CLASSES = ( GROUP_LOCAL_SERVLET_CLASSES = ( FederationGroupsLocalInviteServlet, FederationGroupsRemoveLocalUserServlet, + FederationGroupsBulkPublicisedServlet, ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 3a738ef36f..c980623bbc 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -313,3 +313,45 @@ class GroupsLocalHandler(object): def get_joined_groups(self, user_id): group_ids = yield self.store.get_joined_groups(user_id) defer.returnValue({"groups": group_ids}) + + @defer.inlineCallbacks + def get_publicised_groups_for_user(self, user_id): + if self.hs.is_mine_id(user_id): + result = yield self.store.get_publicised_groups_for_user(user_id) + defer.returnValue({"groups": result}) + else: + result = yield self.transport_client.get_publicised_groups_for_user( + get_domain_from_id(user_id), user_id + ) + # TODO: Verify attestations + defer.returnValue(result) + + @defer.inlineCallbacks + def bulk_get_publicised_groups(self, user_ids, proxy=True): + destinations = {} + locals = [] + + for user_id in user_ids: + if self.hs.is_mine_id(user_id): + locals.append(user_id) + else: + destinations.setdefault( + get_domain_from_id(user_id), [] + ).append(user_id) + + if not proxy and destinations: + raise SynapseError(400, "Some user_ids are not local") + + results = {} + for destination, dest_user_ids in destinations.iteritems(): + r = yield self.transport_client.bulk_get_publicised_groups( + destination, dest_user_ids, + ) + results.update(r) + + for uid in locals: + results[uid] = yield self.store.get_publicised_groups_for_user( + uid + ) + + defer.returnValue({"users": results}) diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 9b1116acee..97d7948bb9 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -584,6 +584,59 @@ class GroupSelfUpdatePublicityServlet(RestServlet): defer.returnValue((200, {})) +class PublicisedGroupsForUserServlet(RestServlet): + """Get the list of groups a user is advertising + """ + PATTERNS = client_v2_patterns( + "/publicised_groups/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(PublicisedGroupsForUserServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + yield self.auth.get_user_by_req(request) + + result = yield self.groups_handler.get_publicised_groups_for_user( + user_id + ) + + defer.returnValue((200, result)) + + +class PublicisedGroupsForUsersServlet(RestServlet): + """Get the list of groups a user is advertising + """ + PATTERNS = client_v2_patterns( + "/publicised_groups$" + ) + + def __init__(self, hs): + super(PublicisedGroupsForUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_POST(self, request): + yield self.auth.get_user_by_req(request) + + content = parse_json_object_from_request(request) + user_ids = content["user_ids"] + + result = yield self.groups_handler.bulk_get_publicised_groups( + user_ids + ) + + defer.returnValue((200, result)) + + class GroupsForUserServlet(RestServlet): """Get all groups the logged in user is joined to """ @@ -627,3 +680,4 @@ def register_servlets(hs, http_server): GroupRolesServlet(hs).register(http_server) GroupSelfUpdatePublicityServlet(hs).register(http_server) GroupSummaryUsersRoleServlet(hs).register(http_server) + PublicisedGroupsForUserServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 10e757e975..0c35b03d2a 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -835,6 +835,20 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + def get_publicised_groups_for_user(self, user_id): + """Get all groups a user is publicising + """ + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + "is_publicised": True, + }, + retcol="group_id", + desc="get_publicised_groups_for_user", + ) + def update_group_publicity(self, group_id, user_id, publicise): """Update whether the user is publicising their membership of the group """ -- cgit 1.5.1 From 175a01f56c86a4c201e72d49f22663425656d81d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Aug 2017 14:45:56 +0100 Subject: Groups: Fix mising json.load in initial sync --- synapse/storage/group_server.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index f44e80b514..792a57deb5 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1101,7 +1101,13 @@ class GroupServerStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (from_token, to_token, limit,)) - return txn.fetchall() + return [{ + "stream_id": stream_id, + "group_id": group_id, + "user_id": user_id, + "type": gtype, + "content": json.loads(content_json), + } for stream_id, group_id, user_id, gtype, content_json in txn] return self.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn, ) -- cgit 1.5.1 From 8b50fe5330249fd24d50fa97385cd88ef6703d79 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Aug 2017 13:18:23 +0100 Subject: Use BOOLEAN rather than TEXT type --- synapse/storage/schema/delta/43/group_server.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 01ac0edc35..e74554381f 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -150,7 +150,7 @@ CREATE TABLE local_group_membership ( user_id TEXT NOT NULL, is_admin BOOLEAN NOT NULL, membership TEXT NOT NULL, - is_publicised TEXT NOT NULL, -- if the user is publicising their membership + is_publicised BOOLEAN NOT NULL, -- if the user is publicising their membership content TEXT NOT NULL ); -- cgit 1.5.1 From 97c544f91f562f33b9655e7c8c8f980bac5ac658 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Aug 2017 11:11:37 +0100 Subject: Add _simple_update --- synapse/storage/_base.py | 51 ++++++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 19 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6f54036d67..5124a833a5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -743,6 +743,33 @@ class SQLBaseStore(object): txn.execute(sql, values) return cls.cursor_to_dict(txn) + def _simple_update(self, table, keyvalues, updatevalues, desc): + return self.runInteraction( + desc, + self._simple_update_txn, + table, keyvalues, updatevalues, + ) + + @staticmethod + def _simple_update_txn(txn, table, keyvalues, updatevalues): + if keyvalues: + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + else: + where = "" + + update_sql = "UPDATE %s SET %s %s" % ( + table, + ", ".join("%s = ?" % (k,) for k in updatevalues), + where, + ) + + txn.execute( + update_sql, + updatevalues.values() + keyvalues.values() + ) + + return txn.rowcount + def _simple_update_one(self, table, keyvalues, updatevalues, desc="_simple_update_one"): """Executes an UPDATE query on the named table, setting new values for @@ -768,27 +795,13 @@ class SQLBaseStore(object): table, keyvalues, updatevalues, ) - @staticmethod - def _simple_update_one_txn(txn, table, keyvalues, updatevalues): - if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) - else: - where = "" - - update_sql = "UPDATE %s SET %s %s" % ( - table, - ", ".join("%s = ?" % (k,) for k in updatevalues), - where, - ) - - txn.execute( - update_sql, - updatevalues.values() + keyvalues.values() - ) + @classmethod + def _simple_update_one_txn(cls, txn, table, keyvalues, updatevalues): + rowcount = cls._simple_update_txn(txn, table, keyvalues, updatevalues) - if txn.rowcount == 0: + if rowcount == 0: raise StoreError(404, "No row found") - if txn.rowcount > 1: + if rowcount > 1: raise StoreError(500, "More than one row matched") @staticmethod -- cgit 1.5.1 From 27ebc5c8f299488ccc0a6f100ec3b248cd81a058 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Aug 2017 11:21:34 +0100 Subject: Add remote profile cache --- synapse/groups/groups_server.py | 18 +++++ synapse/handlers/groups_local.py | 17 +++- synapse/handlers/profile.py | 81 ++++++++++++++++++- synapse/storage/profile.py | 98 +++++++++++++++++++++++ synapse/storage/schema/delta/43/profile_cache.sql | 28 +++++++ 5 files changed, 237 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/43/profile_cache.sql (limited to 'synapse/storage') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index f25f327eb9..6bccae4bfb 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -503,6 +503,13 @@ class GroupsServerHandler(object): get_domain_from_id(user_id), group_id, user_id, content ) + user_profile = res.get("user_profile", {}) + yield self.store.add_remote_profile_cache( + user_id, + displayname=user_profile.get("displayname"), + avatar_url=user_profile.get("avatar_url"), + ) + if res["state"] == "join": if not self.hs.is_mine_id(user_id): remote_attestation = res["attestation"] @@ -627,6 +634,9 @@ class GroupsServerHandler(object): get_domain_from_id(user_id), group_id, user_id, {} ) + if not self.hs.is_mine_id(user_id): + yield self.store.maybe_delete_remote_profile_cache(user_id) + defer.returnValue({}) @defer.inlineCallbacks @@ -647,6 +657,7 @@ class GroupsServerHandler(object): avatar_url = profile.get("avatar_url") short_description = profile.get("short_description") long_description = profile.get("long_description") + user_profile = content.get("user_profile", {}) yield self.store.create_group( group_id, @@ -679,6 +690,13 @@ class GroupsServerHandler(object): remote_attestation=remote_attestation, ) + if not self.hs.is_mine_id(user_id): + yield self.store.add_remote_profile_cache( + user_id, + displayname=user_profile.get("displayname"), + avatar_url=user_profile.get("avatar_url"), + ) + defer.returnValue({ "group_id": group_id, }) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 274fed9278..bfa10bde5a 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -56,6 +56,9 @@ class GroupsLocalHandler(object): self.notifier = hs.get_notifier() self.attestations = hs.get_groups_attestation_signing() + handlers = hs.get_handlers() + self.profile_handler = handlers.profile_handler + # Ensure attestations get renewed hs.get_groups_attestation_renewer() @@ -123,6 +126,7 @@ class GroupsLocalHandler(object): defer.returnValue(res) + @defer.inlineCallbacks def create_group(self, group_id, user_id, content): """Create a group """ @@ -130,13 +134,16 @@ class GroupsLocalHandler(object): logger.info("Asking to create group with ID: %r", group_id) if self.is_mine_id(group_id): - return self.groups_server_handler.create_group( + res = yield self.groups_server_handler.create_group( group_id, user_id, content ) + defer.returnValue(res) - return self.transport_client.create_group( + content["user_profile"] = yield self.profile_handler.get_profile(user_id) + res = yield self.transport_client.create_group( get_domain_from_id(group_id), group_id, user_id, content, - ) # TODO + ) + defer.returnValue(res) @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): @@ -265,7 +272,9 @@ class GroupsLocalHandler(object): "groups_key", token, users=[user_id], ) - defer.returnValue({"state": "invite"}) + user_profile = yield self.profile_handler.get_profile(user_id) + + defer.returnValue({"state": "invite", "user_profile": user_profile}) @defer.inlineCallbacks def remove_user_from_group(self, group_id, user_id, requester_user_id, content): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 7abee98dea..57e22edb0d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -19,7 +19,7 @@ from twisted.internet import defer import synapse.types from synapse.api.errors import SynapseError, AuthError, CodeMessageException -from synapse.types import UserID +from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler @@ -27,15 +27,53 @@ logger = logging.getLogger(__name__) class ProfileHandler(BaseHandler): + PROFILE_UPDATE_MS = 60 * 1000 + PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 def __init__(self, hs): super(ProfileHandler, self).__init__(hs) + self.clock = hs.get_clock() + self.federation = hs.get_replication_layer() self.federation.register_query_handler( "profile", self.on_profile_query ) + self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + + @defer.inlineCallbacks + def get_profile(self, user_id): + target_user = UserID.from_string(user_id) + if self.hs.is_mine(target_user): + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + + defer.returnValue({ + "displayname": displayname, + "avatar_url": avatar_url, + }) + else: + try: + result = yield self.federation.make_query( + destination=target_user.domain, + query_type="profile", + args={ + "user_id": user_id, + }, + ignore_backoff=True, + ) + defer.returnValue(result) + except CodeMessageException as e: + if e.code != 404: + logger.exception("Failed to get displayname") + + raise + @defer.inlineCallbacks def get_displayname(self, target_user): if self.hs.is_mine(target_user): @@ -182,3 +220,44 @@ class ProfileHandler(BaseHandler): "Failed to update join event for room %s - %s", room_id, str(e.message) ) + + def _update_remote_profile_cache(self): + """Called periodically to check profiles of remote users we havent' + checked in a while. + """ + entries = yield self.store.get_remote_profile_cache_entries_that_expire( + last_checked=self.clock.time_msec() - self.PROFILE_UPDATE_EVERY_MS + ) + + for user_id, displayname, avatar_url in entries: + is_subcscribed = yield self.store.is_subscribed_remote_profile_for_user( + user_id, + ) + if not is_subcscribed: + yield self.store.maybe_delete_remote_profile_cache(user_id) + continue + + try: + profile = yield self.federation.make_query( + destination=get_domain_from_id(user_id), + query_type="profile", + args={ + "user_id": user_id, + }, + ignore_backoff=True, + ) + except: + logger.exception("Failed to get avatar_url") + + yield self.store.update_remote_profile_cache( + user_id, displayname, avatar_url + ) + continue + + new_name = profile.get("displayname") + new_avatar = profile.get("avatar_url") + + # We always hit update to update the last_check timestamp + yield self.store.update_remote_profile_cache( + user_id, new_name, new_avatar + ) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 26a40905ae..dca6af8a77 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore @@ -55,3 +57,99 @@ class ProfileStore(SQLBaseStore): updatevalues={"avatar_url": new_avatar_url}, desc="set_profile_avatar_url", ) + + def get_from_remote_profile_cache(self, user_id): + return self._simple_select_one( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + retcols=("displayname", "avatar_url", "last_check"), + allow_none=True, + desc="get_from_remote_profile_cache", + ) + + def add_remote_profile_cache(self, user_id, displayname, avatar_url): + """Ensure we are caching the remote user's profiles. + + This should only be called when `is_subscribed_remote_profile_for_user` + would return true for the user. + """ + return self._simple_upsert( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + values={ + "displayname": displayname, + "avatar_url": avatar_url, + "last_check": self._clock.time_msec(), + }, + desc="add_remote_profile_cache", + ) + + def update_remote_profile_cache(self, user_id, displayname, avatar_url): + return self._simple_update( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + values={ + "displayname": displayname, + "avatar_url": avatar_url, + "last_check": self._clock.time_msec(), + }, + desc="update_remote_profile_cache", + ) + + @defer.inlineCallbacks + def maybe_delete_remote_profile_cache(self, user_id): + """Check if we still care about the remote user's profile, and if we + don't then remove their profile from the cache + """ + subscribed = yield self.is_subscribed_remote_profile_for_user(user_id) + if not subscribed: + yield self._simple_delete( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + desc="delete_remote_profile_cache", + ) + + def get_remote_profile_cache_entries_that_expire(self, last_checked): + """Get all users who haven't been checked since `last_checked` + """ + def _get_remote_profile_cache_entries_that_expire_txn(txn): + sql = """ + SELECT user_id, displayname, avatar_url + FROM remote_profile_cache + WHERE last_check < ? + """ + + txn.execute(sql, (last_checked,)) + + return self.cursor_to_dict(txn) + + return self.runInteraction( + "get_remote_profile_cache_entries_that_expire", + _get_remote_profile_cache_entries_that_expire_txn, + ) + + @defer.inlineCallbacks + def is_subscribed_remote_profile_for_user(self, user_id): + """Check whether we are interested in a remote user's profile. + """ + res = yield self._simple_select_one_onecol( + table="group_users", + keyvalues={"user_id": user_id}, + retcol="user_id", + allow_none=True, + desc="should_update_remote_profile_cache_for_user", + ) + + if res: + defer.returnValue(True) + + res = yield self._simple_select_one_onecol( + table="group_invites", + keyvalues={"user_id": user_id}, + retcol="user_id", + allow_none=True, + desc="should_update_remote_profile_cache_for_user", + ) + + if res: + defer.returnValue(True) diff --git a/synapse/storage/schema/delta/43/profile_cache.sql b/synapse/storage/schema/delta/43/profile_cache.sql new file mode 100644 index 0000000000..e5ddc84df0 --- /dev/null +++ b/synapse/storage/schema/delta/43/profile_cache.sql @@ -0,0 +1,28 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- A subset of remote users whose profiles we have cached. +-- Whether a user is in this table or not is defined by the storage function +-- `is_subscribed_remote_profile_for_user` +CREATE TABLE remote_profile_cache ( + user_id TEXT NOT NULL, + displayname TEXT, + avatar_url TEXT, + last_check BIGINT NOT NULL +); + +CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id); +CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check); -- cgit 1.5.1 From 4a9b1cf25300eedf66aaefcb36e23f5fadf2b57a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Aug 2017 16:23:58 +0100 Subject: Add user profiles to summary from group server --- synapse/groups/groups_server.py | 7 ++++++- synapse/handlers/profile.py | 23 +++++++++++++++++++++++ synapse/storage/profile.py | 2 +- 3 files changed, 30 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 6bccae4bfb..94cf9788bb 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -45,6 +45,7 @@ class GroupsServerHandler(object): self.server_name = hs.hostname self.attestations = hs.get_groups_attestation_signing() self.transport_client = hs.get_federation_transport_client() + self.profile_handler = hs.get_profile_handler() # Ensure attestations get renewed hs.get_groups_attestation_renewer() @@ -128,6 +129,9 @@ class GroupsServerHandler(object): group_id, user_id, ) + user_profile = yield self.profile_handler.get_profile_from_cache(user_id) + entry.update(user_profile) + users.sort(key=lambda e: e.get("order", 0)) membership_info = yield self.store.get_users_membership_info_in_group( @@ -387,7 +391,8 @@ class GroupsServerHandler(object): entry = {"user_id": g_user_id} - # TODO: Get profile information + profile = yield self.profile_handler.get_profile_from_cache(g_user_id) + entry.update(profile) if not is_public: entry["is_public"] = False diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c3cee38a43..e56e0a52bf 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -71,6 +71,29 @@ class ProfileHandler(BaseHandler): raise + @defer.inlineCallbacks + def get_profile_from_cache(self, user_id): + """Get the profile information from our local cache. If the user is + ours then the profile information will always be corect. Otherwise, + it may be out of date/missing. + """ + target_user = UserID.from_string(user_id) + if self.hs.is_mine(target_user): + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + + defer.returnValue({ + "displayname": displayname, + "avatar_url": avatar_url, + }) + else: + profile = yield self.store.get_from_remote_profile_cache(user_id) + defer.returnValue(profile or {}) + @defer.inlineCallbacks def get_displayname(self, target_user): if self.hs.is_mine(target_user): diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index dca6af8a77..beea3102fc 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -62,7 +62,7 @@ class ProfileStore(SQLBaseStore): return self._simple_select_one( table="remote_profile_cache", keyvalues={"user_id": user_id}, - retcols=("displayname", "avatar_url", "last_check"), + retcols=("displayname", "avatar_url",), allow_none=True, desc="get_from_remote_profile_cache", ) -- cgit 1.5.1 From 069ae2df126418b5be1c96727a578cfd1dd4e506 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Sep 2017 10:52:12 +0100 Subject: Fix initial sync --- synapse/storage/group_server.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 5433063507..b0399f8133 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1085,7 +1085,15 @@ class GroupServerStore(SQLBaseStore): AND stream_id <= ? """ txn.execute(sql, (user_id, now_token,)) - return self.cursor_to_dict(txn) + return [ + { + "group_id": row[0], + "type": row[1], + "membership": row[2], + "content": json.loads(row[3]), + } + for row in txn + ] return self.runInteraction( "get_all_groups_for_user", _get_all_groups_for_user_txn, ) -- cgit 1.5.1 From 197d82dc070447b4a89a82816996f38f01ca7a04 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Sep 2017 11:12:11 +0100 Subject: Correctly return next token --- synapse/storage/group_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index b0399f8133..2afd689d83 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -966,10 +966,11 @@ class GroupServerStore(SQLBaseStore): return next_id with self._group_updates_id_gen.get_next() as next_id: - yield self.runInteraction( + res = yield self.runInteraction( "register_user_group_membership", _register_user_group_membership_txn, next_id, ) + defer.returnValue(res) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, -- cgit 1.5.1 From e1dec2f1a797122b4d72ba883e09b2d1b9eafcc9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Sep 2017 16:09:57 +0100 Subject: Remove user from group summary when the leave the group --- synapse/storage/group_server.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 2afd689d83..d0b5ad231a 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -822,6 +822,14 @@ class GroupServerStore(SQLBaseStore): "user_id": user_id, }, ) + self._simple_delete_txn( + txn, + table="group_summary_users", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) return self.runInteraction("remove_user_from_group", _remove_user_from_group_txn) def add_room_to_group(self, group_id, room_id, is_public): -- cgit 1.5.1 From a8e2a3df32f3584d728021d5feafecf78b0f37d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Sep 2017 15:39:13 +0100 Subject: Add unique index to group_rooms table --- synapse/groups/groups_server.py | 2 -- synapse/storage/schema/delta/43/group_server.sql | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 94cf9788bb..699d8a5265 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -466,8 +466,6 @@ class GroupsServerHandler(object): group_id, and_exists=True, and_is_admin=requester_user_id ) - # TODO: Check if room has already been added - is_public = _parse_visibility_from_contents(content) yield self.store.add_room_to_group(group_id, room_id, is_public=is_public) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index e74554381f..b2333848a0 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -52,7 +52,7 @@ CREATE TABLE group_rooms ( is_public BOOLEAN NOT NULL -- whether the room can be seen by everyone ); -CREATE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); +CREATE UNIQUE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); -- cgit 1.5.1 From 17b8e2bd02ad0abbd25103b637eb8490f3a53507 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Sep 2017 15:52:41 +0100 Subject: Add remove room API --- synapse/federation/transport/client.py | 12 ++++++++++++ synapse/federation/transport/server.py | 14 +++++++++++++- synapse/groups/groups_server.py | 12 ++++++++++++ synapse/handlers/groups_local.py | 1 + synapse/rest/client/v2_alpha/groups.py | 11 +++++++++++ synapse/storage/group_server.py | 23 +++++++++++++++++++++++ 6 files changed, 72 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ce68cc4937..36f6eb75e9 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -525,6 +525,18 @@ class TransportLayerClient(object): ignore_backoff=True, ) + def remove_room_from_group(self, destination, group_id, requester_user_id, room_id): + """Remove a room from a group + """ + path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + @log_function def get_users_in_group(self, destination, group_id, requester_user_id): """Get users in a group diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b5f07c50bf..c7565e0737 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -674,7 +674,7 @@ class FederationGroupsRoomsServlet(BaseFederationServlet): class FederationGroupsAddRoomsServlet(BaseFederationServlet): - """Add room to group + """Add/remove room from group """ PATH = "/groups/(?P[^/]*)/room/(?)$" @@ -690,6 +690,18 @@ class FederationGroupsAddRoomsServlet(BaseFederationServlet): defer.returnValue((200, new_content)) + @defer.inlineCallbacks + def on_DELETE(self, origin, content, query, group_id, room_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.remove_room_from_group( + group_id, requester_user_id, room_id, + ) + + defer.returnValue((200, new_content)) + class FederationGroupsUsersServlet(BaseFederationServlet): """Get the users in a group on behalf of a user diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 699d8a5265..10bf61d178 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -472,6 +472,18 @@ class GroupsServerHandler(object): defer.returnValue({}) + @defer.inlineCallbacks + def remove_room_from_group(self, group_id, requester_user_id, room_id): + """Remove room from group + """ + yield self.check_group_is_ours( + group_id, and_exists=True, and_is_admin=requester_user_id + ) + + yield self.store.remove_room_from_group(group_id, room_id) + + defer.returnValue({}) + @defer.inlineCallbacks def invite_to_group(self, group_id, user_id, requester_user_id, content): """Invite user to group diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 14fdf06b58..a2bacbfc38 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -69,6 +69,7 @@ class GroupsLocalHandler(object): get_rooms_in_group = _create_rerouter("get_rooms_in_group") add_room_to_group = _create_rerouter("add_room_to_group") + remove_room_from_group = _create_rerouter("remove_room_from_group") update_group_summary_room = _create_rerouter("update_group_summary_room") delete_group_summary_room = _create_rerouter("delete_group_summary_room") diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index b469058e9d..8f3ce15b02 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -423,6 +423,17 @@ class GroupAdminRoomsServlet(RestServlet): defer.returnValue((200, result)) + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.remove_room_from_group( + group_id, user_id, room_id, + ) + + defer.returnValue((200, result)) + class GroupAdminUsersInviteServlet(RestServlet): """Invite a user to the group diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index d0b5ad231a..4fe9172adc 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -843,6 +843,29 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + def remove_room_from_group(self, group_id, room_id): + def _remove_room_from_group_txn(txn): + self._simple_delete_txn( + txn, + table="group_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + }, + ) + + self._simple_delete_txn( + txn, + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + }, + ) + return self.runInteraction( + "remove_room_from_group", _remove_room_from_group_txn, + ) + def get_publicised_groups_for_user(self, user_id): """Get all groups a user is publicising """ -- cgit 1.5.1 From 9ccb4226ba0824a1468c4e8f0abe91aca3381862 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:18:06 +0100 Subject: Delete expired url cache data --- synapse/rest/media/v1/filepath.py | 43 ++++++++++- synapse/rest/media/v1/preview_url_resource.py | 90 +++++++++++++++++++++- synapse/storage/media_repository.py | 61 +++++++++++++++ synapse/storage/prepare_database.py | 2 +- .../storage/schema/delta/44/expire_url_cache.sql | 17 ++++ 5 files changed, 208 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/44/expire_url_cache.sql (limited to 'synapse/storage') diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index d92b7ff337..c5d43209f9 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -73,19 +73,58 @@ class MediaFilePaths(object): ) def url_cache_filepath(self, media_id): + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf return os.path.join( self.base_path, "url_cache", - media_id[0:2], media_id[2:4], media_id[4:] + media_id[:10], media_id[11:] ) + def url_cache_filepath_dirs_to_delete(self, media_id): + "The dirs to try and remove if we delete the media_id file" + return [ + os.path.join( + self.base_path, "url_cache", + media_id[:10], + ), + ] + def url_cache_thumbnail(self, media_id, width, height, content_type, method): + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf + top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s-%s" % ( width, height, top_level_type, sub_type, method ) + return os.path.join( self.base_path, "url_cache_thumbnails", - media_id[0:2], media_id[2:4], media_id[4:], + media_id[:10], media_id[11:], file_name ) + + def url_cache_thumbnail_directory(self, media_id): + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf + + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + ) + + def url_cache_thumbnail_dirs_to_delete(self, media_id): + "The dirs to try and remove if we delete the media_id thumbnails" + # Media id is of the form + # E.g.: 2017-09-28-fsdRDt24DS234dsf + return [ + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + ), + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], + ), + ] diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index b81a336c5d..c5ba83ddfd 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -36,6 +36,9 @@ import cgi import ujson as json import urlparse import itertools +import datetime +import errno +import shutil import logging logger = logging.getLogger(__name__) @@ -70,6 +73,10 @@ class PreviewUrlResource(Resource): self.downloads = {} + self._cleaner_loop = self.clock.looping_call( + self._expire_url_cache_data, 30 * 10000 + ) + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -253,8 +260,7 @@ class PreviewUrlResource(Resource): # we're most likely being explicitly triggered by a human rather than a # bot, so are we really a robot? - # XXX: horrible duplication with base_resource's _download_remote_file() - file_id = random_string(24) + file_id = datetime.date.today().isoformat() + '_' + random_string(16) fname = self.filepaths.url_cache_filepath(file_id) self.media_repo._makedirs(fname) @@ -328,6 +334,86 @@ class PreviewUrlResource(Resource): "etag": headers["ETag"][0] if "ETag" in headers else None, }) + @defer.inlineCallbacks + def _expire_url_cache_data(self): + """Clean up expired url cache content, media and thumbnails. + """ + now = self.clock.time_msec() + + # First we delete expired url cache entries + media_ids = yield self.store.get_expired_url_cache(now) + + removed_media = [] + for media_id in media_ids: + fname = self.filepaths.url_cache_filepath(media_id) + try: + os.remove(fname) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + removed_media.append(media_id) + + try: + dirs = self.filepaths.url_cache_filepath_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except: + pass + + yield self.store.delete_url_cache(removed_media) + + logger.info("Deleted %d entries from url cache", len(removed_media)) + + # Now we delete old images associated with the url cache. + # These may be cached for a bit on the client (i.e., they + # may have a room open with a preview url thing open). + # So we wait a couple of days before deleting, just in case. + expire_before = now - 2 * 24 * 60 * 60 * 1000 + yield self.store.get_url_cache_media_before(expire_before) + + removed_media = [] + for media_id in media_ids: + fname = self.filepaths.url_cache_filepath(media_id) + try: + os.remove(fname) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + try: + dirs = self.filepaths.url_cache_filepath_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except: + pass + + thumbnail_dir = self.filepaths.url_cache_thumbnail_directory(media_id) + try: + shutil.rmtree(thumbnail_dir) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + removed_media.append(media_id) + + try: + dirs = self.filepaths.url_cache_thumbnail_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except: + pass + + yield self.store.delete_url_cache_media(removed_media) + + logger.info("Deleted %d media from url cache", len(removed_media)) + def decode_and_calc_og(body, media_uri, request_encoding=None): from lxml import etree diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 82bb61b811..5cca14ccb2 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -238,3 +238,64 @@ class MediaRepositoryStore(SQLBaseStore): }, ) return self.runInteraction("delete_remote_media", delete_remote_media_txn) + + def get_expired_url_cache(self, now_ts): + sql = ( + "SELECT media_id FROM local_media_repository_url_cache" + " WHERE download_ts + expires < ?" + " ORDER BY download_ts + expires ASC" + " LIMIT 100" + ) + + def _get_expired_url_cache_txn(txn): + txn.execute(sql, (now_ts,)) + return [row[0] for row in txn] + + return self.runInteraction("get_expired_url_cache", _get_expired_url_cache_txn) + + def delete_url_cache(self, media_ids): + sql = ( + "DELETE FROM local_media_repository_url_cache" + " WHERE media_id = ?" + ) + + def _delete_url_cache_txn(txn): + txn.executemany(sql, [(media_id) for media_id in media_ids]) + + return self.runInteraction("delete_url_cache", _delete_url_cache_txn) + + def get_url_cache_media_before(self, before_ts): + sql = ( + "SELECT media_id FROM local_media_repository" + " WHERE created_ts < ?" + " ORDER BY created_ts ASC" + " LIMIT 100" + ) + + def _get_url_cache_media_before_txn(txn): + txn.execute(sql, (before_ts,)) + return [row[0] for row in txn] + + return self.runInteraction( + "get_url_cache_media_before", _get_url_cache_media_before_txn, + ) + + def delete_url_cache_media(self, media_ids): + def _delete_url_cache_media_txn(txn): + sql = ( + "DELETE FROM local_media_repository" + " WHERE media_id = ?" + ) + + txn.executemany(sql, [(media_id) for media_id in media_ids]) + + sql = ( + "DELETE FROM local_media_repository_thumbnails" + " WHERE media_id = ?" + ) + + txn.executemany(sql, [(media_id) for media_id in media_ids]) + + return self.runInteraction( + "delete_url_cache_media", _delete_url_cache_media_txn, + ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 72b670b83b..a0af8456f5 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 43 +SCHEMA_VERSION = 44 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql new file mode 100644 index 0000000000..96202bd2a6 --- /dev/null +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; +CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(download_ts + expires); -- cgit 1.5.1 From 77f1d24de3c696f52bc1ba6d0f61e82f03a9de7a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:23:15 +0100 Subject: More brackets --- synapse/storage/schema/delta/44/expire_url_cache.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql index 96202bd2a6..997e790b6d 100644 --- a/synapse/storage/schema/delta/44/expire_url_cache.sql +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -14,4 +14,4 @@ */ CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; -CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(download_ts + expires); +CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache((download_ts + expires)); -- cgit 1.5.1 From ae79764fe55ab15156b4f28658326bd2c9c0b937 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:37:53 +0100 Subject: Change expires column to expires_ts --- synapse/rest/media/v1/preview_url_resource.py | 4 ++-- synapse/storage/media_repository.py | 14 +++++++------- .../storage/schema/delta/44/expire_url_cache.sql | 21 ++++++++++++++++++++- 3 files changed, 29 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index c5ba83ddfd..6f896ffb53 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -137,7 +137,7 @@ class PreviewUrlResource(Resource): cache_result = yield self.store.get_url_cache(url, ts) if ( cache_result and - cache_result["download_ts"] + cache_result["expires"] > ts and + cache_result["expires_ts"] > ts and cache_result["response_code"] / 100 == 2 ): respond_with_json_bytes( @@ -246,7 +246,7 @@ class PreviewUrlResource(Resource): url, media_info["response_code"], media_info["etag"], - media_info["expires"], + media_info["expires"] + media_info["created_ts"], json.dumps(og), media_info["filesystem_id"], media_info["created_ts"], diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 5cca14ccb2..b8a0dd0762 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -62,7 +62,7 @@ class MediaRepositoryStore(SQLBaseStore): def get_url_cache_txn(txn): # get the most recently cached result (relative to the given ts) sql = ( - "SELECT response_code, etag, expires, og, media_id, download_ts" + "SELECT response_code, etag, expires_ts, og, media_id, download_ts" " FROM local_media_repository_url_cache" " WHERE url = ? AND download_ts <= ?" " ORDER BY download_ts DESC LIMIT 1" @@ -74,7 +74,7 @@ class MediaRepositoryStore(SQLBaseStore): # ...or if we've requested a timestamp older than the oldest # copy in the cache, return the oldest copy (if any) sql = ( - "SELECT response_code, etag, expires, og, media_id, download_ts" + "SELECT response_code, etag, expires_ts, og, media_id, download_ts" " FROM local_media_repository_url_cache" " WHERE url = ? AND download_ts > ?" " ORDER BY download_ts ASC LIMIT 1" @@ -86,14 +86,14 @@ class MediaRepositoryStore(SQLBaseStore): return None return dict(zip(( - 'response_code', 'etag', 'expires', 'og', 'media_id', 'download_ts' + 'response_code', 'etag', 'expires_ts', 'og', 'media_id', 'download_ts' ), row)) return self.runInteraction( "get_url_cache", get_url_cache_txn ) - def store_url_cache(self, url, response_code, etag, expires, og, media_id, + def store_url_cache(self, url, response_code, etag, expires_ts, og, media_id, download_ts): return self._simple_insert( "local_media_repository_url_cache", @@ -101,7 +101,7 @@ class MediaRepositoryStore(SQLBaseStore): "url": url, "response_code": response_code, "etag": etag, - "expires": expires, + "expires_ts": expires_ts, "og": og, "media_id": media_id, "download_ts": download_ts, @@ -242,8 +242,8 @@ class MediaRepositoryStore(SQLBaseStore): def get_expired_url_cache(self, now_ts): sql = ( "SELECT media_id FROM local_media_repository_url_cache" - " WHERE download_ts + expires < ?" - " ORDER BY download_ts + expires ASC" + " WHERE expires_ts < ?" + " ORDER BY expires_ts ASC" " LIMIT 100" ) diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql index 997e790b6d..9475d53e84 100644 --- a/synapse/storage/schema/delta/44/expire_url_cache.sql +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -14,4 +14,23 @@ */ CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; -CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache((download_ts + expires)); + +-- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support +-- indices on expressions until 3.9. +CREATE TABLE local_media_repository_url_cache_new( + url TEXT, + response_code INTEGER, + etag TEXT, + expires_ts BIGINT, + og TEXT, + media_id TEXT, + download_ts BIGINT +); + +INSERT INTO local_media_repository_url_cache_new + SELECT url, response_code, etag, expires + download_ts, og, media_id, download_ts FROM local_media_repository_url_cache; + +DROP TABLE local_media_repository_url_cache; +ALTER TABLE local_media_repository_url_cache_new RENAME TO local_media_repository_url_cache; + +CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts); -- cgit 1.5.1 From 7a44c01d894d85a0eb829b4a82d1aeaff9a39ec9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 12:46:04 +0100 Subject: Fix typo --- synapse/storage/media_repository.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index b8a0dd0762..5e39daa210 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -260,7 +260,7 @@ class MediaRepositoryStore(SQLBaseStore): ) def _delete_url_cache_txn(txn): - txn.executemany(sql, [(media_id) for media_id in media_ids]) + txn.executemany(sql, [(media_id,) for media_id in media_ids]) return self.runInteraction("delete_url_cache", _delete_url_cache_txn) @@ -287,14 +287,14 @@ class MediaRepositoryStore(SQLBaseStore): " WHERE media_id = ?" ) - txn.executemany(sql, [(media_id) for media_id in media_ids]) + txn.executemany(sql, [(media_id,) for media_id in media_ids]) sql = ( "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?" ) - txn.executemany(sql, [(media_id) for media_id in media_ids]) + txn.executemany(sql, [(media_id,) for media_id in media_ids]) return self.runInteraction( "delete_url_cache_media", _delete_url_cache_media_txn, -- cgit 1.5.1 From 93247a424a5068b088567fa98b6990e47608b7cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 13:48:14 +0100 Subject: Only pull out local media that were for url cache --- synapse/storage/media_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 5e39daa210..1f2eab98e3 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -267,7 +267,7 @@ class MediaRepositoryStore(SQLBaseStore): def get_url_cache_media_before(self, before_ts): sql = ( "SELECT media_id FROM local_media_repository" - " WHERE created_ts < ?" + " WHERE created_ts < ? AND url_cache IS NOT NULL" " ORDER BY created_ts ASC" " LIMIT 100" ) -- cgit 1.5.1 From 4dc07e93a85f0f6e09a6763a7833ef935be1c417 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 14:10:33 +0100 Subject: Add old indices --- synapse/storage/schema/delta/44/expire_url_cache.sql | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql index 9475d53e84..e2b775f038 100644 --- a/synapse/storage/schema/delta/44/expire_url_cache.sql +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -34,3 +34,5 @@ DROP TABLE local_media_repository_url_cache; ALTER TABLE local_media_repository_url_cache_new RENAME TO local_media_repository_url_cache; CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts); +CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache(url, download_ts); +CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache(media_id); -- cgit 1.5.1 From 768f00dedbee83dd6bfb7c37bfadc511f7aeb10e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 14:27:27 +0100 Subject: Up the limits on number of url cache entries to delete at one time --- synapse/storage/media_repository.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 1f2eab98e3..7110a71279 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -244,7 +244,7 @@ class MediaRepositoryStore(SQLBaseStore): "SELECT media_id FROM local_media_repository_url_cache" " WHERE expires_ts < ?" " ORDER BY expires_ts ASC" - " LIMIT 100" + " LIMIT 500" ) def _get_expired_url_cache_txn(txn): @@ -269,7 +269,7 @@ class MediaRepositoryStore(SQLBaseStore): "SELECT media_id FROM local_media_repository" " WHERE created_ts < ? AND url_cache IS NOT NULL" " ORDER BY created_ts ASC" - " LIMIT 100" + " LIMIT 500" ) def _get_url_cache_media_before_txn(txn): -- cgit 1.5.1 From 30848c0fcd34aaf0b2db7b65c91648ae49c480a2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Oct 2017 11:09:51 +0100 Subject: Ignore incoming events for rooms that we have left When synapse receives an event for a room its not in over federation, it double checks with the remote server to see if it is in fact in the room. This is done so that if the server has forgotten about the room (usually as a result of the database being dropped) it can recover from it. However, in the presence of state resets in large rooms, this can cause a lot of work for servers that have legitimately left. As a hacky solution that supports both cases we drop incoming events for rooms that we have explicitly left. This means that we no longer support the case of servers having forgotten that they've rejoined a room, but that is sufficiently rare that we're not going to support it for now. --- synapse/handlers/federation.py | 23 +++++++++++++++++++++++ synapse/storage/roommember.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 18f87cad67..b160ff1684 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -125,6 +125,29 @@ class FederationHandler(BaseHandler): self.room_queues[pdu.room_id].append((pdu, origin)) return + # If we're no longer in the room just ditch the event entirely. This + # is probably an old server that has come back and thinks we're still + # in the room. + # + # If we were never in the room then maybe our database got vaped and + # we should check if we *are* in fact in the room. If we are then we + # can magically rejoin the room. + is_in_room = yield self.auth.check_host_in_room( + pdu.room_id, + self.server_name + ) + if not is_in_room: + was_in_room = yield self.store.was_host_joined( + pdu.room_id, self.server_name, + + ) + if was_in_room: + logger.info( + "Ignoring PDU %s for room %s from %s as we've left the room!", + pdu.event_id, pdu.room_id, origin, + ) + return + state = None auth_chain = [] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 457ca288d0..cb0791e591 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -533,6 +533,38 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(True) + @cachedInlineCallbacks() + def was_host_joined(self, room_id, host): + """Check whether the server is or ever was in the room. + """ + if '%' in host or '_' in host: + raise Exception("Invalid host name") + + sql = """ + SELECT user_id FROM room_memberships + WHERE room_id = ? + AND user_id LIKE ? + AND membership = 'join' + LIMIT 1 + """ + + # We do need to be careful to ensure that host doesn't have any wild cards + # in it, but we checked above for known ones and we'll check below that + # the returned user actually has the correct domain. + like_clause = "%:" + host + + rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause) + + if not rows: + defer.returnValue(False) + + user_id = rows[0][0] + if get_domain_from_id(user_id) != host: + # This can only happen if the host name has something funky in it + raise Exception("Invalid host name") + + defer.returnValue(True) + def get_joined_hosts(self, room_id, state_entry): state_group = state_entry.state_group if not state_group: -- cgit 1.5.1 From e4ab96021e84ad9cccb2c3e0dea6347cce4e6149 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Oct 2017 14:10:21 +0100 Subject: Update comments --- synapse/handlers/federation.py | 2 +- synapse/storage/roommember.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7456b23005..77dd0ae1e2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -127,7 +127,7 @@ class FederationHandler(BaseHandler): # If we're no longer in the room just ditch the event entirely. This # is probably an old server that has come back and thinks we're still - # in the room. + # in the room (or we've been rejoined to the room by a state reset). # # If we were never in the room then maybe our database got vaped and # we should check if we *are* in fact in the room. If we are then we diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index cb0791e591..63f6115ba9 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -536,6 +536,13 @@ class RoomMemberStore(SQLBaseStore): @cachedInlineCallbacks() def was_host_joined(self, room_id, host): """Check whether the server is or ever was in the room. + + Args: + room_id (str) + host (str) + + Returns: + bool: whether the host is/was in the room or not """ if '%' in host or '_' in host: raise Exception("Invalid host name") -- cgit 1.5.1 From 11d62f43c9a28de7efd00a534cfbf05f254bfc3e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Oct 2017 14:12:28 +0100 Subject: Invalidate cache --- synapse/storage/events.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7002b3752e..4f0b43c36d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -784,6 +784,9 @@ class EventsStore(SQLBaseStore): self._invalidate_cache_and_stream( txn, self.is_host_joined, (room_id, host) ) + self._invalidate_cache_and_stream( + txn, self.was_host_joined, (room_id, host) + ) self._invalidate_cache_and_stream( txn, self.get_users_in_room, (room_id,) -- cgit 1.5.1 From e8496efe8467568f488c6f53056be4bf69fd56e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Oct 2017 15:17:34 +0100 Subject: Fix up comment --- synapse/storage/roommember.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 63f6115ba9..a0fc9a6867 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -542,7 +542,8 @@ class RoomMemberStore(SQLBaseStore): host (str) Returns: - bool: whether the host is/was in the room or not + Deferred: Resolves to True if the host is/was in the room, otherwise + False. """ if '%' in host or '_' in host: raise Exception("Invalid host name") -- cgit 1.5.1 From c2c47550f9b85fda1a24964f053d03e459bb8436 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Oct 2017 13:23:15 +0100 Subject: Fix schema delta versions --- synapse/storage/prepare_database.py | 2 +- synapse/storage/schema/delta/43/group_server.sql | 167 ---------------------- synapse/storage/schema/delta/43/profile_cache.sql | 28 ---- synapse/storage/schema/delta/45/group_server.sql | 167 ++++++++++++++++++++++ synapse/storage/schema/delta/45/profile_cache.sql | 28 ++++ 5 files changed, 196 insertions(+), 196 deletions(-) delete mode 100644 synapse/storage/schema/delta/43/group_server.sql delete mode 100644 synapse/storage/schema/delta/43/profile_cache.sql create mode 100644 synapse/storage/schema/delta/45/group_server.sql create mode 100644 synapse/storage/schema/delta/45/profile_cache.sql (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index a0af8456f5..ccaaabcfa0 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 44 +SCHEMA_VERSION = 45 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql deleted file mode 100644 index b2333848a0..0000000000 --- a/synapse/storage/schema/delta/43/group_server.sql +++ /dev/null @@ -1,167 +0,0 @@ -/* Copyright 2017 Vector Creations Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE TABLE groups ( - group_id TEXT NOT NULL, - name TEXT, -- the display name of the room - avatar_url TEXT, - short_description TEXT, - long_description TEXT -); - -CREATE UNIQUE INDEX groups_idx ON groups(group_id); - - --- list of users the group server thinks are joined -CREATE TABLE group_users ( - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - is_admin BOOLEAN NOT NULL, - is_public BOOLEAN NOT NULL -- whether the users membership can be seen by everyone -); - - -CREATE INDEX groups_users_g_idx ON group_users(group_id, user_id); -CREATE INDEX groups_users_u_idx ON group_users(user_id); - --- list of users the group server thinks are invited -CREATE TABLE group_invites ( - group_id TEXT NOT NULL, - user_id TEXT NOT NULL -); - -CREATE INDEX groups_invites_g_idx ON group_invites(group_id, user_id); -CREATE INDEX groups_invites_u_idx ON group_invites(user_id); - - -CREATE TABLE group_rooms ( - group_id TEXT NOT NULL, - room_id TEXT NOT NULL, - is_public BOOLEAN NOT NULL -- whether the room can be seen by everyone -); - -CREATE UNIQUE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); -CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); - - --- Rooms to include in the summary -CREATE TABLE group_summary_rooms ( - group_id TEXT NOT NULL, - room_id TEXT NOT NULL, - category_id TEXT NOT NULL, - room_order BIGINT NOT NULL, - is_public BOOLEAN NOT NULL, -- whether the room should be show to everyone - UNIQUE (group_id, category_id, room_id, room_order), - CHECK (room_order > 0) -); - -CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id); - - --- Categories to include in the summary -CREATE TABLE group_summary_room_categories ( - group_id TEXT NOT NULL, - category_id TEXT NOT NULL, - cat_order BIGINT NOT NULL, - UNIQUE (group_id, category_id, cat_order), - CHECK (cat_order > 0) -); - --- The categories in the group -CREATE TABLE group_room_categories ( - group_id TEXT NOT NULL, - category_id TEXT NOT NULL, - profile TEXT NOT NULL, - is_public BOOLEAN NOT NULL, -- whether the category should be show to everyone - UNIQUE (group_id, category_id) -); - --- The users to include in the group summary -CREATE TABLE group_summary_users ( - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - role_id TEXT NOT NULL, - user_order BIGINT NOT NULL, - is_public BOOLEAN NOT NULL -- whether the user should be show to everyone -); - -CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id); - --- The roles to include in the group summary -CREATE TABLE group_summary_roles ( - group_id TEXT NOT NULL, - role_id TEXT NOT NULL, - role_order BIGINT NOT NULL, - UNIQUE (group_id, role_id, role_order), - CHECK (role_order > 0) -); - - --- The roles in a groups -CREATE TABLE group_roles ( - group_id TEXT NOT NULL, - role_id TEXT NOT NULL, - profile TEXT NOT NULL, - is_public BOOLEAN NOT NULL, -- whether the role should be show to everyone - UNIQUE (group_id, role_id) -); - - --- List of attestations we've given out and need to renew -CREATE TABLE group_attestations_renewals ( - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - valid_until_ms BIGINT NOT NULL -); - -CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(group_id, user_id); -CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id); -CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms); - - --- List of attestations we've received from remotes and are interested in. -CREATE TABLE group_attestations_remote ( - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - valid_until_ms BIGINT NOT NULL, - attestation_json TEXT NOT NULL -); - -CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); -CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); -CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); - - --- The group membership for the HS's users -CREATE TABLE local_group_membership ( - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - is_admin BOOLEAN NOT NULL, - membership TEXT NOT NULL, - is_publicised BOOLEAN NOT NULL, -- if the user is publicising their membership - content TEXT NOT NULL -); - -CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); -CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); - - -CREATE TABLE local_group_updates ( - stream_id BIGINT NOT NULL, - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - type TEXT NOT NULL, - content TEXT NOT NULL -); diff --git a/synapse/storage/schema/delta/43/profile_cache.sql b/synapse/storage/schema/delta/43/profile_cache.sql deleted file mode 100644 index e5ddc84df0..0000000000 --- a/synapse/storage/schema/delta/43/profile_cache.sql +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright 2017 New Vector Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - --- A subset of remote users whose profiles we have cached. --- Whether a user is in this table or not is defined by the storage function --- `is_subscribed_remote_profile_for_user` -CREATE TABLE remote_profile_cache ( - user_id TEXT NOT NULL, - displayname TEXT, - avatar_url TEXT, - last_check BIGINT NOT NULL -); - -CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id); -CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check); diff --git a/synapse/storage/schema/delta/45/group_server.sql b/synapse/storage/schema/delta/45/group_server.sql new file mode 100644 index 0000000000..b2333848a0 --- /dev/null +++ b/synapse/storage/schema/delta/45/group_server.sql @@ -0,0 +1,167 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE groups ( + group_id TEXT NOT NULL, + name TEXT, -- the display name of the room + avatar_url TEXT, + short_description TEXT, + long_description TEXT +); + +CREATE UNIQUE INDEX groups_idx ON groups(group_id); + + +-- list of users the group server thinks are joined +CREATE TABLE group_users ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + is_admin BOOLEAN NOT NULL, + is_public BOOLEAN NOT NULL -- whether the users membership can be seen by everyone +); + + +CREATE INDEX groups_users_g_idx ON group_users(group_id, user_id); +CREATE INDEX groups_users_u_idx ON group_users(user_id); + +-- list of users the group server thinks are invited +CREATE TABLE group_invites ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL +); + +CREATE INDEX groups_invites_g_idx ON group_invites(group_id, user_id); +CREATE INDEX groups_invites_u_idx ON group_invites(user_id); + + +CREATE TABLE group_rooms ( + group_id TEXT NOT NULL, + room_id TEXT NOT NULL, + is_public BOOLEAN NOT NULL -- whether the room can be seen by everyone +); + +CREATE UNIQUE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id); +CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id); + + +-- Rooms to include in the summary +CREATE TABLE group_summary_rooms ( + group_id TEXT NOT NULL, + room_id TEXT NOT NULL, + category_id TEXT NOT NULL, + room_order BIGINT NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the room should be show to everyone + UNIQUE (group_id, category_id, room_id, room_order), + CHECK (room_order > 0) +); + +CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id); + + +-- Categories to include in the summary +CREATE TABLE group_summary_room_categories ( + group_id TEXT NOT NULL, + category_id TEXT NOT NULL, + cat_order BIGINT NOT NULL, + UNIQUE (group_id, category_id, cat_order), + CHECK (cat_order > 0) +); + +-- The categories in the group +CREATE TABLE group_room_categories ( + group_id TEXT NOT NULL, + category_id TEXT NOT NULL, + profile TEXT NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the category should be show to everyone + UNIQUE (group_id, category_id) +); + +-- The users to include in the group summary +CREATE TABLE group_summary_users ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + role_id TEXT NOT NULL, + user_order BIGINT NOT NULL, + is_public BOOLEAN NOT NULL -- whether the user should be show to everyone +); + +CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id); + +-- The roles to include in the group summary +CREATE TABLE group_summary_roles ( + group_id TEXT NOT NULL, + role_id TEXT NOT NULL, + role_order BIGINT NOT NULL, + UNIQUE (group_id, role_id, role_order), + CHECK (role_order > 0) +); + + +-- The roles in a groups +CREATE TABLE group_roles ( + group_id TEXT NOT NULL, + role_id TEXT NOT NULL, + profile TEXT NOT NULL, + is_public BOOLEAN NOT NULL, -- whether the role should be show to everyone + UNIQUE (group_id, role_id) +); + + +-- List of attestations we've given out and need to renew +CREATE TABLE group_attestations_renewals ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + valid_until_ms BIGINT NOT NULL +); + +CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(group_id, user_id); +CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id); +CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms); + + +-- List of attestations we've received from remotes and are interested in. +CREATE TABLE group_attestations_remote ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + valid_until_ms BIGINT NOT NULL, + attestation_json TEXT NOT NULL +); + +CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); +CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); +CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); + + +-- The group membership for the HS's users +CREATE TABLE local_group_membership ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + is_admin BOOLEAN NOT NULL, + membership TEXT NOT NULL, + is_publicised BOOLEAN NOT NULL, -- if the user is publicising their membership + content TEXT NOT NULL +); + +CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); +CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); diff --git a/synapse/storage/schema/delta/45/profile_cache.sql b/synapse/storage/schema/delta/45/profile_cache.sql new file mode 100644 index 0000000000..e5ddc84df0 --- /dev/null +++ b/synapse/storage/schema/delta/45/profile_cache.sql @@ -0,0 +1,28 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- A subset of remote users whose profiles we have cached. +-- Whether a user is in this table or not is defined by the storage function +-- `is_subscribed_remote_profile_for_user` +CREATE TABLE remote_profile_cache ( + user_id TEXT NOT NULL, + displayname TEXT, + avatar_url TEXT, + last_check BIGINT NOT NULL +); + +CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id); +CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check); -- cgit 1.5.1 From ea18996f54194f920dc506201a65eb3d36bb161d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Oct 2017 15:44:37 +0100 Subject: Fix group stream replication The stream update functions expect the storage function to return a list of tuples. --- synapse/storage/group_server.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 4fe9172adc..22a6bc6261 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1172,13 +1172,13 @@ class GroupServerStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (from_token, to_token, limit,)) - return [{ - "stream_id": stream_id, - "group_id": group_id, - "user_id": user_id, - "type": gtype, - "content": json.loads(content_json), - } for stream_id, group_id, user_id, gtype, content_json in txn] + return [( + stream_id, + group_id, + user_id, + gtype, + json.loads(content_json), + ) for stream_id, group_id, user_id, gtype, content_json in txn] return self.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn, ) -- cgit 1.5.1 From 818b08d0e4be7571008af4590542fd652f028dcd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Oct 2017 15:54:00 +0100 Subject: peeeeeeeeep8888888888888888888888888888 --- synapse/storage/group_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 22a6bc6261..3af372de59 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1178,7 +1178,7 @@ class GroupServerStore(SQLBaseStore): user_id, gtype, json.loads(content_json), - ) for stream_id, group_id, user_id, gtype, content_json in txn] + ) for stream_id, group_id, user_id, gtype, content_json in txn] return self.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn, ) -- cgit 1.5.1 From 2c5972f87f0541aaeff43846f7050ab91d11cf0e Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 15:31:11 +0100 Subject: Implement GET /groups/$groupId/invited_users --- synapse/federation/transport/client.py | 13 +++++++++++++ synapse/federation/transport/server.py | 18 ++++++++++++++++- synapse/groups/groups_server.py | 35 ++++++++++++++++++++++++++++++++++ synapse/handlers/groups_local.py | 17 +++++++++++++++++ synapse/rest/client/v2_alpha/groups.py | 21 ++++++++++++++++++++ synapse/storage/group_server.py | 12 ++++++++++++ 6 files changed, 115 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f96561c1fe..125d8f3598 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -550,6 +550,19 @@ class TransportLayerClient(object): ignore_backoff=True, ) + @log_function + def get_invited_users_in_group(self, destination, group_id, requester_user_id): + """Get users that have been invited to a group + """ + path = PREFIX + "/groups/%s/invited_users" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + @log_function def accept_group_invite(self, destination, group_id, user_id, content): """Accept a group invite diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index c7565e0737..625a2fe27f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -720,6 +720,22 @@ class FederationGroupsUsersServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsInvitedUsersServlet(BaseFederationServlet): + """Get the users that have been invited to a group + """ + PATH = "/groups/(?P[^/]*)/invited_users$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_invited_users_in_group( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) class FederationGroupsInviteServlet(BaseFederationServlet): """Ask a group server to invite someone to the group @@ -1109,12 +1125,12 @@ ROOM_LIST_CLASSES = ( PublicRoomList, ) - GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsProfileServlet, FederationGroupsSummaryServlet, FederationGroupsRoomsServlet, FederationGroupsUsersServlet, + FederationGroupsInvitedUsersServlet, FederationGroupsInviteServlet, FederationGroupsAcceptInviteServlet, FederationGroupsRemoveUserServlet, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 1083bc2990..bfa46b7cb2 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -420,6 +420,41 @@ class GroupsServerHandler(object): "total_user_count_estimate": len(user_results), }) + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get the users that have been invited to a group as seen by requester_user_id. + + The ordering is arbitrary at the moment + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) + + if not is_user_in_group: + raise SynapseError(403, "User not in group") + + invited_users = yield self.store.get_invited_users_in_group(group_id) + + user_profiles = [] + + for user_id in invited_users: + user_profile = { + "user_id": user_id + } + try: + profile = yield self.profile_handler.get_profile_from_cache(user) + user_profile.update(profile) + except Exception as e: + pass + user_profiles.append(user_profile) + + defer.returnValue({ + "chunk": user_profiles, + "total_user_count_estimate": len(invited_users), + }) + + @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): """Get the rooms in group as seen by requester_user_id diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 97a20f2b04..5263e769bb 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -219,6 +219,23 @@ class GroupsLocalHandler(object): defer.returnValue(res) + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get users invited to a group + """ + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_invited_users_in_group( + group_id, requester_user_id + ) + defer.returnValue(res) + + group_server_name = get_domain_from_id(group_id) + + res = yield self.transport_client.get_users_in_group( + get_domain_from_id(group_id), group_id, requester_user_id, + ) + defer.returnValue(res) + @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 8f3ce15b02..4532112cfc 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -370,6 +370,26 @@ class GroupUsersServlet(RestServlet): defer.returnValue((200, result)) +class GroupInvitedUsersServlet(RestServlet): + """Get users invited to a group + """ + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/invited_users$") + + def __init__(self, hs): + super(GroupInvitedUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_invited_users_in_group(group_id, user_id) + + defer.returnValue((200, result)) + class GroupCreateServlet(RestServlet): """Create a group @@ -674,6 +694,7 @@ class GroupsForUserServlet(RestServlet): def register_servlets(hs, http_server): GroupServlet(hs).register(http_server) GroupSummaryServlet(hs).register(http_server) + GroupInvitedUsersServlet(hs).register(http_server) GroupUsersServlet(hs).register(http_server) GroupRoomServlet(hs).register(http_server) GroupCreateServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 3af372de59..9e63db5c6c 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -56,6 +56,18 @@ class GroupServerStore(SQLBaseStore): desc="get_users_in_group", ) + def get_invited_users_in_group(self, group_id): + # TODO: Pagination + + return self._simple_select_onecol( + table="group_invites", + keyvalues={ + "group_id": group_id, + }, + retcol="user_id", + desc="get_invited_users_in_group", + ) + def get_rooms_in_group(self, group_id, include_private=False): # TODO: Pagination -- cgit 1.5.1 From 2e9f5ea31a9c66eceb6276c5241cc6537cb0ae4c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 10:59:30 +0100 Subject: Fix logcontext handling for persist_events * don't use preserve_context_over_deferred, which is known broken. * remove a redundant preserve_fn. * add/improve some comments --- synapse/storage/events.py | 24 +++++++++++++++++------- synapse/util/async.py | 5 +++++ 2 files changed, 22 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4f0b43c36d..637640ec2a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -21,7 +21,7 @@ from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, preserve_context_over_deferred + preserve_fn, PreserveLoggingContext, make_deferred_yieldable ) from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -88,13 +88,23 @@ class _EventPeristenceQueue(object): def add_to_queue(self, room_id, events_and_contexts, backfilled): """Add events to the queue, with the given persist_event options. + NB: due to the normal usage pattern of this method, it does *not* + follow the synapse logcontext rules, and leaves the logcontext in + place whether or not the returned deferred is ready. + Args: room_id (str): events_and_contexts (list[(EventBase, EventContext)]): backfilled (bool): + + Returns: + defer.Deferred: a deferred which will resolve once the events are + persisted. Runs its callbacks *without* a logcontext. """ queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: + # if the last item in the queue has the same `backfilled` setting, + # we can just add these new events to that item. end_item = queue[-1] if end_item.backfilled == backfilled: end_item.events_and_contexts.extend(events_and_contexts) @@ -113,11 +123,11 @@ class _EventPeristenceQueue(object): def handle_queue(self, room_id, per_item_callback): """Attempts to handle the queue for a room if not already being handled. - The given callback will be invoked with for each item in the queue,1 + The given callback will be invoked with for each item in the queue, of type _EventPersistQueueItem. The per_item_callback will continuously be called with new items, unless the queue becomnes empty. The return value of the function will be given to the deferreds waiting on the item, - exceptions will be passed to the deferres as well. + exceptions will be passed to the deferreds as well. This function should therefore be called whenever anything is added to the queue. @@ -233,7 +243,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.iteritems(): - d = preserve_fn(self._event_persist_queue.add_to_queue)( + d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled, ) @@ -242,7 +252,7 @@ class EventsStore(SQLBaseStore): for room_id in partitioned: self._maybe_start_persisting(room_id) - return preserve_context_over_deferred( + return make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) @@ -267,7 +277,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield preserve_context_over_deferred(deferred) + yield make_deferred_yieldable(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1526,7 +1536,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield preserve_context_over_deferred(defer.gatherResults( + res = yield make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], diff --git a/synapse/util/async.py b/synapse/util/async.py index 0fd5b42523..a0a9039475 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -53,6 +53,11 @@ class ObservableDeferred(object): Cancelling or otherwise resolving an observer will not affect the original ObservableDeferred. + + NB that it does not attempt to do anything with logcontexts; in general + you should probably make_deferred_yieldable the deferreds + returned by `observe`, and ensure that the original deferred runs its + callbacks in the sentinel logcontext. """ __slots__ = ["_deferred", "_observers", "_result"] -- cgit 1.5.1