From 79452edeee94a09a826ee2b41a08811b823a3ad6 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 28 Mar 2018 14:03:37 +0100 Subject: Add joinability for groups Adds API to set the 'joinable' flag, and corresponding flag in the table. --- synapse/federation/transport/server.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a66a6b0692..107deb4e1e 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1124,6 +1125,24 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): defer.returnValue((200, resp)) +class FederationGroupsJoinableServlet(BaseFederationServlet): + """Sets whether a group is joinable without an invite or knock + """ + PATH = "/groups/(?P[^/]*)/joinable$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.set_group_joinable( + group_id, requester_user_id, content + ) + + defer.returnValue((200, new_content)) + + FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -1172,6 +1191,7 @@ GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsSummaryUsersServlet, FederationGroupsAddRoomsServlet, FederationGroupsAddRoomsConfigServlet, + FederationGroupsJoinableServlet, ) -- cgit 1.4.1 From eb8d8d6f57c7f6017548aa95409bb8cc346a5ae0 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 3 Apr 2018 15:40:43 +0100 Subject: Use join_policy API instead of joinable The API is now under /groups/$group_id/setting/m.join_policy and expects a JSON blob of the shape ```json { "m.join_policy": { "type": "invite" } } ``` where "invite" could alternatively be "open". --- synapse/federation/transport/client.py | 4 +-- synapse/federation/transport/server.py | 8 ++--- synapse/groups/groups_server.py | 41 ++++++++++++++++++---- synapse/handlers/groups_local.py | 2 +- synapse/rest/client/v2_alpha/groups.py | 12 +++---- synapse/storage/group_server.py | 6 ++-- .../storage/schema/delta/48/groups_joinable.sql | 8 ++++- 7 files changed, 58 insertions(+), 23 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 5a6b63350b..0f7f656824 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -860,9 +860,9 @@ class TransportLayerClient(object): @log_function def set_group_joinable(self, destination, group_id, requester_user_id, content): - """Sets whether a group is joinable without an invite or knock + """Sets the join policy for a group """ - path = PREFIX + "/groups/%s/joinable" % (group_id,) + path = PREFIX + "/groups/%s/setting/m.join_policy" % (group_id,) return self.client.post_json( destination=destination, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 107deb4e1e..a52d3948f4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1125,10 +1125,10 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): defer.returnValue((200, resp)) -class FederationGroupsJoinableServlet(BaseFederationServlet): +class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): """Sets whether a group is joinable without an invite or knock """ - PATH = "/groups/(?P[^/]*)/joinable$" + PATH = "/groups/(?P[^/]*)/setting/m.join_policy$" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id): @@ -1136,7 +1136,7 @@ class FederationGroupsJoinableServlet(BaseFederationServlet): if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.set_group_joinable( + new_content = yield self.handler.set_group_join_policy( group_id, requester_user_id, content ) @@ -1191,7 +1191,7 @@ GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsSummaryUsersServlet, FederationGroupsAddRoomsServlet, FederationGroupsAddRoomsConfigServlet, - FederationGroupsJoinableServlet, + FederationGroupsSettingJoinPolicyServlet, ) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 25cbfb1691..70781e1854 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -207,20 +207,24 @@ class GroupsServerHandler(object): defer.returnValue({}) @defer.inlineCallbacks - def set_group_joinable(self, group_id, requester_user_id, content): - """Sets whether a group is joinable without an invite or knock + def set_group_join_policy(self, group_id, requester_user_id, content): + """Sets the group join policy. + + Currently supported policies are: + - "invite": an invite must be received and accepted in order to join. + - "open": anyone can join. """ yield self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - is_joinable = content.get('joinable') - if is_joinable is None: + join_policy = _parse_join_policy_from_contents(content) + if join_policy is None: raise SynapseError( - 400, "No value specified for 'joinable'" + 400, "No value specified for 'm.join_policy'" ) - yield self.store.set_group_joinable(group_id, is_joinable=is_joinable) + yield self.store.set_group_join_policy(group_id, join_policy=join_policy) defer.returnValue({}) @@ -854,6 +858,31 @@ class GroupsServerHandler(object): }) +def _parse_join_policy_from_contents(content): + """Given a content for a request, return the specified join policy or None + """ + + join_policy_dict = content.get("m.join_policy") + if join_policy_dict: + return _parse_join_policy_dict(join_policy_dict) + else: + return None + + +def _parse_join_policy_dict(join_policy_dict): + """Given a dict for the "m.join_policy" config return the join policy specified + """ + join_policy_type = join_policy_dict.get("type") + if not join_policy_type: + return True + + if join_policy_type not in ("invite", "open"): + raise SynapseError( + 400, "Synapse only supports 'invite'/'open' join rule" + ) + return join_policy_type + + def _parse_visibility_from_contents(content): """Given a content for a request parse out whether the entity should be public or not diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index c9671b9046..5f7b0ff305 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -91,7 +91,7 @@ class GroupsLocalHandler(object): get_group_role = _create_rerouter("get_group_role") get_group_roles = _create_rerouter("get_group_roles") - set_group_joinable = _create_rerouter("set_group_joinable") + set_group_join_policy = _create_rerouter("set_group_join_policy") @defer.inlineCallbacks def get_group_summary(self, group_id, requester_user_id): diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index aa94130e57..8faaa1d6a0 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -402,13 +402,13 @@ class GroupInvitedUsersServlet(RestServlet): defer.returnValue((200, result)) -class GroupJoinableServlet(RestServlet): - """Set whether a group is joinable without an invite +class GroupSettingJoinPolicyServlet(RestServlet): + """Set group join policy """ - PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/joinable$") + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/setting/m.join_policy$") def __init__(self, hs): - super(GroupJoinableServlet, self).__init__() + super(GroupSettingJoinPolicyServlet, self).__init__() self.auth = hs.get_auth() self.groups_handler = hs.get_groups_local_handler() @@ -419,7 +419,7 @@ class GroupJoinableServlet(RestServlet): content = parse_json_object_from_request(request) - result = yield self.groups_handler.set_group_joinable( + result = yield self.groups_handler.set_group_join_policy( group_id, requester_user_id, content, @@ -765,7 +765,7 @@ def register_servlets(hs, http_server): GroupInvitedUsersServlet(hs).register(http_server) GroupUsersServlet(hs).register(http_server) GroupRoomServlet(hs).register(http_server) - GroupJoinableServlet(hs).register(http_server) + GroupSettingJoinPolicyServlet(hs).register(http_server) GroupCreateServlet(hs).register(http_server) GroupAdminRoomsServlet(hs).register(http_server) GroupAdminRoomsConfigServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 96553d4fb1..db66ea1eb0 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -30,16 +30,16 @@ _DEFAULT_ROLE_ID = "" class GroupServerStore(SQLBaseStore): - def set_group_joinable(self, group_id, is_joinable): + def set_group_join_policy(self, group_id, join_policy): return self._simple_update_one( table="groups", keyvalues={ "group_id": group_id, }, updatevalues={ - "is_joinable": is_joinable, + "join_policy": join_policy, }, - desc="set_group_joinable", + desc="set_group_join_policy", ) def get_group(self, group_id): diff --git a/synapse/storage/schema/delta/48/groups_joinable.sql b/synapse/storage/schema/delta/48/groups_joinable.sql index ace7d0a723..ab3b00286d 100644 --- a/synapse/storage/schema/delta/48/groups_joinable.sql +++ b/synapse/storage/schema/delta/48/groups_joinable.sql @@ -13,4 +13,10 @@ * limitations under the License. */ -ALTER TABLE groups ADD COLUMN is_joinable SMALLINT DEFAULT 0 NOT NULL; +/* + * This isn't a real ENUM because sqlite doesn't support it + * and we use a default of NULL for inserted rows and interpret + * NULL at the python store level as necessary so that existing + * rows are given the correct default policy. + */ +ALTER TABLE groups ADD COLUMN join_policy TEXT DEFAULT NULL; -- cgit 1.4.1 From 104c0bc1d5d1f2a487c50d63b22caa477b091976 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Thu, 5 Apr 2018 14:07:16 +0100 Subject: Use "/settings/" (plural) --- synapse/federation/transport/client.py | 2 +- synapse/federation/transport/server.py | 2 +- synapse/rest/client/v2_alpha/groups.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 0f7f656824..1fe162d55b 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -862,7 +862,7 @@ class TransportLayerClient(object): content): """Sets the join policy for a group """ - path = PREFIX + "/groups/%s/setting/m.join_policy" % (group_id,) + path = PREFIX + "/groups/%s/settings/m.join_policy" % (group_id,) return self.client.post_json( destination=destination, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a52d3948f4..3658ca75f3 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1128,7 +1128,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): """Sets whether a group is joinable without an invite or knock """ - PATH = "/groups/(?P[^/]*)/setting/m.join_policy$" + PATH = "/groups/(?P[^/]*)/settings/m.join_policy$" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id): diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 8faaa1d6a0..3bb1ec2af6 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -405,7 +405,7 @@ class GroupInvitedUsersServlet(RestServlet): class GroupSettingJoinPolicyServlet(RestServlet): """Set group join policy """ - PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/setting/m.join_policy$") + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/settings/m.join_policy$") def __init__(self, hs): super(GroupSettingJoinPolicyServlet, self).__init__() -- cgit 1.4.1 From 1d71f484d4ec00fd41e3ef195622d0d5dba6d372 Mon Sep 17 00:00:00 2001 From: Krombel Date: Fri, 6 Apr 2018 12:54:09 +0200 Subject: use PUT instead of POST for federating groups/m.join_policy --- synapse/federation/transport/client.py | 2 +- synapse/federation/transport/server.py | 2 +- synapse/http/matrixfederationclient.py | 6 +++++- 3 files changed, 7 insertions(+), 3 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 1fe162d55b..3beab47832 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -864,7 +864,7 @@ class TransportLayerClient(object): """ path = PREFIX + "/groups/%s/settings/m.join_policy" % (group_id,) - return self.client.post_json( + return self.client.put_json( destination=destination, path=path, args={"requester_user_id": requester_user_id}, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3658ca75f3..b98e30459c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1131,7 +1131,7 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): PATH = "/groups/(?P[^/]*)/settings/m.join_policy$" @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id): + def on_PUT(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 9145405cb0..60a29081e8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -286,7 +286,8 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers @defer.inlineCallbacks - def put_json(self, destination, path, data={}, json_data_callback=None, + def put_json(self, destination, path, args={}, data={}, + json_data_callback=None, long_retries=False, timeout=None, ignore_backoff=False, backoff_on_404=False): @@ -296,6 +297,7 @@ class MatrixFederationHttpClient(object): destination (str): The remote server to send the HTTP request to. path (str): The HTTP path. + args (dict): query params data (dict): A dict containing the data that will be used as the request body. This will be encoded as JSON. json_data_callback (callable): A callable returning the dict to @@ -342,6 +344,7 @@ class MatrixFederationHttpClient(object): path, body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, + query_bytes=encode_query_args(args), long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, @@ -373,6 +376,7 @@ class MatrixFederationHttpClient(object): giving up. None indicates no timeout. ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. + args (dict): query params Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. -- cgit 1.4.1 From b370fe61c0aeccac7745ad404dad925ec217ba6d Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 28 Mar 2018 17:18:02 +0100 Subject: Implement group join API --- synapse/federation/transport/client.py | 13 ++++++++++ synapse/federation/transport/server.py | 18 ++++++++++++++ synapse/groups/groups_server.py | 45 ++++++++++++++++++++++++++++++++++ synapse/handlers/groups_local.py | 40 +++++++++++++++++++++++++++++- synapse/storage/group_server.py | 12 ++++++--- 5 files changed, 124 insertions(+), 4 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 3beab47832..370f7ba78b 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -614,6 +614,19 @@ class TransportLayerClient(object): ignore_backoff=True, ) + @log_function + def join_group(self, destination, group_id, user_id, content): + """Attempts to join a group + """ + path = PREFIX + "/groups/%s/users/%s/join" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + @log_function def invite_to_group(self, destination, group_id, user_id, requester_user_id, content): """Invite a user to a group diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b98e30459c..4c94d5a36c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -803,6 +803,23 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsJoinServlet(BaseFederationServlet): + """Attempt to join a group + """ + PATH = "/groups/(?P[^/]*)/users/(?P[^/]*)/join$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(user_id) != origin: + raise SynapseError(403, "user_id doesn't match origin") + + new_content = yield self.handler.join_group( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsRemoveUserServlet(BaseFederationServlet): """Leave or kick a user from the group """ @@ -1182,6 +1199,7 @@ GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsInvitedUsersServlet, FederationGroupsInviteServlet, FederationGroupsAcceptInviteServlet, + FederationGroupsJoinServlet, FederationGroupsRemoveUserServlet, FederationGroupsSummaryRoomsServlet, FederationGroupsCategoriesServlet, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 70781e1854..77b6273e39 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -723,6 +723,51 @@ class GroupsServerHandler(object): "attestation": local_attestation, }) + @defer.inlineCallbacks + def join_group(self, group_id, requester_user_id, content): + """User tries to join the group. + + This will error if the group requires an invite/knock to join + """ + + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + group_info = yield self.store.get_group( + group_id, + ) + if not group_info['is_joinable']: + raise SynapseError(403, "Group is not publicly joinable") + + if not self.hs.is_mine_id(requester_user_id): + local_attestation = self.attestations.create_attestation( + group_id, requester_user_id, + ) + remote_attestation = content["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + user_id=requester_user_id, + group_id=group_id, + ) + else: + local_attestation = None + remote_attestation = None + + is_public = _parse_visibility_from_contents(content) + + yield self.store.add_user_to_group( + group_id, requester_user_id, + is_admin=False, + is_public=is_public, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + ) + + defer.returnValue({ + "state": "join", + "attestation": local_attestation, + }) + @defer.inlineCallbacks def knock(self, group_id, requester_user_id, content): """A user requests becoming a member of the group diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 5f7b0ff305..977993e7d4 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -229,7 +229,45 @@ class GroupsLocalHandler(object): def join_group(self, group_id, user_id, content): """Request to join a group """ - raise NotImplementedError() # TODO + if self.is_mine_id(group_id): + yield self.groups_server_handler.join_group( + group_id, user_id, content + ) + local_attestation = None + remote_attestation = None + else: + local_attestation = self.attestations.create_attestation(group_id, user_id) + content["attestation"] = local_attestation + + res = yield self.transport_client.join_group( + get_domain_from_id(group_id), group_id, user_id, content, + ) + + remote_attestation = res["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + group_id=group_id, + user_id=user_id, + server_name=get_domain_from_id(group_id), + ) + + # TODO: Check that the group is public and we're being added publically + is_publicised = content.get("publicise", False) + + token = yield self.store.register_user_group_membership( + group_id, user_id, + membership="join", + is_admin=False, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + is_publicised=is_publicised, + ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) + + defer.returnValue({}) @defer.inlineCallbacks def accept_invite(self, group_id, user_id, content): diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index db316a27ec..16c11a056f 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -48,19 +48,25 @@ class GroupServerStore(SQLBaseStore): desc="set_group_join_policy", ) + @defer.inlineCallbacks def get_group(self, group_id): - return self._simple_select_one( + ret = yield self._simple_select_one( table="groups", keyvalues={ "group_id": group_id, }, retcols=( - "name", "short_description", "long_description", "avatar_url", "is_public" + "name", "short_description", "long_description", "avatar_url", "is_public", "is_joinable", ), allow_none=True, - desc="is_user_in_group", + desc="get_group", ) + if ret and 'is_joinable' in ret: + ret['is_joinable'] = bool(ret['is_joinable']) + + defer.returnValue(ret) + def get_users_in_group(self, group_id, include_private=False): # TODO: Pagination -- cgit 1.4.1 From f8d46cad3c3b4318a6b63c55fe63f07d1ae91695 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 13 Apr 2018 15:41:52 +0100 Subject: correctly auth inbound federation_domain_whitelist reqs --- synapse/federation/transport/server.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 4c94d5a36c..d2a57d08d7 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -94,12 +94,6 @@ class Authenticator(object): "signatures": {}, } - if ( - self.federation_domain_whitelist is not None and - self.server_name not in self.federation_domain_whitelist - ): - raise FederationDeniedError(self.server_name) - if content is not None: json_request["content"] = content @@ -138,6 +132,12 @@ class Authenticator(object): json_request["origin"] = origin json_request["signatures"].setdefault(origin, {})[key] = sig + if ( + self.federation_domain_whitelist is not None and + origin not in self.federation_domain_whitelist + ): + raise FederationDeniedError(self.server_name) + if not json_request["signatures"]: raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, -- cgit 1.4.1 From 25b0ba30b1ffab9cb799bd8fc331581b7ff6f7aa Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 13 Apr 2018 15:46:37 +0100 Subject: revert last to PR properly --- synapse/federation/transport/server.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index d2a57d08d7..4c94d5a36c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -94,6 +94,12 @@ class Authenticator(object): "signatures": {}, } + if ( + self.federation_domain_whitelist is not None and + self.server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(self.server_name) + if content is not None: json_request["content"] = content @@ -132,12 +138,6 @@ class Authenticator(object): json_request["origin"] = origin json_request["signatures"].setdefault(origin, {})[key] = sig - if ( - self.federation_domain_whitelist is not None and - origin not in self.federation_domain_whitelist - ): - raise FederationDeniedError(self.server_name) - if not json_request["signatures"]: raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, -- cgit 1.4.1 From 78a9698650b7a96e2a7814c3732c3ff8aa5a2f0f Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 13 Apr 2018 15:47:43 +0100 Subject: fix federation_domain_whitelist we were checking the wrong server_name on inbound requests --- synapse/federation/transport/server.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 4c94d5a36c..ff0656df3e 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -94,12 +94,6 @@ class Authenticator(object): "signatures": {}, } - if ( - self.federation_domain_whitelist is not None and - self.server_name not in self.federation_domain_whitelist - ): - raise FederationDeniedError(self.server_name) - if content is not None: json_request["content"] = content @@ -138,6 +132,12 @@ class Authenticator(object): json_request["origin"] = origin json_request["signatures"].setdefault(origin, {})[key] = sig + if ( + self.federation_domain_whitelist is not None and + origin not in self.federation_domain_whitelist + ): + raise FederationDeniedError(origin) + if not json_request["signatures"]: raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, -- cgit 1.4.1 From 9255a6cb17716c022ebae1dbe9c142b78ca86ea7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:07:40 +0100 Subject: Improve exception handling for background processes There were a bunch of places where we fire off a process to happen in the background, but don't have any exception handling on it - instead relying on the unhandled error being logged when the relevent deferred gets garbage-collected. This is unsatisfactory for a number of reasons: - logging on garbage collection is best-effort and may happen some time after the error, if at all - it can be hard to figure out where the error actually happened. - it is logged as a scary CRITICAL error which (a) I always forget to grep for and (b) it's not really CRITICAL if a background process we don't care about fails. So this is an attempt to add exception handling to everything we fire off into the background. --- synapse/app/appservice.py | 15 +++-- synapse/app/federation_sender.py | 27 +++++---- synapse/app/pusher.py | 31 +++++----- synapse/app/synchrotron.py | 95 ++++++++++++++++--------------- synapse/app/user_dir.py | 13 ++++- synapse/appservice/scheduler.py | 25 ++++---- synapse/crypto/keyring.py | 93 +++++++++++++++--------------- synapse/federation/transaction_queue.py | 2 + synapse/federation/transport/server.py | 13 ++++- synapse/groups/attestations.py | 44 +++++++------- synapse/handlers/message.py | 22 +++++-- synapse/handlers/presence.py | 19 +++++-- synapse/handlers/receipts.py | 61 ++++++++++---------- synapse/handlers/typing.py | 43 +++++++------- synapse/notifier.py | 13 +++-- synapse/push/emailpusher.py | 11 ++-- synapse/push/httppusher.py | 5 +- synapse/rest/media/v1/storage_provider.py | 9 ++- synapse/storage/event_push_actions.py | 24 +++++--- synapse/util/logcontext.py | 7 ++- 20 files changed, 335 insertions(+), 237 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index f2540023a7..58f2c9d68c 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.appservice") @@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - preserve_fn( - self.appservice_handler.notify_interested_services - )(max_stream_id) + run_in_background(self._notify_app_services, max_stream_id) + + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") def start(config_options): diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..4f2a9ca21a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -237,19 +237,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): - self.federation_position = token - - # We linearize here to ensure we don't have races updating the token - with (yield self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + try: + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (yield self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position + except Exception: + logger.exception("Error updating federation stream position") if __name__ == '__main__': diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..739d113ad5 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -144,20 +144,23 @@ class PusherReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) - ) + try: + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, + ) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) + ) + except Exception: + logger.exception("Error poking pushers") def stop_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..777da564d7 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -340,55 +340,58 @@ class SyncReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def process_and_notify(self, stream_name, token, rows): - if stream_name == "events": - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - event = yield self.store.get_event(row.event_id) - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users + try: + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + event = yield self.store.get_event(row.event_id) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "push_rules": - self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], - ) - elif stream_name in ("account_data", "tag_account_data",): - self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], - ) - elif stream_name == "receipts": - self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "to_device": - entities = [row.entity for row in rows if row.entity.startswith("@")] - if entities: + elif stream_name in ("account_data", "tag_account_data",): self.notifier.on_new_event( - "to_device_key", token, users=entities, + "account_data_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "device_lists": - all_room_ids = set() - for row in rows: - room_ids = yield self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) - elif stream_name == "presence": - yield self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": - self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows], - ) + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event( + "to_device_key", token, users=entities, + ) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = yield self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event( + "device_list_key", token, rooms=all_room_ids, + ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) + except Exception: + logger.exception("Error processing replication") def start(config_options): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 5f845e80d1..5ba7e9b416 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.user_dir") @@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): stream_name, token, rows ) if stream_name == "current_state_deltas": - preserve_fn(self.user_directory.notify_new_event)() + run_in_background(self._notify_directory) + + @defer.inlineCallbacks + def _notify_directory(self): + try: + yield self.user_directory.notify_new_event() + except Exception: + logger.exception("Error notifiying user directory of state update") def start(config_options): diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..dfc8d1b42e 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -176,17 +176,20 @@ class _TransactionController(object): @defer.inlineCallbacks def _start_recoverer(self, service): - yield self.store.set_appservice_state( - service, - ApplicationServiceState.DOWN - ) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + try: + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + except Exception: + logger.exception("Error starting AS recoverer") @defer.inlineCallbacks def _is_service_up(self, service): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..32cbddbc53 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -146,53 +146,56 @@ class Keyring(object): verify_requests (List[VerifyKeyRequest]): """ - # create a deferred for each server we're going to look up the keys - # for; we'll resolve them once we have completed our lookups. - # These will be passed into wait_for_previous_lookups to block - # any other lookups until we have finished. - # The deferreds are called with no logcontext. - server_to_deferred = { - rq.server_name: defer.Deferred() - for rq in verify_requests - } - - # We want to wait for any previous lookups to complete before - # proceeding. - yield self.wait_for_previous_lookups( - [rq.server_name for rq in verify_requests], - server_to_deferred, - ) - - # Actually start fetching keys. - self._get_server_verify_keys(verify_requests) - - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - # - # map from server name to a set of request ids - server_to_request_ids = {} - - for verify_request in verify_requests: - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids.setdefault(server_name, set()).add(request_id) - - def remove_deferreds(res, verify_request): - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids[server_name].discard(request_id) - if not server_to_request_ids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) - return res - - for verify_request in verify_requests: - verify_request.deferred.addBoth( - remove_deferreds, verify_request, + try: + # create a deferred for each server we're going to look up the keys + # for; we'll resolve them once we have completed our lookups. + # These will be passed into wait_for_previous_lookups to block + # any other lookups until we have finished. + # The deferreds are called with no logcontext. + server_to_deferred = { + rq.server_name: defer.Deferred() + for rq in verify_requests + } + + # We want to wait for any previous lookups to complete before + # proceeding. + yield self.wait_for_previous_lookups( + [rq.server_name for rq in verify_requests], + server_to_deferred, ) + # Actually start fetching keys. + self._get_server_verify_keys(verify_requests) + + # When we've finished fetching all the keys for a given server_name, + # resolve the deferred passed to `wait_for_previous_lookups` so that + # any lookups waiting will proceed. + # + # map from server name to a set of request ids + server_to_request_ids = {} + + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + + def remove_deferreds(res, verify_request): + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids[server_name].discard(request_id) + if not server_to_request_ids[server_name]: + d = server_to_deferred.pop(server_name, None) + if d: + d.callback(None) + return res + + for verify_request in verify_requests: + verify_request.deferred.addBoth( + remove_deferreds, verify_request, + ) + except Exception: + logger.exception("Error starting key lookups") + @defer.inlineCallbacks def wait_for_previous_lookups(self, server_names, server_to_deferred): """Waits for any previous key lookups for the given servers to finish. diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 963d938edd..ded2b1871a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -323,6 +323,8 @@ class TransactionQueue(object): break yield self._process_presence_inner(states_map.values()) + except Exception: + logger.exception("Error sending presence states to servers") finally: self._processing_pending_presence = False diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ff0656df3e..19d09f5422 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -25,7 +25,7 @@ from synapse.http.servlet import ( ) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.types import ThirdPartyInstanceID, get_domain_from_id import functools @@ -152,11 +152,18 @@ class Authenticator(object): # alive retry_timings = yield self.store.get_destination_retry_timings(origin) if retry_timings and retry_timings["retry_last_ts"]: - logger.info("Marking origin %r as up", origin) - preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0) + run_in_background(self._reset_retry_timings, origin) defer.returnValue(origin) + @defer.inlineCallbacks + def _reset_retry_timings(self, origin): + try: + logger.info("Marking origin %r as up", origin) + yield self.store.set_destination_retry_timings(origin, 0, 0) + except Exception: + logger.exception("Error resetting retry timings on %s", origin) + class BaseFederationServlet(object): REQUIRE_AUTH = True diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..7187df2508 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -165,28 +165,32 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def _renew_attestation(group_id, user_id): - if not self.is_mine_id(group_id): - destination = get_domain_from_id(group_id) - elif not self.is_mine_id(user_id): - destination = get_domain_from_id(user_id) - else: - logger.warn( - "Incorrectly trying to do attestations for user: %r in %r", - user_id, group_id, + try: + if not self.is_mine_id(group_id): + destination = get_domain_from_id(group_id) + elif not self.is_mine_id(user_id): + destination = get_domain_from_id(user_id) + else: + logger.warn( + "Incorrectly trying to do attestations for user: %r in %r", + user_id, group_id, + ) + yield self.store.remove_attestation_renewal(group_id, user_id) + return + + attestation = self.attestations.create_attestation(group_id, user_id) + + yield self.transport_client.renew_group_attestation( + destination, group_id, user_id, + content={"attestation": attestation}, ) - yield self.store.remove_attestation_renewal(group_id, user_id) - return - - attestation = self.attestations.create_attestation(group_id, user_id) - yield self.transport_client.renew_group_attestation( - destination, group_id, user_id, - content={"attestation": attestation}, - ) - - yield self.store.update_attestation_renewal( - group_id, user_id, attestation - ) + yield self.store.update_attestation_renewal( + group_id, user_id, attestation + ) + except Exception: + logger.exception("Error renewing attestation of %r in %r", + user_id, group_id) for row in rows: group_id = row["group_id"] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..d168ff5b86 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -857,15 +857,25 @@ class EventCreationHandler(object): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + try: + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room event") preserve_fn(_notify)() if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(requester.user) + run_in_background(self._bump_active_time, requester.user) + + @defer.inlineCallbacks + def _bump_active_time(self, user): + try: + presence = self.hs.get_presence_handler() + yield presence.bump_presence_active_time(user) + except Exception: + logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a5e501897c..585f3e4da2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.async import Linearizer -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -254,6 +254,14 @@ class PresenceHandler(object): logger.info("Finished _persist_unpersisted_changes") + @defer.inlineCallbacks + def _update_states_and_catch_exception(self, new_states): + try: + res = yield self._update_states(new_states) + defer.returnValue(res) + except Exception: + logger.exception("Error updating presence") + @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes @@ -364,7 +372,7 @@ class PresenceHandler(object): now=now, ) - preserve_fn(self._update_states)(changes) + run_in_background(self._update_states_and_catch_exception, changes) except Exception: logger.exception("Exception in _handle_timeouts loop") @@ -422,20 +430,23 @@ class PresenceHandler(object): @defer.inlineCallbacks def _end(): - if affect_presence: + try: self.user_to_num_current_syncs[user_id] -= 1 prev_state = yield self.current_state_for_user(user_id) yield self._update_states([prev_state.copy_and_replace( last_user_sync_ts=self.clock.time_msec(), )]) + except Exception: + logger.exception("Error updating presence after sync") @contextmanager def _user_syncing(): try: yield finally: - preserve_fn(_end)() + if affect_presence: + run_in_background(_end) defer.returnValue(_user_syncing()) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 3f215c2b4e..2e0672161c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler): """Given a list of receipts, works out which remote servers should be poked and pokes them. """ - # TODO: Some of this stuff should be coallesced. - for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, + try: + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - } + }, }, - }, - key=(room_id, receipt_type, user_id), - ) + key=(room_id, receipt_type, user_id), + ) + except Exception: + logger.exception("Error pushing receipts to remote servers") @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..823e2e27e1 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -205,28 +205,31 @@ class TypingHandler(object): @defer.inlineCallbacks def _push_remote(self, member, typing): - users = yield self.state.get_current_user_in_room(member.room_id) - self._member_last_federation_poke[member] = self.clock.time_msec() + try: + users = yield self.state.get_current_user_in_room(member.room_id) + self._member_last_federation_poke[member] = self.clock.time_msec() - now = self.clock.time_msec() - self.wheel_timer.insert( - now=now, - obj=member, - then=now + FEDERATION_PING_INTERVAL, - ) + now = self.clock.time_msec() + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_PING_INTERVAL, + ) - for domain in set(get_domain_from_id(u) for u in users): - if domain != self.server_name: - self.federation.send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": member.room_id, - "user_id": member.user_id, - "typing": typing, - }, - key=member, - ) + for domain in set(get_domain_from_id(u) for u in users): + if domain != self.server_name: + self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": member.room_id, + "user_id": member.user_id, + "typing": typing, + }, + key=member, + ) + except Exception: + logger.exception("Error pushing typing notif to remotes") @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/notifier.py b/synapse/notifier.py index 0e40a4aad6..939723a404 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -21,7 +21,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client @@ -251,9 +251,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - preserve_fn(self.appservice_handler.notify_interested_services)( - room_stream_id - ) + run_in_background(self._notify_app_services, room_stream_id) if self.federation_sender: self.federation_sender.notify_new_events(room_stream_id) @@ -267,6 +265,13 @@ class Notifier(object): rooms=[event.room_id], ) + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") + def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 58df98a793..ba7286cb72 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -77,10 +77,13 @@ class EmailPusher(object): @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() + try: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() + except Exception: + logger.exception("Error starting email pusher") def on_stop(self): if self.timed_call: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 2cbac571b8..1420d378ef 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -94,7 +94,10 @@ class HttpPusher(object): @defer.inlineCallbacks def on_started(self): - yield self._process() + try: + yield self._process() + except Exception: + logger.exception("Error starting http pusher") @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index c188192f2b..0252afd9d3 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -18,7 +18,7 @@ from twisted.internet import defer, threads from .media_storage import FileResponder from synapse.config._base import Config -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import logging import os @@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider): return self.backend.store_file(path, file_info) else: # TODO: Handle errors. - preserve_fn(self.backend.store_file)(path, file_info) + def store(): + try: + return self.backend.store_file(path, file_info) + except Exception: + logger.exception("Error storing file") + run_in_background(store) return defer.succeed(None) def fetch(self, path, file_info): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index e78f8d0114..c22762eb5c 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): "add_push_actions_to_staging", _add_push_actions_to_staging_txn ) + @defer.inlineCallbacks def remove_push_actions_from_staging(self, event_id): """Called if we failed to persist the event to ensure that stale push actions don't build up in the DB @@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore): event_id (str) """ - return self._simple_delete( - table="event_push_actions_staging", - keyvalues={ - "event_id": event_id, - }, - desc="remove_push_actions_from_staging", - ) + try: + res = yield self._simple_delete( + table="event_push_actions_staging", + keyvalues={ + "event_id": event_id, + }, + desc="remove_push_actions_from_staging", + ) + defer.returnValue(res) + except Exception: + # this method is called from an exception handler, so propagating + # another exception here really isn't helpful - there's nothing + # the caller can do about it. Just log the exception and move on. + logger.exception( + "Error removing push actions after event persistence failure", + ) @defer.inlineCallbacks def _find_stream_orderings_for_times(self): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..d6587e4409 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -305,7 +305,12 @@ def run_in_background(f, *args, **kwargs): deferred returned by the funtion completes. Useful for wrapping functions that return a deferred which you don't yield - on. + on (for instance because you want to pass it to deferred.gatherResults()). + + Note that if you completely discard the result, you should make sure that + `f` doesn't raise any deferred exceptions, otherwise a scary-looking + CRITICAL error about an unhandled error will be logged without much + indication about where it came from. """ current = LoggingContext.current_context() res = f(*args, **kwargs) -- cgit 1.4.1