From 7902bf1e1d6331e7964ac498988925cc26e18f79 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 11 Sep 2019 15:14:56 +0100 Subject: Clean up some code in the retry logic (#6017) * remove some unused code * make things which were constants into constants for efficiency and clarity --- synapse/util/retryutils.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 0862b5ca5a..5b16a81617 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -22,6 +22,15 @@ from synapse.api.errors import CodeMessageException logger = logging.getLogger(__name__) +# the intial backoff, after the first transaction fails +MIN_RETRY_INTERVAL = 10 * 60 * 1000 + +# how much we multiply the backoff by after each subsequent fail +RETRY_MULTIPLIER = 5 + +# a cap on the backoff +MAX_RETRY_INTERVAL = 24 * 60 * 60 * 1000 + class NotRetryingDestination(Exception): def __init__(self, retry_last_ts, retry_interval, destination): @@ -112,9 +121,6 @@ class RetryDestinationLimiter(object): clock, store, retry_interval, - min_retry_interval=10 * 60 * 1000, - max_retry_interval=24 * 60 * 60 * 1000, - multiplier_retry_interval=5, backoff_on_404=False, backoff_on_failure=True, ): @@ -130,12 +136,6 @@ class RetryDestinationLimiter(object): retry_interval (int): The next retry interval taken from the database in milliseconds, or zero if the last request was successful. - min_retry_interval (int): The minimum retry interval to use after - a failed request, in milliseconds. - max_retry_interval (int): The maximum retry interval to use after - a failed request, in milliseconds. - multiplier_retry_interval (int): The multiplier to use to increase - the retry interval after a failed request. backoff_on_404 (bool): Back off if we get a 404 backoff_on_failure (bool): set to False if we should not increase the @@ -146,9 +146,6 @@ class RetryDestinationLimiter(object): self.destination = destination self.retry_interval = retry_interval - self.min_retry_interval = min_retry_interval - self.max_retry_interval = max_retry_interval - self.multiplier_retry_interval = multiplier_retry_interval self.backoff_on_404 = backoff_on_404 self.backoff_on_failure = backoff_on_failure @@ -196,13 +193,13 @@ class RetryDestinationLimiter(object): else: # We couldn't connect. if self.retry_interval: - self.retry_interval *= self.multiplier_retry_interval + self.retry_interval *= RETRY_MULTIPLIER self.retry_interval *= int(random.uniform(0.8, 1.4)) - if self.retry_interval >= self.max_retry_interval: - self.retry_interval = self.max_retry_interval + if self.retry_interval >= MAX_RETRY_INTERVAL: + self.retry_interval = MAX_RETRY_INTERVAL else: - self.retry_interval = self.min_retry_interval + self.retry_interval = MIN_RETRY_INTERVAL logger.info( "Connection to %s was unsuccessful (%s(%s)); backoff now %i", -- cgit 1.4.1 From 9fc71dc5eed7531454a34f8fec34bd451458c7c6 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 11 Sep 2019 16:02:42 +0100 Subject: Use the v2 Identity Service API for lookups (MSC2134 + MSC2140) (#5976) This is a redo of https://github.com/matrix-org/synapse/pull/5897 but with `id_access_token` accepted. Implements [MSC2134](https://github.com/matrix-org/matrix-doc/pull/2134) plus Identity Service v2 authentication ala [MSC2140](https://github.com/matrix-org/matrix-doc/pull/2140). Identity lookup-related functions were also moved from `RoomMemberHandler` to `IdentityHandler`. --- changelog.d/5897.feature | 1 + synapse/handlers/identity.py | 56 ++++++++----- synapse/handlers/room.py | 4 +- synapse/handlers/room_member.py | 178 +++++++++++++++++++++++++++++++++++++--- synapse/rest/client/v1/room.py | 1 + synapse/util/hash.py | 33 ++++++++ 6 files changed, 238 insertions(+), 35 deletions(-) create mode 100644 changelog.d/5897.feature create mode 100644 synapse/util/hash.py (limited to 'synapse/util') diff --git a/changelog.d/5897.feature b/changelog.d/5897.feature new file mode 100644 index 0000000000..1557e559e8 --- /dev/null +++ b/changelog.d/5897.feature @@ -0,0 +1 @@ +Switch to using the v2 Identity Service `/lookup` API where available, with fallback to v1. (Implements [MSC2134](https://github.com/matrix-org/matrix-doc/pull/2134) plus id_access_token authentication for v2 Identity Service APIs from [MSC2140](https://github.com/matrix-org/matrix-doc/pull/2140)). diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index f690fd04a3..512f38e5a6 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -74,25 +74,6 @@ class IdentityHandler(BaseHandler): id_access_token = creds.get("id_access_token") return client_secret, id_server, id_access_token - def create_id_access_token_header(self, id_access_token): - """Create an Authorization header for passing to SimpleHttpClient as the header value - of an HTTP request. - - Args: - id_access_token (str): An identity server access token. - - Returns: - list[str]: The ascii-encoded bearer token encased in a list. - """ - # Prefix with Bearer - bearer_token = "Bearer %s" % id_access_token - - # Encode headers to standard ascii - bearer_token.encode("ascii") - - # Return as a list as that's how SimpleHttpClient takes header values - return [bearer_token] - @defer.inlineCallbacks def threepid_from_creds(self, id_server, creds): """ @@ -178,9 +159,7 @@ class IdentityHandler(BaseHandler): bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid} if use_v2: bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,) - headers["Authorization"] = self.create_id_access_token_header( - id_access_token - ) + headers["Authorization"] = create_id_access_token_header(id_access_token) else: bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,) @@ -478,3 +457,36 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() + + +def create_id_access_token_header(id_access_token): + """Create an Authorization header for passing to SimpleHttpClient as the header value + of an HTTP request. + + Args: + id_access_token (str): An identity server access token. + + Returns: + list[str]: The ascii-encoded bearer token encased in a list. + """ + # Prefix with Bearer + bearer_token = "Bearer %s" % id_access_token + + # Encode headers to standard ascii + bearer_token.encode("ascii") + + # Return as a list as that's how SimpleHttpClient takes header values + return [bearer_token] + + +class LookupAlgorithm: + """ + Supported hashing algorithms when performing a 3PID lookup. + + SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64 + encoding + NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext + """ + + SHA256 = "sha256" + NONE = "none" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a509e11d69..970be3c846 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -579,8 +579,8 @@ class RoomCreationHandler(BaseHandler): room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public) + directory_handler = self.hs.get_handlers().directory_handler if room_alias: - directory_handler = self.hs.get_handlers().directory_handler yield directory_handler.create_association( requester=requester, room_id=room_id, @@ -665,6 +665,7 @@ class RoomCreationHandler(BaseHandler): for invite_3pid in invite_3pid_list: id_server = invite_3pid["id_server"] + id_access_token = invite_3pid.get("id_access_token") # optional address = invite_3pid["address"] medium = invite_3pid["medium"] yield self.hs.get_room_member_handler().do_3pid_invite( @@ -675,6 +676,7 @@ class RoomCreationHandler(BaseHandler): id_server, requester, txn_id=None, + id_access_token=id_access_token, ) result = {"room_id": room_id} diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index a3a3d4d143..43d10a5308 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -29,9 +29,11 @@ from twisted.internet import defer from synapse import types from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError +from synapse.handlers.identity import LookupAlgorithm, create_id_access_token_header from synapse.types import RoomID, UserID from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room +from synapse.util.hash import sha256_and_url_safe_base64 from ._base import BaseHandler @@ -626,7 +628,7 @@ class RoomMemberHandler(object): servers.remove(room_alias.domain) servers.insert(0, room_alias.domain) - return (RoomID.from_string(room_id), servers) + return RoomID.from_string(room_id), servers @defer.inlineCallbacks def _get_inviter(self, user_id, room_id): @@ -638,7 +640,15 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def do_3pid_invite( - self, room_id, inviter, medium, address, id_server, requester, txn_id + self, + room_id, + inviter, + medium, + address, + id_server, + requester, + txn_id, + id_access_token=None, ): if self.config.block_non_admin_invites: is_requester_admin = yield self.auth.is_server_admin(requester.user) @@ -661,7 +671,12 @@ class RoomMemberHandler(object): Codes.FORBIDDEN, ) - invitee = yield self._lookup_3pid(id_server, medium, address) + if not self._enable_lookup: + raise SynapseError( + 403, "Looking up third-party identifiers is denied from this server" + ) + + invitee = yield self._lookup_3pid(id_server, medium, address, id_access_token) if invitee: yield self.update_membership( @@ -673,9 +688,47 @@ class RoomMemberHandler(object): ) @defer.inlineCallbacks - def _lookup_3pid(self, id_server, medium, address): + def _lookup_3pid(self, id_server, medium, address, id_access_token=None): """Looks up a 3pid in the passed identity server. + Args: + id_server (str): The server name (including port, if required) + of the identity server to use. + medium (str): The type of the third party identifier (e.g. "email"). + address (str): The third party identifier (e.g. "foo@example.com"). + id_access_token (str|None): The access token to authenticate to the identity + server with + + Returns: + str|None: the matrix ID of the 3pid, or None if it is not recognized. + """ + if id_access_token is not None: + try: + results = yield self._lookup_3pid_v2( + id_server, id_access_token, medium, address + ) + return results + + except Exception as e: + # Catch HttpResponseExcept for a non-200 response code + # Check if this identity server does not know about v2 lookups + if isinstance(e, HttpResponseException) and e.code == 404: + # This is an old identity server that does not yet support v2 lookups + logger.warning( + "Attempted v2 lookup on v1 identity server %s. Falling " + "back to v1", + id_server, + ) + else: + logger.warning("Error when looking up hashing details: %s", e) + return None + + return (yield self._lookup_3pid_v1(id_server, medium, address)) + + @defer.inlineCallbacks + def _lookup_3pid_v1(self, id_server, medium, address): + """Looks up a 3pid in the passed identity server using v1 lookup. + Args: id_server (str): The server name (including port, if required) of the identity server to use. @@ -685,10 +738,6 @@ class RoomMemberHandler(object): Returns: str: the matrix ID of the 3pid, or None if it is not recognized. """ - if not self._enable_lookup: - raise SynapseError( - 403, "Looking up third-party identifiers is denied from this server" - ) try: data = yield self.simple_http_client.get_json( "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server), @@ -702,9 +751,116 @@ class RoomMemberHandler(object): return data["mxid"] except IOError as e: - logger.warn("Error from identity server lookup: %s" % (e,)) + logger.warning("Error from v1 identity server lookup: %s" % (e,)) + + return None + + @defer.inlineCallbacks + def _lookup_3pid_v2(self, id_server, id_access_token, medium, address): + """Looks up a 3pid in the passed identity server using v2 lookup. + + Args: + id_server (str): The server name (including port, if required) + of the identity server to use. + id_access_token (str): The access token to authenticate to the identity server with + medium (str): The type of the third party identifier (e.g. "email"). + address (str): The third party identifier (e.g. "foo@example.com"). + + Returns: + Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised. + """ + # Check what hashing details are supported by this identity server + hash_details = yield self.simple_http_client.get_json( + "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server), + {"access_token": id_access_token}, + ) + + if not isinstance(hash_details, dict): + logger.warning( + "Got non-dict object when checking hash details of %s%s: %s", + id_server_scheme, + id_server, + hash_details, + ) + raise SynapseError( + 400, + "Non-dict object from %s%s during v2 hash_details request: %s" + % (id_server_scheme, id_server, hash_details), + ) + + # Extract information from hash_details + supported_lookup_algorithms = hash_details.get("algorithms") + lookup_pepper = hash_details.get("lookup_pepper") + if ( + not supported_lookup_algorithms + or not isinstance(supported_lookup_algorithms, list) + or not lookup_pepper + or not isinstance(lookup_pepper, str) + ): + raise SynapseError( + 400, + "Invalid hash details received from identity server %s%s: %s" + % (id_server_scheme, id_server, hash_details), + ) + + # Check if any of the supported lookup algorithms are present + if LookupAlgorithm.SHA256 in supported_lookup_algorithms: + # Perform a hashed lookup + lookup_algorithm = LookupAlgorithm.SHA256 + + # Hash address, medium and the pepper with sha256 + to_hash = "%s %s %s" % (address, medium, lookup_pepper) + lookup_value = sha256_and_url_safe_base64(to_hash) + + elif LookupAlgorithm.NONE in supported_lookup_algorithms: + # Perform a non-hashed lookup + lookup_algorithm = LookupAlgorithm.NONE + + # Combine together plaintext address and medium + lookup_value = "%s %s" % (address, medium) + + else: + logger.warning( + "None of the provided lookup algorithms of %s are supported: %s", + id_server, + supported_lookup_algorithms, + ) + raise SynapseError( + 400, + "Provided identity server does not support any v2 lookup " + "algorithms that this homeserver supports.", + ) + + # Authenticate with identity server given the access token from the client + headers = {"Authorization": create_id_access_token_header(id_access_token)} + + try: + lookup_results = yield self.simple_http_client.post_json_get_json( + "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server), + { + "addresses": [lookup_value], + "algorithm": lookup_algorithm, + "pepper": lookup_pepper, + }, + headers=headers, + ) + except Exception as e: + logger.warning("Error when performing a v2 3pid lookup: %s", e) + raise SynapseError( + 500, "Unknown error occurred during identity server lookup" + ) + + # Check for a mapping from what we looked up to an MXID + if "mappings" not in lookup_results or not isinstance( + lookup_results["mappings"], dict + ): + logger.warning("No results from 3pid lookup") return None + # Return the MXID if it's available, or None otherwise + mxid = lookup_results["mappings"].get(lookup_value) + return mxid + @defer.inlineCallbacks def _verify_any_signature(self, data, server_hostname): if server_hostname not in data["signatures"]: @@ -844,7 +1000,6 @@ class RoomMemberHandler(object): display_name (str): A user-friendly name to represent the invited user. """ - is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( id_server_scheme, id_server, @@ -862,7 +1017,6 @@ class RoomMemberHandler(object): "sender_display_name": inviter_display_name, "sender_avatar_url": inviter_avatar_url, } - try: data = yield self.simple_http_client.post_json_get_json( is_url, invite_config @@ -1049,7 +1203,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): # The 'except' clause is very broad, but we need to # capture everything from DNS failures upwards # - logger.warn("Failed to reject invite: %s", e) + logger.warning("Failed to reject invite: %s", e) yield self.store.locally_reject_invite(target.to_string(), room_id) return {} diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 3582259026..a6a7b3b57e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -701,6 +701,7 @@ class RoomMembershipRestServlet(TransactionRestServlet): content["id_server"], requester, txn_id, + content.get("id_access_token"), ) return 200, {} diff --git a/synapse/util/hash.py b/synapse/util/hash.py new file mode 100644 index 0000000000..359168704e --- /dev/null +++ b/synapse/util/hash.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- + +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib + +import unpaddedbase64 + + +def sha256_and_url_safe_base64(input_text): + """SHA256 hash an input string, encode the digest as url-safe base64, and + return + + :param input_text: string to hash + :type input_text: str + + :returns a sha256 hashed and url-safe base64 encoded digest + :rtype: str + """ + digest = hashlib.sha256(input_text.encode()).digest() + return unpaddedbase64.encode_base64(digest, urlsafe=True) -- cgit 1.4.1 From 0388beafe48d1ae9c30565c37b8902b9aa0b8fe2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 12 Sep 2019 12:59:43 +0100 Subject: Fix bug in calculating the federation retry backoff period (#6025) This was intended to introduce an element of jitter; instead it gave you a 30/60 chance of resetting to zero. --- changelog.d/6025.bugfix | 1 + synapse/util/retryutils.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 changelog.d/6025.bugfix (limited to 'synapse/util') diff --git a/changelog.d/6025.bugfix b/changelog.d/6025.bugfix new file mode 100644 index 0000000000..50d7f9aab5 --- /dev/null +++ b/changelog.d/6025.bugfix @@ -0,0 +1 @@ +Fix bug in calculating the federation retry backoff period. \ No newline at end of file diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 5b16a81617..33263fe20f 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -193,8 +193,9 @@ class RetryDestinationLimiter(object): else: # We couldn't connect. if self.retry_interval: - self.retry_interval *= RETRY_MULTIPLIER - self.retry_interval *= int(random.uniform(0.8, 1.4)) + self.retry_interval = int( + self.retry_interval * RETRY_MULTIPLIER * random.uniform(0.8, 1.4) + ) if self.retry_interval >= MAX_RETRY_INTERVAL: self.retry_interval = MAX_RETRY_INTERVAL -- cgit 1.4.1 From 3d882a7ba52114f18ec6be61c51561db203a0534 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 12 Sep 2019 13:00:13 +0100 Subject: Remove the cap on federation retry interval. (#6026) Essentially the intention here is to end up blacklisting servers which never respond to federation requests. Fixes https://github.com/matrix-org/synapse/issues/5113. --- changelog.d/6026.feature | 1 + synapse/util/retryutils.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/6026.feature (limited to 'synapse/util') diff --git a/changelog.d/6026.feature b/changelog.d/6026.feature new file mode 100644 index 0000000000..2489ff09b5 --- /dev/null +++ b/changelog.d/6026.feature @@ -0,0 +1 @@ +Stop sending federation transactions to servers which have been down for a long time. diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 33263fe20f..b740913b58 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -28,8 +28,8 @@ MIN_RETRY_INTERVAL = 10 * 60 * 1000 # how much we multiply the backoff by after each subsequent fail RETRY_MULTIPLIER = 5 -# a cap on the backoff -MAX_RETRY_INTERVAL = 24 * 60 * 60 * 1000 +# a cap on the backoff. (Essentially none) +MAX_RETRY_INTERVAL = 2 ** 63 class NotRetryingDestination(Exception): -- cgit 1.4.1 From 1e19ce00bff8d67168d39201cdf9424f7b2f22f6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 17 Sep 2019 11:41:54 +0100 Subject: Add 'failure_ts' column to 'destinations' table (#6016) Track the time that a server started failing at, for general analysis purposes. --- changelog.d/6016.misc | 1 + .../schema/delta/56/destinations_failure_ts.sql | 25 ++++ synapse/storage/transactions.py | 23 ++-- synapse/util/retryutils.py | 16 ++- tests/handlers/test_typing.py | 7 +- tests/storage/test_transactions.py | 8 +- tests/util/test_retryutils.py | 127 +++++++++++++++++++++ 7 files changed, 195 insertions(+), 12 deletions(-) create mode 100644 changelog.d/6016.misc create mode 100644 synapse/storage/schema/delta/56/destinations_failure_ts.sql create mode 100644 tests/util/test_retryutils.py (limited to 'synapse/util') diff --git a/changelog.d/6016.misc b/changelog.d/6016.misc new file mode 100644 index 0000000000..91cf164714 --- /dev/null +++ b/changelog.d/6016.misc @@ -0,0 +1 @@ +Add a 'failure_ts' column to the 'destinations' database table. diff --git a/synapse/storage/schema/delta/56/destinations_failure_ts.sql b/synapse/storage/schema/delta/56/destinations_failure_ts.sql new file mode 100644 index 0000000000..f00889290b --- /dev/null +++ b/synapse/storage/schema/delta/56/destinations_failure_ts.sql @@ -0,0 +1,25 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Record the timestamp when a given server started failing + */ +ALTER TABLE destinations ADD failure_ts BIGINT; + +/* as a rough approximation, we assume that the server started failing at + * retry_interval before the last retry + */ +UPDATE destinations SET failure_ts = retry_last_ts - retry_interval + WHERE retry_last_ts > 0; diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d81ace0ece..289c117396 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -165,7 +165,7 @@ class TransactionStore(SQLBaseStore): txn, table="destinations", keyvalues={"destination": destination}, - retcols=("destination", "retry_last_ts", "retry_interval"), + retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), allow_none=True, ) @@ -174,12 +174,15 @@ class TransactionStore(SQLBaseStore): else: return None - def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): + def set_destination_retry_timings( + self, destination, failure_ts, retry_last_ts, retry_interval + ): """Sets the current retry timings for a given destination. Both timings should be zero if retrying is no longer occuring. Args: destination (str) + failure_ts (int|None) - when the server started failing (ms since epoch) retry_last_ts (int) - time of last retry attempt in unix epoch ms retry_interval (int) - how long until next retry in ms """ @@ -189,12 +192,13 @@ class TransactionStore(SQLBaseStore): "set_destination_retry_timings", self._set_destination_retry_timings, destination, + failure_ts, retry_last_ts, retry_interval, ) def _set_destination_retry_timings( - self, txn, destination, retry_last_ts, retry_interval + self, txn, destination, failure_ts, retry_last_ts, retry_interval ): if self.database_engine.can_native_upsert: @@ -202,9 +206,12 @@ class TransactionStore(SQLBaseStore): # resetting it) or greater than the existing retry interval. sql = """ - INSERT INTO destinations (destination, retry_last_ts, retry_interval) - VALUES (?, ?, ?) + INSERT INTO destinations ( + destination, failure_ts, retry_last_ts, retry_interval + ) + VALUES (?, ?, ?, ?) ON CONFLICT (destination) DO UPDATE SET + failure_ts = EXCLUDED.failure_ts, retry_last_ts = EXCLUDED.retry_last_ts, retry_interval = EXCLUDED.retry_interval WHERE @@ -212,7 +219,7 @@ class TransactionStore(SQLBaseStore): OR destinations.retry_interval < EXCLUDED.retry_interval """ - txn.execute(sql, (destination, retry_last_ts, retry_interval)) + txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval)) return @@ -225,7 +232,7 @@ class TransactionStore(SQLBaseStore): txn, table="destinations", keyvalues={"destination": destination}, - retcols=("retry_last_ts", "retry_interval"), + retcols=("failure_ts", "retry_last_ts", "retry_interval"), allow_none=True, ) @@ -235,6 +242,7 @@ class TransactionStore(SQLBaseStore): table="destinations", values={ "destination": destination, + "failure_ts": failure_ts, "retry_last_ts": retry_last_ts, "retry_interval": retry_interval, }, @@ -245,6 +253,7 @@ class TransactionStore(SQLBaseStore): "destinations", keyvalues={"destination": destination}, updatevalues={ + "failure_ts": failure_ts, "retry_last_ts": retry_last_ts, "retry_interval": retry_interval, }, diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index b740913b58..a5f2fbef5c 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -80,11 +80,13 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs) # We aren't ready to retry that destination. raise """ + failure_ts = None retry_last_ts, retry_interval = (0, 0) retry_timings = yield store.get_destination_retry_timings(destination) if retry_timings: + failure_ts = retry_timings["failure_ts"] retry_last_ts, retry_interval = ( retry_timings["retry_last_ts"], retry_timings["retry_interval"], @@ -108,6 +110,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs) destination, clock, store, + failure_ts, retry_interval, backoff_on_failure=backoff_on_failure, **kwargs @@ -120,6 +123,7 @@ class RetryDestinationLimiter(object): destination, clock, store, + failure_ts, retry_interval, backoff_on_404=False, backoff_on_failure=True, @@ -133,6 +137,8 @@ class RetryDestinationLimiter(object): destination (str) clock (Clock) store (DataStore) + failure_ts (int|None): when this destination started failing (in ms since + the epoch), or zero if the last request was successful retry_interval (int): The next retry interval taken from the database in milliseconds, or zero if the last request was successful. @@ -145,6 +151,7 @@ class RetryDestinationLimiter(object): self.store = store self.destination = destination + self.failure_ts = failure_ts self.retry_interval = retry_interval self.backoff_on_404 = backoff_on_404 self.backoff_on_failure = backoff_on_failure @@ -186,6 +193,7 @@ class RetryDestinationLimiter(object): logger.debug( "Connection to %s was successful; clearing backoff", self.destination ) + self.failure_ts = None retry_last_ts = 0 self.retry_interval = 0 elif not self.backoff_on_failure: @@ -211,11 +219,17 @@ class RetryDestinationLimiter(object): ) retry_last_ts = int(self.clock.time_msec()) + if self.failure_ts is None: + self.failure_ts = retry_last_ts + @defer.inlineCallbacks def store_retry_timings(): try: yield self.store.set_destination_retry_timings( - self.destination, retry_last_ts, self.retry_interval + self.destination, + self.failure_ts, + retry_last_ts, + self.retry_interval, ) except Exception: logger.exception("Failed to store destination_retry_timings") diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 5d5e324df2..1f2ef5d01f 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -99,7 +99,12 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.event_source = hs.get_event_sources().sources["typing"] self.datastore = hs.get_datastore() - retry_timings_res = {"destination": "", "retry_last_ts": 0, "retry_interval": 0} + retry_timings_res = { + "destination": "", + "retry_last_ts": 0, + "retry_interval": 0, + "failure_ts": None, + } self.datastore.get_destination_retry_timings.return_value = defer.succeed( retry_timings_res ) diff --git a/tests/storage/test_transactions.py b/tests/storage/test_transactions.py index 14169afa96..a771d5af29 100644 --- a/tests/storage/test_transactions.py +++ b/tests/storage/test_transactions.py @@ -29,17 +29,19 @@ class TransactionStoreTestCase(HomeserverTestCase): r = self.get_success(d) self.assertIsNone(r) - d = self.store.set_destination_retry_timings("example.com", 50, 100) + d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100) self.get_success(d) d = self.store.get_destination_retry_timings("example.com") r = self.get_success(d) - self.assert_dict({"retry_last_ts": 50, "retry_interval": 100}, r) + self.assert_dict( + {"retry_last_ts": 50, "retry_interval": 100, "failure_ts": 1000}, r + ) def test_initial_set_transactions(self): """Tests that we can successfully set the destination retries (there was a bug around invalidating the cache that broke this) """ - d = self.store.set_destination_retry_timings("example.com", 50, 100) + d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100) self.get_success(d) diff --git a/tests/util/test_retryutils.py b/tests/util/test_retryutils.py new file mode 100644 index 0000000000..9e348694ad --- /dev/null +++ b/tests/util/test_retryutils.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.util.retryutils import ( + MIN_RETRY_INTERVAL, + RETRY_MULTIPLIER, + NotRetryingDestination, + get_retry_limiter, +) + +from tests.unittest import HomeserverTestCase + + +class RetryLimiterTestCase(HomeserverTestCase): + def test_new_destination(self): + """A happy-path case with a new destination and a successful operation""" + store = self.hs.get_datastore() + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + limiter = self.successResultOf(d) + + # advance the clock a bit before making the request + self.pump(1) + + with limiter: + pass + + d = store.get_destination_retry_timings("test_dest") + self.pump() + new_timings = self.successResultOf(d) + self.assertIsNone(new_timings) + + def test_limiter(self): + """General test case which walks through the process of a failing request""" + store = self.hs.get_datastore() + + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + limiter = self.successResultOf(d) + + self.pump(1) + try: + with limiter: + self.pump(1) + failure_ts = self.clock.time_msec() + raise AssertionError("argh") + except AssertionError: + pass + + # wait for the update to land + self.pump() + + d = store.get_destination_retry_timings("test_dest") + self.pump() + new_timings = self.successResultOf(d) + self.assertEqual(new_timings["failure_ts"], failure_ts) + self.assertEqual(new_timings["retry_last_ts"], failure_ts) + self.assertEqual(new_timings["retry_interval"], MIN_RETRY_INTERVAL) + + # now if we try again we should get a failure + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + self.failureResultOf(d, NotRetryingDestination) + + # + # advance the clock and try again + # + + self.pump(MIN_RETRY_INTERVAL) + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + limiter = self.successResultOf(d) + + self.pump(1) + try: + with limiter: + self.pump(1) + retry_ts = self.clock.time_msec() + raise AssertionError("argh") + except AssertionError: + pass + + # wait for the update to land + self.pump() + + d = store.get_destination_retry_timings("test_dest") + self.pump() + new_timings = self.successResultOf(d) + self.assertEqual(new_timings["failure_ts"], failure_ts) + self.assertEqual(new_timings["retry_last_ts"], retry_ts) + self.assertGreaterEqual( + new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5 + ) + self.assertLessEqual( + new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0 + ) + + # + # one more go, with success + # + self.pump(MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0) + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + limiter = self.successResultOf(d) + + self.pump(1) + with limiter: + self.pump(1) + + # wait for the update to land + self.pump() + + d = store.get_destination_retry_timings("test_dest") + self.pump() + new_timings = self.successResultOf(d) + self.assertIsNone(new_timings) -- cgit 1.4.1 From b74606ea2262a717193f08bb6876459c1ee2d97d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 19 Sep 2019 20:29:11 +0100 Subject: Fix a bug with saml attribute maps. Fixes a bug where the default attribute maps were prioritised over user-specified ones, resulting in incorrect mappings. The problem is that if you call SPConfig.load() multiple times, it adds new attribute mappers to a list. So by calling it with the default config first, and then the user-specified config, we would always get the default mappers before the user-specified mappers. To solve this, let's merge the config dicts first, and then pass them to SPConfig. --- changelog.d/6069.bugfix | 1 + synapse/config/saml2_config.py | 34 ++++++++++++++++++++++++++++------ synapse/util/module_loader.py | 20 +++++++++++++++++++- 3 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 changelog.d/6069.bugfix (limited to 'synapse/util') diff --git a/changelog.d/6069.bugfix b/changelog.d/6069.bugfix new file mode 100644 index 0000000000..a437ac41a9 --- /dev/null +++ b/changelog.d/6069.bugfix @@ -0,0 +1 @@ +Fix a bug which caused SAML attribute maps to be overridden by defaults. diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py index 6a8161547a..14539fdb2a 100644 --- a/synapse/config/saml2_config.py +++ b/synapse/config/saml2_config.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,11 +13,29 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from synapse.python_dependencies import DependencyException, check_requirements +from synapse.util.module_loader import load_python_module from ._base import Config, ConfigError +def _dict_merge(merge_dict, into_dct): + for k, v in merge_dict.items(): + if k not in into_dct: + into_dct[k] = v + continue + + current_val = into_dct[k] + + if isinstance(v, dict) and isinstance(current_val, dict): + _dict_merge(v, current_val) + continue + + # otherwise we just overwrite + into_dct[k] = v + + class SAML2Config(Config): def read_config(self, config, **kwargs): self.saml2_enabled = False @@ -33,15 +52,18 @@ class SAML2Config(Config): self.saml2_enabled = True - import saml2.config - - self.saml2_sp_config = saml2.config.SPConfig() - self.saml2_sp_config.load(self._default_saml_config_dict()) - self.saml2_sp_config.load(saml2_config.get("sp_config", {})) + saml2_config_dict = self._default_saml_config_dict() + _dict_merge(saml2_config.get("sp_config", {}), saml2_config_dict) config_path = saml2_config.get("config_path", None) if config_path is not None: - self.saml2_sp_config.load_file(config_path) + mod = load_python_module(config_path) + _dict_merge(mod.CONFIG, saml2_config_dict) + + import saml2.config + + self.saml2_sp_config = saml2.config.SPConfig() + self.saml2_sp_config.load(saml2_config_dict) # session lifetime: in milliseconds self.saml2_session_lifetime = self.parse_duration( diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index 522acd5aa8..7ff7eb1e4d 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -14,12 +14,13 @@ # limitations under the License. import importlib +import importlib.util from synapse.config._base import ConfigError def load_module(provider): - """ Loads a module with its config + """ Loads a synapse module with its config Take a dict with keys 'module' (the module name) and 'config' (the config dict). @@ -38,3 +39,20 @@ def load_module(provider): raise ConfigError("Failed to parse config for %r: %r" % (provider["module"], e)) return provider_class, provider_config + + +def load_python_module(location: str): + """Load a python module, and return a reference to its global namespace + + Args: + location (str): path to the module + + Returns: + python module object + """ + spec = importlib.util.spec_from_file_location(location, location) + if spec is None: + raise Exception("Unable to load module at %s" % (location,)) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod -- cgit 1.4.1 From f44f1d2e8374b7250a8a68cf3a49e6d1ac63b0fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 10:36:27 +0100 Subject: Fix errors storing large retry intervals. We have set the max retry interval to a value larger than a postgres or sqlite int can hold, which caused exceptions when updating the destinations table. To fix postgres we need to change the column to a bigint, and for sqlite we lower the max interval to 2**62 (which is still incredibly long). --- .../56/destinations_retry_interval_type.sql.postgres | 18 ++++++++++++++++++ synapse/util/retryutils.py | 2 +- tests/storage/test_transactions.py | 11 +++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres (limited to 'synapse/util') diff --git a/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres b/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres new file mode 100644 index 0000000000..b9bbb18a91 --- /dev/null +++ b/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres @@ -0,0 +1,18 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- We want to store large retry intervals so we upgrade the column from INT +-- to BIGINT. We don't need to do this on SQLite. +ALTER TABLE destinations ALTER retry_interval SET DATA TYPE BIGINT; diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index a5f2fbef5c..af69587196 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -29,7 +29,7 @@ MIN_RETRY_INTERVAL = 10 * 60 * 1000 RETRY_MULTIPLIER = 5 # a cap on the backoff. (Essentially none) -MAX_RETRY_INTERVAL = 2 ** 63 +MAX_RETRY_INTERVAL = 2 ** 62 class NotRetryingDestination(Exception): diff --git a/tests/storage/test_transactions.py b/tests/storage/test_transactions.py index a771d5af29..8e817e2c7f 100644 --- a/tests/storage/test_transactions.py +++ b/tests/storage/test_transactions.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util.retryutils import MAX_RETRY_INTERVAL + from tests.unittest import HomeserverTestCase @@ -45,3 +47,12 @@ class TransactionStoreTestCase(HomeserverTestCase): """ d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100) self.get_success(d) + + def test_large_destination_retry(self): + d = self.store.set_destination_retry_timings( + "example.com", MAX_RETRY_INTERVAL, MAX_RETRY_INTERVAL, MAX_RETRY_INTERVAL + ) + self.get_success(d) + + d = self.store.get_destination_retry_timings("example.com") + self.get_success(d) -- cgit 1.4.1 From 864f14454322c6cba11476667ade8fc6cbea6f44 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 2 Oct 2019 05:29:01 -0700 Subject: Fix up some typechecking (#6150) * type checking fixes * changelog --- .gitignore | 1 + changelog.d/6150.misc | 1 + synapse/api/errors.py | 3 ++- synapse/api/room_versions.py | 5 ++++- synapse/app/_base.py | 4 +++- synapse/config/appservice.py | 5 +++-- synapse/config/consent_config.py | 4 ++-- synapse/config/password_auth_providers.py | 4 +++- synapse/config/repository.py | 5 +++-- synapse/config/server.py | 10 +++++++--- synapse/config/server_notices_config.py | 4 ++-- synapse/logging/opentracing.py | 9 +++++---- synapse/logging/utils.py | 20 ++++++++++++++++---- synapse/metrics/__init__.py | 4 ++-- synapse/metrics/_exposition.py | 4 ++-- synapse/python_dependencies.py | 17 +++++++++++++---- synapse/types.py | 3 ++- synapse/util/async_helpers.py | 10 +++++++--- synapse/util/caches/__init__.py | 3 ++- synapse/util/caches/descriptors.py | 22 ++++++++++++++++++++-- synapse/util/caches/treecache.py | 4 +++- synapse/util/module_loader.py | 2 +- 22 files changed, 104 insertions(+), 40 deletions(-) create mode 100644 changelog.d/6150.misc (limited to 'synapse/util') diff --git a/.gitignore b/.gitignore index e53d4908d5..747b8714d7 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ *.tac _trial_temp/ _trial_temp*/ +/out # stuff that is likely to exist when you run a server locally /*.db diff --git a/changelog.d/6150.misc b/changelog.d/6150.misc new file mode 100644 index 0000000000..a373c091ab --- /dev/null +++ b/changelog.d/6150.misc @@ -0,0 +1 @@ +Expand type-checking on modules imported by synapse.config. diff --git a/synapse/api/errors.py b/synapse/api/errors.py index cf1ebf1af2..1bb2e86789 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -17,6 +17,7 @@ """Contains exceptions and error codes.""" import logging +from typing import Dict from six import iteritems from six.moves import http_client @@ -111,7 +112,7 @@ class ProxiedRequestError(SynapseError): def __init__(self, code, msg, errcode=Codes.UNKNOWN, additional_fields=None): super(ProxiedRequestError, self).__init__(code, msg, errcode) if additional_fields is None: - self._additional_fields = {} + self._additional_fields = {} # type: Dict else: self._additional_fields = dict(additional_fields) diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index 95292b7dec..c6f50fd7b9 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -12,6 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from typing import Dict + import attr @@ -102,4 +105,4 @@ KNOWN_ROOM_VERSIONS = { RoomVersions.V4, RoomVersions.V5, ) -} # type: dict[str, RoomVersion] +} # type: Dict[str, RoomVersion] diff --git a/synapse/app/_base.py b/synapse/app/_base.py index c30fdeee9a..2ac7d5c064 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -263,7 +263,9 @@ def start(hs, listeners=None): refresh_certificate(hs) # Start the tracer - synapse.logging.opentracing.init_tracer(hs.config) + synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa + hs.config + ) # It is now safe to start your Synapse. hs.start_listening(listeners) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 8387ff6805..28d36b1bc3 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from typing import Dict from six import string_types from six.moves.urllib import parse as urlparse @@ -56,8 +57,8 @@ def load_appservices(hostname, config_files): return [] # Dicts of value -> filename - seen_as_tokens = {} - seen_ids = {} + seen_as_tokens = {} # type: Dict[str, str] + seen_ids = {} # type: Dict[str, str] appservices = [] diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py index 94916f3a49..48976e17b1 100644 --- a/synapse/config/consent_config.py +++ b/synapse/config/consent_config.py @@ -73,8 +73,8 @@ DEFAULT_CONFIG = """\ class ConsentConfig(Config): - def __init__(self): - super(ConsentConfig, self).__init__() + def __init__(self, *args): + super(ConsentConfig, self).__init__(*args) self.user_consent_version = None self.user_consent_template_dir = None diff --git a/synapse/config/password_auth_providers.py b/synapse/config/password_auth_providers.py index 788c39c9fb..c50e244394 100644 --- a/synapse/config/password_auth_providers.py +++ b/synapse/config/password_auth_providers.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any, List + from synapse.util.module_loader import load_module from ._base import Config @@ -22,7 +24,7 @@ LDAP_PROVIDER = "ldap_auth_provider.LdapAuthProvider" class PasswordAuthProviderConfig(Config): def read_config(self, config, **kwargs): - self.password_providers = [] + self.password_providers = [] # type: List[Any] providers = [] # We want to be backwards compatible with the old `ldap_config` diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 52e014608a..14740891f3 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -15,6 +15,7 @@ import os from collections import namedtuple +from typing import Dict, List from synapse.python_dependencies import DependencyException, check_requirements from synapse.util.module_loader import load_module @@ -61,7 +62,7 @@ def parse_thumbnail_requirements(thumbnail_sizes): Dictionary mapping from media type string to list of ThumbnailRequirement tuples. """ - requirements = {} + requirements = {} # type: Dict[str, List] for size in thumbnail_sizes: width = size["width"] height = size["height"] @@ -130,7 +131,7 @@ class ContentRepositoryConfig(Config): # # We don't create the storage providers here as not all workers need # them to be started. - self.media_storage_providers = [] + self.media_storage_providers = [] # type: List[tuple] for provider_config in storage_providers: # We special case the module "file_system" so as not to need to diff --git a/synapse/config/server.py b/synapse/config/server.py index 536ee7f29c..709bd387e5 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,6 +19,7 @@ import logging import os.path import re from textwrap import indent +from typing import List import attr import yaml @@ -243,7 +244,7 @@ class ServerConfig(Config): # events with profile information that differ from the target's global profile. self.allow_per_room_profiles = config.get("allow_per_room_profiles", True) - self.listeners = [] + self.listeners = [] # type: List[dict] for listener in config.get("listeners", []): if not isinstance(listener.get("port", None), int): raise ConfigError( @@ -287,7 +288,10 @@ class ServerConfig(Config): validator=attr.validators.instance_of(bool), default=False ) complexity = attr.ib( - validator=attr.validators.instance_of((int, float)), default=1.0 + validator=attr.validators.instance_of( + (float, int) # type: ignore[arg-type] # noqa + ), + default=1.0, ) complexity_error = attr.ib( validator=attr.validators.instance_of(str), @@ -366,7 +370,7 @@ class ServerConfig(Config): "cleanup_extremities_with_dummy_events", True ) - def has_tls_listener(self): + def has_tls_listener(self) -> bool: return any(l["tls"] for l in self.listeners) def generate_config_section( diff --git a/synapse/config/server_notices_config.py b/synapse/config/server_notices_config.py index eaac3d73bc..6d4285ef93 100644 --- a/synapse/config/server_notices_config.py +++ b/synapse/config/server_notices_config.py @@ -59,8 +59,8 @@ class ServerNoticesConfig(Config): None if server notices are not enabled. """ - def __init__(self): - super(ServerNoticesConfig, self).__init__() + def __init__(self, *args): + super(ServerNoticesConfig, self).__init__(*args) self.server_notices_mxid = None self.server_notices_mxid_display_name = None self.server_notices_mxid_avatar_url = None diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 308a27213b..cd1ff6a518 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -170,6 +170,7 @@ import inspect import logging import re from functools import wraps +from typing import Dict from canonicaljson import json @@ -547,7 +548,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T return span = opentracing.tracer.active_span - carrier = {} + carrier = {} # type: Dict[str, str] opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier) for key, value in carrier.items(): @@ -584,7 +585,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True): span = opentracing.tracer.active_span - carrier = {} + carrier = {} # type: Dict[str, str] opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier) for key, value in carrier.items(): @@ -639,7 +640,7 @@ def get_active_span_text_map(destination=None): if destination and not whitelisted_homeserver(destination): return {} - carrier = {} + carrier = {} # type: Dict[str, str] opentracing.tracer.inject( opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier ) @@ -653,7 +654,7 @@ def active_span_context_as_string(): Returns: The active span context encoded as a string. """ - carrier = {} + carrier = {} # type: Dict[str, str] if opentracing: opentracing.tracer.inject( opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py index 7df0fa6087..6073fc2725 100644 --- a/synapse/logging/utils.py +++ b/synapse/logging/utils.py @@ -119,7 +119,11 @@ def trace_function(f): logger = logging.getLogger(name) level = logging.DEBUG - s = inspect.currentframe().f_back + frame = inspect.currentframe() + if frame is None: + raise Exception("Can't get current frame!") + + s = frame.f_back to_print = [ "\t%s:%s %s. Args: args=%s, kwargs=%s" @@ -144,7 +148,7 @@ def trace_function(f): pathname=pathname, lineno=lineno, msg=msg, - args=None, + args=tuple(), exc_info=None, ) @@ -157,7 +161,12 @@ def trace_function(f): def get_previous_frames(): - s = inspect.currentframe().f_back.f_back + + frame = inspect.currentframe() + if frame is None: + raise Exception("Can't get current frame!") + + s = frame.f_back.f_back to_return = [] while s: if s.f_globals["__name__"].startswith("synapse"): @@ -174,7 +183,10 @@ def get_previous_frames(): def get_previous_frame(ignore=[]): - s = inspect.currentframe().f_back.f_back + frame = inspect.currentframe() + if frame is None: + raise Exception("Can't get current frame!") + s = frame.f_back.f_back while s: if s.f_globals["__name__"].startswith("synapse"): diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index bec3b13397..0b45e1f52a 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -125,7 +125,7 @@ class InFlightGauge(object): ) # Counts number of in flight blocks for a given set of label values - self._registrations = {} + self._registrations = {} # type: Dict # Protects access to _registrations self._lock = threading.Lock() @@ -226,7 +226,7 @@ class BucketCollector(object): # Fetch the data -- this must be synchronous! data = self.data_collector() - buckets = {} + buckets = {} # type: Dict[float, int] res = [] for x in data.keys(): diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index 74d9c3ecd3..a248103191 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -36,9 +36,9 @@ from twisted.web.resource import Resource try: from prometheus_client.samples import Sample except ImportError: - Sample = namedtuple( + Sample = namedtuple( # type: ignore[no-redef] # noqa "Sample", ["name", "labels", "value", "timestamp", "exemplar"] - ) # type: ignore + ) CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8") diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 0bd563edc7..aa7da1c543 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import Set +from typing import List, Set from pkg_resources import ( DistributionNotFound, @@ -73,6 +73,7 @@ REQUIREMENTS = [ "netaddr>=0.7.18", "Jinja2>=2.9", "bleach>=1.4.3", + "typing-extensions>=3.7.4", ] CONDITIONAL_REQUIREMENTS = { @@ -144,7 +145,11 @@ def check_requirements(for_feature=None): deps_needed.append(dependency) errors.append( "Needed %s, got %s==%s" - % (dependency, e.dist.project_name, e.dist.version) + % ( + dependency, + e.dist.project_name, # type: ignore[attr-defined] # noqa + e.dist.version, # type: ignore[attr-defined] # noqa + ) ) except DistributionNotFound: deps_needed.append(dependency) @@ -159,7 +164,7 @@ def check_requirements(for_feature=None): if not for_feature: # Check the optional dependencies are up to date. We allow them to not be # installed. - OPTS = sum(CONDITIONAL_REQUIREMENTS.values(), []) + OPTS = sum(CONDITIONAL_REQUIREMENTS.values(), []) # type: List[str] for dependency in OPTS: try: @@ -168,7 +173,11 @@ def check_requirements(for_feature=None): deps_needed.append(dependency) errors.append( "Needed optional %s, got %s==%s" - % (dependency, e.dist.project_name, e.dist.version) + % ( + dependency, + e.dist.project_name, # type: ignore[attr-defined] # noqa + e.dist.version, # type: ignore[attr-defined] # noqa + ) ) except DistributionNotFound: # If it's not found, we don't care diff --git a/synapse/types.py b/synapse/types.py index 51eadb6ad4..8f79797f17 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -318,6 +318,7 @@ class StreamToken( ) ): _SEPARATOR = "_" + START = None # type: StreamToken @classmethod def from_string(cls, string): @@ -402,7 +403,7 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): followed by the "stream_ordering" id of the event it comes after. """ - __slots__ = [] + __slots__ = [] # type: list @classmethod def parse(cls, string): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index f1c46836b1..0d3bdd88ce 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -13,9 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import collections import logging from contextlib import contextmanager +from typing import Dict, Sequence, Set, Union from six.moves import range @@ -213,7 +215,9 @@ class Linearizer(object): # the first element is the number of things executing, and # the second element is an OrderedDict, where the keys are deferreds for the # things blocked from executing. - self.key_to_defer = {} + self.key_to_defer = ( + {} + ) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]] def queue(self, key): # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly. @@ -340,10 +344,10 @@ class ReadWriteLock(object): def __init__(self): # Latest readers queued - self.key_to_current_readers = {} + self.key_to_current_readers = {} # type: Dict[str, Set[defer.Deferred]] # Latest writer queued - self.key_to_current_writer = {} + self.key_to_current_writer = {} # type: Dict[str, defer.Deferred] @defer.inlineCallbacks def read(self, key): diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index b50e3503f0..43fd65d693 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -16,6 +16,7 @@ import logging import os +from typing import Dict import six from six.moves import intern @@ -37,7 +38,7 @@ def get_cache_factor_for(cache_name): caches_by_name = {} -collectors_by_name = {} +collectors_by_name = {} # type: Dict cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 43f66ec4be..5ac2530a6a 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -18,10 +18,12 @@ import inspect import logging import threading from collections import namedtuple +from typing import Any, cast from six import itervalues from prometheus_client import Gauge +from typing_extensions import Protocol from twisted.internet import defer @@ -37,6 +39,18 @@ from . import register_cache logger = logging.getLogger(__name__) +class _CachedFunction(Protocol): + invalidate = None # type: Any + invalidate_all = None # type: Any + invalidate_many = None # type: Any + prefill = None # type: Any + cache = None # type: Any + num_args = None # type: Any + + def __name__(self): + ... + + cache_pending_metric = Gauge( "synapse_util_caches_cache_pending", "Number of lookups currently pending for this cache", @@ -245,7 +259,9 @@ class Cache(object): class _CacheDescriptorBase(object): - def __init__(self, orig, num_args, inlineCallbacks, cache_context=False): + def __init__( + self, orig: _CachedFunction, num_args, inlineCallbacks, cache_context=False + ): self.orig = orig if inlineCallbacks: @@ -404,7 +420,7 @@ class CacheDescriptor(_CacheDescriptorBase): return tuple(get_cache_key_gen(args, kwargs)) @functools.wraps(self.orig) - def wrapped(*args, **kwargs): + def _wrapped(*args, **kwargs): # If we're passed a cache_context then we'll want to call its invalidate() # whenever we are invalidated invalidate_callback = kwargs.pop("on_invalidate", None) @@ -440,6 +456,8 @@ class CacheDescriptor(_CacheDescriptorBase): return make_deferred_yieldable(observer) + wrapped = cast(_CachedFunction, _wrapped) + if self.num_args == 1: wrapped.invalidate = lambda key: cache.invalidate(key[0]) wrapped.prefill = lambda key, val: cache.prefill(key[0], val) diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 9a72218d85..2ea4e4e911 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -1,3 +1,5 @@ +from typing import Dict + from six import itervalues SENTINEL = object() @@ -12,7 +14,7 @@ class TreeCache(object): def __init__(self): self.size = 0 - self.root = {} + self.root = {} # type: Dict def __setitem__(self, key, value): return self.set(key, value) diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index 7ff7eb1e4d..2705cbe5f8 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -54,5 +54,5 @@ def load_python_module(location: str): if spec is None: raise Exception("Unable to load module at %s" % (location,)) mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) + spec.loader.exec_module(mod) # type: ignore return mod -- cgit 1.4.1 From 66537e10ce77e47fac52e3f27569ac1ef0f1aaa3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 3 Oct 2019 17:47:20 +0100 Subject: add some metrics on the federation sender (#6160) --- changelog.d/6160.misc | 1 + synapse/federation/sender/__init__.py | 11 ++++++----- synapse/state/__init__.py | 24 ++++++++++++++++++------ synapse/storage/roommember.py | 21 +++++++++++++++------ synapse/util/metrics.py | 6 ++++-- 5 files changed, 44 insertions(+), 19 deletions(-) create mode 100644 changelog.d/6160.misc (limited to 'synapse/util') diff --git a/changelog.d/6160.misc b/changelog.d/6160.misc new file mode 100644 index 0000000000..3d7cce00e1 --- /dev/null +++ b/changelog.d/6160.misc @@ -0,0 +1 @@ +Add some metrics on the federation sender. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d46f4aaeb1..2b2ee8612a 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -38,7 +38,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.metrics import measure_func +from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -183,8 +183,8 @@ class FederationSender(object): # Otherwise if the last member on a server in a room is # banned then it won't receive the event because it won't # be in the room after the ban. - destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=event.prev_event_ids() + destinations = yield self.state.get_hosts_in_room_at_events( + event.room_id, event_ids=event.prev_event_ids() ) except Exception: logger.exception( @@ -207,8 +207,9 @@ class FederationSender(object): @defer.inlineCallbacks def handle_room_events(events): - for event in events: - yield handle_event(event) + with Measure(self.clock, "handle_room_events"): + for event in events: + yield handle_event(event) events_by_room = {} for event in events: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 2b0f4c79ee..dc9f5a9008 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -33,7 +33,7 @@ from synapse.state import v1, v2 from synapse.util.async_helpers import Linearizer from synapse.util.caches import get_cache_factor_for from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.metrics import Measure +from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -191,11 +191,22 @@ class StateHandler(object): return joined_users @defer.inlineCallbacks - def get_current_hosts_in_room(self, room_id, latest_event_ids=None): - if not latest_event_ids: - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - logger.debug("calling resolve_state_groups from get_current_hosts_in_room") - entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids) + def get_current_hosts_in_room(self, room_id): + event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + return (yield self.get_hosts_in_room_at_events(room_id, event_ids)) + + @defer.inlineCallbacks + def get_hosts_in_room_at_events(self, room_id, event_ids): + """Get the hosts that were in a room at the given event ids + + Args: + room_id (str): + event_ids (list[str]): + + Returns: + Deferred[list[str]]: the hosts in the room at the given events + """ + entry = yield self.resolve_state_groups_for_events(room_id, event_ids) joined_hosts = yield self.store.get_joined_hosts(room_id, entry) return joined_hosts @@ -344,6 +355,7 @@ class StateHandler(object): return context + @measure_func() @defer.inlineCallbacks def resolve_state_groups_for_events(self, room_id, event_ids): """ Given a list of event_ids this method fetches the state at each diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4df8ebdacd..1550d827ba 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -33,6 +33,7 @@ from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.metrics import Measure from synapse.util.stringutils import to_ascii logger = logging.getLogger(__name__) @@ -483,6 +484,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) return result + @defer.inlineCallbacks def get_joined_users_from_state(self, room_id, state_entry): state_group = state_entry.state_group if not state_group: @@ -492,9 +494,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_users_from_context( - room_id, state_group, state_entry.state, context=state_entry - ) + with Measure(self._clock, "get_joined_users_from_state"): + return ( + yield self._get_joined_users_from_context( + room_id, state_group, state_entry.state, context=state_entry + ) + ) @cachedInlineCallbacks( num_args=2, cache_context=True, iterable=True, max_entries=100000 @@ -669,6 +674,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): return True + @defer.inlineCallbacks def get_joined_hosts(self, room_id, state_entry): state_group = state_entry.state_group if not state_group: @@ -678,9 +684,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_hosts( - room_id, state_group, state_entry.state, state_entry=state_entry - ) + with Measure(self._clock, "get_joined_hosts"): + return ( + yield self._get_joined_hosts( + room_id, state_group, state_entry.state, state_entry=state_entry + ) + ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) # @defer.inlineCallbacks diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 0910930c21..4b1bcdf23c 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -60,12 +60,14 @@ in_flight = InFlightGauge( ) -def measure_func(name): +def measure_func(name=None): def wrapper(func): + block_name = func.__name__ if name is None else name + @wraps(func) @defer.inlineCallbacks def measured_func(self, *args, **kwargs): - with Measure(self.clock, name): + with Measure(self.clock, block_name): r = yield func(self, *args, **kwargs) return r -- cgit 1.4.1 From 1d6dd1c2944c22147258dda8ccf2777c68b38fba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 10:53:06 +0100 Subject: Move patch_inline_callbacks into synapse/ --- synapse/__init__.py | 2 +- synapse/util/patch_inline_callbacks.py | 179 +++++++++++++++++++++++++++++++++ tests/__init__.py | 4 +- tests/patch_inline_callbacks.py | 179 --------------------------------- 4 files changed, 182 insertions(+), 182 deletions(-) create mode 100644 synapse/util/patch_inline_callbacks.py delete mode 100644 tests/patch_inline_callbacks.py (limited to 'synapse/util') diff --git a/synapse/__init__.py b/synapse/__init__.py index 1055f54e00..bf102244a9 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -20,7 +20,7 @@ import os import sys -from tests.patch_inline_callbacks import do_patch +from synapse.util.patch_inline_callbacks import do_patch # Check that we're not running on an unsupported Python version. if sys.version_info < (3, 5): diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py new file mode 100644 index 0000000000..4fb49b0b2b --- /dev/null +++ b/synapse/util/patch_inline_callbacks.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import functools +import sys + +from twisted.internet import defer +from twisted.internet.defer import Deferred +from twisted.python.failure import Failure + + +def do_patch(): + """ + Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit + """ + + from synapse.logging.context import LoggingContext + + orig_inline_callbacks = defer.inlineCallbacks + if hasattr(orig_inline_callbacks, "patched_by_synapse"): + return + + def new_inline_callbacks(f): + @functools.wraps(f) + def wrapped(*args, **kwargs): + start_context = LoggingContext.current_context() + changes = [] + orig = orig_inline_callbacks(_check_yield_points(f, changes, start_context)) + + try: + res = orig(*args, **kwargs) + except Exception: + if LoggingContext.current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + + err = "%s changed context from %s to %s on exception" % ( + f, + start_context, + LoggingContext.current_context(), + ) + print(err, file=sys.stderr) + raise Exception(err) + raise + + if not isinstance(res, Deferred) or res.called: + if LoggingContext.current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + + err = "Completed %s changed context from %s to %s" % ( + f, + start_context, + LoggingContext.current_context(), + ) + # print the error to stderr because otherwise all we + # see in travis-ci is the 500 error + print(err, file=sys.stderr) + raise Exception(err) + return res + + if LoggingContext.current_context() != LoggingContext.sentinel: + err = ( + "%s returned incomplete deferred in non-sentinel context " + "%s (start was %s)" + ) % (f, LoggingContext.current_context(), start_context) + print(err, file=sys.stderr) + raise Exception(err) + + def check_ctx(r): + if LoggingContext.current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + err = "%s completion of %s changed context from %s to %s" % ( + "Failure" if isinstance(r, Failure) else "Success", + f, + start_context, + LoggingContext.current_context(), + ) + print(err, file=sys.stderr) + raise Exception(err) + return r + + res.addBoth(check_ctx) + return res + + return wrapped + + defer.inlineCallbacks = new_inline_callbacks + new_inline_callbacks.patched_by_synapse = True + + +def _check_yield_points(f, changes, start_context): + """Wraps a generator that is about to be passed to defer.inlineCallbacks + checking that after every yield the log contexts are correct. + """ + + from synapse.logging.context import LoggingContext + + @functools.wraps(f) + def check_yield_points_inner(*args, **kwargs): + expected_context = start_context + + gen = f(*args, **kwargs) + + last_yield_line_no = 1 + result = None + while True: + try: + isFailure = isinstance(result, Failure) + if isFailure: + d = result.throwExceptionIntoGenerator(gen) + else: + d = gen.send(result) + except (StopIteration, defer._DefGen_Return) as e: + if LoggingContext.current_context() != expected_context: + # This happens when the context is lost sometime *after* the + # final yield and returning. E.g. we forgot to yield on a + # function that returns a deferred. + err = ( + "Function %r returned and changed context from %s to %s," + " in %s between %d and end of func" + % ( + f.__qualname__, + start_context, + LoggingContext.current_context(), + f.__code__.co_filename, + last_yield_line_no, + ) + ) + changes.append(err) + # raise Exception(err) + return getattr(e, "value", None) + + try: + result = yield d + except Exception as e: + result = Failure(e) + + frame = gen.gi_frame + + if LoggingContext.current_context() != expected_context: + # This happens because the context is lost sometime *after* the + # previous yield and *after* the current yield. E.g. the + # deferred we waited on didn't follow the rules, or we forgot to + # yield on a function between the two yield points. + err = ( + "%s changed context from %s to %s, happened between lines %d and %d in %s" + % ( + frame.f_code.co_name, + start_context, + LoggingContext.current_context(), + last_yield_line_no, + frame.f_lineno, + frame.f_code.co_filename, + ) + ) + changes.append(err) + # raise Exception(err) + + expected_context = LoggingContext.current_context() + + last_yield_line_no = frame.f_lineno + + return check_yield_points_inner diff --git a/tests/__init__.py b/tests/__init__.py index f7fc502f01..ed805db1c2 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -16,9 +16,9 @@ from twisted.trial import util -import tests.patch_inline_callbacks +from synapse.util.patch_inline_callbacks import do_patch # attempt to do the patch before we load any synapse code -tests.patch_inline_callbacks.do_patch() +do_patch() util.DEFAULT_TIMEOUT_DURATION = 20 diff --git a/tests/patch_inline_callbacks.py b/tests/patch_inline_callbacks.py deleted file mode 100644 index a35a1d3305..0000000000 --- a/tests/patch_inline_callbacks.py +++ /dev/null @@ -1,179 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import functools -import sys - -from twisted.internet import defer -from twisted.internet.defer import Deferred -from twisted.python.failure import Failure - - -def do_patch(): - """ - Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit - """ - - from synapse.logging.context import LoggingContext - - orig_inline_callbacks = defer.inlineCallbacks - if hasattr(orig_inline_callbacks, "patched_by_synapse"): - return - - def new_inline_callbacks(f): - @functools.wraps(f) - def wrapped(*args, **kwargs): - start_context = LoggingContext.current_context() - changes = [] - orig = orig_inline_callbacks(_check_yield_points(f, changes, start_context)) - - try: - res = orig(*args, **kwargs) - except Exception: - if LoggingContext.current_context() != start_context: - for err in changes: - print(err, file=sys.stderr) - - err = "%s changed context from %s to %s on exception" % ( - f, - start_context, - LoggingContext.current_context(), - ) - print(err, file=sys.stderr) - raise Exception(err) - raise - - if not isinstance(res, Deferred) or res.called: - if LoggingContext.current_context() != start_context: - for err in changes: - print(err, file=sys.stderr) - - err = "Completed %s changed context from %s to %s" % ( - f, - start_context, - LoggingContext.current_context(), - ) - # print the error to stderr because otherwise all we - # see in travis-ci is the 500 error - print(err, file=sys.stderr) - raise Exception(err) - return res - - if LoggingContext.current_context() != LoggingContext.sentinel: - err = ( - "%s returned incomplete deferred in non-sentinel context " - "%s (start was %s)" - ) % (f, LoggingContext.current_context(), start_context) - print(err, file=sys.stderr) - raise Exception(err) - - def check_ctx(r): - if LoggingContext.current_context() != start_context: - for err in changes: - print(err, file=sys.stderr) - err = "%s completion of %s changed context from %s to %s" % ( - "Failure" if isinstance(r, Failure) else "Success", - f, - start_context, - LoggingContext.current_context(), - ) - print(err, file=sys.stderr) - raise Exception(err) - return r - - res.addBoth(check_ctx) - return res - - return wrapped - - defer.inlineCallbacks = new_inline_callbacks - new_inline_callbacks.patched_by_synapse = True - - -def _check_yield_points(f, changes, start_context): - """Wraps a generator that is about to passed to defer.inlineCallbacks - checking that after every yield the log contexts are correct. - """ - - from synapse.logging.context import LoggingContext - - @functools.wraps(f) - def check_yield_points_inner(*args, **kwargs): - expected_context = start_context - - gen = f(*args, **kwargs) - - last_yield_line_no = 1 - result = None - while True: - try: - isFailure = isinstance(result, Failure) - if isFailure: - d = result.throwExceptionIntoGenerator(gen) - else: - d = gen.send(result) - except (StopIteration, defer._DefGen_Return) as e: - if LoggingContext.current_context() != expected_context: - # This happens when the context is lost sometime *after* the - # final yield and returning. E.g. we forgot to yield on a - # function that returns a deferred. - err = ( - "Function %r returned and changed context from %s to %s," - " in %s between %d and end of func" - % ( - f.__qualname__, - start_context, - LoggingContext.current_context(), - f.__code__.co_filename, - last_yield_line_no, - ) - ) - changes.append(err) - # raise Exception(err) - return getattr(e, "value", None) - - try: - result = yield d - except Exception as e: - result = Failure(e) - - frame = gen.gi_frame - - if LoggingContext.current_context() != expected_context: - # This happens because the context is lost sometime *after* the - # previous yield and *after* the current yield. E.g. the - # deferred we waited on didn't follow the rules, or we forgot to - # yield on a function between the two yield points. - err = ( - "%s changed context from %s to %s, happened between lines %d and %d in %s" - % ( - frame.f_code.co_name, - start_context, - LoggingContext.current_context(), - last_yield_line_no, - frame.f_lineno, - frame.f_code.co_filename, - ) - ) - changes.append(err) - # raise Exception(err) - - expected_context = LoggingContext.current_context() - - last_yield_line_no = frame.f_lineno - - return check_yield_points_inner -- cgit 1.4.1 From 3e4272961a4cb659513bccd981cbd42f4e506362 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 10:58:32 +0100 Subject: Test for sentinel commit --- synapse/util/patch_inline_callbacks.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 4fb49b0b2b..4a45824f52 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -146,13 +146,30 @@ def _check_yield_points(f, changes, start_context): # raise Exception(err) return getattr(e, "value", None) + frame = gen.gi_frame + + if isinstance(d, defer.Deferred): + # This happens if we yield on a deferred that doesn't follow + # the log context rules without wrappin in a `make_deferred_yieldable` + if LoggingContext.current_context() != LoggingContext.Sentinel: + err = ( + "%s yielded with context %s rather than Sentinel," + " yielded on line %d in %s" + % ( + frame.f_code.co_name, + start_context, + LoggingContext.current_context(), + frame.f_lineno, + frame.f_code.co_filename, + ) + ) + changes.append(err) + try: result = yield d except Exception as e: result = Failure(e) - frame = gen.gi_frame - if LoggingContext.current_context() != expected_context: # This happens because the context is lost sometime *after* the # previous yield and *after* the current yield. E.g. the -- cgit 1.4.1 From ec0596f2ab4502c9a6183813a7e5dc2a5bfedd48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 10:59:07 +0100 Subject: Log correct context --- synapse/util/patch_inline_callbacks.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 4a45824f52..5ef7190b14 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -117,7 +117,7 @@ def _check_yield_points(f, changes, start_context): gen = f(*args, **kwargs) - last_yield_line_no = 1 + last_yield_line_no = gen.gi_frame.f_lineno result = None while True: try: @@ -136,7 +136,7 @@ def _check_yield_points(f, changes, start_context): " in %s between %d and end of func" % ( f.__qualname__, - start_context, + expected_context, LoggingContext.current_context(), f.__code__.co_filename, last_yield_line_no, @@ -148,22 +148,22 @@ def _check_yield_points(f, changes, start_context): frame = gen.gi_frame - if isinstance(d, defer.Deferred): + if isinstance(d, defer.Deferred) and not d.called: # This happens if we yield on a deferred that doesn't follow # the log context rules without wrappin in a `make_deferred_yieldable` - if LoggingContext.current_context() != LoggingContext.Sentinel: + if LoggingContext.current_context() is not LoggingContext.sentinel: err = ( - "%s yielded with context %s rather than Sentinel," + "%s yielded with context %s rather than sentinel," " yielded on line %d in %s" % ( frame.f_code.co_name, - start_context, LoggingContext.current_context(), frame.f_lineno, frame.f_code.co_filename, ) ) changes.append(err) + # raise Exception(err) try: result = yield d -- cgit 1.4.1 From 791a8c559bf4ea984637c047fad7d6097e34ce99 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 11:53:57 +0100 Subject: Add coments --- synapse/util/patch_inline_callbacks.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 5ef7190b14..b518dae256 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -107,6 +107,19 @@ def do_patch(): def _check_yield_points(f, changes, start_context): """Wraps a generator that is about to be passed to defer.inlineCallbacks checking that after every yield the log contexts are correct. + + Its perfectly valid for log contexts to change within a function, e.g. due + to new Measure blocks, so such changes are added to the given `changes` + list instead of triggering an exception. + + Args: + f: generator function to wrap + changes (list[str]): A list of strings detailing how the contexts + changed within a function. + start_context (LoggingContext): The initial context we're expecting + + Returns: + function """ from synapse.logging.context import LoggingContext @@ -131,6 +144,10 @@ def _check_yield_points(f, changes, start_context): # This happens when the context is lost sometime *after* the # final yield and returning. E.g. we forgot to yield on a # function that returns a deferred. + # + # We don't raise here as its perfectly valid for contexts to + # change in a function, as long as it sets the correct context + # on resolving (which is checked separately). err = ( "Function %r returned and changed context from %s to %s," " in %s between %d and end of func" @@ -143,14 +160,14 @@ def _check_yield_points(f, changes, start_context): ) ) changes.append(err) - # raise Exception(err) return getattr(e, "value", None) frame = gen.gi_frame if isinstance(d, defer.Deferred) and not d.called: # This happens if we yield on a deferred that doesn't follow - # the log context rules without wrappin in a `make_deferred_yieldable` + # the log context rules without wrappin in a `make_deferred_yieldable`. + # We raise here as this should never happen. if LoggingContext.current_context() is not LoggingContext.sentinel: err = ( "%s yielded with context %s rather than sentinel," @@ -162,8 +179,7 @@ def _check_yield_points(f, changes, start_context): frame.f_code.co_filename, ) ) - changes.append(err) - # raise Exception(err) + raise Exception(err) try: result = yield d @@ -171,10 +187,15 @@ def _check_yield_points(f, changes, start_context): result = Failure(e) if LoggingContext.current_context() != expected_context: + # This happens because the context is lost sometime *after* the # previous yield and *after* the current yield. E.g. the # deferred we waited on didn't follow the rules, or we forgot to # yield on a function between the two yield points. + # + # We don't raise here as its perfectly valid for contexts to + # change in a function, as long as it sets the correct context + # on resolving (which is checked separately). err = ( "%s changed context from %s to %s, happened between lines %d and %d in %s" % ( @@ -187,7 +208,6 @@ def _check_yield_points(f, changes, start_context): ) ) changes.append(err) - # raise Exception(err) expected_context = LoggingContext.current_context() -- cgit 1.4.1 From 941edad58355a829b49b0a43d382bbb0bf9ba021 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 12:15:17 +0100 Subject: Appease mypy --- synapse/util/patch_inline_callbacks.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index b518dae256..64a2c891c3 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -18,11 +18,17 @@ from __future__ import print_function import functools import sys +from typing import List, Callable, Any + from twisted.internet import defer from twisted.internet.defer import Deferred from twisted.python.failure import Failure +# Tracks if we've already patched inlineCallbacks +_already_patched = False + + def do_patch(): """ Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit @@ -30,16 +36,18 @@ def do_patch(): from synapse.logging.context import LoggingContext + global _already_patched + orig_inline_callbacks = defer.inlineCallbacks - if hasattr(orig_inline_callbacks, "patched_by_synapse"): + if _already_patched: return def new_inline_callbacks(f): @functools.wraps(f) def wrapped(*args, **kwargs): start_context = LoggingContext.current_context() - changes = [] - orig = orig_inline_callbacks(_check_yield_points(f, changes, start_context)) + changes: List[str] = [] + orig = orig_inline_callbacks(_check_yield_points(f, changes)) try: res = orig(*args, **kwargs) @@ -101,10 +109,10 @@ def do_patch(): return wrapped defer.inlineCallbacks = new_inline_callbacks - new_inline_callbacks.patched_by_synapse = True + _already_patched = True -def _check_yield_points(f, changes, start_context): +def _check_yield_points(f: Callable, changes: List[str]): """Wraps a generator that is about to be passed to defer.inlineCallbacks checking that after every yield the log contexts are correct. @@ -114,9 +122,8 @@ def _check_yield_points(f, changes, start_context): Args: f: generator function to wrap - changes (list[str]): A list of strings detailing how the contexts + changes: A list of strings detailing how the contexts changed within a function. - start_context (LoggingContext): The initial context we're expecting Returns: function @@ -126,13 +133,13 @@ def _check_yield_points(f, changes, start_context): @functools.wraps(f) def check_yield_points_inner(*args, **kwargs): - expected_context = start_context - gen = f(*args, **kwargs) last_yield_line_no = gen.gi_frame.f_lineno - result = None + result: Any = None while True: + expected_context = LoggingContext.current_context() + try: isFailure = isinstance(result, Failure) if isFailure: @@ -200,7 +207,7 @@ def _check_yield_points(f, changes, start_context): "%s changed context from %s to %s, happened between lines %d and %d in %s" % ( frame.f_code.co_name, - start_context, + expected_context, LoggingContext.current_context(), last_yield_line_no, frame.f_lineno, @@ -209,8 +216,6 @@ def _check_yield_points(f, changes, start_context): ) changes.append(err) - expected_context = LoggingContext.current_context() - last_yield_line_no = frame.f_lineno return check_yield_points_inner -- cgit 1.4.1 From f735aeec65a5117c71cf0f1e5f61cb900683533a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 12:20:29 +0100 Subject: sort --- synapse/util/patch_inline_callbacks.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 64a2c891c3..3b78451dc0 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -17,14 +17,12 @@ from __future__ import print_function import functools import sys - -from typing import List, Callable, Any +from typing import Any, Callable, List from twisted.internet import defer from twisted.internet.defer import Deferred from twisted.python.failure import Failure - # Tracks if we've already patched inlineCallbacks _already_patched = False -- cgit 1.4.1 From c349e3ebafbe044022b93ca5c04d8b2fcb640c0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 12:29:38 +0100 Subject: Fix py3.5 --- synapse/util/patch_inline_callbacks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 3b78451dc0..812dc88835 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -44,7 +44,7 @@ def do_patch(): @functools.wraps(f) def wrapped(*args, **kwargs): start_context = LoggingContext.current_context() - changes: List[str] = [] + changes = [] # type: List[str] orig = orig_inline_callbacks(_check_yield_points(f, changes)) try: -- cgit 1.4.1 From 59e0ed83065874775be350e25bb9f87da67b87c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 12:47:07 +0100 Subject: Fix py3.5 --- synapse/util/patch_inline_callbacks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 812dc88835..66c3d47519 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -134,7 +134,7 @@ def _check_yield_points(f: Callable, changes: List[str]): gen = f(*args, **kwargs) last_yield_line_no = gen.gi_frame.f_lineno - result: Any = None + result = None # type: Any while True: expected_context = LoggingContext.current_context() -- cgit 1.4.1 From fe1c1e6c28e09f88b30e0587161f9b1dbd6e8acf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 13:17:19 +0100 Subject: Fixup comments Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/util/patch_inline_callbacks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 66c3d47519..3925927f9f 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -114,7 +114,7 @@ def _check_yield_points(f: Callable, changes: List[str]): """Wraps a generator that is about to be passed to defer.inlineCallbacks checking that after every yield the log contexts are correct. - Its perfectly valid for log contexts to change within a function, e.g. due + It's perfectly valid for log contexts to change within a function, e.g. due to new Measure blocks, so such changes are added to the given `changes` list instead of triggering an exception. @@ -150,7 +150,7 @@ def _check_yield_points(f: Callable, changes: List[str]): # final yield and returning. E.g. we forgot to yield on a # function that returns a deferred. # - # We don't raise here as its perfectly valid for contexts to + # We don't raise here as it's perfectly valid for contexts to # change in a function, as long as it sets the correct context # on resolving (which is checked separately). err = ( @@ -171,7 +171,7 @@ def _check_yield_points(f: Callable, changes: List[str]): if isinstance(d, defer.Deferred) and not d.called: # This happens if we yield on a deferred that doesn't follow - # the log context rules without wrappin in a `make_deferred_yieldable`. + # the log context rules without wrapping in a `make_deferred_yieldable`. # We raise here as this should never happen. if LoggingContext.current_context() is not LoggingContext.sentinel: err = ( -- cgit 1.4.1 From 3c2d6c708cd93df7fc945e10014049e9f9b36f46 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Oct 2019 15:26:09 +0100 Subject: Add maybe_awaitable and fix __init__ bugs --- synapse/rest/admin/__init__.py | 7 +++++-- synapse/util/async_helpers.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index f7b9483008..939418ee2b 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -44,6 +44,7 @@ from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet from synapse.rest.admin.users import UserAdminServlet from synapse.types import UserID, create_requester +from synapse.util.async_helpers import maybe_awaitable from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) @@ -310,7 +311,7 @@ class PurgeHistoryRestServlet(RestServlet): errcode=Codes.BAD_JSON, ) - purge_id = await self.pagination_handler.start_purge_history( + purge_id = self.pagination_handler.start_purge_history( room_id, token, delete_local_events=delete_local_events ) @@ -480,7 +481,9 @@ class ShutdownRoomRestServlet(RestServlet): ratelimit=False, ) - aliases_for_room = await self.store.get_aliases_for_room(room_id) + aliases_for_room = await maybe_awaitable( + self.store.get_aliases_for_room(room_id) + ) await self.store.update_aliases_for_room( room_id, new_room_id, requester_user_id diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 0d3bdd88ce..804dbca443 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -21,6 +21,8 @@ from typing import Dict, Sequence, Set, Union from six.moves import range +import attr + from twisted.internet import defer from twisted.internet.defer import CancelledError from twisted.python import failure @@ -483,3 +485,30 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): deferred.addCallbacks(success_cb, failure_cb) return new_d + + +@attr.s(slots=True, frozen=True) +class DoneAwaitable(object): + """Simple awaitable that returns the provided value. + """ + + value = attr.ib() + + def __await__(self): + return self + + def __iter__(self): + return self + + def __next__(self): + raise StopIteration(self.value) + + +def maybe_awaitable(value): + """Convert a value to an awaitable if not already an awaitable. + """ + + if hasattr(value, "__await__"): + return value + + return DoneAwaitable(value) -- cgit 1.4.1