From b8ca494ee9e42e5b1aca8958088bd35cc5707437 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 15:44:15 +0100 Subject: Initial group server implementation --- synapse/handlers/room_list.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 516cd9a6ac..41e1781df7 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -276,13 +276,14 @@ class RoomListHandler(BaseHandler): # We've already got enough, so lets just drop it. return - result = yield self._generate_room_entry(room_id, num_joined_users) + result = yield self.generate_room_entry(room_id, num_joined_users) if result and _matches_room_entry(result, search_filter): chunk.append(result) @cachedInlineCallbacks(num_args=1, cache_context=True) - def _generate_room_entry(self, room_id, num_joined_users, cache_context): + def generate_room_entry(self, room_id, num_joined_users, cache_context, + with_alias=True, allow_private=False): """Returns the entry for a room """ result = { @@ -316,14 +317,15 @@ class RoomListHandler(BaseHandler): join_rules_event = current_state.get((EventTypes.JoinRules, "")) if join_rules_event: join_rule = join_rules_event.content.get("join_rule", None) - if join_rule and join_rule != JoinRules.PUBLIC: + if not allow_private and join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) - aliases = yield self.store.get_aliases_for_room( - room_id, on_invalidate=cache_context.invalidate - ) - if aliases: - result["aliases"] = aliases + if with_alias: + aliases = yield self.store.get_aliases_for_room( + room_id, on_invalidate=cache_context.invalidate + ) + if aliases: + result["aliases"] = aliases name_event = yield current_state.get((EventTypes.Name, "")) if name_event: -- cgit 1.4.1 From 2f9eafdd369796d8b7731b24ab8cf6a98ad19e29 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 14:52:27 +0100 Subject: Add local group server support --- synapse/federation/transport/client.py | 77 +++ synapse/federation/transport/server.py | 44 ++ synapse/groups/groups_server.py | 7 +- synapse/handlers/groups_local.py | 278 ++++++++++ synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/groups.py | 642 +++++++++++++++++++++++ synapse/server.py | 5 + synapse/storage/__init__.py | 15 + synapse/storage/group_server.py | 152 ++++++ synapse/storage/schema/delta/43/group_server.sql | 28 + 10 files changed, 1248 insertions(+), 2 deletions(-) create mode 100644 synapse/handlers/groups_local.py create mode 100644 synapse/rest/client/v2_alpha/groups.py (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d0f8da7516..ea340e345c 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -472,6 +472,72 @@ class TransportLayerClient(object): defer.returnValue(content) + @log_function + def get_group_profile(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/profile" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_summary(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/summary" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_rooms(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/rooms" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_users(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/users" % (group_id,) + + return self.client.post_json( + destination=destination, + path=path, + data={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def accept_group_invite(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + @log_function + def invite_to_group(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + @log_function def invite_to_group_notification(self, destination, group_id, user_id, content): """Sent by group server to inform a user's server that they have been @@ -487,6 +553,17 @@ class TransportLayerClient(object): ignore_backoff=True, ) + @log_function + def remove_user_from_group(self, destination, group_id, user_id, content): + path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + @log_function def remove_user_from_group_notification(self, destination, group_id, user_id, content): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 4f7d2546cf..0f08334f33 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -715,6 +715,21 @@ class FederationGroupsInviteServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsLocalInviteServlet(BaseFederationServlet): + PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/invite$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "group_id doesn't match origin") + + new_content = yield self.handler.on_invite( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsAcceptInviteServlet(BaseFederationServlet): """Accept an invitation from the group server """ @@ -750,6 +765,21 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): + PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/remove$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "user_id doesn't match origin") + + new_content = yield self.handler.user_removed_from_group( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): """A group or user's server renews their attestation """ @@ -1053,6 +1083,12 @@ GROUP_SERVER_SERVLET_CLASSES = ( ) +GROUP_LOCAL_SERVLET_CLASSES = ( + FederationGroupsLocalInviteServlet, + FederationGroupsRemoveLocalUserServlet, +) + + GROUP_ATTESTATION_SERVLET_CLASSES = ( FederationGroupsRenewAttestaionServlet, ) @@ -1083,6 +1119,14 @@ def register_servlets(hs, resource, authenticator, ratelimiter): server_name=hs.hostname, ).register(resource) + for servletclass in GROUP_LOCAL_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_local_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: servletclass( handler=hs.get_groups_attestation_renewer(), diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index a00bafe3af..c8559577f7 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -462,7 +462,9 @@ class GroupsServerHandler(object): } if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + res = yield groups_local.on_invite(group_id, user_id, content) + local_attestation = None else: local_attestation = self.attestations.create_attestation(group_id, user_id) content.update({ @@ -590,7 +592,8 @@ class GroupsServerHandler(object): if is_kick: if self.hs.is_mine_id(user_id): - raise NotImplementedError() + groups_local = self.hs.get_groups_local_handler() + yield groups_local.user_removed_from_group(group_id, user_id, {}) else: yield self.transport_client.remove_user_from_group_notification( get_domain_from_id(user_id), group_id, user_id, {} diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py new file mode 100644 index 0000000000..3df255b05a --- /dev/null +++ b/synapse/handlers/groups_local.py @@ -0,0 +1,278 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import SynapseError + +import logging + +logger = logging.getLogger(__name__) + + +# TODO: Validate attestations +# TODO: Allow users to "knock" or simpkly join depending on rules +# TODO: is_priveged flag to users and is_public to users and rooms +# TODO: Roles +# TODO: Audit log for admins (profile updates, membership changes, users who tried +# to join but were rejected, etc) +# TODO: Flairs +# TODO: Add group memebership /sync + + +def _create_rerouter(name): + def f(self, group_id, *args, **kwargs): + if self.is_mine_id(group_id): + return getattr(self.groups_server_handler, name)( + group_id, *args, **kwargs + ) + + repl_layer = self.hs.get_replication_layer() + return getattr(repl_layer, name)(group_id, *args, **kwargs) + return f + + +class GroupsLocalHandler(object): + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + self.room_list_handler = hs.get_room_list_handler() + self.groups_server_handler = hs.get_groups_server_handler() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.keyring = hs.get_keyring() + self.is_mine_id = hs.is_mine_id + self.signing_key = hs.config.signing_key[0] + self.server_name = hs.hostname + self.attestations = hs.get_groups_attestation_signing() + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + get_group_profile = _create_rerouter("get_group_profile") + get_rooms_in_group = _create_rerouter("get_rooms_in_group") + + update_group_summary_room = _create_rerouter("update_group_summary_room") + delete_group_summary_room = _create_rerouter("delete_group_summary_room") + + update_group_category = _create_rerouter("update_group_category") + delete_group_category = _create_rerouter("delete_group_category") + get_group_category = _create_rerouter("get_group_category") + get_group_categories = _create_rerouter("get_group_categories") + + update_group_summary_user = _create_rerouter("update_group_summary_user") + delete_group_summary_user = _create_rerouter("delete_group_summary_user") + + update_group_role = _create_rerouter("update_group_role") + delete_group_role = _create_rerouter("delete_group_role") + get_group_role = _create_rerouter("get_group_role") + get_group_roles = _create_rerouter("get_group_roles") + + @defer.inlineCallbacks + def get_group_summary(self, group_id, requester_user_id): + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_group_summary( + group_id, requester_user_id + ) + defer.returnValue(res) + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.get_group_summary(group_id, requester_user_id) + + chunk = res["users_section"]["users"] + valid_users = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation") + try: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + ) + valid_users.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["users_section"]["users"] = valid_users + + res["users_section"]["users"].sort(key=lambda e: e.get("order", 0)) + res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0)) + + defer.returnValue(res) + + def create_group(self, group_id, user_id, content): + logger.info("Asking to create group with ID: %r", group_id) + + if self.is_mine_id(group_id): + return self.groups_server_handler.create_group( + group_id, user_id, content + ) + + repl_layer = self.hs.get_replication_layer() + return repl_layer.create_group(group_id, user_id, content) # TODO + + def add_room(self, group_id, user_id, room_id, content): + if self.is_mine_id(group_id): + return self.groups_server_handler.add_room( + group_id, user_id, room_id, content + ) + + repl_layer = self.hs.get_replication_layer() + return repl_layer.add_room_to_group(group_id, user_id, room_id, content) # TODO + + @defer.inlineCallbacks + def get_users_in_group(self, group_id, requester_user_id): + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_users_in_group( + group_id, requester_user_id + ) + defer.returnValue(res) + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.get_users_in_group(group_id, requester_user_id) # TODO + + chunk = res["chunk"] + valid_entries = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation") + try: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + ) + valid_entries.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["chunk"] = valid_entries + + defer.returnValue(res) + + @defer.inlineCallbacks + def join_group(self, group_id, user_id, content): + raise NotImplementedError() # TODO + + @defer.inlineCallbacks + def accept_invite(self, group_id, user_id, content): + if self.is_mine_id(group_id): + yield self.groups_server_handler.accept_invite( + group_id, user_id, content + ) + local_attestation = None + remote_attestation = None + else: + local_attestation = self.attestations.create_attestation(group_id, user_id) + content["attestation"] = local_attestation + + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.accept_group_invite(group_id, user_id, content) + + remote_attestation = res["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + group_id=group_id, + user_id=user_id, + ) + + yield self.store.register_user_group_membership( + group_id, user_id, + membership="join", + is_admin=False, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + ) + + defer.returnValue({}) + + @defer.inlineCallbacks + def invite(self, group_id, user_id, requester_user_id, config): + content = { + "requester_user_id": requester_user_id, + "config": config, + } + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.invite_to_group( + group_id, user_id, requester_user_id, content, + ) + else: + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.invite_to_group( + group_id, user_id, content, + ) + + defer.returnValue(res) + + @defer.inlineCallbacks + def on_invite(self, group_id, user_id, content): + # TODO: Support auto join and rejection + + if not self.is_mine_id(user_id): + raise SynapseError(400, "User not on this server") + + local_profile = {} + if "profile" in content: + if "name" in content["profile"]: + local_profile["name"] = content["profile"]["name"] + if "avatar_url" in content["profile"]: + local_profile["avatar_url"] = content["profile"]["avatar_url"] + + yield self.store.register_user_group_membership( + group_id, user_id, + membership="invite", + content={"profile": local_profile, "inviter": content["inviter"]}, + ) + + defer.returnValue({"state": "invite"}) + + @defer.inlineCallbacks + def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + if user_id == requester_user_id: + yield self.store.register_user_group_membership( + group_id, user_id, + membership="leave", + ) + + # TODO: Should probably remember that we tried to leave so that we can + # retry if the group server is currently down. + + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.remove_user_from_group( + group_id, user_id, requester_user_id, content, + ) + else: + content["requester_user_id"] = requester_user_id + repl_layer = self.hs.get_replication_layer() + res = yield repl_layer.remove_user_from_group( + group_id, user_id, content + ) # TODO + + defer.returnValue(res) + + @defer.inlineCallbacks + def user_removed_from_group(self, group_id, user_id, content): + # TODO: Check if user in group + yield self.store.register_user_group_membership( + group_id, user_id, + membership="leave", + ) + + @defer.inlineCallbacks + def get_joined_groups(self, user_id): + group_ids = yield self.store.get_joined_groups(user_id) + defer.returnValue({"groups": group_ids}) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 3d809d181b..16f5a73b95 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -52,6 +52,7 @@ from synapse.rest.client.v2_alpha import ( thirdparty, sendtodevice, user_directory, + groups, ) from synapse.http.server import JsonResource @@ -102,3 +103,4 @@ class ClientRestResource(JsonResource): thirdparty.register_servlets(hs, client_resource) sendtodevice.register_servlets(hs, client_resource) user_directory.register_servlets(hs, client_resource) + groups.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py new file mode 100644 index 0000000000..255552c365 --- /dev/null +++ b/synapse/rest/client/v2_alpha/groups.py @@ -0,0 +1,642 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.types import GroupID + +from ._base import client_v2_patterns + +import logging + +logger = logging.getLogger(__name__) + + +class GroupServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/profile$") + + def __init__(self, hs): + super(GroupServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + group_description = yield self.groups_handler.get_group_profile(group_id, user_id) + + defer.returnValue((200, group_description)) + + +class GroupSummaryServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/summary$") + + def __init__(self, hs): + super(GroupSummaryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id) + + defer.returnValue((200, get_group_summary)) + + +class GroupSummaryRoomsServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/summary/rooms$") + + def __init__(self, hs): + super(GroupSummaryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + get_group_summary = yield self.groups_handler.get_group_summary(group_id, user_id) + + defer.returnValue((200, get_group_summary)) + + +class GroupSummaryRoomsDefaultCatServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary/rooms/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupSummaryRoomsDefaultCatServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=None, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=None, + ) + + defer.returnValue((200, resp)) + + +class GroupSummaryRoomsCatServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary" + "/categories/(?P[^/]+)/rooms/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupSummaryRoomsCatServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, category_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=category_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, category_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_room( + group_id, user_id, + room_id=room_id, + category_id=category_id, + ) + + defer.returnValue((200, resp)) + + +class GroupCategoryServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/categories/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupCategoryServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_category( + group_id, user_id, + category_id=category_id, + ) + + defer.returnValue((200, category)) + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_category( + group_id, user_id, + category_id=category_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, category_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_category( + group_id, user_id, + category_id=category_id, + ) + + defer.returnValue((200, resp)) + + +class GroupCategoriesServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/categories/$" + ) + + def __init__(self, hs): + super(GroupCategoriesServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_categories( + group_id, user_id, + ) + + defer.returnValue((200, category)) + + +class GroupRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/roles/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_role( + group_id, user_id, + role_id=role_id, + ) + + defer.returnValue((200, category)) + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_role( + group_id, user_id, + role_id=role_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, role_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_role( + group_id, user_id, + role_id=role_id, + ) + + defer.returnValue((200, resp)) + + +class GroupRolesServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/roles/$" + ) + + def __init__(self, hs): + super(GroupRolesServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + category = yield self.groups_handler.get_group_roles( + group_id, user_id, + ) + + defer.returnValue((200, category)) + + +class GroupSummaryUsersDefaultRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary/users/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupSummaryUsersDefaultRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=None, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=None, + ) + + defer.returnValue((200, resp)) + + +class GroupSummaryUsersRoleServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/summary" + "/roles/(?P[^/]+)/users/(?P[^/]+)$" + ) + + def __init__(self, hs): + super(GroupSummaryUsersRoleServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, role_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + resp = yield self.groups_handler.update_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=role_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, role_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + resp = yield self.groups_handler.delete_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=role_id, + ) + + defer.returnValue((200, resp)) + + +class GroupRoomServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/rooms$") + + def __init__(self, hs): + super(GroupRoomServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_rooms_in_group(group_id, user_id) + + defer.returnValue((200, result)) + + +class GroupUsersServlet(RestServlet): + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/users$") + + def __init__(self, hs): + super(GroupUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_users_in_group(group_id, user_id) + + defer.returnValue((200, result)) + + +class GroupCreateServlet(RestServlet): + PATTERNS = client_v2_patterns("/create_group$") + + def __init__(self, hs): + super(GroupCreateServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + self.server_name = hs.hostname + + @defer.inlineCallbacks + def on_POST(self, request): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + # TODO: Create group on remote server + content = parse_json_object_from_request(request) + localpart = content.pop("localpart") + group_id = GroupID.create(localpart, self.server_name).to_string() + + result = yield self.groups_handler.create_group(group_id, user_id, content) + + defer.returnValue((200, result)) + + +class GroupAdminRoomsServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/rooms/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminRoomsServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.add_room(group_id, user_id, room_id, content) + + defer.returnValue((200, result)) + + +class GroupAdminUsersInviteServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/users/invite/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminUsersInviteServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + self.store = hs.get_datastore() + self.is_mine_id = hs.is_mine_id + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + config = content.get("config", {}) + result = yield self.groups_handler.invite( + group_id, user_id, requester_user_id, config, + ) + + defer.returnValue((200, result)) + + +class GroupAdminUsersKickServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/admin/users/remove/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(GroupAdminUsersKickServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id, user_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.remove_user_from_group( + group_id, user_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfLeaveServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/leave$" + ) + + def __init__(self, hs): + super(GroupSelfLeaveServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.remove_user_from_group( + group_id, requester_user_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfJoinServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/join$" + ) + + def __init__(self, hs): + super(GroupSelfJoinServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.join_group( + group_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupSelfAcceptInviteServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/groups/(?P[^/]*)/self/accept_invite$" + ) + + def __init__(self, hs): + super(GroupSelfAcceptInviteServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + requester_user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + result = yield self.groups_handler.accept_invite( + group_id, requester_user_id, content, + ) + + defer.returnValue((200, result)) + + +class GroupsForUserServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/joined_groups$" + ) + + def __init__(self, hs): + super(GroupsForUserServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_joined_groups(user_id) + + defer.returnValue((200, result)) + + +def register_servlets(hs, http_server): + GroupServlet(hs).register(http_server) + GroupSummaryServlet(hs).register(http_server) + GroupUsersServlet(hs).register(http_server) + GroupRoomServlet(hs).register(http_server) + GroupCreateServlet(hs).register(http_server) + GroupAdminRoomsServlet(hs).register(http_server) + GroupAdminUsersInviteServlet(hs).register(http_server) + GroupAdminUsersKickServlet(hs).register(http_server) + GroupSelfLeaveServlet(hs).register(http_server) + GroupSelfJoinServlet(hs).register(http_server) + GroupSelfAcceptInviteServlet(hs).register(http_server) + GroupsForUserServlet(hs).register(http_server) + GroupSummaryRoomsDefaultCatServlet(hs).register(http_server) + GroupCategoryServlet(hs).register(http_server) + GroupCategoriesServlet(hs).register(http_server) + GroupSummaryRoomsCatServlet(hs).register(http_server) + GroupRoleServlet(hs).register(http_server) + GroupRolesServlet(hs).register(http_server) + GroupSummaryUsersDefaultRoleServlet(hs).register(http_server) + GroupSummaryUsersRoleServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index d857cca848..d0a6272766 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -50,6 +50,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.user_directory import UserDirectoyHandler +from synapse.handlers.groups_local import GroupsLocalHandler from synapse.groups.groups_server import GroupsServerHandler from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory @@ -141,6 +142,7 @@ class HomeServer(object): 'read_marker_handler', 'action_generator', 'user_directory_handler', + 'groups_local_handler', 'groups_server_handler', 'groups_attestation_signing', 'groups_attestation_renewer', @@ -314,6 +316,9 @@ class HomeServer(object): def build_user_directory_handler(self): return UserDirectoyHandler(self) + def build_groups_local_handler(self): + return GroupsLocalHandler(self) + def build_groups_server_handler(self): return GroupsServerHandler(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fdee9f1ad5..594566eb38 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._group_updates_id_gen = StreamIdGenerator( + db_conn, "local_group_updates", "stream_id", + ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) + _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + db_conn, "local_group_updates", + entity_column="user_id", + stream_column="stream_id", + max_value=self._group_updates_id_gen.get_current_token(), + limit=1000, + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", min_group_updates_id, + prefilled_cache=_group_updates_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index e8a799d8c7..036549d437 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -756,6 +756,103 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + @defer.inlineCallbacks + def register_user_group_membership(self, group_id, user_id, membership, + is_admin=False, content={}, + local_attestation=None, + remote_attestation=None, + ): + def _register_user_group_membership_txn(txn, next_id): + # TODO: Upsert? + self._simple_delete_txn( + txn, + table="local_group_membership", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_insert_txn( + txn, + table="local_group_membership", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "membership": membership, + "content": json.dumps(content), + }, + ) + self._simple_delete_txn( + txn, + table="local_group_updates", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + "type": "membership", + }, + ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + + # TODO: Insert profile to ensuer it comes down stream if its a join. + + if membership == "join": + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + } + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + } + ) + else: + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) + @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, long_description,): @@ -771,6 +868,61 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 472aab0a78..e32db8b313 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -142,3 +142,31 @@ CREATE TABLE group_attestations_remote ( CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); + + +CREATE TABLE local_group_membership ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + is_admin BOOLEAN NOT NULL, + membership TEXT NOT NULL, + content TEXT NOT NULL +); + +CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); +CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); + + +CREATE TABLE local_group_profiles ( + group_id TEXT NOT NULL, + name TEXT, + avatar_url TEXT +); -- cgit 1.4.1 From 68f34e85cebcacef428d1f38942990c43c3cdd01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 10:29:57 +0100 Subject: Use transport client directly --- synapse/handlers/groups_local.py | 43 +++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 18 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 3df255b05a..e0f53120be 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -32,15 +32,17 @@ logger = logging.getLogger(__name__) # TODO: Add group memebership /sync -def _create_rerouter(name): +def _create_rerouter(func_name): + """Returns a function that looks at the group id and calls the function + on federation or the local group server if the group is local + """ def f(self, group_id, *args, **kwargs): if self.is_mine_id(group_id): - return getattr(self.groups_server_handler, name)( + return getattr(self.groups_server_handler, func_name)( group_id, *args, **kwargs ) - repl_layer = self.hs.get_replication_layer() - return getattr(repl_layer, name)(group_id, *args, **kwargs) + return getattr(self.transport_client, func_name)(group_id, *args, **kwargs) return f @@ -50,6 +52,7 @@ class GroupsLocalHandler(object): self.store = hs.get_datastore() self.room_list_handler = hs.get_room_list_handler() self.groups_server_handler = hs.get_groups_server_handler() + self.transport_client = hs.get_federation_transport_client() self.auth = hs.get_auth() self.clock = hs.get_clock() self.keyring = hs.get_keyring() @@ -82,15 +85,19 @@ class GroupsLocalHandler(object): @defer.inlineCallbacks def get_group_summary(self, group_id, requester_user_id): + """Get the group summary for a group. + + If the group is remote we check that the users have valid attestations. + """ if self.is_mine_id(group_id): res = yield self.groups_server_handler.get_group_summary( group_id, requester_user_id ) defer.returnValue(res) - repl_layer = self.hs.get_replication_layer() - res = yield repl_layer.get_group_summary(group_id, requester_user_id) + res = yield self.transport_client.get_group_summary(group_id, requester_user_id) + # Loop through the users and validate the attestations. chunk = res["users_section"]["users"] valid_users = [] for entry in chunk: @@ -121,8 +128,7 @@ class GroupsLocalHandler(object): group_id, user_id, content ) - repl_layer = self.hs.get_replication_layer() - return repl_layer.create_group(group_id, user_id, content) # TODO + return self.transport_client.create_group(group_id, user_id, content) # TODO def add_room(self, group_id, user_id, room_id, content): if self.is_mine_id(group_id): @@ -130,8 +136,9 @@ class GroupsLocalHandler(object): group_id, user_id, room_id, content ) - repl_layer = self.hs.get_replication_layer() - return repl_layer.add_room_to_group(group_id, user_id, room_id, content) # TODO + return self.transport_client.add_room_to_group( + group_id, user_id, room_id, content, + ) # TODO @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): @@ -141,8 +148,9 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) - repl_layer = self.hs.get_replication_layer() - res = yield repl_layer.get_users_in_group(group_id, requester_user_id) # TODO + res = yield self.transport_client.get_users_in_group( + group_id, requester_user_id, + ) # TODO chunk = res["chunk"] valid_entries = [] @@ -179,8 +187,9 @@ class GroupsLocalHandler(object): local_attestation = self.attestations.create_attestation(group_id, user_id) content["attestation"] = local_attestation - repl_layer = self.hs.get_replication_layer() - res = yield repl_layer.accept_group_invite(group_id, user_id, content) + res = yield self.transport_client.accept_group_invite( + group_id, user_id, content, + ) remote_attestation = res["attestation"] @@ -211,8 +220,7 @@ class GroupsLocalHandler(object): group_id, user_id, requester_user_id, content, ) else: - repl_layer = self.hs.get_replication_layer() - res = yield repl_layer.invite_to_group( + res = yield self.transport_client.invite_to_group( group_id, user_id, content, ) @@ -257,8 +265,7 @@ class GroupsLocalHandler(object): ) else: content["requester_user_id"] = requester_user_id - repl_layer = self.hs.get_replication_layer() - res = yield repl_layer.remove_user_from_group( + res = yield self.transport_client.remove_user_from_group( group_id, user_id, content ) # TODO -- cgit 1.4.1 From cccfcfa7b9e2af28bb56d6f970754fe5aa07ad56 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 10:31:59 +0100 Subject: Comments --- synapse/federation/transport/server.py | 34 +++++++++++++++++++--------------- synapse/handlers/groups_local.py | 3 +++ 2 files changed, 22 insertions(+), 15 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 0f08334f33..d68e90d2f7 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -715,21 +715,6 @@ class FederationGroupsInviteServlet(BaseFederationServlet): defer.returnValue((200, new_content)) -class FederationGroupsLocalInviteServlet(BaseFederationServlet): - PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/invite$" - - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, user_id): - if get_domain_from_id(group_id) != origin: - raise SynapseError(403, "group_id doesn't match origin") - - new_content = yield self.handler.on_invite( - group_id, user_id, content, - ) - - defer.returnValue((200, new_content)) - - class FederationGroupsAcceptInviteServlet(BaseFederationServlet): """Accept an invitation from the group server """ @@ -765,7 +750,26 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsLocalInviteServlet(BaseFederationServlet): + """A group server has invited a local user + """ + PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/invite$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "group_id doesn't match origin") + + new_content = yield self.handler.on_invite( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): + """A group server has removed a local user + """ PATH = "/groups/local/(?P[^/]*)/users/(?P[^/]*)/remove$" @defer.inlineCallbacks diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index e0f53120be..0857b14c7a 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -64,6 +64,9 @@ class GroupsLocalHandler(object): # Ensure attestations get renewed hs.get_groups_attestation_renewer() + # The following functions merely route the query to the local groups server + # or federation depending on if the group is local or remote + get_group_profile = _create_rerouter("get_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") -- cgit 1.4.1 From e5ea6dd021ea71f3b5bc9a37fb896c351ee550b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 14:37:06 +0100 Subject: Add client apis --- synapse/federation/transport/client.py | 196 +++++++++++++++++++++++++++++++-- synapse/handlers/groups_local.py | 2 +- 2 files changed, 188 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ea340e345c..500f3622a2 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -476,10 +476,10 @@ class TransportLayerClient(object): def get_group_profile(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/profile" % (group_id,) - return self.client.post_json( + return self.client.get_json( destination=destination, path=path, - data={"requester_user_id": requester_user_id}, + args={"requester_user_id": requester_user_id}, ignore_backoff=True, ) @@ -487,10 +487,10 @@ class TransportLayerClient(object): def get_group_summary(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/summary" % (group_id,) - return self.client.post_json( + return self.client.get_json( destination=destination, path=path, - data={"requester_user_id": requester_user_id}, + args={"requester_user_id": requester_user_id}, ignore_backoff=True, ) @@ -498,10 +498,22 @@ class TransportLayerClient(object): def get_group_rooms(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/rooms" % (group_id,) + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + def add_room_to_group(self, destination, group_id, requester_user_id, room_id, + content): + path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,) + return self.client.post_json( destination=destination, path=path, - data={"requester_user_id": requester_user_id}, + args={"requester_user_id": requester_user_id}, + data=content, ignore_backoff=True, ) @@ -509,10 +521,10 @@ class TransportLayerClient(object): def get_group_users(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/users" % (group_id,) - return self.client.post_json( + return self.client.get_json( destination=destination, path=path, - data={"requester_user_id": requester_user_id}, + args={"requester_user_id": requester_user_id}, ignore_backoff=True, ) @@ -528,12 +540,13 @@ class TransportLayerClient(object): ) @log_function - def invite_to_group(self, destination, group_id, user_id, content): + def invite_to_group(self, destination, group_id, user_id, requester_user_id, content): path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id) return self.client.post_json( destination=destination, path=path, + args=requester_user_id, data=content, ignore_backoff=True, ) @@ -554,12 +567,14 @@ class TransportLayerClient(object): ) @log_function - def remove_user_from_group(self, destination, group_id, user_id, content): + def remove_user_from_group(self, destination, group_id, requester_user_id, + user_id, content): path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id) return self.client.post_json( destination=destination, path=path, + args={"requester_user_id": requester_user_id}, data=content, ignore_backoff=True, ) @@ -594,3 +609,166 @@ class TransportLayerClient(object): data=content, ignore_backoff=True, ) + + @log_function + def update_group_summary_room(self, destination, group_id, user_id, room_id, + category_id, content): + if category_id: + path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % ( + group_id, category_id, room_id, + ) + else: + path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def delete_group_summary_room(self, destination, group_id, user_id, room_id, + category_id): + if category_id: + path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % ( + group_id, category_id, room_id, + ) + else: + path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_categories(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/categories" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_category(self, destination, group_id, requester_user_id, category_id): + path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def update_group_category(self, destination, group_id, requester_user_id, category_id, + content): + path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def delete_group_category(self, destination, group_id, requester_user_id, + category_id): + path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_roles(self, destination, group_id, requester_user_id): + path = PREFIX + "/groups/%s/roles" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_role(self, destination, group_id, requester_user_id, role_id): + path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def update_group_role(self, destination, group_id, requester_user_id, role_id, + content): + path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def delete_group_role(self, destination, group_id, requester_user_id, role_id): + path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def update_group_summary_user(self, destination, group_id, requester_user_id, + user_id, role_id, content): + if role_id: + path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % ( + group_id, role_id, user_id, + ) + else: + path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def delete_group_summary_user(self, destination, group_id, requester_user_id, + user_id, role_id): + if role_id: + path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % ( + group_id, role_id, user_id, + ) + else: + path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 0857b14c7a..6962210526 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -224,7 +224,7 @@ class GroupsLocalHandler(object): ) else: res = yield self.transport_client.invite_to_group( - group_id, user_id, content, + group_id, user_id, requester_user_id, content, ) defer.returnValue(res) -- cgit 1.4.1 From 332839f6ea4f4ae6e89c383bfb334b6ddecd3e53 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 14:45:37 +0100 Subject: Update federation client pokes --- synapse/handlers/groups_local.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 6962210526..7d7fc5d976 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.types import get_domain_from_id import logging @@ -41,8 +42,11 @@ def _create_rerouter(func_name): return getattr(self.groups_server_handler, func_name)( group_id, *args, **kwargs ) - - return getattr(self.transport_client, func_name)(group_id, *args, **kwargs) + else: + destination = get_domain_from_id(group_id) + return getattr(self.transport_client, func_name)( + destination, group_id, *args, **kwargs + ) return f @@ -98,7 +102,9 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) - res = yield self.transport_client.get_group_summary(group_id, requester_user_id) + res = yield self.transport_client.get_group_summary( + get_domain_from_id(group_id), group_id, requester_user_id, + ) # Loop through the users and validate the attestations. chunk = res["users_section"]["users"] @@ -131,7 +137,9 @@ class GroupsLocalHandler(object): group_id, user_id, content ) - return self.transport_client.create_group(group_id, user_id, content) # TODO + return self.transport_client.create_group( + get_domain_from_id(group_id), group_id, user_id, content, + ) # TODO def add_room(self, group_id, user_id, room_id, content): if self.is_mine_id(group_id): @@ -140,8 +148,8 @@ class GroupsLocalHandler(object): ) return self.transport_client.add_room_to_group( - group_id, user_id, room_id, content, - ) # TODO + get_domain_from_id(group_id), group_id, user_id, room_id, content, + ) @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): @@ -151,9 +159,9 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) - res = yield self.transport_client.get_users_in_group( - group_id, requester_user_id, - ) # TODO + res = yield self.transport_client.get_group_users( + get_domain_from_id(group_id), group_id, requester_user_id, + ) chunk = res["chunk"] valid_entries = [] @@ -191,7 +199,7 @@ class GroupsLocalHandler(object): content["attestation"] = local_attestation res = yield self.transport_client.accept_group_invite( - group_id, user_id, content, + get_domain_from_id(group_id), group_id, user_id, content, ) remote_attestation = res["attestation"] @@ -224,7 +232,8 @@ class GroupsLocalHandler(object): ) else: res = yield self.transport_client.invite_to_group( - group_id, user_id, requester_user_id, content, + get_domain_from_id(group_id), group_id, user_id, requester_user_id, + content, ) defer.returnValue(res) @@ -269,8 +278,8 @@ class GroupsLocalHandler(object): else: content["requester_user_id"] = requester_user_id res = yield self.transport_client.remove_user_from_group( - group_id, user_id, content - ) # TODO + get_domain_from_id(group_id), group_id, user_id, content + ) defer.returnValue(res) -- cgit 1.4.1 From 94ecd871a047707da5998f83440c039d064de8aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 16:38:54 +0100 Subject: Fix typos --- synapse/federation/transport/client.py | 4 ++-- synapse/handlers/groups_local.py | 5 +++-- synapse/storage/group_server.py | 25 +++++++++++++++++-------- 3 files changed, 22 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 500f3622a2..e4d84c06c1 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -495,7 +495,7 @@ class TransportLayerClient(object): ) @log_function - def get_group_rooms(self, destination, group_id, requester_user_id): + def get_rooms_in_group(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/rooms" % (group_id,) return self.client.get_json( @@ -518,7 +518,7 @@ class TransportLayerClient(object): ) @log_function - def get_group_users(self, destination, group_id, requester_user_id): + def get_users_in_group(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/users" % (group_id,) return self.client.get_json( diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 7d7fc5d976..50f7fce885 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -159,7 +159,7 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) - res = yield self.transport_client.get_group_users( + res = yield self.transport_client.get_users_in_group( get_domain_from_id(group_id), group_id, requester_user_id, ) @@ -278,7 +278,8 @@ class GroupsLocalHandler(object): else: content["requester_user_id"] = requester_user_id res = yield self.transport_client.remove_user_from_group( - get_domain_from_id(group_id), group_id, user_id, content + get_domain_from_id(group_id), group_id, requester_user_id, + user_id, content, ) defer.returnValue(res) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 3c6ee7df68..0a69e0f501 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore): remote_attestation (dict): If remote group then store the remote attestation from the group, else None. """ - def _register_user_group_membership_txn(txn, next_id): + def _register_user_group_membership_txn(txn): # TODO: Upsert? self._simple_delete_txn( txn, @@ -797,7 +797,6 @@ class GroupServerStore(SQLBaseStore): "content": json.dumps(content), }, ) - self._group_updates_stream_cache.entity_has_changed(user_id, next_id) # TODO: Insert profile to ensuer it comes down stream if its a join. @@ -820,7 +819,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), + "attestation_json": json.dumps(remote_attestation), } ) else: @@ -841,11 +840,10 @@ class GroupServerStore(SQLBaseStore): }, ) - with self._group_updates_id_gen.get_next() as next_id: - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, next_id, - ) + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, + ) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, @@ -928,3 +926,14 @@ class GroupServerStore(SQLBaseStore): defer.returnValue(json.loads(row["attestation_json"])) defer.returnValue(None) + + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) -- cgit 1.4.1 From 14a34f12d755e7516dc81348d811d47dc51f026d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 17:28:42 +0100 Subject: Comments --- synapse/federation/transport/server.py | 2 +- synapse/groups/groups_server.py | 2 +- synapse/handlers/groups_local.py | 29 +++++++++++++++++++---------- synapse/rest/client/v2_alpha/groups.py | 4 +++- 4 files changed, 24 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 29e966ac29..1332b49f35 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -672,7 +672,7 @@ class FederationGroupsAddRoomsServlet(BaseFederationServlet): if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.add_room( + new_content = yield self.handler.add_room_to_group( group_id, requester_user_id, room_id, content ) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index b9ad9507f4..1b6e354ca3 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -430,7 +430,7 @@ class GroupsServerHandler(object): }) @defer.inlineCallbacks - def add_room(self, group_id, requester_user_id, room_id, content): + def add_room_to_group(self, group_id, requester_user_id, room_id, content): """Add room to group """ yield self.check_group_is_ours( diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 50f7fce885..0b80348c82 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -74,6 +74,8 @@ class GroupsLocalHandler(object): get_group_profile = _create_rerouter("get_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") + add_room_to_group = _create_rerouter("add_room_to_group") + update_group_summary_room = _create_rerouter("update_group_summary_room") delete_group_summary_room = _create_rerouter("delete_group_summary_room") @@ -130,6 +132,9 @@ class GroupsLocalHandler(object): defer.returnValue(res) def create_group(self, group_id, user_id, content): + """Create a group + """ + logger.info("Asking to create group with ID: %r", group_id) if self.is_mine_id(group_id): @@ -141,18 +146,10 @@ class GroupsLocalHandler(object): get_domain_from_id(group_id), group_id, user_id, content, ) # TODO - def add_room(self, group_id, user_id, room_id, content): - if self.is_mine_id(group_id): - return self.groups_server_handler.add_room( - group_id, user_id, room_id, content - ) - - return self.transport_client.add_room_to_group( - get_domain_from_id(group_id), group_id, user_id, room_id, content, - ) - @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): + """Get users in a group + """ if self.is_mine_id(group_id): res = yield self.groups_server_handler.get_users_in_group( group_id, requester_user_id @@ -184,10 +181,14 @@ class GroupsLocalHandler(object): @defer.inlineCallbacks def join_group(self, group_id, user_id, content): + """Request to join a group + """ raise NotImplementedError() # TODO @defer.inlineCallbacks def accept_invite(self, group_id, user_id, content): + """Accept an invite to a group + """ if self.is_mine_id(group_id): yield self.groups_server_handler.accept_invite( group_id, user_id, content @@ -222,6 +223,8 @@ class GroupsLocalHandler(object): @defer.inlineCallbacks def invite(self, group_id, user_id, requester_user_id, config): + """Invite a user to a group + """ content = { "requester_user_id": requester_user_id, "config": config, @@ -240,6 +243,8 @@ class GroupsLocalHandler(object): @defer.inlineCallbacks def on_invite(self, group_id, user_id, content): + """One of our users were invited to a group + """ # TODO: Support auto join and rejection if not self.is_mine_id(user_id): @@ -262,6 +267,8 @@ class GroupsLocalHandler(object): @defer.inlineCallbacks def remove_user_from_group(self, group_id, user_id, requester_user_id, content): + """Remove a user from a group + """ if user_id == requester_user_id: yield self.store.register_user_group_membership( group_id, user_id, @@ -286,6 +293,8 @@ class GroupsLocalHandler(object): @defer.inlineCallbacks def user_removed_from_group(self, group_id, user_id, content): + """One of our users was removed/kicked from a group + """ # TODO: Check if user in group yield self.store.register_user_group_membership( group_id, user_id, diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 787967c3a2..f937d856fd 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -405,7 +405,9 @@ class GroupAdminRoomsServlet(RestServlet): user_id = requester.user.to_string() content = parse_json_object_from_request(request) - result = yield self.groups_handler.add_room(group_id, user_id, room_id, content) + result = yield self.groups_handler.add_room_to_group( + group_id, user_id, room_id, content, + ) defer.returnValue((200, result)) -- cgit 1.4.1 From 6f443a74cf6ab0bfe452289f9888580725987765 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 09:46:33 +0100 Subject: Add update group profile API --- synapse/federation/transport/server.py | 12 ++++++++++++ synapse/groups/groups_server.py | 16 ++++++++++++++++ synapse/handlers/groups_local.py | 1 + synapse/rest/client/v2_alpha/groups.py | 12 ++++++++++++ synapse/storage/group_server.py | 11 +++++++++++ 5 files changed, 52 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 1332b49f35..e04750fd2a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -642,6 +642,18 @@ class FederationGroupsSummaryServlet(BaseFederationServlet): defer.returnValue((200, new_content)) + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.update_group_profile( + group_id, requester_user_id, content + ) + + defer.returnValue((200, new_content)) + class FederationGroupsRoomsServlet(BaseFederationServlet): """Get the rooms in a group on behalf of a user diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 1b6e354ca3..322aad2a6f 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -341,6 +341,22 @@ class GroupsServerHandler(object): else: raise SynapseError(404, "Unknown group") + @defer.inlineCallbacks + def update_group_profile(self, group_id, requester_user_id, content): + """Update the group profile + """ + yield self.check_group_is_ours( + group_id, and_exists=True, and_is_admin=requester_user_id, + ) + + profile = {} + for keyname in ("name", "avatar_url", "short_description", + "long_description"): + if keyname in content: + profile[keyname] = content[keyname] + + yield self.store.update_group_profile(group_id, profile) + @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): """Get the users in group as seen by requester_user_id. diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 0b80348c82..b2c920da38 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -72,6 +72,7 @@ class GroupsLocalHandler(object): # or federation depending on if the group is local or remote get_group_profile = _create_rerouter("get_group_profile") + update_group_profile = _create_rerouter("update_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") add_room_to_group = _create_rerouter("add_room_to_group") diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index f937d856fd..64d803d489 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -45,6 +45,18 @@ class GroupServlet(RestServlet): defer.returnValue((200, group_description)) + @defer.inlineCallbacks + def on_POST(self, request, group_id, content): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + content = parse_json_object_from_request(request) + yield self.groups_handler.update_group_profile( + group_id, user_id, content, + ) + + defer.returnValue((200, {})) + class GroupSummaryServlet(RestServlet): """Get the full group summary diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 0a69e0f501..4197d22d88 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -860,6 +860,17 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + @defer.inlineCallbacks + def update_group_profile(self, group_id, profile,): + yield self._simple_update_one( + table="groups", + keyvalues={ + "group_id": group_id, + }, + updatevalues=profile, + desc="create_group", + ) + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ -- cgit 1.4.1 From c544188ee3644c85a97a3c4e09e63ad4e3c6f0cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 14:53:19 +0100 Subject: Add groups to sync stream --- synapse/handlers/sync.py | 64 +++++++++++++++++++++- synapse/rest/client/v2_alpha/sync.py | 5 ++ synapse/storage/__init__.py | 15 ++++++ synapse/storage/group_server.py | 68 ++++++++++++++++++++++-- synapse/storage/schema/delta/43/group_server.sql | 9 ++++ synapse/streams/events.py | 2 + synapse/types.py | 2 + tests/rest/client/v1/test_rooms.py | 4 +- 8 files changed, 161 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91c6c6be3c..c01fcd3d59 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -108,6 +108,17 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ return True +class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ + "join", + "invite", + "leave", +])): + __slots__ = [] + + def __nonzero__(self): + return self.join or self.invite or self.leave + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -119,6 +130,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "device_lists", # List of user_ids whose devices have chanegd "device_one_time_keys_count", # Dict of algorithm to count for one time keys # for this device + "groups", ])): __slots__ = [] @@ -134,7 +146,8 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.archived or self.account_data or self.to_device or - self.device_lists + self.device_lists or + self.groups ) @@ -560,6 +573,8 @@ class SyncHandler(object): user_id, device_id ) + yield self._generate_sync_entry_for_groups(sync_result_builder) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -568,10 +583,56 @@ class SyncHandler(object): archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, device_lists=device_lists, + groups=sync_result_builder.groups, device_one_time_keys_count=one_time_key_counts, next_batch=sync_result_builder.now_token, )) + @measure_func("_generate_sync_entry_for_groups") + @defer.inlineCallbacks + def _generate_sync_entry_for_groups(self, sync_result_builder): + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + + if since_token and since_token.groups_key: + results = yield self.store.get_groups_changes_for_user( + user_id, since_token.groups_key, now_token.groups_key, + ) + else: + results = yield self.store.get_all_groups_for_user( + user_id, now_token.groups_key, + ) + + invited = {} + joined = {} + left = {} + for result in results: + membership = result["membership"] + group_id = result["group_id"] + gtype = result["type"] + content = result["content"] + + if membership == "join": + if gtype == "membership": + content.pop("membership", None) + invited[group_id] = content["content"] + else: + joined.setdefault(group_id, {})[gtype] = content + elif membership == "invite": + if gtype == "membership": + content.pop("membership", None) + invited[group_id] = content["content"] + else: + if gtype == "membership": + left[group_id] = content["content"] + + sync_result_builder.groups = GroupsSyncResult( + join=joined, + invite=invited, + leave=left, + ) + @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks def _generate_sync_entry_for_device_list(self, sync_result_builder): @@ -1260,6 +1321,7 @@ class SyncResultBuilder(object): self.invited = [] self.archived = [] self.device = [] + self.groups = None class RoomSyncResultBuilder(object): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 6dcc407451..5f208a4c1c 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -199,6 +199,11 @@ class SyncRestServlet(RestServlet): "invite": invited, "leave": archived, }, + "groups": { + "join": sync_result.groups.join, + "invite": sync_result.groups.invite, + "leave": sync_result.groups.leave, + }, "device_one_time_keys_count": sync_result.device_one_time_keys_count, "next_batch": sync_result.next_batch.to_string(), } diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fdee9f1ad5..594566eb38 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._group_updates_id_gen = StreamIdGenerator( + db_conn, "local_group_updates", "stream_id", + ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) + _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + db_conn, "local_group_updates", + entity_column="user_id", + stream_column="stream_id", + max_value=self._group_updates_id_gen.get_current_token(), + limit=1000, + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", min_group_updates_id, + prefilled_cache=_group_updates_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index a2e7aa47d8..45f0a4c599 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore): remote_attestation (dict): If remote group then store the remote attestation from the group, else None. """ - def _register_user_group_membership_txn(txn): + def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? self._simple_delete_txn( txn, @@ -798,6 +798,19 @@ class GroupServerStore(SQLBaseStore): }, ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + # TODO: Insert profile to ensure it comes down stream if its a join. if membership == "join": @@ -840,10 +853,11 @@ class GroupServerStore(SQLBaseStore): }, ) - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, - ) + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, @@ -937,3 +951,47 @@ class GroupServerStore(SQLBaseStore): retcol="group_id", desc="get_joined_groups", ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index e1fd47aa7f..92f3339c94 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -155,3 +155,12 @@ CREATE TABLE local_group_membership ( CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 91a59b0bae..e2be500815 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -45,6 +45,7 @@ class EventSources(object): push_rules_key, _ = self.store.get_push_rules_stream_token() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() + groups_key = self.store.get_group_stream_token() token = StreamToken( room_key=( @@ -65,6 +66,7 @@ class EventSources(object): push_rules_key=push_rules_key, to_device_key=to_device_key, device_list_key=device_list_key, + groups_key=groups_key, ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index b32c0e360d..37d5fa7f9f 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -171,6 +171,7 @@ class StreamToken( "push_rules_key", "to_device_key", "device_list_key", + "groups_key", )) ): _SEPARATOR = "_" @@ -209,6 +210,7 @@ class StreamToken( or (int(other.push_rules_key) < int(self.push_rules_key)) or (int(other.to_device_key) < int(self.to_device_key)) or (int(other.device_list_key) < int(self.device_list_key)) + or (int(other.groups_key) < int(self.groups_key)) ) def copy_and_advance(self, key, new_value): diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index d746ea8568..de376fb514 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1032,7 +1032,7 @@ class RoomMessageListTestCase(RestTestCase): @defer.inlineCallbacks def test_topo_token_is_accepted(self): - token = "t1-0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0" (code, response) = yield self.mock_resource.trigger_get( "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)) @@ -1044,7 +1044,7 @@ class RoomMessageListTestCase(RestTestCase): @defer.inlineCallbacks def test_stream_token_is_accepted_for_fwd_pagianation(self): - token = "s0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0" (code, response) = yield self.mock_resource.trigger_get( "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)) -- cgit 1.4.1 From 139fe30f47aabd3ca8cce0f8d8c961348188b90b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 16:47:35 +0100 Subject: Remember to cast to bool --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c01fcd3d59..600d0589fd 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -116,7 +116,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ __slots__ = [] def __nonzero__(self): - return self.join or self.invite or self.leave + return bool(self.join or self.invite or self.leave) class SyncResult(collections.namedtuple("SyncResult", [ -- cgit 1.4.1 From 2cc998fed879357376edb35d5088d88a078dd576 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 17:13:18 +0100 Subject: Fix replication. And notify --- synapse/app/synchrotron.py | 6 ++++ synapse/handlers/groups_local.py | 20 ++++++++--- synapse/replication/slave/storage/groups.py | 54 +++++++++++++++++++++++++++++ synapse/replication/tcp/streams.py | 20 +++++++++++ synapse/storage/group_server.py | 23 ++++++++++++ 5 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 synapse/replication/slave/storage/groups.py (limited to 'synapse/handlers') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4bdd99a966..d06a05acd9 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -41,6 +41,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -75,6 +76,7 @@ class SynchrotronSlavedStore( SlavedRegistrationStore, SlavedFilteringStore, SlavedPresenceStore, + SlavedGroupServerStore, SlavedDeviceInboxStore, SlavedDeviceStore, SlavedClientIpStore, @@ -409,6 +411,10 @@ class SyncReplicationHandler(ReplicationClientHandler): ) elif stream_name == "presence": yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) def start(config_options): diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 0b80348c82..4182ea5afa 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -211,13 +211,16 @@ class GroupsLocalHandler(object): user_id=user_id, ) - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="join", is_admin=False, local_attestation=local_attestation, remote_attestation=remote_attestation, ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) defer.returnValue({}) @@ -257,11 +260,14 @@ class GroupsLocalHandler(object): if "avatar_url" in content["profile"]: local_profile["avatar_url"] = content["profile"]["avatar_url"] - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="invite", content={"profile": local_profile, "inviter": content["inviter"]}, ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) defer.returnValue({"state": "invite"}) @@ -270,10 +276,13 @@ class GroupsLocalHandler(object): """Remove a user from a group """ if user_id == requester_user_id: - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="leave", ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) # TODO: Should probably remember that we tried to leave so that we can # retry if the group server is currently down. @@ -296,10 +305,13 @@ class GroupsLocalHandler(object): """One of our users was removed/kicked from a group """ # TODO: Check if user in group - yield self.store.register_user_group_membership( + token = yield self.store.register_user_group_membership( group_id, user_id, membership="leave", ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) @defer.inlineCallbacks def get_joined_groups(self, user_id): diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py new file mode 100644 index 0000000000..0bc4bce5b0 --- /dev/null +++ b/synapse/replication/slave/storage/groups.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedGroupServerStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedGroupServerStore, self).__init__(db_conn, hs) + + self.hs = hs + + self._group_updates_id_gen = SlavedIdTracker( + db_conn, "local_group_updates", "stream_id", + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(), + ) + + get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__ + get_group_stream_token = DataStore.get_group_stream_token.__func__ + get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__ + + def stream_positions(self): + result = super(SlavedGroupServerStore, self).stream_positions() + result["groups"] = self._group_updates_id_gen.get_current_token() + return result + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "groups": + self._group_updates_id_gen.advance(token) + for row in rows: + self._group_updates_stream_cache.entity_has_changed( + row.user_id, token + ) + + return super(SlavedGroupServerStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index fbafe12cc2..4c60bf79f9 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -118,6 +118,12 @@ CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( "state_key", # str "event_id", # str, optional )) +GroupsStreamRow = namedtuple("GroupsStreamRow", ( + "group_id", # str + "user_id", # str + "type", # str + "content", # dict +)) class Stream(object): @@ -464,6 +470,19 @@ class CurrentStateDeltaStream(Stream): super(CurrentStateDeltaStream, self).__init__(hs) +class GroupServerStream(Stream): + NAME = "groups" + ROW_TYPE = GroupsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_group_stream_token + self.update_function = store.get_all_groups_changes + + super(GroupServerStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -482,5 +501,6 @@ STREAMS_MAP = { TagAccountDataStream, AccountDataStream, CurrentStateDeltaStream, + GroupServerStream, ) } diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 45f0a4c599..5006ac863f 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -853,6 +853,8 @@ class GroupServerStore(SQLBaseStore): }, ) + return next_id + with self._group_updates_id_gen.get_next() as next_id: yield self.runInteraction( "register_user_group_membership", @@ -993,5 +995,26 @@ class GroupServerStore(SQLBaseStore): "get_groups_changes_for_user", _get_groups_changes_for_user_txn, ) + def get_all_groups_changes(self, from_token, to_token, limit): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_any_entity_changed( + from_token, + ) + if not has_changed: + return [] + + def _get_all_groups_changes_txn(txn): + sql = """ + SELECT stream_id, group_id, user_id, type, content + FROM local_group_updates + WHERE ? < stream_id AND stream_id <= ? + LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit,)) + return txn.fetchall() + return self.runInteraction( + "get_all_groups_changes", _get_all_groups_changes_txn, + ) + def get_group_stream_token(self): return self._group_updates_id_gen.get_current_token() -- cgit 1.4.1 From 960dae3340221168e1e019f250cbc2dd430a1aee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 17:14:44 +0100 Subject: Add notifier --- synapse/handlers/groups_local.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 4182ea5afa..0c329e633d 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -63,6 +63,7 @@ class GroupsLocalHandler(object): self.is_mine_id = hs.is_mine_id self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname + self.notifier = hs.get_notifier() self.attestations = hs.get_groups_attestation_signing() # Ensure attestations get renewed -- cgit 1.4.1 From d5e32c843fb51f2b7c0247ca821fd38e5f2f25d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2017 13:31:26 +0100 Subject: Correctly add joins to correct segment --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 600d0589fd..d7b90a35ea 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -616,7 +616,7 @@ class SyncHandler(object): if membership == "join": if gtype == "membership": content.pop("membership", None) - invited[group_id] = content["content"] + joined[group_id] = content["content"] else: joined.setdefault(group_id, {})[gtype] = content elif membership == "invite": -- cgit 1.4.1 From a1e67bcb974e098cbbe2fbe6072bc7d7658936f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 4 Aug 2017 10:07:10 +0100 Subject: Remove stale TODO comments --- synapse/handlers/groups_local.py | 10 ---------- 1 file changed, 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index d0ed988224..b8b1e754c7 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -23,16 +23,6 @@ import logging logger = logging.getLogger(__name__) -# TODO: Validate attestations -# TODO: Allow users to "knock" or simpkly join depending on rules -# TODO: is_priveged flag to users and is_public to users and rooms -# TODO: Roles -# TODO: Audit log for admins (profile updates, membership changes, users who tried -# to join but were rejected, etc) -# TODO: Flairs -# TODO: Add group memebership /sync - - def _create_rerouter(func_name): """Returns a function that looks at the group id and calls the function on federation or the local group server if the group is local -- cgit 1.4.1 From 05e21285aae4a0411a9ec1151ce006297fa3ca91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Aug 2017 11:50:09 +0100 Subject: Store whether the user wants to publicise their membership of a group --- synapse/handlers/groups_local.py | 4 ++++ synapse/storage/group_server.py | 2 ++ synapse/storage/schema/delta/43/group_server.sql | 1 + 3 files changed, 7 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index b8b1e754c7..3a738ef36f 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -203,12 +203,16 @@ class GroupsLocalHandler(object): user_id=user_id, ) + # TODO: Check that the group is public and we're being added publically + is_publicised = content.get("publicise", False) + token = yield self.store.register_user_group_membership( group_id, user_id, membership="join", is_admin=False, local_attestation=local_attestation, remote_attestation=remote_attestation, + is_publicised=is_publicised, ) self.notifier.on_new_event( "groups_key", token, users=[user_id], diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index f44e80b514..31514f3cdb 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -840,6 +840,7 @@ class GroupServerStore(SQLBaseStore): is_admin=False, content={}, local_attestation=None, remote_attestation=None, + is_publicised=False, ): """Registers that a local user is a member of a (local or remote) group. @@ -873,6 +874,7 @@ class GroupServerStore(SQLBaseStore): "user_id": user_id, "is_admin": is_admin, "membership": membership, + "is_publicised": is_publicised, "content": json.dumps(content), }, ) diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 92f3339c94..01ac0edc35 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -150,6 +150,7 @@ CREATE TABLE local_group_membership ( user_id TEXT NOT NULL, is_admin BOOLEAN NOT NULL, membership TEXT NOT NULL, + is_publicised TEXT NOT NULL, -- if the user is publicising their membership content TEXT NOT NULL ); -- cgit 1.4.1 From ef8e5786770ff285ebdf1fce420b5aa86437673c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Aug 2017 13:36:22 +0100 Subject: Add bulk group publicised lookup API --- synapse/federation/transport/client.py | 15 ++++++++++ synapse/federation/transport/server.py | 17 +++++++++++ synapse/handlers/groups_local.py | 42 ++++++++++++++++++++++++++ synapse/rest/client/v2_alpha/groups.py | 54 ++++++++++++++++++++++++++++++++++ synapse/storage/group_server.py | 14 +++++++++ 5 files changed, 142 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 073d3abb2a..ce68cc4937 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -812,3 +812,18 @@ class TransportLayerClient(object): args={"requester_user_id": requester_user_id}, ignore_backoff=True, ) + + def bulk_get_publicised_groups(self, destination, user_ids): + """Get the groups a list of users are publicising + """ + + path = PREFIX + "/get_groups_publicised" + + content = {"user_ids": user_ids} + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index e04750fd2a..b5f07c50bf 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1050,6 +1050,22 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): defer.returnValue((200, resp)) +class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): + """Get roles in a group + """ + PATH = ( + "/get_groups_publicised$" + ) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query): + resp = yield self.handler.bulk_get_publicised_groups( + content["user_ids"], proxy=False, + ) + + defer.returnValue((200, resp)) + + FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -1102,6 +1118,7 @@ GROUP_SERVER_SERVLET_CLASSES = ( GROUP_LOCAL_SERVLET_CLASSES = ( FederationGroupsLocalInviteServlet, FederationGroupsRemoveLocalUserServlet, + FederationGroupsBulkPublicisedServlet, ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 3a738ef36f..c980623bbc 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -313,3 +313,45 @@ class GroupsLocalHandler(object): def get_joined_groups(self, user_id): group_ids = yield self.store.get_joined_groups(user_id) defer.returnValue({"groups": group_ids}) + + @defer.inlineCallbacks + def get_publicised_groups_for_user(self, user_id): + if self.hs.is_mine_id(user_id): + result = yield self.store.get_publicised_groups_for_user(user_id) + defer.returnValue({"groups": result}) + else: + result = yield self.transport_client.get_publicised_groups_for_user( + get_domain_from_id(user_id), user_id + ) + # TODO: Verify attestations + defer.returnValue(result) + + @defer.inlineCallbacks + def bulk_get_publicised_groups(self, user_ids, proxy=True): + destinations = {} + locals = [] + + for user_id in user_ids: + if self.hs.is_mine_id(user_id): + locals.append(user_id) + else: + destinations.setdefault( + get_domain_from_id(user_id), [] + ).append(user_id) + + if not proxy and destinations: + raise SynapseError(400, "Some user_ids are not local") + + results = {} + for destination, dest_user_ids in destinations.iteritems(): + r = yield self.transport_client.bulk_get_publicised_groups( + destination, dest_user_ids, + ) + results.update(r) + + for uid in locals: + results[uid] = yield self.store.get_publicised_groups_for_user( + uid + ) + + defer.returnValue({"users": results}) diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 9b1116acee..97d7948bb9 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -584,6 +584,59 @@ class GroupSelfUpdatePublicityServlet(RestServlet): defer.returnValue((200, {})) +class PublicisedGroupsForUserServlet(RestServlet): + """Get the list of groups a user is advertising + """ + PATTERNS = client_v2_patterns( + "/publicised_groups/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(PublicisedGroupsForUserServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + yield self.auth.get_user_by_req(request) + + result = yield self.groups_handler.get_publicised_groups_for_user( + user_id + ) + + defer.returnValue((200, result)) + + +class PublicisedGroupsForUsersServlet(RestServlet): + """Get the list of groups a user is advertising + """ + PATTERNS = client_v2_patterns( + "/publicised_groups$" + ) + + def __init__(self, hs): + super(PublicisedGroupsForUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_POST(self, request): + yield self.auth.get_user_by_req(request) + + content = parse_json_object_from_request(request) + user_ids = content["user_ids"] + + result = yield self.groups_handler.bulk_get_publicised_groups( + user_ids + ) + + defer.returnValue((200, result)) + + class GroupsForUserServlet(RestServlet): """Get all groups the logged in user is joined to """ @@ -627,3 +680,4 @@ def register_servlets(hs, http_server): GroupRolesServlet(hs).register(http_server) GroupSelfUpdatePublicityServlet(hs).register(http_server) GroupSummaryUsersRoleServlet(hs).register(http_server) + PublicisedGroupsForUserServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 10e757e975..0c35b03d2a 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -835,6 +835,20 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + def get_publicised_groups_for_user(self, user_id): + """Get all groups a user is publicising + """ + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + "is_publicised": True, + }, + retcol="group_id", + desc="get_publicised_groups_for_user", + ) + def update_group_publicity(self, group_id, user_id, publicise): """Update whether the user is publicising their membership of the group """ -- cgit 1.4.1 From ba3ff7918b54ae431aaaedb3d12650c93d366c04 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Aug 2017 13:42:42 +0100 Subject: Fixup --- synapse/handlers/groups_local.py | 22 +++++++++++++--------- synapse/rest/client/v2_alpha/groups.py | 1 + 2 files changed, 14 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index c980623bbc..274fed9278 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -329,27 +329,31 @@ class GroupsLocalHandler(object): @defer.inlineCallbacks def bulk_get_publicised_groups(self, user_ids, proxy=True): destinations = {} - locals = [] + local_users = set() for user_id in user_ids: if self.hs.is_mine_id(user_id): - locals.append(user_id) + local_users.add(user_id) else: destinations.setdefault( - get_domain_from_id(user_id), [] - ).append(user_id) + get_domain_from_id(user_id), set() + ).add(user_id) if not proxy and destinations: raise SynapseError(400, "Some user_ids are not local") results = {} + failed_results = [] for destination, dest_user_ids in destinations.iteritems(): - r = yield self.transport_client.bulk_get_publicised_groups( - destination, dest_user_ids, - ) - results.update(r) + try: + r = yield self.transport_client.bulk_get_publicised_groups( + destination, list(dest_user_ids), + ) + results.update(r["users"]) + except Exception: + failed_results.extend(dest_user_ids) - for uid in locals: + for uid in local_users: results[uid] = yield self.store.get_publicised_groups_for_user( uid ) diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 97d7948bb9..b469058e9d 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -681,3 +681,4 @@ def register_servlets(hs, http_server): GroupSelfUpdatePublicityServlet(hs).register(http_server) GroupSummaryUsersRoleServlet(hs).register(http_server) PublicisedGroupsForUserServlet(hs).register(http_server) + PublicisedGroupsForUsersServlet(hs).register(http_server) -- cgit 1.4.1 From 27ebc5c8f299488ccc0a6f100ec3b248cd81a058 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Aug 2017 11:21:34 +0100 Subject: Add remote profile cache --- synapse/groups/groups_server.py | 18 +++++ synapse/handlers/groups_local.py | 17 +++- synapse/handlers/profile.py | 81 ++++++++++++++++++- synapse/storage/profile.py | 98 +++++++++++++++++++++++ synapse/storage/schema/delta/43/profile_cache.sql | 28 +++++++ 5 files changed, 237 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/43/profile_cache.sql (limited to 'synapse/handlers') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index f25f327eb9..6bccae4bfb 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -503,6 +503,13 @@ class GroupsServerHandler(object): get_domain_from_id(user_id), group_id, user_id, content ) + user_profile = res.get("user_profile", {}) + yield self.store.add_remote_profile_cache( + user_id, + displayname=user_profile.get("displayname"), + avatar_url=user_profile.get("avatar_url"), + ) + if res["state"] == "join": if not self.hs.is_mine_id(user_id): remote_attestation = res["attestation"] @@ -627,6 +634,9 @@ class GroupsServerHandler(object): get_domain_from_id(user_id), group_id, user_id, {} ) + if not self.hs.is_mine_id(user_id): + yield self.store.maybe_delete_remote_profile_cache(user_id) + defer.returnValue({}) @defer.inlineCallbacks @@ -647,6 +657,7 @@ class GroupsServerHandler(object): avatar_url = profile.get("avatar_url") short_description = profile.get("short_description") long_description = profile.get("long_description") + user_profile = content.get("user_profile", {}) yield self.store.create_group( group_id, @@ -679,6 +690,13 @@ class GroupsServerHandler(object): remote_attestation=remote_attestation, ) + if not self.hs.is_mine_id(user_id): + yield self.store.add_remote_profile_cache( + user_id, + displayname=user_profile.get("displayname"), + avatar_url=user_profile.get("avatar_url"), + ) + defer.returnValue({ "group_id": group_id, }) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 274fed9278..bfa10bde5a 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -56,6 +56,9 @@ class GroupsLocalHandler(object): self.notifier = hs.get_notifier() self.attestations = hs.get_groups_attestation_signing() + handlers = hs.get_handlers() + self.profile_handler = handlers.profile_handler + # Ensure attestations get renewed hs.get_groups_attestation_renewer() @@ -123,6 +126,7 @@ class GroupsLocalHandler(object): defer.returnValue(res) + @defer.inlineCallbacks def create_group(self, group_id, user_id, content): """Create a group """ @@ -130,13 +134,16 @@ class GroupsLocalHandler(object): logger.info("Asking to create group with ID: %r", group_id) if self.is_mine_id(group_id): - return self.groups_server_handler.create_group( + res = yield self.groups_server_handler.create_group( group_id, user_id, content ) + defer.returnValue(res) - return self.transport_client.create_group( + content["user_profile"] = yield self.profile_handler.get_profile(user_id) + res = yield self.transport_client.create_group( get_domain_from_id(group_id), group_id, user_id, content, - ) # TODO + ) + defer.returnValue(res) @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): @@ -265,7 +272,9 @@ class GroupsLocalHandler(object): "groups_key", token, users=[user_id], ) - defer.returnValue({"state": "invite"}) + user_profile = yield self.profile_handler.get_profile(user_id) + + defer.returnValue({"state": "invite", "user_profile": user_profile}) @defer.inlineCallbacks def remove_user_from_group(self, group_id, user_id, requester_user_id, content): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 7abee98dea..57e22edb0d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -19,7 +19,7 @@ from twisted.internet import defer import synapse.types from synapse.api.errors import SynapseError, AuthError, CodeMessageException -from synapse.types import UserID +from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler @@ -27,15 +27,53 @@ logger = logging.getLogger(__name__) class ProfileHandler(BaseHandler): + PROFILE_UPDATE_MS = 60 * 1000 + PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 def __init__(self, hs): super(ProfileHandler, self).__init__(hs) + self.clock = hs.get_clock() + self.federation = hs.get_replication_layer() self.federation.register_query_handler( "profile", self.on_profile_query ) + self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + + @defer.inlineCallbacks + def get_profile(self, user_id): + target_user = UserID.from_string(user_id) + if self.hs.is_mine(target_user): + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + + defer.returnValue({ + "displayname": displayname, + "avatar_url": avatar_url, + }) + else: + try: + result = yield self.federation.make_query( + destination=target_user.domain, + query_type="profile", + args={ + "user_id": user_id, + }, + ignore_backoff=True, + ) + defer.returnValue(result) + except CodeMessageException as e: + if e.code != 404: + logger.exception("Failed to get displayname") + + raise + @defer.inlineCallbacks def get_displayname(self, target_user): if self.hs.is_mine(target_user): @@ -182,3 +220,44 @@ class ProfileHandler(BaseHandler): "Failed to update join event for room %s - %s", room_id, str(e.message) ) + + def _update_remote_profile_cache(self): + """Called periodically to check profiles of remote users we havent' + checked in a while. + """ + entries = yield self.store.get_remote_profile_cache_entries_that_expire( + last_checked=self.clock.time_msec() - self.PROFILE_UPDATE_EVERY_MS + ) + + for user_id, displayname, avatar_url in entries: + is_subcscribed = yield self.store.is_subscribed_remote_profile_for_user( + user_id, + ) + if not is_subcscribed: + yield self.store.maybe_delete_remote_profile_cache(user_id) + continue + + try: + profile = yield self.federation.make_query( + destination=get_domain_from_id(user_id), + query_type="profile", + args={ + "user_id": user_id, + }, + ignore_backoff=True, + ) + except: + logger.exception("Failed to get avatar_url") + + yield self.store.update_remote_profile_cache( + user_id, displayname, avatar_url + ) + continue + + new_name = profile.get("displayname") + new_avatar = profile.get("avatar_url") + + # We always hit update to update the last_check timestamp + yield self.store.update_remote_profile_cache( + user_id, new_name, new_avatar + ) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 26a40905ae..dca6af8a77 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore @@ -55,3 +57,99 @@ class ProfileStore(SQLBaseStore): updatevalues={"avatar_url": new_avatar_url}, desc="set_profile_avatar_url", ) + + def get_from_remote_profile_cache(self, user_id): + return self._simple_select_one( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + retcols=("displayname", "avatar_url", "last_check"), + allow_none=True, + desc="get_from_remote_profile_cache", + ) + + def add_remote_profile_cache(self, user_id, displayname, avatar_url): + """Ensure we are caching the remote user's profiles. + + This should only be called when `is_subscribed_remote_profile_for_user` + would return true for the user. + """ + return self._simple_upsert( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + values={ + "displayname": displayname, + "avatar_url": avatar_url, + "last_check": self._clock.time_msec(), + }, + desc="add_remote_profile_cache", + ) + + def update_remote_profile_cache(self, user_id, displayname, avatar_url): + return self._simple_update( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + values={ + "displayname": displayname, + "avatar_url": avatar_url, + "last_check": self._clock.time_msec(), + }, + desc="update_remote_profile_cache", + ) + + @defer.inlineCallbacks + def maybe_delete_remote_profile_cache(self, user_id): + """Check if we still care about the remote user's profile, and if we + don't then remove their profile from the cache + """ + subscribed = yield self.is_subscribed_remote_profile_for_user(user_id) + if not subscribed: + yield self._simple_delete( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + desc="delete_remote_profile_cache", + ) + + def get_remote_profile_cache_entries_that_expire(self, last_checked): + """Get all users who haven't been checked since `last_checked` + """ + def _get_remote_profile_cache_entries_that_expire_txn(txn): + sql = """ + SELECT user_id, displayname, avatar_url + FROM remote_profile_cache + WHERE last_check < ? + """ + + txn.execute(sql, (last_checked,)) + + return self.cursor_to_dict(txn) + + return self.runInteraction( + "get_remote_profile_cache_entries_that_expire", + _get_remote_profile_cache_entries_that_expire_txn, + ) + + @defer.inlineCallbacks + def is_subscribed_remote_profile_for_user(self, user_id): + """Check whether we are interested in a remote user's profile. + """ + res = yield self._simple_select_one_onecol( + table="group_users", + keyvalues={"user_id": user_id}, + retcol="user_id", + allow_none=True, + desc="should_update_remote_profile_cache_for_user", + ) + + if res: + defer.returnValue(True) + + res = yield self._simple_select_one_onecol( + table="group_invites", + keyvalues={"user_id": user_id}, + retcol="user_id", + allow_none=True, + desc="should_update_remote_profile_cache_for_user", + ) + + if res: + defer.returnValue(True) diff --git a/synapse/storage/schema/delta/43/profile_cache.sql b/synapse/storage/schema/delta/43/profile_cache.sql new file mode 100644 index 0000000000..e5ddc84df0 --- /dev/null +++ b/synapse/storage/schema/delta/43/profile_cache.sql @@ -0,0 +1,28 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- A subset of remote users whose profiles we have cached. +-- Whether a user is in this table or not is defined by the storage function +-- `is_subscribed_remote_profile_for_user` +CREATE TABLE remote_profile_cache ( + user_id TEXT NOT NULL, + displayname TEXT, + avatar_url TEXT, + last_check BIGINT NOT NULL +); + +CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id); +CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check); -- cgit 1.4.1 From bf81f3cf2c3e5b1d96953f3116c22aee05fb79b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Aug 2017 14:34:56 +0100 Subject: Split out profile handler to fix tests --- synapse/handlers/__init__.py | 2 -- synapse/handlers/groups_local.py | 3 +-- synapse/handlers/message.py | 3 ++- synapse/handlers/profile.py | 13 ++++++++----- synapse/handlers/register.py | 4 ++-- synapse/handlers/room_member.py | 4 +++- synapse/rest/client/v1/profile.py | 18 +++++++++--------- synapse/server.py | 5 +++++ tests/handlers/test_profile.py | 4 +--- tests/handlers/test_register.py | 5 +++-- tests/rest/client/v1/test_profile.py | 3 +-- 11 files changed, 35 insertions(+), 29 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 5ad408f549..53213cdccf 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -20,7 +20,6 @@ from .room import ( from .room_member import RoomMemberHandler from .message import MessageHandler from .federation import FederationHandler -from .profile import ProfileHandler from .directory import DirectoryHandler from .admin import AdminHandler from .identity import IdentityHandler @@ -52,7 +51,6 @@ class Handlers(object): self.room_creation_handler = RoomCreationHandler(hs) self.room_member_handler = RoomMemberHandler(hs) self.federation_handler = FederationHandler(hs) - self.profile_handler = ProfileHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) self.identity_handler = IdentityHandler(hs) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index bfa10bde5a..1950c12f1f 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -56,8 +56,7 @@ class GroupsLocalHandler(object): self.notifier = hs.get_notifier() self.attestations = hs.get_groups_attestation_signing() - handlers = hs.get_handlers() - self.profile_handler = handlers.profile_handler + self.profile_handler = hs.get_profile_handler() # Ensure attestations get renewed hs.get_groups_attestation_renewer() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index be4f123c54..5b8f20b73c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -47,6 +47,7 @@ class MessageHandler(BaseHandler): self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() + self.profile_handler = hs.get_profile_handler() self.pagination_lock = ReadWriteLock() @@ -210,7 +211,7 @@ class MessageHandler(BaseHandler): if membership in {Membership.JOIN, Membership.INVITE}: # If event doesn't include a display name, add one. - profile = self.hs.get_handlers().profile_handler + profile = self.profile_handler content = builder.content try: diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 57e22edb0d..5e34501c7a 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -22,18 +22,21 @@ from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler - logger = logging.getLogger(__name__) -class ProfileHandler(BaseHandler): +class ProfileHandler(object): PROFILE_UPDATE_MS = 60 * 1000 PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 def __init__(self, hs): - super(ProfileHandler, self).__init__(hs) - + self.hs = hs + self.store = hs.get_datastore() self.clock = hs.get_clock() + self.ratelimiter = hs.get_ratelimiter() + + # AWFUL hack to get at BaseHandler.ratelimit + self.base_handler = BaseHandler(hs) self.federation = hs.get_replication_layer() self.federation.register_query_handler( @@ -194,7 +197,7 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): return - yield self.ratelimit(requester) + yield self.base_handler.ratelimit(requester) room_ids = yield self.store.get_rooms_for_user( user.to_string(), diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index ee3a2269a8..560fb36254 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -36,6 +36,7 @@ class RegistrationHandler(BaseHandler): super(RegistrationHandler, self).__init__(hs) self.auth = hs.get_auth() + self.profile_handler = hs.get_profile_handler() self.captcha_client = CaptchaServerHttpClient(hs) self._next_generated_user_id = None @@ -423,8 +424,7 @@ class RegistrationHandler(BaseHandler): if displayname is not None: logger.info("setting user display name: %s -> %s", user_id, displayname) - profile_handler = self.hs.get_handlers().profile_handler - yield profile_handler.set_displayname( + yield self.profile_handler.set_displayname( user, requester, displayname, by_admin=True, ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b3f979b246..dadc19d45b 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -45,6 +45,8 @@ class RoomMemberHandler(BaseHandler): def __init__(self, hs): super(RoomMemberHandler, self).__init__(hs) + self.profile_handler = hs.get_profile_handler() + self.member_linearizer = Linearizer(name="member") self.clock = hs.get_clock() @@ -255,7 +257,7 @@ class RoomMemberHandler(BaseHandler): content["membership"] = Membership.JOIN - profile = self.hs.get_handlers().profile_handler + profile = self.profile_handler if not content_specified: content["displayname"] = yield profile.get_displayname(target) content["avatar_url"] = yield profile.get_avatar_url(target) diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 1a5045c9ec..d7edc34245 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -26,13 +26,13 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): def __init__(self, hs): super(ProfileDisplaynameRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.profile_handler = hs.get_profile_handler() @defer.inlineCallbacks def on_GET(self, request, user_id): user = UserID.from_string(user_id) - displayname = yield self.handlers.profile_handler.get_displayname( + displayname = yield self.profile_handler.get_displayname( user, ) @@ -55,7 +55,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): except: defer.returnValue((400, "Unable to parse name")) - yield self.handlers.profile_handler.set_displayname( + yield self.profile_handler.set_displayname( user, requester, new_name, is_admin) defer.returnValue((200, {})) @@ -69,13 +69,13 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet): def __init__(self, hs): super(ProfileAvatarURLRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.profile_handler = hs.get_profile_handler() @defer.inlineCallbacks def on_GET(self, request, user_id): user = UserID.from_string(user_id) - avatar_url = yield self.handlers.profile_handler.get_avatar_url( + avatar_url = yield self.profile_handler.get_avatar_url( user, ) @@ -97,7 +97,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet): except: defer.returnValue((400, "Unable to parse name")) - yield self.handlers.profile_handler.set_avatar_url( + yield self.profile_handler.set_avatar_url( user, requester, new_name, is_admin) defer.returnValue((200, {})) @@ -111,16 +111,16 @@ class ProfileRestServlet(ClientV1RestServlet): def __init__(self, hs): super(ProfileRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.profile_handler = hs.get_profile_handler() @defer.inlineCallbacks def on_GET(self, request, user_id): user = UserID.from_string(user_id) - displayname = yield self.handlers.profile_handler.get_displayname( + displayname = yield self.profile_handler.get_displayname( user, ) - avatar_url = yield self.handlers.profile_handler.get_avatar_url( + avatar_url = yield self.profile_handler.get_avatar_url( user, ) diff --git a/synapse/server.py b/synapse/server.py index d0a6272766..5b892cc390 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -51,6 +51,7 @@ from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.user_directory import UserDirectoyHandler from synapse.handlers.groups_local import GroupsLocalHandler +from synapse.handlers.profile import ProfileHandler from synapse.groups.groups_server import GroupsServerHandler from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory @@ -114,6 +115,7 @@ class HomeServer(object): 'application_service_scheduler', 'application_service_handler', 'device_message_handler', + 'profile_handler', 'notifier', 'distributor', 'client_resource', @@ -258,6 +260,9 @@ class HomeServer(object): def build_initial_sync_handler(self): return InitialSyncHandler(self) + def build_profile_handler(self): + return ProfileHandler(self) + def build_event_sources(self): return EventSources(self) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 2a203129ca..a5f47181d7 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -62,8 +62,6 @@ class ProfileTestCase(unittest.TestCase): self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) - hs.handlers = ProfileHandlers(hs) - self.store = hs.get_datastore() self.frank = UserID.from_string("@1234ABCD:test") @@ -72,7 +70,7 @@ class ProfileTestCase(unittest.TestCase): yield self.store.create_profile(self.frank.localpart) - self.handler = hs.get_handlers().profile_handler + self.handler = hs.get_profile_handler() @defer.inlineCallbacks def test_get_my_name(self): diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py index c8cf9a63ec..e990e45220 100644 --- a/tests/handlers/test_register.py +++ b/tests/handlers/test_register.py @@ -40,13 +40,14 @@ class RegistrationTestCase(unittest.TestCase): self.hs = yield setup_test_homeserver( handlers=None, http_client=None, - expire_access_token=True) + expire_access_token=True, + profile_handler=Mock(), + ) self.macaroon_generator = Mock( generate_access_token=Mock(return_value='secret')) self.hs.get_macaroon_generator = Mock(return_value=self.macaroon_generator) self.hs.handlers = RegistrationHandlers(self.hs) self.handler = self.hs.get_handlers().registration_handler - self.hs.get_handlers().profile_handler = Mock() @defer.inlineCallbacks def test_user_is_created_and_logged_in_if_doesnt_exist(self): diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index 1e95e97538..dddcf51b69 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -46,6 +46,7 @@ class ProfileTestCase(unittest.TestCase): resource_for_client=self.mock_resource, federation=Mock(), replication_layer=Mock(), + profile_handler=self.mock_handler ) def _get_user_by_req(request=None, allow_guest=False): @@ -53,8 +54,6 @@ class ProfileTestCase(unittest.TestCase): hs.get_v1auth().get_user_by_req = _get_user_by_req - hs.get_handlers().profile_handler = self.mock_handler - profile.register_servlets(hs, self.mock_resource) @defer.inlineCallbacks -- cgit 1.4.1 From 258409ef6156782d0c15cd5d1c1620b5734f379c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Aug 2017 14:45:20 +0100 Subject: Fix typos and reinherit --- synapse/handlers/profile.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 5e34501c7a..c3cee38a43 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -25,18 +25,12 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) -class ProfileHandler(object): +class ProfileHandler(BaseHandler): PROFILE_UPDATE_MS = 60 * 1000 PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 def __init__(self, hs): - self.hs = hs - self.store = hs.get_datastore() - self.clock = hs.get_clock() - self.ratelimiter = hs.get_ratelimiter() - - # AWFUL hack to get at BaseHandler.ratelimit - self.base_handler = BaseHandler(hs) + super(ProfileHandler, self).__init__(hs) self.federation = hs.get_replication_layer() self.federation.register_query_handler( @@ -197,7 +191,7 @@ class ProfileHandler(object): if not self.hs.is_mine(user): return - yield self.base_handler.ratelimit(requester) + yield self.ratelimit(requester) room_ids = yield self.store.get_rooms_for_user( user.to_string(), @@ -225,7 +219,7 @@ class ProfileHandler(object): ) def _update_remote_profile_cache(self): - """Called periodically to check profiles of remote users we havent' + """Called periodically to check profiles of remote users we haven't checked in a while. """ entries = yield self.store.get_remote_profile_cache_entries_that_expire( @@ -233,10 +227,10 @@ class ProfileHandler(object): ) for user_id, displayname, avatar_url in entries: - is_subcscribed = yield self.store.is_subscribed_remote_profile_for_user( + is_subscribed = yield self.store.is_subscribed_remote_profile_for_user( user_id, ) - if not is_subcscribed: + if not is_subscribed: yield self.store.maybe_delete_remote_profile_cache(user_id) continue -- cgit 1.4.1 From 4a9b1cf25300eedf66aaefcb36e23f5fadf2b57a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Aug 2017 16:23:58 +0100 Subject: Add user profiles to summary from group server --- synapse/groups/groups_server.py | 7 ++++++- synapse/handlers/profile.py | 23 +++++++++++++++++++++++ synapse/storage/profile.py | 2 +- 3 files changed, 30 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 6bccae4bfb..94cf9788bb 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -45,6 +45,7 @@ class GroupsServerHandler(object): self.server_name = hs.hostname self.attestations = hs.get_groups_attestation_signing() self.transport_client = hs.get_federation_transport_client() + self.profile_handler = hs.get_profile_handler() # Ensure attestations get renewed hs.get_groups_attestation_renewer() @@ -128,6 +129,9 @@ class GroupsServerHandler(object): group_id, user_id, ) + user_profile = yield self.profile_handler.get_profile_from_cache(user_id) + entry.update(user_profile) + users.sort(key=lambda e: e.get("order", 0)) membership_info = yield self.store.get_users_membership_info_in_group( @@ -387,7 +391,8 @@ class GroupsServerHandler(object): entry = {"user_id": g_user_id} - # TODO: Get profile information + profile = yield self.profile_handler.get_profile_from_cache(g_user_id) + entry.update(profile) if not is_public: entry["is_public"] = False diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c3cee38a43..e56e0a52bf 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -71,6 +71,29 @@ class ProfileHandler(BaseHandler): raise + @defer.inlineCallbacks + def get_profile_from_cache(self, user_id): + """Get the profile information from our local cache. If the user is + ours then the profile information will always be corect. Otherwise, + it may be out of date/missing. + """ + target_user = UserID.from_string(user_id) + if self.hs.is_mine(target_user): + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + + defer.returnValue({ + "displayname": displayname, + "avatar_url": avatar_url, + }) + else: + profile = yield self.store.get_from_remote_profile_cache(user_id) + defer.returnValue(profile or {}) + @defer.inlineCallbacks def get_displayname(self, target_user): if self.hs.is_mine(target_user): diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index dca6af8a77..beea3102fc 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -62,7 +62,7 @@ class ProfileStore(SQLBaseStore): return self._simple_select_one( table="remote_profile_cache", keyvalues={"user_id": user_id}, - retcols=("displayname", "avatar_url", "last_check"), + retcols=("displayname", "avatar_url",), allow_none=True, desc="get_from_remote_profile_cache", ) -- cgit 1.4.1 From 93e504d04e5e92ec1d54b3c7c860adc2cfb0e9f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Sep 2017 10:35:35 +0100 Subject: Ensure that creator of group sees group down /sync --- synapse/handlers/groups_local.py | 34 ++++++++++++++++++++++++++++++---- synapse/handlers/sync.py | 1 + 2 files changed, 31 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 1950c12f1f..b4833f8ef8 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -136,12 +136,38 @@ class GroupsLocalHandler(object): res = yield self.groups_server_handler.create_group( group_id, user_id, content ) - defer.returnValue(res) + local_attestation = None + remote_attestation = None + else: + local_attestation = self.attestations.create_attestation(group_id, user_id) + content["attestation"] = local_attestation + + content["user_profile"] = yield self.profile_handler.get_profile(user_id) + + res = yield self.transport_client.create_group( + get_domain_from_id(group_id), group_id, user_id, content, + ) + + remote_attestation = res["attestation"] + yield self.attestations.verify_attestation( + remote_attestation, + group_id=group_id, + user_id=user_id, + ) - content["user_profile"] = yield self.profile_handler.get_profile(user_id) - res = yield self.transport_client.create_group( - get_domain_from_id(group_id), group_id, user_id, content, + is_publicised = content.get("publicise", False) + token = yield self.store.register_user_group_membership( + group_id, user_id, + membership="join", + is_admin=True, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + is_publicised=is_publicised, ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) + defer.returnValue(res) @defer.inlineCallbacks diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f2e4ffcec6..69c1bc189e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -637,6 +637,7 @@ class SyncHandler(object): if membership == "join": if gtype == "membership": + # TODO: Add profile content.pop("membership", None) joined[group_id] = content["content"] else: -- cgit 1.4.1 From 95298783bb86e13da56e07fa3f73b6e2de053c08 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Sep 2017 11:04:37 +0100 Subject: Add is_publicised to group summary --- synapse/handlers/groups_local.py | 56 ++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 25 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index b4833f8ef8..14fdf06b58 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -96,32 +96,38 @@ class GroupsLocalHandler(object): res = yield self.groups_server_handler.get_group_summary( group_id, requester_user_id ) - defer.returnValue(res) - - res = yield self.transport_client.get_group_summary( - get_domain_from_id(group_id), group_id, requester_user_id, - ) - - # Loop through the users and validate the attestations. - chunk = res["users_section"]["users"] - valid_users = [] - for entry in chunk: - g_user_id = entry["user_id"] - attestation = entry.pop("attestation") - try: - yield self.attestations.verify_attestation( - attestation, - group_id=group_id, - user_id=g_user_id, - ) - valid_users.append(entry) - except Exception as e: - logger.info("Failed to verify user is in group: %s", e) - - res["users_section"]["users"] = valid_users + else: + res = yield self.transport_client.get_group_summary( + get_domain_from_id(group_id), group_id, requester_user_id, + ) - res["users_section"]["users"].sort(key=lambda e: e.get("order", 0)) - res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0)) + # Loop through the users and validate the attestations. + chunk = res["users_section"]["users"] + valid_users = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation") + try: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + ) + valid_users.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["users_section"]["users"] = valid_users + + res["users_section"]["users"].sort(key=lambda e: e.get("order", 0)) + res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0)) + + # Add `is_publicised` flag to indicate whether the user has publicised their + # membership of the group on their profile + result = yield self.store.get_publicised_groups_for_user(requester_user_id) + is_publicised = group_id in result + + res.setdefault("user", {})["is_publicised"] = is_publicised defer.returnValue(res) -- cgit 1.4.1 From 17b8e2bd02ad0abbd25103b637eb8490f3a53507 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Sep 2017 15:52:41 +0100 Subject: Add remove room API --- synapse/federation/transport/client.py | 12 ++++++++++++ synapse/federation/transport/server.py | 14 +++++++++++++- synapse/groups/groups_server.py | 12 ++++++++++++ synapse/handlers/groups_local.py | 1 + synapse/rest/client/v2_alpha/groups.py | 11 +++++++++++ synapse/storage/group_server.py | 23 +++++++++++++++++++++++ 6 files changed, 72 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ce68cc4937..36f6eb75e9 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -525,6 +525,18 @@ class TransportLayerClient(object): ignore_backoff=True, ) + def remove_room_from_group(self, destination, group_id, requester_user_id, room_id): + """Remove a room from a group + """ + path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + @log_function def get_users_in_group(self, destination, group_id, requester_user_id): """Get users in a group diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b5f07c50bf..c7565e0737 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -674,7 +674,7 @@ class FederationGroupsRoomsServlet(BaseFederationServlet): class FederationGroupsAddRoomsServlet(BaseFederationServlet): - """Add room to group + """Add/remove room from group """ PATH = "/groups/(?P[^/]*)/room/(?)$" @@ -690,6 +690,18 @@ class FederationGroupsAddRoomsServlet(BaseFederationServlet): defer.returnValue((200, new_content)) + @defer.inlineCallbacks + def on_DELETE(self, origin, content, query, group_id, room_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.remove_room_from_group( + group_id, requester_user_id, room_id, + ) + + defer.returnValue((200, new_content)) + class FederationGroupsUsersServlet(BaseFederationServlet): """Get the users in a group on behalf of a user diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 699d8a5265..10bf61d178 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -472,6 +472,18 @@ class GroupsServerHandler(object): defer.returnValue({}) + @defer.inlineCallbacks + def remove_room_from_group(self, group_id, requester_user_id, room_id): + """Remove room from group + """ + yield self.check_group_is_ours( + group_id, and_exists=True, and_is_admin=requester_user_id + ) + + yield self.store.remove_room_from_group(group_id, room_id) + + defer.returnValue({}) + @defer.inlineCallbacks def invite_to_group(self, group_id, user_id, requester_user_id, content): """Invite user to group diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 14fdf06b58..a2bacbfc38 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -69,6 +69,7 @@ class GroupsLocalHandler(object): get_rooms_in_group = _create_rerouter("get_rooms_in_group") add_room_to_group = _create_rerouter("add_room_to_group") + remove_room_from_group = _create_rerouter("remove_room_from_group") update_group_summary_room = _create_rerouter("update_group_summary_room") delete_group_summary_room = _create_rerouter("delete_group_summary_room") diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index b469058e9d..8f3ce15b02 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -423,6 +423,17 @@ class GroupAdminRoomsServlet(RestServlet): defer.returnValue((200, result)) + @defer.inlineCallbacks + def on_DELETE(self, request, group_id, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.remove_room_from_group( + group_id, user_id, room_id, + ) + + defer.returnValue((200, result)) + class GroupAdminUsersInviteServlet(RestServlet): """Invite a user to the group diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index d0b5ad231a..4fe9172adc 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -843,6 +843,29 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + def remove_room_from_group(self, group_id, room_id): + def _remove_room_from_group_txn(txn): + self._simple_delete_txn( + txn, + table="group_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + }, + ) + + self._simple_delete_txn( + txn, + table="group_summary_rooms", + keyvalues={ + "group_id": group_id, + "room_id": room_id, + }, + ) + return self.runInteraction( + "remove_room_from_group", _remove_room_from_group_txn, + ) + def get_publicised_groups_for_user(self, user_id): """Get all groups a user is publicising """ -- cgit 1.4.1 From 6cd5fcd5366cfef4959d107e818d0e20d78aa483 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 19:20:23 +0100 Subject: Make the spam checker a module --- synapse/config/homeserver.py | 4 +++- synapse/events/spamcheck.py | 37 +++++++++++++++++++---------------- synapse/federation/federation_base.py | 5 ++--- synapse/handlers/message.py | 5 +++-- synapse/server.py | 5 +++++ 5 files changed, 33 insertions(+), 23 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index b22cacf8dc..3f9d9d5f8b 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -34,6 +34,7 @@ from .password_auth_providers import PasswordAuthProviderConfig from .emailconfig import EmailConfig from .workers import WorkerConfig from .push import PushConfig +from .spam_checker import SpamCheckerConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, @@ -41,7 +42,8 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, JWTConfig, PasswordConfig, EmailConfig, - WorkerConfig, PasswordAuthProviderConfig, PushConfig,): + WorkerConfig, PasswordAuthProviderConfig, PushConfig, + SpamCheckerConfig,): pass diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 56fa9e556e..7b22b3413a 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -13,26 +13,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +class SpamChecker(object): + def __init__(self, hs): + self.spam_checker = None -def check_event_for_spam(event): - """Checks if a given event is considered "spammy" by this server. + if hs.config.spam_checker is not None: + module, config = hs.config.spam_checker + print("cfg %r", config) + self.spam_checker = module(config=config) - If the server considers an event spammy, then it will be rejected if - sent by a local user. If it is sent by a user on another server, then - users receive a blank event. + def check_event_for_spam(self, event): + """Checks if a given event is considered "spammy" by this server. - Args: - event (synapse.events.EventBase): the event to be checked + If the server considers an event spammy, then it will be rejected if + sent by a local user. If it is sent by a user on another server, then + users receive a blank event. - Returns: - bool: True if the event is spammy. - """ - if not hasattr(event, "content") or "body" not in event.content: - return False + Args: + event (synapse.events.EventBase): the event to be checked - # for example: - # - # if "the third flower is green" in event.content["body"]: - # return True + Returns: + bool: True if the event is spammy. + """ + if self.spam_checker is None: + return False - return False + return self.spam_checker.check_event_for_spam(event) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index babd9ea078..a0f5d40eb3 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -16,7 +16,6 @@ import logging from synapse.api.errors import SynapseError from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import spamcheck from synapse.events.utils import prune_event from synapse.util import unwrapFirstError, logcontext from twisted.internet import defer @@ -26,7 +25,7 @@ logger = logging.getLogger(__name__) class FederationBase(object): def __init__(self, hs): - pass + self.spam_checker = hs.get_spam_checker() @defer.inlineCallbacks def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, @@ -144,7 +143,7 @@ class FederationBase(object): ) return redacted - if spamcheck.check_event_for_spam(pdu): + if self.spam_checker.check_event_for_spam(pdu): logger.warn( "Event contains spam, redacting %s: %s", pdu.event_id, pdu.get_pdu_json() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index da18bf23db..37f0a2772a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.events import spamcheck from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -58,6 +57,8 @@ class MessageHandler(BaseHandler): self.action_generator = hs.get_action_generator() + self.spam_checker = hs.get_spam_checker() + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -322,7 +323,7 @@ class MessageHandler(BaseHandler): txn_id=txn_id ) - if spamcheck.check_event_for_spam(event): + if self.spam_checker.check_event_for_spam(event): raise SynapseError( 403, "Spam is not permitted here", Codes.FORBIDDEN ) diff --git a/synapse/server.py b/synapse/server.py index a38e5179e0..4d44af745e 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -31,6 +31,7 @@ from synapse.appservice.api import ApplicationServiceApi from synapse.appservice.scheduler import ApplicationServiceScheduler from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory +from synapse.events.spamcheck import SpamChecker from synapse.federation import initialize_http_replication from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.transport.client import TransportLayerClient @@ -139,6 +140,7 @@ class HomeServer(object): 'read_marker_handler', 'action_generator', 'user_directory_handler', + 'spam_checker', ] def __init__(self, hostname, **kwargs): @@ -309,6 +311,9 @@ class HomeServer(object): def build_user_directory_handler(self): return UserDirectoyHandler(self) + def build_spam_checker(self): + return SpamChecker(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) -- cgit 1.4.1 From adec03395d1c9a8e237a74ea420966bae8ea0002 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Sep 2017 15:01:25 +0100 Subject: Fix bug where /joined_members didn't check user was in room --- synapse/handlers/message.py | 31 +++++++++++++++++++++++++++++++ synapse/rest/client/v1/room.py | 17 +++++++---------- 2 files changed, 38 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 37f0a2772a..f6740544c1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -419,6 +419,37 @@ class MessageHandler(BaseHandler): [serialize_event(c, now) for c in room_state.values()] ) + @defer.inlineCallbacks + def get_joined_members(self, user_id, room_id): + """Get all the joined members in the room and their profile information. + + If the user has left the room return the state events from when they left. + + Args: + user_id(str): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A dict of user_id to profile info + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + else: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + defer.returnValue({ + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in users_with_profile.iteritems() + }) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index cd388770c8..4be0fee38d 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -398,22 +398,19 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(JoinedRoomMemberListRestServlet, self).__init__(hs) - self.state = hs.get_state_handler() + self.message_handler = hs.get_handlers().message_handler @defer.inlineCallbacks def on_GET(self, request, room_id): - yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() - users_with_profile = yield self.state.get_current_user_in_room(room_id) + users_with_profile = yield self.message_handler.get_joined_members( + user_id, room_id, + ) defer.returnValue((200, { - "joined": { - user_id: { - "avatar_url": profile.avatar_url, - "display_name": profile.display_name, - } - for user_id, profile in users_with_profile.iteritems() - } + "joined": users_with_profile, })) -- cgit 1.4.1 From 8090fd4664de87bad636ace6774dad8c33bd5276 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Sep 2017 10:09:32 +0100 Subject: Fix /joined_members to work with AS users --- synapse/handlers/message.py | 36 +++++++++++++++++++++++++----------- synapse/rest/client/v1/room.py | 3 +-- 2 files changed, 26 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f6740544c1..ca8c6c55bb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -420,27 +420,41 @@ class MessageHandler(BaseHandler): ) @defer.inlineCallbacks - def get_joined_members(self, user_id, room_id): + def get_joined_members(self, requester, room_id): """Get all the joined members in the room and their profile information. If the user has left the room return the state events from when they left. Args: - user_id(str): The user requesting state events. + requester(Requester): The user requesting state events. room_id(str): The room ID to get all state events from. Returns: A dict of user_id to profile info """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - users_with_profile = yield self.state.get_current_user_in_room(room_id) - else: - raise NotImplementedError( - "Getting joined members after leaving is not implemented" + user_id = requester.user.to_string() + if not requester.app_service: + # We check AS auth after fetching the room membership, as it + # requires us to pull out all joined members anyway. + membership, _ = yield self._check_in_room_or_world_readable( + room_id, user_id ) + if membership != Membership.JOIN: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # If this is an AS, double check that they are allowed to see the members. + # This can either be because the AS user is in the room or becuase there + # is a user in the room that the AS is "interested in" + if requester.app_service and user_id not in users_with_profile: + for uid in users_with_profile: + if requester.app_service.is_interested_in_user(uid): + break + else: + # Loop fell through, AS has no interested users in room + raise AuthError(403, "Appservice not in room") defer.returnValue({ user_id: { diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 4be0fee38d..6c379d53ac 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -403,10 +403,9 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): requester = yield self.auth.get_user_by_req(request) - user_id = requester.user.to_string() users_with_profile = yield self.message_handler.get_joined_members( - user_id, room_id, + requester, room_id, ) defer.returnValue((200, { -- cgit 1.4.1 From cafb8de132999507e9b05c751fbb32d199e7de50 Mon Sep 17 00:00:00 2001 From: Jeremy Cline Date: Sat, 30 Sep 2017 11:22:37 -0400 Subject: Unfreeze event before serializing with ujson In newer versions of https://github.com/esnme/ultrajson, ujson does not serialize frozendicts (introduced in esnme/ultrajson@53f85b1). Although the PyPI version is still 1.35, Fedora ships with a build from commit esnme/ultrajson@2f1d487. This causes the serialization to fail if the distribution-provided package is used. This runs the event through the unfreeze utility before serializing it. Thanks to @ignatenkobrain for tracking down the root cause. fixes #2351 Signed-off-by: Jeremy Cline --- synapse/handlers/message.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index be4f123c54..fe9d8848bc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -26,6 +26,7 @@ from synapse.types import ( from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter from synapse.util.logcontext import preserve_fn from synapse.util.metrics import measure_func +from synapse.util.frozenutils import unfreeze from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -503,7 +504,7 @@ class MessageHandler(BaseHandler): # Ensure that we can round trip before trying to persist in db try: - dump = ujson.dumps(event.content) + dump = ujson.dumps(unfreeze(event.content)) ujson.loads(dump) except: logger.exception("Failed to encode content: %r", event.content) -- cgit 1.4.1 From 30848c0fcd34aaf0b2db7b65c91648ae49c480a2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Oct 2017 11:09:51 +0100 Subject: Ignore incoming events for rooms that we have left When synapse receives an event for a room its not in over federation, it double checks with the remote server to see if it is in fact in the room. This is done so that if the server has forgotten about the room (usually as a result of the database being dropped) it can recover from it. However, in the presence of state resets in large rooms, this can cause a lot of work for servers that have legitimately left. As a hacky solution that supports both cases we drop incoming events for rooms that we have explicitly left. This means that we no longer support the case of servers having forgotten that they've rejoined a room, but that is sufficiently rare that we're not going to support it for now. --- synapse/handlers/federation.py | 23 +++++++++++++++++++++++ synapse/storage/roommember.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 18f87cad67..b160ff1684 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -125,6 +125,29 @@ class FederationHandler(BaseHandler): self.room_queues[pdu.room_id].append((pdu, origin)) return + # If we're no longer in the room just ditch the event entirely. This + # is probably an old server that has come back and thinks we're still + # in the room. + # + # If we were never in the room then maybe our database got vaped and + # we should check if we *are* in fact in the room. If we are then we + # can magically rejoin the room. + is_in_room = yield self.auth.check_host_in_room( + pdu.room_id, + self.server_name + ) + if not is_in_room: + was_in_room = yield self.store.was_host_joined( + pdu.room_id, self.server_name, + + ) + if was_in_room: + logger.info( + "Ignoring PDU %s for room %s from %s as we've left the room!", + pdu.event_id, pdu.room_id, origin, + ) + return + state = None auth_chain = [] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 457ca288d0..cb0791e591 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -533,6 +533,38 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(True) + @cachedInlineCallbacks() + def was_host_joined(self, room_id, host): + """Check whether the server is or ever was in the room. + """ + if '%' in host or '_' in host: + raise Exception("Invalid host name") + + sql = """ + SELECT user_id FROM room_memberships + WHERE room_id = ? + AND user_id LIKE ? + AND membership = 'join' + LIMIT 1 + """ + + # We do need to be careful to ensure that host doesn't have any wild cards + # in it, but we checked above for known ones and we'll check below that + # the returned user actually has the correct domain. + like_clause = "%:" + host + + rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause) + + if not rows: + defer.returnValue(False) + + user_id = rows[0][0] + if get_domain_from_id(user_id) != host: + # This can only happen if the host name has something funky in it + raise Exception("Invalid host name") + + defer.returnValue(True) + def get_joined_hosts(self, room_id, state_entry): state_group = state_entry.state_group if not state_group: -- cgit 1.4.1 From f2da6df568178aa542209f2b9413f4fcff5484fd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Oct 2017 11:31:06 +0100 Subject: Remove spurious line feed --- synapse/handlers/federation.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b160ff1684..7456b23005 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -139,7 +139,6 @@ class FederationHandler(BaseHandler): if not is_in_room: was_in_room = yield self.store.was_host_joined( pdu.room_id, self.server_name, - ) if was_in_room: logger.info( -- cgit 1.4.1 From 84716d267c6d93cfe759e8da336efb3136dc1560 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 3 Oct 2017 13:53:09 +0100 Subject: Allow spam checker to reject invites too --- synapse/handlers/federation.py | 4 ++++ synapse/handlers/room_member.py | 20 ++++++++++++++------ 2 files changed, 18 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 18f87cad67..32078fde3c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -77,6 +77,7 @@ class FederationHandler(BaseHandler): self.action_generator = hs.get_action_generator() self.is_mine_id = hs.is_mine_id self.pusher_pool = hs.get_pusherpool() + self.spam_checker = hs.get_spam_checker() self.replication_layer.set_handler(self) @@ -1077,6 +1078,9 @@ class FederationHandler(BaseHandler): if self.hs.config.block_non_admin_invites: raise SynapseError(403, "This server does not accept room invites") + if not self.spam_checker.user_may_invite(requester.user): + raise SynapseError(403, "This user is not permitted to send invites to this server") + membership = event.content.get("membership") if event.type != EventTypes.Member or membership != Membership.INVITE: raise SynapseError(400, "The event was not an m.room.member invite event") diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 9a498c2d3e..61b0140e69 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -48,6 +48,7 @@ class RoomMemberHandler(BaseHandler): self.member_linearizer = Linearizer(name="member") self.clock = hs.get_clock() + self.spam_checker = hs.get_spam_checker() self.distributor = hs.get_distributor() self.distributor.declare("user_joined_room") @@ -210,12 +211,19 @@ class RoomMemberHandler(BaseHandler): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - if (effective_membership_state == "invite" and - self.hs.config.block_non_admin_invites): - is_requester_admin = yield self.auth.is_server_admin( - requester.user, - ) - if not is_requester_admin: + if effective_membership_state == "invite": + block_invite = False + if self.hs.config.block_non_admin_invites: + is_requester_admin = yield self.auth.is_server_admin( + requester.user, + ) + if not is_requester_admin: + block_invite = True + + if not self.spam_checker.user_may_invite(requester.user): + block_invite = True + + if block_invite: raise SynapseError( 403, "Invites have been disabled on this server", ) -- cgit 1.4.1 From 2a7ed700d51f0a81f563298c78cd4566994ddbab Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 3 Oct 2017 14:04:10 +0100 Subject: Fix param name & lint --- synapse/handlers/federation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 32078fde3c..8571350cc8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1078,8 +1078,10 @@ class FederationHandler(BaseHandler): if self.hs.config.block_non_admin_invites: raise SynapseError(403, "This server does not accept room invites") - if not self.spam_checker.user_may_invite(requester.user): - raise SynapseError(403, "This user is not permitted to send invites to this server") + if not self.spam_checker.user_may_invite(event.sender): + raise SynapseError( + 403, "This user is not permitted to send invites to this server" + ) membership = event.content.get("membership") if event.type != EventTypes.Member or membership != Membership.INVITE: -- cgit 1.4.1 From e4ab96021e84ad9cccb2c3e0dea6347cce4e6149 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Oct 2017 14:10:21 +0100 Subject: Update comments --- synapse/handlers/federation.py | 2 +- synapse/storage/roommember.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7456b23005..77dd0ae1e2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -127,7 +127,7 @@ class FederationHandler(BaseHandler): # If we're no longer in the room just ditch the event entirely. This # is probably an old server that has come back and thinks we're still - # in the room. + # in the room (or we've been rejoined to the room by a state reset). # # If we were never in the room then maybe our database got vaped and # we should check if we *are* in fact in the room. If we are then we diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index cb0791e591..63f6115ba9 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -536,6 +536,13 @@ class RoomMemberStore(SQLBaseStore): @cachedInlineCallbacks() def was_host_joined(self, room_id, host): """Check whether the server is or ever was in the room. + + Args: + room_id (str) + host (str) + + Returns: + bool: whether the host is/was in the room or not """ if '%' in host or '_' in host: raise Exception("Invalid host name") -- cgit 1.4.1 From 41fd9989a28cfd6cc0b401677be61270f3959cfa Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 3 Oct 2017 14:17:44 +0100 Subject: Skip spam check for admin users --- synapse/handlers/room_member.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 61b0140e69..e88ba0e3a6 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -213,16 +213,16 @@ class RoomMemberHandler(BaseHandler): if effective_membership_state == "invite": block_invite = False - if self.hs.config.block_non_admin_invites: - is_requester_admin = yield self.auth.is_server_admin( - requester.user, - ) - if not is_requester_admin: + is_requester_admin = yield self.auth.is_server_admin( + requester.user, + ) + if not is_requester_admin: + if ( + self.hs.config.block_non_admin_invites or + not self.spam_checker.user_may_invite(requester.user) + ): block_invite = True - if not self.spam_checker.user_may_invite(requester.user): - block_invite = True - if block_invite: raise SynapseError( 403, "Invites have been disabled on this server", -- cgit 1.4.1 From bd769a81e12ea710accdebbaa296db1c1a625f75 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 3 Oct 2017 15:16:40 +0100 Subject: better logging --- synapse/handlers/room_member.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e88ba0e3a6..76e46d93fe 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -217,10 +217,15 @@ class RoomMemberHandler(BaseHandler): requester.user, ) if not is_requester_admin: - if ( - self.hs.config.block_non_admin_invites or - not self.spam_checker.user_may_invite(requester.user) - ): + if self.hs.config.block_non_admin_invites: + logger.debug( + "Blocking invite: user is not admin and non-admin " + "invites disabled" + ) + block_invite = True + + if not self.spam_checker.user_may_invite(requester.user): + logger.debug("Blocking invite due to spam checker") block_invite = True if block_invite: -- cgit 1.4.1 From c46a0d7eb4704c6532a611040a591633dac02b1a Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 3 Oct 2017 15:20:14 +0100 Subject: this shouldn't be debug --- synapse/handlers/room_member.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 76e46d93fe..77e5b95e8a 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -218,14 +218,14 @@ class RoomMemberHandler(BaseHandler): ) if not is_requester_admin: if self.hs.config.block_non_admin_invites: - logger.debug( + logger.info( "Blocking invite: user is not admin and non-admin " "invites disabled" ) block_invite = True if not self.spam_checker.user_may_invite(requester.user): - logger.debug("Blocking invite due to spam checker") + logger.info("Blocking invite due to spam checker") block_invite = True if block_invite: -- cgit 1.4.1 From c2c188b699e555376912dfea49c42b02c4168270 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 3 Oct 2017 15:46:19 +0100 Subject: Federation was passing strings anyway so pass string everywhere --- synapse/handlers/room_member.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 77e5b95e8a..a33a8ad42b 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -224,7 +224,7 @@ class RoomMemberHandler(BaseHandler): ) block_invite = True - if not self.spam_checker.user_may_invite(requester.user): + if not self.spam_checker.user_may_invite(requester.user.to_string()): logger.info("Blocking invite due to spam checker") block_invite = True -- cgit 1.4.1 From 1e375468de914fdefc7c0b4b65217c4ec95784a4 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 3 Oct 2017 17:13:14 +0100 Subject: pass room id too --- synapse/events/spamcheck.py | 4 ++-- synapse/handlers/federation.py | 2 +- synapse/handlers/room_member.py | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 605261f4b5..fe2d22a6f2 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -46,7 +46,7 @@ class SpamChecker(object): return self.spam_checker.check_event_for_spam(event) - def user_may_invite(self, userid): + def user_may_invite(self, userid, roomid): """Checks if a given user may send an invite If this method returns false, the invite will be rejected. @@ -60,4 +60,4 @@ class SpamChecker(object): if self.spam_checker is None: return True - return self.spam_checker.user_may_invite(userid) + return self.spam_checker.user_may_invite(userid, roomid) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8571350cc8..737fe518ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1078,7 +1078,7 @@ class FederationHandler(BaseHandler): if self.hs.config.block_non_admin_invites: raise SynapseError(403, "This server does not accept room invites") - if not self.spam_checker.user_may_invite(event.sender): + if not self.spam_checker.user_may_invite(event.sender, event.room_id): raise SynapseError( 403, "This user is not permitted to send invites to this server" ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index a33a8ad42b..37985fa1f9 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -224,7 +224,9 @@ class RoomMemberHandler(BaseHandler): ) block_invite = True - if not self.spam_checker.user_may_invite(requester.user.to_string()): + if not self.spam_checker.user_may_invite( + requester.user.to_string(), room_id, + ): logger.info("Blocking invite due to spam checker") block_invite = True -- cgit 1.4.1 From 197c14dbcfa9bc5bb281833a91ee035cb154216d Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 4 Oct 2017 10:47:54 +0100 Subject: Add room creation checks to spam checker Lets the spam checker deny attempts to create rooms and add aliases to them. --- synapse/events/spamcheck.py | 32 ++++++++++++++++++++++++++++++++ synapse/handlers/directory.py | 7 +++++++ synapse/handlers/room.py | 8 ++++++++ 3 files changed, 47 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 8b01c091e9..7cb3468df4 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -61,3 +61,35 @@ class SpamChecker(object): return True return self.spam_checker.user_may_invite(userid, room_id) + + def user_may_create_room(self, userid): + """Checks if a given user may create a room + + If this method returns false, the creation request will be rejected. + + Args: + userid (string): The sender's user ID + + Returns: + bool: True if the user may create a room, otherwise False + """ + if self.spam_checker is None: + return True + + return self.spam_checker.user_may_create_room(userid) + + def user_may_create_room_alias(self, userid, room_alias): + """Checks if a given user may create a room alias + + If this method returns false, the association request will be rejected. + + Args: + userid (string): The sender's user ID + + Returns: + bool: True if the user may create a room alias, otherwise False + """ + if self.spam_checker is None: + return True + + return self.spam_checker.user_may_create_room_alias(userid, room_alias) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 943554ce98..ed18bb20bb 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -40,6 +40,8 @@ class DirectoryHandler(BaseHandler): "directory", self.on_directory_query ) + self.spam_checker = hs.get_spam_checker() + @defer.inlineCallbacks def _create_association(self, room_alias, room_id, servers=None, creator=None): # general association creation for both human users and app services @@ -73,6 +75,11 @@ class DirectoryHandler(BaseHandler): # association creation for human users # TODO(erikj): Do user auth. + if not self.spam_checker.user_may_create_room_alias(user_id, room_alias): + raise SynapseError( + 403, "This user is not permitted to create this alias", + ) + can_create = yield self.can_modify_alias( room_alias, user_id=user_id diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5698d28088..f909ea04f0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -60,6 +60,11 @@ class RoomCreationHandler(BaseHandler): }, } + def __init__(self, hs): + super(RoomCreationHandler, self).__init__(hs) + + self.spam_checker = hs.get_spam_checker() + @defer.inlineCallbacks def create_room(self, requester, config, ratelimit=True): """ Creates a new room. @@ -75,6 +80,9 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() + if not self.spam_checker.user_may_create_room(user_id): + raise SynapseError(403, "You are not permitted to create rooms") + if ratelimit: yield self.ratelimit(requester) -- cgit 1.4.1 From 78d4ced82941ba249b7b16ea72684ade69c6a0d2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 4 Oct 2017 12:44:27 +0100 Subject: un-double indent --- synapse/handlers/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f909ea04f0..535ba9517c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -81,7 +81,7 @@ class RoomCreationHandler(BaseHandler): user_id = requester.user.to_string() if not self.spam_checker.user_may_create_room(user_id): - raise SynapseError(403, "You are not permitted to create rooms") + raise SynapseError(403, "You are not permitted to create rooms") if ratelimit: yield self.ratelimit(requester) -- cgit 1.4.1 From d8ce68b09b0966330b4da720eeb41719c7c61be6 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 4 Oct 2017 14:29:33 +0100 Subject: spam check room publishing --- synapse/events/spamcheck.py | 18 ++++++++++++++++++ synapse/handlers/directory.py | 8 ++++++++ 2 files changed, 26 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 7cb3468df4..595b1760f8 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -85,6 +85,7 @@ class SpamChecker(object): Args: userid (string): The sender's user ID + room_alias (string): The alias to be created Returns: bool: True if the user may create a room alias, otherwise False @@ -93,3 +94,20 @@ class SpamChecker(object): return True return self.spam_checker.user_may_create_room_alias(userid, room_alias) + + def user_may_publish_room(self, userid, room_id): + """Checks if a given user may publish a room to the directory + + If this method returns false, the publish request will be rejected. + + Args: + userid (string): The sender's user ID + room_id (string): The ID of the room that would be published + + Returns: + bool: True if the user may publish the room, otherwise False + """ + if self.spam_checker is None: + return True + + return self.spam_checker.user_may_publish_room(userid, room_id) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index ed18bb20bb..a0464ae5c0 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -334,6 +334,14 @@ class DirectoryHandler(BaseHandler): room_id (str) visibility (str): "public" or "private" """ + if not self.spam_checker.user_may_publish_room( + requester.user.to_string(), room_id + ): + raise AuthError( + 403, + "This user is not permitted to publish rooms to the room list" + ) + if requester.is_guest: raise AuthError(403, "Guests cannot edit the published room list") -- cgit 1.4.1 From f878e6f8af9e80cfa4be717c03cc4f9853a93794 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 5 Oct 2017 14:02:28 +0100 Subject: Spam checking: add the invitee to user_may_invite --- synapse/events/spamcheck.py | 4 ++-- synapse/handlers/federation.py | 12 +++++++----- synapse/handlers/room_member.py | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 595b1760f8..dccc579eac 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -46,7 +46,7 @@ class SpamChecker(object): return self.spam_checker.check_event_for_spam(event) - def user_may_invite(self, userid, room_id): + def user_may_invite(self, inviter_userid, invitee_userid, room_id): """Checks if a given user may send an invite If this method returns false, the invite will be rejected. @@ -60,7 +60,7 @@ class SpamChecker(object): if self.spam_checker is None: return True - return self.spam_checker.user_may_invite(userid, room_id) + return self.spam_checker.user_may_invite(inviter_userid, invitee_userid, room_id) def user_may_create_room(self, userid): """Checks if a given user may create a room diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 737fe518ef..8fccf8bab3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1071,6 +1071,9 @@ class FederationHandler(BaseHandler): """ event = pdu + if event.state_key is None: + raise SynapseError(400, "The invite event did not have a state key") + is_blocked = yield self.store.is_room_blocked(event.room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") @@ -1078,9 +1081,11 @@ class FederationHandler(BaseHandler): if self.hs.config.block_non_admin_invites: raise SynapseError(403, "This server does not accept room invites") - if not self.spam_checker.user_may_invite(event.sender, event.room_id): + if not self.spam_checker.user_may_invite( + event.sender, event.state_key, event.room_id, + ): raise SynapseError( - 403, "This user is not permitted to send invites to this server" + 403, "This user is not permitted to send invites to this server/user" ) membership = event.content.get("membership") @@ -1091,9 +1096,6 @@ class FederationHandler(BaseHandler): if sender_domain != origin: raise SynapseError(400, "The invite event was not from the server sending it") - if event.state_key is None: - raise SynapseError(400, "The invite event did not have a state key") - if not self.is_mine_id(event.state_key): raise SynapseError(400, "The invite event must be for this server") diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 37985fa1f9..36a8ef8ce0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -225,7 +225,7 @@ class RoomMemberHandler(BaseHandler): block_invite = True if not self.spam_checker.user_may_invite( - requester.user.to_string(), room_id, + requester.user.to_string(), target.to_string(), room_id, ): logger.info("Blocking invite due to spam checker") block_invite = True -- cgit 1.4.1 From c8f568ddf9e8827d8971e14137663fa8df5b57d2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 22:14:24 +0100 Subject: Fix up deferred handling in federation.py * Avoid preserve_context_over_deferred, which is broken * set consumeErrors=True on defer.gatherResults, to avoid spurious "unhandled failure" erros --- synapse/handlers/federation.py | 45 ++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 24 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8fccf8bab3..63c56a4a32 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,7 +14,6 @@ # limitations under the License. """Contains handlers for federation events.""" -import synapse.util.logcontext from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -26,10 +25,7 @@ from synapse.api.errors import ( ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator -from synapse.util import unwrapFirstError -from synapse.util.logcontext import ( - preserve_fn, preserve_context_over_deferred -) +from synapse.util import unwrapFirstError, logcontext from synapse.util.metrics import measure_func from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor, Linearizer @@ -592,9 +588,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield preserve_context_over_deferred(defer.gatherResults( + results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.replication_layer.get_pdu)( + logcontext.preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -786,10 +782,14 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") - states = yield preserve_context_over_deferred(defer.gatherResults([ - preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) - for e in event_ids - ])) + states = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.preserve_fn(self.state_handler.resolve_state_groups)( + room_id, [e] + ) + for e in event_ids + ], consumeErrors=True, + )) states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( @@ -942,9 +942,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( - room_queue - ) + logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) defer.returnValue(True) @@ -1438,7 +1436,7 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( event_stream_id, max_stream_id ) @@ -1451,16 +1449,16 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield preserve_context_over_deferred(defer.gatherResults( + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._prep_event)( + logcontext.preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) for ev_info in event_infos - ] + ], consumeErrors=True, )) yield self.store.persist_events( @@ -1768,18 +1766,17 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield preserve_context_over_deferred(defer.gatherResults( - [ - preserve_fn(self.store.get_event)( + different_events = yield logcontext.make_deferred_yieldable( + defer.gatherResults([ + logcontext.preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, ) for d in different_auth if d in have_events and not have_events[d] - ], - consumeErrors=True - )).addErrback(unwrapFirstError) + ], consumeErrors=True) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) -- cgit 1.4.1 From 4ce43792350f0df432df25006c1bdd78c08647e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Oct 2017 14:11:43 +0100 Subject: Fix attestations to check correct server name --- synapse/handlers/groups_local.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index a2bacbfc38..50e40548c2 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -102,6 +102,8 @@ class GroupsLocalHandler(object): get_domain_from_id(group_id), group_id, requester_user_id, ) + group_server_name = get_domain_from_id(group_id) + # Loop through the users and validate the attestations. chunk = res["users_section"]["users"] valid_users = [] @@ -109,11 +111,13 @@ class GroupsLocalHandler(object): g_user_id = entry["user_id"] attestation = entry.pop("attestation") try: - yield self.attestations.verify_attestation( - attestation, - group_id=group_id, - user_id=g_user_id, - ) + if get_domain_from_id(g_user_id) != group_server_name: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + server_name=get_domain_from_id(g_user_id), + ) valid_users.append(entry) except Exception as e: logger.info("Failed to verify user is in group: %s", e) @@ -160,6 +164,7 @@ class GroupsLocalHandler(object): remote_attestation, group_id=group_id, user_id=user_id, + server_name=get_domain_from_id(group_id), ) is_publicised = content.get("publicise", False) @@ -187,6 +192,8 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) + group_server_name = get_domain_from_id(group_id) + res = yield self.transport_client.get_users_in_group( get_domain_from_id(group_id), group_id, requester_user_id, ) @@ -197,11 +204,13 @@ class GroupsLocalHandler(object): g_user_id = entry["user_id"] attestation = entry.pop("attestation") try: - yield self.attestations.verify_attestation( - attestation, - group_id=group_id, - user_id=g_user_id, - ) + if get_domain_from_id(g_user_id) != group_server_name: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + server_name=get_domain_from_id(g_user_id), + ) valid_entries.append(entry) except Exception as e: logger.info("Failed to verify user is in group: %s", e) @@ -240,6 +249,7 @@ class GroupsLocalHandler(object): remote_attestation, group_id=group_id, user_id=user_id, + server_name=get_domain_from_id(group_id), ) # TODO: Check that the group is public and we're being added publically -- cgit 1.4.1 From c3e190ce671ed37a31a1543f42e26277fea582ce Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 11 Oct 2017 14:37:20 +0100 Subject: fix a logcontext leak in read receipt handling --- synapse/handlers/receipts.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index e1cd3a48e9..0525765272 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util import logcontext from ._base import BaseHandler @@ -59,6 +60,8 @@ class ReceiptsHandler(BaseHandler): is_new = yield self._handle_new_receipts([receipt]) if is_new: + # fire off a process in the background to send the receipt to + # remote servers self._push_remotes([receipt]) @defer.inlineCallbacks @@ -126,6 +129,7 @@ class ReceiptsHandler(BaseHandler): defer.returnValue(True) + @logcontext.preserve_fn # caller should not yield on this @defer.inlineCallbacks def _push_remotes(self, receipts): """Given a list of receipts, works out which remote servers should be -- cgit 1.4.1 From c3b7a45e84f0aaf45e671eef295993e3d09d6908 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 11 Oct 2017 14:39:22 +0100 Subject: Allow error strings from spam checker --- synapse/handlers/message.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fbf88b46ef..06672df3bc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -325,9 +326,12 @@ class MessageHandler(BaseHandler): txn_id=txn_id ) - if self.spam_checker.check_event_for_spam(event): + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, (str, basestring)): + spam_error = "Spam is not permitted here" raise SynapseError( - 403, "Spam is not permitted here", Codes.FORBIDDEN + 403, spam_error, Codes.FORBIDDEN ) yield self.send_nonmember_event( -- cgit 1.4.1 From b78bae2d51bac7e1f75365d85cd67402adb5f408 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 11 Oct 2017 14:49:09 +0100 Subject: fix isinstance --- synapse/handlers/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 06672df3bc..28792788d9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -328,7 +328,7 @@ class MessageHandler(BaseHandler): spam_error = self.spam_checker.check_event_for_spam(event) if spam_error: - if not isinstance(spam_error, (str, basestring)): + if not isinstance(spam_error, basestring): spam_error = "Spam is not permitted here" raise SynapseError( 403, spam_error, Codes.FORBIDDEN -- cgit 1.4.1 From b752507b488d223a0ab01ec3dbc7b30fee64c203 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Oct 2017 16:54:36 +0100 Subject: Fix fetching remote summaries --- synapse/handlers/groups_local.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 50e40548c2..3b676d46bd 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -109,7 +109,7 @@ class GroupsLocalHandler(object): valid_users = [] for entry in chunk: g_user_id = entry["user_id"] - attestation = entry.pop("attestation") + attestation = entry.pop("attestation", {}) try: if get_domain_from_id(g_user_id) != group_server_name: yield self.attestations.verify_attestation( @@ -202,7 +202,7 @@ class GroupsLocalHandler(object): valid_entries = [] for entry in chunk: g_user_id = entry["user_id"] - attestation = entry.pop("attestation") + attestation = entry.pop("attestation", {}) try: if get_domain_from_id(g_user_id) != group_server_name: yield self.attestations.verify_attestation( -- cgit 1.4.1 From 6079d0027aa79a6e2e41254ceda42206e307add7 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 14:20:45 +0100 Subject: Log a warning when no profile for invited member And return empty profile --- synapse/handlers/groups_local.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 3b676d46bd..97a20f2b04 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -313,8 +313,11 @@ class GroupsLocalHandler(object): self.notifier.on_new_event( "groups_key", token, users=[user_id], ) - - user_profile = yield self.profile_handler.get_profile(user_id) + try: + user_profile = yield self.profile_handler.get_profile(user_id) + except Exception as e: + logger.warn("No profile for user %s: %s", user_id, e) + user_profile = {} defer.returnValue({"state": "invite", "user_profile": user_profile}) -- cgit 1.4.1 From 2c5972f87f0541aaeff43846f7050ab91d11cf0e Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 15:31:11 +0100 Subject: Implement GET /groups/$groupId/invited_users --- synapse/federation/transport/client.py | 13 +++++++++++++ synapse/federation/transport/server.py | 18 ++++++++++++++++- synapse/groups/groups_server.py | 35 ++++++++++++++++++++++++++++++++++ synapse/handlers/groups_local.py | 17 +++++++++++++++++ synapse/rest/client/v2_alpha/groups.py | 21 ++++++++++++++++++++ synapse/storage/group_server.py | 12 ++++++++++++ 6 files changed, 115 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f96561c1fe..125d8f3598 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -550,6 +550,19 @@ class TransportLayerClient(object): ignore_backoff=True, ) + @log_function + def get_invited_users_in_group(self, destination, group_id, requester_user_id): + """Get users that have been invited to a group + """ + path = PREFIX + "/groups/%s/invited_users" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + @log_function def accept_group_invite(self, destination, group_id, user_id, content): """Accept a group invite diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index c7565e0737..625a2fe27f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -720,6 +720,22 @@ class FederationGroupsUsersServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsInvitedUsersServlet(BaseFederationServlet): + """Get the users that have been invited to a group + """ + PATH = "/groups/(?P[^/]*)/invited_users$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_invited_users_in_group( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) class FederationGroupsInviteServlet(BaseFederationServlet): """Ask a group server to invite someone to the group @@ -1109,12 +1125,12 @@ ROOM_LIST_CLASSES = ( PublicRoomList, ) - GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsProfileServlet, FederationGroupsSummaryServlet, FederationGroupsRoomsServlet, FederationGroupsUsersServlet, + FederationGroupsInvitedUsersServlet, FederationGroupsInviteServlet, FederationGroupsAcceptInviteServlet, FederationGroupsRemoveUserServlet, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 1083bc2990..bfa46b7cb2 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -420,6 +420,41 @@ class GroupsServerHandler(object): "total_user_count_estimate": len(user_results), }) + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get the users that have been invited to a group as seen by requester_user_id. + + The ordering is arbitrary at the moment + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) + + if not is_user_in_group: + raise SynapseError(403, "User not in group") + + invited_users = yield self.store.get_invited_users_in_group(group_id) + + user_profiles = [] + + for user_id in invited_users: + user_profile = { + "user_id": user_id + } + try: + profile = yield self.profile_handler.get_profile_from_cache(user) + user_profile.update(profile) + except Exception as e: + pass + user_profiles.append(user_profile) + + defer.returnValue({ + "chunk": user_profiles, + "total_user_count_estimate": len(invited_users), + }) + + @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): """Get the rooms in group as seen by requester_user_id diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 97a20f2b04..5263e769bb 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -219,6 +219,23 @@ class GroupsLocalHandler(object): defer.returnValue(res) + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get users invited to a group + """ + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_invited_users_in_group( + group_id, requester_user_id + ) + defer.returnValue(res) + + group_server_name = get_domain_from_id(group_id) + + res = yield self.transport_client.get_users_in_group( + get_domain_from_id(group_id), group_id, requester_user_id, + ) + defer.returnValue(res) + @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 8f3ce15b02..4532112cfc 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -370,6 +370,26 @@ class GroupUsersServlet(RestServlet): defer.returnValue((200, result)) +class GroupInvitedUsersServlet(RestServlet): + """Get users invited to a group + """ + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/invited_users$") + + def __init__(self, hs): + super(GroupInvitedUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_invited_users_in_group(group_id, user_id) + + defer.returnValue((200, result)) + class GroupCreateServlet(RestServlet): """Create a group @@ -674,6 +694,7 @@ class GroupsForUserServlet(RestServlet): def register_servlets(hs, http_server): GroupServlet(hs).register(http_server) GroupSummaryServlet(hs).register(http_server) + GroupInvitedUsersServlet(hs).register(http_server) GroupUsersServlet(hs).register(http_server) GroupRoomServlet(hs).register(http_server) GroupCreateServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 3af372de59..9e63db5c6c 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -56,6 +56,18 @@ class GroupServerStore(SQLBaseStore): desc="get_users_in_group", ) + def get_invited_users_in_group(self, group_id): + # TODO: Pagination + + return self._simple_select_onecol( + table="group_invites", + keyvalues={ + "group_id": group_id, + }, + retcol="user_id", + desc="get_invited_users_in_group", + ) + def get_rooms_in_group(self, group_id, include_private=False): # TODO: Pagination -- cgit 1.4.1 From a3ac4f6b0ad9cce172678fa87c10a1100d16236f Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 15:41:03 +0100 Subject: _create_rererouter for get_invited_users_in_group --- synapse/handlers/groups_local.py | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 5263e769bb..6699d0888f 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -68,6 +68,8 @@ class GroupsLocalHandler(object): update_group_profile = _create_rerouter("update_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") + get_invited_users_in_group = _create_rerouter("get_invited_users_in_group") + add_room_to_group = _create_rerouter("add_room_to_group") remove_room_from_group = _create_rerouter("remove_room_from_group") @@ -219,23 +221,6 @@ class GroupsLocalHandler(object): defer.returnValue(res) - @defer.inlineCallbacks - def get_invited_users_in_group(self, group_id, requester_user_id): - """Get users invited to a group - """ - if self.is_mine_id(group_id): - res = yield self.groups_server_handler.get_invited_users_in_group( - group_id, requester_user_id - ) - defer.returnValue(res) - - group_server_name = get_domain_from_id(group_id) - - res = yield self.transport_client.get_users_in_group( - get_domain_from_id(group_id), group_id, requester_user_id, - ) - defer.returnValue(res) - @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group -- cgit 1.4.1