diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index d0f8da7516..ea340e345c 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -473,6 +473,72 @@ class TransportLayerClient(object):
defer.returnValue(content)
@log_function
+ def get_group_profile(self, destination, group_id, requester_user_id):
+ path = PREFIX + "/groups/%s/profile" % (group_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_group_summary(self, destination, group_id, requester_user_id):
+ path = PREFIX + "/groups/%s/summary" % (group_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_group_rooms(self, destination, group_id, requester_user_id):
+ path = PREFIX + "/groups/%s/rooms" % (group_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_group_users(self, destination, group_id, requester_user_id):
+ path = PREFIX + "/groups/%s/users" % (group_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def accept_group_invite(self, destination, group_id, user_id, content):
+ path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def invite_to_group(self, destination, group_id, user_id, content):
+ path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
def invite_to_group_notification(self, destination, group_id, user_id, content):
"""Sent by group server to inform a user's server that they have been
invited.
@@ -488,6 +554,17 @@ class TransportLayerClient(object):
)
@log_function
+ def remove_user_from_group(self, destination, group_id, user_id, content):
+ path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
def remove_user_from_group_notification(self, destination, group_id, user_id,
content):
"""Sent by group server to inform a user's server that they have been
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 4f7d2546cf..0f08334f33 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -715,6 +715,21 @@ class FederationGroupsInviteServlet(BaseFederationServlet):
defer.returnValue((200, new_content))
+class FederationGroupsLocalInviteServlet(BaseFederationServlet):
+ PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ if get_domain_from_id(group_id) != origin:
+ raise SynapseError(403, "group_id doesn't match origin")
+
+ new_content = yield self.handler.on_invite(
+ group_id, user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
"""Accept an invitation from the group server
"""
@@ -750,6 +765,21 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet):
defer.returnValue((200, new_content))
+class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
+ PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ if get_domain_from_id(group_id) != origin:
+ raise SynapseError(403, "user_id doesn't match origin")
+
+ new_content = yield self.handler.user_removed_from_group(
+ group_id, user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
"""A group or user's server renews their attestation
"""
@@ -1053,6 +1083,12 @@ GROUP_SERVER_SERVLET_CLASSES = (
)
+GROUP_LOCAL_SERVLET_CLASSES = (
+ FederationGroupsLocalInviteServlet,
+ FederationGroupsRemoveLocalUserServlet,
+)
+
+
GROUP_ATTESTATION_SERVLET_CLASSES = (
FederationGroupsRenewAttestaionServlet,
)
@@ -1083,6 +1119,14 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
server_name=hs.hostname,
).register(resource)
+ for servletclass in GROUP_LOCAL_SERVLET_CLASSES:
+ servletclass(
+ handler=hs.get_groups_local_handler(),
+ authenticator=authenticator,
+ ratelimiter=ratelimiter,
+ server_name=hs.hostname,
+ ).register(resource)
+
for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES:
servletclass(
handler=hs.get_groups_attestation_renewer(),
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index a00bafe3af..c8559577f7 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -462,7 +462,9 @@ class GroupsServerHandler(object):
}
if self.hs.is_mine_id(user_id):
- raise NotImplementedError()
+ groups_local = self.hs.get_groups_local_handler()
+ res = yield groups_local.on_invite(group_id, user_id, content)
+ local_attestation = None
else:
local_attestation = self.attestations.create_attestation(group_id, user_id)
content.update({
@@ -590,7 +592,8 @@ class GroupsServerHandler(object):
if is_kick:
if self.hs.is_mine_id(user_id):
- raise NotImplementedError()
+ groups_local = self.hs.get_groups_local_handler()
+ yield groups_local.user_removed_from_group(group_id, user_id, {})
else:
yield self.transport_client.remove_user_from_group_notification(
get_domain_from_id(user_id), group_id, user_id, {}
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
new file mode 100644
index 0000000000..3df255b05a
--- /dev/null
+++ b/synapse/handlers/groups_local.py
@@ -0,0 +1,278 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+# TODO: Validate attestations
+# TODO: Allow users to "knock" or simpkly join depending on rules
+# TODO: is_priveged flag to users and is_public to users and rooms
+# TODO: Roles
+# TODO: Audit log for admins (profile updates, membership changes, users who tried
+# to join but were rejected, etc)
+# TODO: Flairs
+# TODO: Add group memebership /sync
+
+
+def _create_rerouter(name):
+ def f(self, group_id, *args, **kwargs):
+ if self.is_mine_id(group_id):
+ return getattr(self.groups_server_handler, name)(
+ group_id, *args, **kwargs
+ )
+
+ repl_layer = self.hs.get_replication_layer()
+ return getattr(repl_layer, name)(group_id, *args, **kwargs)
+ return f
+
+
+class GroupsLocalHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = hs.get_datastore()
+ self.room_list_handler = hs.get_room_list_handler()
+ self.groups_server_handler = hs.get_groups_server_handler()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.keyring = hs.get_keyring()
+ self.is_mine_id = hs.is_mine_id
+ self.signing_key = hs.config.signing_key[0]
+ self.server_name = hs.hostname
+ self.attestations = hs.get_groups_attestation_signing()
+
+ # Ensure attestations get renewed
+ hs.get_groups_attestation_renewer()
+
+ get_group_profile = _create_rerouter("get_group_profile")
+ get_rooms_in_group = _create_rerouter("get_rooms_in_group")
+
+ update_group_summary_room = _create_rerouter("update_group_summary_room")
+ delete_group_summary_room = _create_rerouter("delete_group_summary_room")
+
+ update_group_category = _create_rerouter("update_group_category")
+ delete_group_category = _create_rerouter("delete_group_category")
+ get_group_category = _create_rerouter("get_group_category")
+ get_group_categories = _create_rerouter("get_group_categories")
+
+ update_group_summary_user = _create_rerouter("update_group_summary_user")
+ delete_group_summary_user = _create_rerouter("delete_group_summary_user")
+
+ update_group_role = _create_rerouter("update_group_role")
+ delete_group_role = _create_rerouter("delete_group_role")
+ get_group_role = _create_rerouter("get_group_role")
+ get_group_roles = _create_rerouter("get_group_roles")
+
+ @defer.inlineCallbacks
+ def get_group_summary(self, group_id, requester_user_id):
+ if self.is_mine_id(group_id):
+ res = yield self.groups_server_handler.get_group_summary(
+ group_id, requester_user_id
+ )
+ defer.returnValue(res)
+
+ repl_layer = self.hs.get_replication_layer()
+ res = yield repl_layer.get_group_summary(group_id, requester_user_id)
+
+ chunk = res["users_section"]["users"]
+ valid_users = []
+ for entry in chunk:
+ g_user_id = entry["user_id"]
+ attestation = entry.pop("attestation")
+ try:
+ yield self.attestations.verify_attestation(
+ attestation,
+ group_id=group_id,
+ user_id=g_user_id,
+ )
+ valid_users.append(entry)
+ except Exception as e:
+ logger.info("Failed to verify user is in group: %s", e)
+
+ res["users_section"]["users"] = valid_users
+
+ res["users_section"]["users"].sort(key=lambda e: e.get("order", 0))
+ res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0))
+
+ defer.returnValue(res)
+
+ def create_group(self, group_id, user_id, content):
+ logger.info("Asking to create group with ID: %r", group_id)
+
+ if self.is_mine_id(group_id):
+ return self.groups_server_handler.create_group(
+ group_id, user_id, content
+ )
+
+ repl_layer = self.hs.get_replication_layer()
+ return repl_layer.create_group(group_id, user_id, content) # TODO
+
+ def add_room(self, group_id, user_id, room_id, content):
+ if self.is_mine_id(group_id):
+ return self.groups_server_handler.add_room(
+ group_id, user_id, room_id, content
+ )
+
+ repl_layer = self.hs.get_replication_layer()
+ return repl_layer.add_room_to_group(group_id, user_id, room_id, content) # TODO
+
+ @defer.inlineCallbacks
+ def get_users_in_group(self, group_id, requester_user_id):
+ if self.is_mine_id(group_id):
+ res = yield self.groups_server_handler.get_users_in_group(
+ group_id, requester_user_id
+ )
+ defer.returnValue(res)
+
+ repl_layer = self.hs.get_replication_layer()
+ res = yield repl_layer.get_users_in_group(group_id, requester_user_id) # TODO
+
+ chunk = res["chunk"]
+ valid_entries = []
+ for entry in chunk:
+ g_user_id = entry["user_id"]
+ attestation = entry.pop("attestation")
+ try:
+ yield self.attestations.verify_attestation(
+ attestation,
+ group_id=group_id,
+ user_id=g_user_id,
+ )
+ valid_entries.append(entry)
+ except Exception as e:
+ logger.info("Failed to verify user is in group: %s", e)
+
+ res["chunk"] = valid_entries
+
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
+ def join_group(self, group_id, user_id, content):
+ raise NotImplementedError() # TODO
+
+ @defer.inlineCallbacks
+ def accept_invite(self, group_id, user_id, content):
+ if self.is_mine_id(group_id):
+ yield self.groups_server_handler.accept_invite(
+ group_id, user_id, content
+ )
+ local_attestation = None
+ remote_attestation = None
+ else:
+ local_attestation = self.attestations.create_attestation(group_id, user_id)
+ content["attestation"] = local_attestation
+
+ repl_layer = self.hs.get_replication_layer()
+ res = yield repl_layer.accept_group_invite(group_id, user_id, content)
+
+ remote_attestation = res["attestation"]
+
+ yield self.attestations.verify_attestation(
+ remote_attestation,
+ group_id=group_id,
+ user_id=user_id,
+ )
+
+ yield self.store.register_user_group_membership(
+ group_id, user_id,
+ membership="join",
+ is_admin=False,
+ local_attestation=local_attestation,
+ remote_attestation=remote_attestation,
+ )
+
+ defer.returnValue({})
+
+ @defer.inlineCallbacks
+ def invite(self, group_id, user_id, requester_user_id, config):
+ content = {
+ "requester_user_id": requester_user_id,
+ "config": config,
+ }
+ if self.is_mine_id(group_id):
+ res = yield self.groups_server_handler.invite_to_group(
+ group_id, user_id, requester_user_id, content,
+ )
+ else:
+ repl_layer = self.hs.get_replication_layer()
+ res = yield repl_layer.invite_to_group(
+ group_id, user_id, content,
+ )
+
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
+ def on_invite(self, group_id, user_id, content):
+ # TODO: Support auto join and rejection
+
+ if not self.is_mine_id(user_id):
+ raise SynapseError(400, "User not on this server")
+
+ local_profile = {}
+ if "profile" in content:
+ if "name" in content["profile"]:
+ local_profile["name"] = content["profile"]["name"]
+ if "avatar_url" in content["profile"]:
+ local_profile["avatar_url"] = content["profile"]["avatar_url"]
+
+ yield self.store.register_user_group_membership(
+ group_id, user_id,
+ membership="invite",
+ content={"profile": local_profile, "inviter": content["inviter"]},
+ )
+
+ defer.returnValue({"state": "invite"})
+
+ @defer.inlineCallbacks
+ def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
+ if user_id == requester_user_id:
+ yield self.store.register_user_group_membership(
+ group_id, user_id,
+ membership="leave",
+ )
+
+ # TODO: Should probably remember that we tried to leave so that we can
+ # retry if the group server is currently down.
+
+ if self.is_mine_id(group_id):
+ res = yield self.groups_server_handler.remove_user_from_group(
+ group_id, user_id, requester_user_id, content,
+ )
+ else:
+ content["requester_user_id"] = requester_user_id
+ repl_layer = self.hs.get_replication_layer()
+ res = yield repl_layer.remove_user_from_group(
+ group_id, user_id, content
+ ) # TODO
+
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
+ def user_removed_from_group(self, group_id, user_id, content):
+ # TODO: Check if user in group
+ yield self.store.register_user_group_membership(
+ group_id, user_id,
+ membership="leave",
+ )
+
+ @defer.inlineCallbacks
+ def get_joined_groups(self, user_id):
+ group_ids = yield self.store.get_joined_groups(user_id)
+ defer.returnValue({"groups": group_ids})
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 3d809d181b..16f5a73b95 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -52,6 +52,7 @@ from synapse.rest.client.v2_alpha import (
thirdparty,
sendtodevice,
user_directory,
+ groups,
)
from synapse.http.server import JsonResource
@@ -102,3 +103,4 @@ class ClientRestResource(JsonResource):
thirdparty.register_servlets(hs, client_resource)
sendtodevice.register_servlets(hs, client_resource)
user_directory.register_servlets(hs, client_resource)
+ groups.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py
new file mode 100644
index 0000000000..255552c365
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/groups.py
@@ -0,0 +1,642 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import GroupID
+
+from ._base import client_v2_patterns
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class GroupServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/profile$")
+
+ def __init__(self, hs):
+ super(GroupServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ group_description = yield self.groups_handler.get_group_profile(group_id, user_id)
+
+ defer.returnValue((200, group_description))
+
+
+class GroupSummaryServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/summary$")
+
+ def __init__(self, hs):
+ super(GroupSummaryServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id)
+
+ defer.returnValue((200, get_group_summary))
+
+
+class GroupSummaryRoomsServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/summary/rooms$")
+
+ def __init__(self, hs):
+ super(GroupSummaryServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id)
+
+ defer.returnValue((200, get_group_summary))
+
+
+class GroupSummaryRoomsDefaultCatServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/summary/rooms/(?P<room_id>[^/]*)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupSummaryRoomsDefaultCatServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ resp = yield self.groups_handler.update_group_summary_room(
+ group_id, user_id,
+ room_id=room_id,
+ category_id=None,
+ content=content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, group_id, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ resp = yield self.groups_handler.delete_group_summary_room(
+ group_id, user_id,
+ room_id=room_id,
+ category_id=None,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class GroupSummaryRoomsCatServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/summary"
+ "/categories/(?P<category_id>[^/]+)/rooms/(?P<room_id>[^/]+)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupSummaryRoomsCatServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, category_id, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ resp = yield self.groups_handler.update_group_summary_room(
+ group_id, user_id,
+ room_id=room_id,
+ category_id=category_id,
+ content=content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, group_id, category_id, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ resp = yield self.groups_handler.delete_group_summary_room(
+ group_id, user_id,
+ room_id=room_id,
+ category_id=category_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class GroupCategoryServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupCategoryServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id, category_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ category = yield self.groups_handler.get_group_category(
+ group_id, user_id,
+ category_id=category_id,
+ )
+
+ defer.returnValue((200, category))
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, category_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ resp = yield self.groups_handler.update_group_category(
+ group_id, user_id,
+ category_id=category_id,
+ content=content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, group_id, category_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ resp = yield self.groups_handler.delete_group_category(
+ group_id, user_id,
+ category_id=category_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class GroupCategoriesServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/categories/$"
+ )
+
+ def __init__(self, hs):
+ super(GroupCategoriesServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ category = yield self.groups_handler.get_group_categories(
+ group_id, user_id,
+ )
+
+ defer.returnValue((200, category))
+
+
+class GroupRoleServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupRoleServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id, role_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ category = yield self.groups_handler.get_group_role(
+ group_id, user_id,
+ role_id=role_id,
+ )
+
+ defer.returnValue((200, category))
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, role_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ resp = yield self.groups_handler.update_group_role(
+ group_id, user_id,
+ role_id=role_id,
+ content=content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, group_id, role_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ resp = yield self.groups_handler.delete_group_role(
+ group_id, user_id,
+ role_id=role_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class GroupRolesServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/roles/$"
+ )
+
+ def __init__(self, hs):
+ super(GroupRolesServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ category = yield self.groups_handler.get_group_roles(
+ group_id, user_id,
+ )
+
+ defer.returnValue((200, category))
+
+
+class GroupSummaryUsersDefaultRoleServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/summary/users/(?P<user_id>[^/]*)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupSummaryUsersDefaultRoleServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, user_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ resp = yield self.groups_handler.update_group_summary_user(
+ group_id, requester_user_id,
+ user_id=user_id,
+ role_id=None,
+ content=content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, group_id, user_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ resp = yield self.groups_handler.delete_group_summary_user(
+ group_id, requester_user_id,
+ user_id=user_id,
+ role_id=None,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class GroupSummaryUsersRoleServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/summary"
+ "/roles/(?P<role_id>[^/]+)/users/(?P<user_id>[^/]+)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupSummaryUsersRoleServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, role_id, user_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ resp = yield self.groups_handler.update_group_summary_user(
+ group_id, requester_user_id,
+ user_id=user_id,
+ role_id=role_id,
+ content=content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, group_id, role_id, user_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ resp = yield self.groups_handler.delete_group_summary_user(
+ group_id, requester_user_id,
+ user_id=user_id,
+ role_id=role_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class GroupRoomServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/rooms$")
+
+ def __init__(self, hs):
+ super(GroupRoomServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ result = yield self.groups_handler.get_rooms_in_group(group_id, user_id)
+
+ defer.returnValue((200, result))
+
+
+class GroupUsersServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/users$")
+
+ def __init__(self, hs):
+ super(GroupUsersServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ result = yield self.groups_handler.get_users_in_group(group_id, user_id)
+
+ defer.returnValue((200, result))
+
+
+class GroupCreateServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/create_group$")
+
+ def __init__(self, hs):
+ super(GroupCreateServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+ self.server_name = hs.hostname
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ # TODO: Create group on remote server
+ content = parse_json_object_from_request(request)
+ localpart = content.pop("localpart")
+ group_id = GroupID.create(localpart, self.server_name).to_string()
+
+ result = yield self.groups_handler.create_group(group_id, user_id, content)
+
+ defer.returnValue((200, result))
+
+
+class GroupAdminRoomsServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/admin/rooms/(?P<room_id>[^/]*)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupAdminRoomsServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ result = yield self.groups_handler.add_room(group_id, user_id, room_id, content)
+
+ defer.returnValue((200, result))
+
+
+class GroupAdminUsersInviteServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/admin/users/invite/(?P<user_id>[^/]*)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupAdminUsersInviteServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+ self.store = hs.get_datastore()
+ self.is_mine_id = hs.is_mine_id
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, user_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ config = content.get("config", {})
+ result = yield self.groups_handler.invite(
+ group_id, user_id, requester_user_id, config,
+ )
+
+ defer.returnValue((200, result))
+
+
+class GroupAdminUsersKickServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/admin/users/remove/(?P<user_id>[^/]*)$"
+ )
+
+ def __init__(self, hs):
+ super(GroupAdminUsersKickServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id, user_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ result = yield self.groups_handler.remove_user_from_group(
+ group_id, user_id, requester_user_id, content,
+ )
+
+ defer.returnValue((200, result))
+
+
+class GroupSelfLeaveServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/self/leave$"
+ )
+
+ def __init__(self, hs):
+ super(GroupSelfLeaveServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ result = yield self.groups_handler.remove_user_from_group(
+ group_id, requester_user_id, requester_user_id, content,
+ )
+
+ defer.returnValue((200, result))
+
+
+class GroupSelfJoinServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/self/join$"
+ )
+
+ def __init__(self, hs):
+ super(GroupSelfJoinServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ result = yield self.groups_handler.join_group(
+ group_id, requester_user_id, content,
+ )
+
+ defer.returnValue((200, result))
+
+
+class GroupSelfAcceptInviteServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/groups/(?P<group_id>[^/]*)/self/accept_invite$"
+ )
+
+ def __init__(self, hs):
+ super(GroupSelfAcceptInviteServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+ result = yield self.groups_handler.accept_invite(
+ group_id, requester_user_id, content,
+ )
+
+ defer.returnValue((200, result))
+
+
+class GroupsForUserServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/joined_groups$"
+ )
+
+ def __init__(self, hs):
+ super(GroupsForUserServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ result = yield self.groups_handler.get_joined_groups(user_id)
+
+ defer.returnValue((200, result))
+
+
+def register_servlets(hs, http_server):
+ GroupServlet(hs).register(http_server)
+ GroupSummaryServlet(hs).register(http_server)
+ GroupUsersServlet(hs).register(http_server)
+ GroupRoomServlet(hs).register(http_server)
+ GroupCreateServlet(hs).register(http_server)
+ GroupAdminRoomsServlet(hs).register(http_server)
+ GroupAdminUsersInviteServlet(hs).register(http_server)
+ GroupAdminUsersKickServlet(hs).register(http_server)
+ GroupSelfLeaveServlet(hs).register(http_server)
+ GroupSelfJoinServlet(hs).register(http_server)
+ GroupSelfAcceptInviteServlet(hs).register(http_server)
+ GroupsForUserServlet(hs).register(http_server)
+ GroupSummaryRoomsDefaultCatServlet(hs).register(http_server)
+ GroupCategoryServlet(hs).register(http_server)
+ GroupCategoriesServlet(hs).register(http_server)
+ GroupSummaryRoomsCatServlet(hs).register(http_server)
+ GroupRoleServlet(hs).register(http_server)
+ GroupRolesServlet(hs).register(http_server)
+ GroupSummaryUsersDefaultRoleServlet(hs).register(http_server)
+ GroupSummaryUsersRoleServlet(hs).register(http_server)
diff --git a/synapse/server.py b/synapse/server.py
index d857cca848..d0a6272766 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -50,6 +50,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.user_directory import UserDirectoyHandler
+from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.groups.groups_server import GroupsServerHandler
from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
@@ -141,6 +142,7 @@ class HomeServer(object):
'read_marker_handler',
'action_generator',
'user_directory_handler',
+ 'groups_local_handler',
'groups_server_handler',
'groups_attestation_signing',
'groups_attestation_renewer',
@@ -314,6 +316,9 @@ class HomeServer(object):
def build_user_directory_handler(self):
return UserDirectoyHandler(self)
+ def build_groups_local_handler(self):
+ return GroupsLocalHandler(self)
+
def build_groups_server_handler(self):
return GroupsServerHandler(self)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index fdee9f1ad5..594566eb38 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "pushers", "id",
extra_tables=[("deleted_pushers", "stream_id")],
)
+ self._group_updates_id_gen = StreamIdGenerator(
+ db_conn, "local_group_updates", "stream_id",
+ )
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
@@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=curr_state_delta_prefill,
)
+ _group_updates_prefill, min_group_updates_id = self._get_cache_dict(
+ db_conn, "local_group_updates",
+ entity_column="user_id",
+ stream_column="stream_id",
+ max_value=self._group_updates_id_gen.get_current_token(),
+ limit=1000,
+ )
+ self._group_updates_stream_cache = StreamChangeCache(
+ "_group_updates_stream_cache", min_group_updates_id,
+ prefilled_cache=_group_updates_prefill,
+ )
+
cur = LoggingTransaction(
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index e8a799d8c7..036549d437 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -757,6 +757,103 @@ class GroupServerStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def register_user_group_membership(self, group_id, user_id, membership,
+ is_admin=False, content={},
+ local_attestation=None,
+ remote_attestation=None,
+ ):
+ def _register_user_group_membership_txn(txn, next_id):
+ # TODO: Upsert?
+ self._simple_delete_txn(
+ txn,
+ table="local_group_membership",
+ keyvalues={
+ "group_id": group_id,
+ "user_id": user_id,
+ },
+ )
+ self._simple_insert_txn(
+ txn,
+ table="local_group_membership",
+ values={
+ "group_id": group_id,
+ "user_id": user_id,
+ "is_admin": is_admin,
+ "membership": membership,
+ "content": json.dumps(content),
+ },
+ )
+ self._simple_delete_txn(
+ txn,
+ table="local_group_updates",
+ keyvalues={
+ "group_id": group_id,
+ "user_id": user_id,
+ "type": "membership",
+ },
+ )
+ self._simple_insert_txn(
+ txn,
+ table="local_group_updates",
+ values={
+ "stream_id": next_id,
+ "group_id": group_id,
+ "user_id": user_id,
+ "type": "membership",
+ "content": json.dumps({"membership": membership, "content": content}),
+ }
+ )
+ self._group_updates_stream_cache.entity_has_changed(user_id, next_id)
+
+ # TODO: Insert profile to ensuer it comes down stream if its a join.
+
+ if membership == "join":
+ if local_attestation:
+ self._simple_insert_txn(
+ txn,
+ table="group_attestations_renewals",
+ values={
+ "group_id": group_id,
+ "user_id": user_id,
+ "valid_until_ms": local_attestation["valid_until_ms"],
+ }
+ )
+ if remote_attestation:
+ self._simple_insert_txn(
+ txn,
+ table="group_attestations_remote",
+ values={
+ "group_id": group_id,
+ "user_id": user_id,
+ "valid_until_ms": remote_attestation["valid_until_ms"],
+ "attestation": json.dumps(remote_attestation),
+ }
+ )
+ else:
+ self._simple_delete_txn(
+ txn,
+ table="group_attestations_renewals",
+ keyvalues={
+ "group_id": group_id,
+ "user_id": user_id,
+ },
+ )
+ self._simple_delete_txn(
+ txn,
+ table="group_attestations_remote",
+ keyvalues={
+ "group_id": group_id,
+ "user_id": user_id,
+ },
+ )
+
+ with self._group_updates_id_gen.get_next() as next_id:
+ yield self.runInteraction(
+ "register_user_group_membership",
+ _register_user_group_membership_txn, next_id,
+ )
+
+ @defer.inlineCallbacks
def create_group(self, group_id, user_id, name, avatar_url, short_description,
long_description,):
yield self._simple_insert(
@@ -771,6 +868,61 @@ class GroupServerStore(SQLBaseStore):
desc="create_group",
)
+ def get_joined_groups(self, user_id):
+ return self._simple_select_onecol(
+ table="local_group_membership",
+ keyvalues={
+ "user_id": user_id,
+ "membership": "join",
+ },
+ retcol="group_id",
+ desc="get_joined_groups",
+ )
+
+ def get_all_groups_for_user(self, user_id, now_token):
+ def _get_all_groups_for_user_txn(txn):
+ sql = """
+ SELECT group_id, type, membership, u.content
+ FROM local_group_updates AS u
+ INNER JOIN local_group_membership USING (group_id, user_id)
+ WHERE user_id = ? AND membership != 'leave'
+ AND stream_id <= ?
+ """
+ txn.execute(sql, (user_id, now_token,))
+ return self.cursor_to_dict(txn)
+ return self.runInteraction(
+ "get_all_groups_for_user", _get_all_groups_for_user_txn,
+ )
+
+ def get_groups_changes_for_user(self, user_id, from_token, to_token):
+ from_token = int(from_token)
+ has_changed = self._group_updates_stream_cache.has_entity_changed(
+ user_id, from_token,
+ )
+ if not has_changed:
+ return []
+
+ def _get_groups_changes_for_user_txn(txn):
+ sql = """
+ SELECT group_id, membership, type, u.content
+ FROM local_group_updates AS u
+ INNER JOIN local_group_membership USING (group_id, user_id)
+ WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+ """
+ txn.execute(sql, (user_id, from_token, to_token,))
+ return [{
+ "group_id": group_id,
+ "membership": membership,
+ "type": gtype,
+ "content": json.loads(content_json),
+ } for group_id, membership, gtype, content_json in txn]
+ return self.runInteraction(
+ "get_groups_changes_for_user", _get_groups_changes_for_user_txn,
+ )
+
+ def get_group_stream_token(self):
+ return self._group_updates_id_gen.get_current_token()
+
def get_attestations_need_renewals(self, valid_until_ms):
"""Get all attestations that need to be renewed until givent time
"""
diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql
index 472aab0a78..e32db8b313 100644
--- a/synapse/storage/schema/delta/43/group_server.sql
+++ b/synapse/storage/schema/delta/43/group_server.sql
@@ -142,3 +142,31 @@ CREATE TABLE group_attestations_remote (
CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id);
CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id);
CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms);
+
+
+CREATE TABLE local_group_membership (
+ group_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ is_admin BOOLEAN NOT NULL,
+ membership TEXT NOT NULL,
+ content TEXT NOT NULL
+);
+
+CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id);
+CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id);
+
+
+CREATE TABLE local_group_updates (
+ stream_id BIGINT NOT NULL,
+ group_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ content TEXT NOT NULL
+);
+
+
+CREATE TABLE local_group_profiles (
+ group_id TEXT NOT NULL,
+ name TEXT,
+ avatar_url TEXT
+);
|