From 1d71f484d4ec00fd41e3ef195622d0d5dba6d372 Mon Sep 17 00:00:00 2001 From: Krombel Date: Fri, 6 Apr 2018 12:54:09 +0200 Subject: use PUT instead of POST for federating groups/m.join_policy --- synapse/http/matrixfederationclient.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse/http/matrixfederationclient.py') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 9145405cb0..60a29081e8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -286,7 +286,8 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers @defer.inlineCallbacks - def put_json(self, destination, path, data={}, json_data_callback=None, + def put_json(self, destination, path, args={}, data={}, + json_data_callback=None, long_retries=False, timeout=None, ignore_backoff=False, backoff_on_404=False): @@ -296,6 +297,7 @@ class MatrixFederationHttpClient(object): destination (str): The remote server to send the HTTP request to. path (str): The HTTP path. + args (dict): query params data (dict): A dict containing the data that will be used as the request body. This will be encoded as JSON. json_data_callback (callable): A callable returning the dict to @@ -342,6 +344,7 @@ class MatrixFederationHttpClient(object): path, body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, + query_bytes=encode_query_args(args), long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, @@ -373,6 +376,7 @@ class MatrixFederationHttpClient(object): giving up. None indicates no timeout. ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. + args (dict): query params Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. -- cgit 1.5.1 From 2a3c33ff03aa88317c30da43cd3773c2789f0fcf Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sun, 15 Apr 2018 17:15:16 +0200 Subject: Use six.moves.urlparse The imports were shuffled around a bunch in py3 Signed-off-by: Adrian Tschira --- synapse/config/appservice.py | 4 ++-- synapse/http/matrixfederationclient.py | 3 +-- synapse/rest/client/v1/login.py | 2 +- synapse/rest/client/v1/room.py | 9 +++++---- synapse/rest/media/v1/_base.py | 2 +- synapse/rest/media/v1/media_repository.py | 2 +- tests/rest/client/v1/test_rooms.py | 14 +++++++------- tests/utils.py | 5 ++--- 8 files changed, 20 insertions(+), 21 deletions(-) (limited to 'synapse/http/matrixfederationclient.py') diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 9a2359b6fd..277305e184 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -17,11 +17,11 @@ from ._base import Config, ConfigError from synapse.appservice import ApplicationService from synapse.types import UserID -import urllib import yaml import logging from six import string_types +from six.moves.urllib import parse as urlparse logger = logging.getLogger(__name__) @@ -105,7 +105,7 @@ def _load_appservice(hostname, as_info, config_filename): ) localpart = as_info["sender_localpart"] - if urllib.quote(localpart) != localpart: + if urlparse.quote(localpart) != localpart: raise ValueError( "sender_localpart needs characters which are not URL encoded." ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 60a29081e8..c2e5610f50 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -38,8 +38,7 @@ import logging import random import sys import urllib -import urlparse - +from six.moves.urllib import parse as urlparse logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 45844aa2d2..34df5be4e9 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -25,7 +25,7 @@ from .base import ClientV1RestServlet, client_path_patterns import simplejson as json import urllib -import urlparse +from six.moves.urllib import parse as urlparse import logging from saml2 import BINDING_HTTP_POST diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 2ad0e5943b..fcf9c9ab44 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -28,8 +28,9 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, parse_integer ) +from six.moves.urllib import parse as urlparse + import logging -import urllib import simplejson as json logger = logging.getLogger(__name__) @@ -433,7 +434,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): as_client_event = "raw" not in request.args filter_bytes = request.args.get("filter", None) if filter_bytes: - filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8") + filter_json = urlparse.unquote(filter_bytes[-1]).decode("UTF-8") event_filter = Filter(json.loads(filter_json)) else: event_filter = None @@ -718,8 +719,8 @@ class RoomTypingRestServlet(ClientV1RestServlet): def on_PUT(self, request, room_id, user_id): requester = yield self.auth.get_user_by_req(request) - room_id = urllib.unquote(room_id) - target_user = UserID.from_string(urllib.unquote(user_id)) + room_id = urlparse.unquote(room_id) + target_user = UserID.from_string(urlparse.unquote(user_id)) content = parse_json_object_from_request(request) diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index e7ac01da01..d9c4af9389 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -28,7 +28,7 @@ import os import logging import urllib -import urlparse +from six.moves.urllib import parse as urlparse logger = logging.getLogger(__name__) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index bb79599379..9800ce7581 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -47,7 +47,7 @@ import shutil import cgi import logging -import urlparse +from six.moves.urllib import parse as urlparse logger = logging.getLogger(__name__) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 7e8966a1a8..d763400eaf 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -24,7 +24,7 @@ from synapse.api.constants import Membership from synapse.types import UserID import json -import urllib +from six.moves.urllib import parse as urlparse from ....utils import MockHttpResource, setup_test_homeserver from .utils import RestTestCase @@ -766,7 +766,7 @@ class RoomMemberStateTestCase(RestTestCase): @defer.inlineCallbacks def test_rooms_members_self(self): path = "/rooms/%s/state/m.room.member/%s" % ( - urllib.quote(self.room_id), self.user_id + urlparse.quote(self.room_id), self.user_id ) # valid join message (NOOP since we made the room) @@ -786,7 +786,7 @@ class RoomMemberStateTestCase(RestTestCase): def test_rooms_members_other(self): self.other_id = "@zzsid1:red" path = "/rooms/%s/state/m.room.member/%s" % ( - urllib.quote(self.room_id), self.other_id + urlparse.quote(self.room_id), self.other_id ) # valid invite message @@ -802,7 +802,7 @@ class RoomMemberStateTestCase(RestTestCase): def test_rooms_members_other_custom_keys(self): self.other_id = "@zzsid1:red" path = "/rooms/%s/state/m.room.member/%s" % ( - urllib.quote(self.room_id), self.other_id + urlparse.quote(self.room_id), self.other_id ) # valid invite message with custom key @@ -859,7 +859,7 @@ class RoomMessagesTestCase(RestTestCase): @defer.inlineCallbacks def test_invalid_puts(self): path = "/rooms/%s/send/m.room.message/mid1" % ( - urllib.quote(self.room_id)) + urlparse.quote(self.room_id)) # missing keys or invalid json (code, response) = yield self.mock_resource.trigger( "PUT", path, '{}' @@ -894,7 +894,7 @@ class RoomMessagesTestCase(RestTestCase): @defer.inlineCallbacks def test_rooms_messages_sent(self): path = "/rooms/%s/send/m.room.message/mid1" % ( - urllib.quote(self.room_id)) + urlparse.quote(self.room_id)) content = '{"body":"test","msgtype":{"type":"a"}}' (code, response) = yield self.mock_resource.trigger("PUT", path, content) @@ -911,7 +911,7 @@ class RoomMessagesTestCase(RestTestCase): # m.text message type path = "/rooms/%s/send/m.room.message/mid2" % ( - urllib.quote(self.room_id)) + urlparse.quote(self.room_id)) content = '{"body":"test2","msgtype":"m.text"}' (code, response) = yield self.mock_resource.trigger("PUT", path, content) self.assertEquals(200, code, msg=str(response)) diff --git a/tests/utils.py b/tests/utils.py index f15317d27b..9e815d8643 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -15,8 +15,7 @@ import hashlib from inspect import getcallargs -import urllib -import urlparse +from six.moves.urllib import parse as urlparse from mock import Mock, patch from twisted.internet import defer, reactor @@ -234,7 +233,7 @@ class MockHttpResource(HttpServer): if matcher: try: args = [ - urllib.unquote(u).decode("UTF-8") + urlparse.unquote(u).decode("UTF-8") for u in matcher.groups() ] -- cgit 1.5.1 From 1ea904b9f09ea6a9df7792f61029d0250602d46b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Apr 2018 00:53:18 +0100 Subject: Use deferred.addTimeout instead of time_bound_deferred This doesn't feel like a wheel we need to reinvent. --- synapse/http/__init__.py | 22 +++++++++++++ synapse/http/client.py | 20 +++++------- synapse/http/matrixfederationclient.py | 35 ++++++++++----------- synapse/notifier.py | 23 +++++++------- synapse/util/__init__.py | 56 ---------------------------------- tests/util/test_clock.py | 33 -------------------- 6 files changed, 59 insertions(+), 130 deletions(-) delete mode 100644 tests/util/test_clock.py (limited to 'synapse/http/matrixfederationclient.py') diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index bfebb0f644..20e568bc43 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,3 +13,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet.defer import CancelledError +from twisted.python import failure + +from synapse.api.errors import SynapseError + + +class RequestTimedOutError(SynapseError): + """Exception representing timeout of an outbound request""" + def __init__(self): + super(RequestTimedOutError, self).__init__(504, "Timed out") + + +def cancelled_to_request_timed_out_error(value): + """Turns CancelledErrors into RequestTimedOutErrors. + + For use with deferred.addTimeout() + """ + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise RequestTimedOutError() + return value diff --git a/synapse/http/client.py b/synapse/http/client.py index f3e4973c2e..35c8d51e71 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,9 +19,9 @@ from OpenSSL.SSL import VERIFY_NONE from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) +from synapse.http import cancelled_to_request_timed_out_error from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable -from synapse.util import logcontext import synapse.metrics from synapse.http.endpoint import SpiderEndpoint @@ -95,21 +96,16 @@ class SimpleHttpClient(object): # counters to it outgoing_requests_counter.inc(method) - def send_request(): + logger.info("Sending request %s %s", method, uri) + + try: request_deferred = self.agent.request( method, uri, *args, **kwargs ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=60, + request_deferred.addTimeout( + 60, reactor, cancelled_to_request_timed_out_error, ) - - logger.info("Sending request %s %s", method, uri) - - try: - with logcontext.PreserveLoggingContext(): - response = yield send_request() + response = yield make_deferred_yieldable(request_deferred) incoming_responses_counter.inc(method, response.code) logger.info( diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 60a29081e8..fe4b1636a6 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,17 +13,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.retryutils from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, HTTPConnectionPool, Agent from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone +from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint +import synapse.metrics from synapse.util.async import sleep from synapse.util import logcontext -import synapse.metrics +from synapse.util.logcontext import make_deferred_yieldable +import synapse.util.retryutils from canonicaljson import encode_canonical_json @@ -184,21 +187,19 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, http_url_bytes, headers_dict) try: - def send_request(): - request_deferred = self.agent.request( - method, - url_bytes, - Headers(headers_dict), - producer - ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=timeout / 1000. if timeout else 60, - ) - - with logcontext.PreserveLoggingContext(): - response = yield send_request() + request_deferred = self.agent.request( + method, + url_bytes, + Headers(headers_dict), + producer + ) + request_deferred.addTimeout( + timeout / 1000. if timeout else 60, + reactor, cancelled_to_request_timed_out_error, + ) + response = yield make_deferred_yieldable( + request_deferred, + ) log_result = "%d %s" % (response.code, response.phrase,) break diff --git a/synapse/notifier.py b/synapse/notifier.py index 0e40a4aad6..1e4f78b993 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.internet.defer import TimeoutError + from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state -from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext, preserve_fn @@ -331,11 +332,11 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) + listener.deferred.addTimeout( + (end_time - now) / 1000., reactor, + ) with PreserveLoggingContext(): - yield self.clock.time_bound_deferred( - listener.deferred, - time_out=(end_time - now) / 1000. - ) + yield listener.deferred current_token = user_stream.current_token @@ -346,7 +347,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except DeferredTimedOutError: + except TimeoutError: break except defer.CancelledError: break @@ -551,13 +552,11 @@ class Notifier(object): if end_time <= now: break + listener.deferred.addTimeout((end_time - now) / 1000., reactor) try: with PreserveLoggingContext(): - yield self.clock.time_bound_deferred( - listener.deferred, - time_out=(end_time - now) / 1000. - ) - except DeferredTimedOutError: + yield listener.deferred + except TimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 756d8ffa32..814a7bf71b 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError from synapse.util.logcontext import PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -24,11 +23,6 @@ import logging logger = logging.getLogger(__name__) -class DeferredTimedOutError(SynapseError): - def __init__(self): - super(DeferredTimedOutError, self).__init__(504, "Timed out") - - def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. failure.trap(defer.FirstError) @@ -85,53 +79,3 @@ class Clock(object): except Exception: if not ignore_errs: raise - - def time_bound_deferred(self, given_deferred, time_out): - if given_deferred.called: - return given_deferred - - ret_deferred = defer.Deferred() - - def timed_out_fn(): - e = DeferredTimedOutError() - - try: - ret_deferred.errback(e) - except Exception: - pass - - try: - given_deferred.cancel() - except Exception: - pass - - timer = None - - def cancel(res): - try: - self.cancel_call_later(timer) - except Exception: - pass - return res - - ret_deferred.addBoth(cancel) - - def success(res): - try: - ret_deferred.callback(res) - except Exception: - pass - - return res - - def err(res): - try: - ret_deferred.errback(res) - except Exception: - pass - - given_deferred.addCallbacks(callback=success, errback=err) - - timer = self.call_later(time_out, timed_out_fn) - - return ret_deferred diff --git a/tests/util/test_clock.py b/tests/util/test_clock.py deleted file mode 100644 index 9672603579..0000000000 --- a/tests/util/test_clock.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2017 Vector Creations Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from synapse import util -from twisted.internet import defer -from tests import unittest - - -class ClockTestCase(unittest.TestCase): - @defer.inlineCallbacks - def test_time_bound_deferred(self): - # just a deferred which never resolves - slow_deferred = defer.Deferred() - - clock = util.Clock() - time_bound = clock.time_bound_deferred(slow_deferred, 0.001) - - try: - yield time_bound - self.fail("Expected timedout error, but got nothing") - except util.DeferredTimedOutError: - pass -- cgit 1.5.1 From 9d2c1b8429996cb9766ac636034485e2a1d685cc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 12:52:30 +0100 Subject: Backport deferred.addTimeout Twisted 16.0 doesn't have addTimeout, so let's backport it. --- synapse/http/__init__.py | 2 +- synapse/http/client.py | 6 ++- synapse/http/matrixfederationclient.py | 7 ++-- synapse/notifier.py | 22 +++++++---- synapse/util/async.py | 67 ++++++++++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 14 deletions(-) (limited to 'synapse/http/matrixfederationclient.py') diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 20e568bc43..0d47ccdb59 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -28,7 +28,7 @@ class RequestTimedOutError(SynapseError): def cancelled_to_request_timed_out_error(value): """Turns CancelledErrors into RequestTimedOutErrors. - For use with deferred.addTimeout() + For use with async.add_timeout_to_deferred """ if isinstance(value, failure.Failure): value.trap(CancelledError) diff --git a/synapse/http/client.py b/synapse/http/client.py index 35c8d51e71..62309c3365 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -20,6 +20,7 @@ from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) from synapse.http import cancelled_to_request_timed_out_error +from synapse.util.async import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable import synapse.metrics @@ -102,8 +103,9 @@ class SimpleHttpClient(object): request_deferred = self.agent.request( method, uri, *args, **kwargs ) - request_deferred.addTimeout( - 60, reactor, cancelled_to_request_timed_out_error, + add_timeout_to_deferred( + request_deferred, + 60, cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable(request_deferred) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index fe4b1636a6..30036fe81c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint import synapse.metrics -from synapse.util.async import sleep +from synapse.util.async import sleep, add_timeout_to_deferred from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable import synapse.util.retryutils @@ -193,9 +193,10 @@ class MatrixFederationHttpClient(object): Headers(headers_dict), producer ) - request_deferred.addTimeout( + add_timeout_to_deferred( + request_deferred, timeout / 1000. if timeout else 60, - reactor, cancelled_to_request_timed_out_error, + cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable( request_deferred, diff --git a/synapse/notifier.py b/synapse/notifier.py index 1e4f78b993..a1c06e8fca 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -13,15 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor -from twisted.internet.defer import TimeoutError +from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.util.logutils import log_function -from synapse.util.async import ObservableDeferred +from synapse.util.async import ( + ObservableDeferred, add_timeout_to_deferred, + DeferredTimeoutError, +) from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from synapse.types import StreamToken @@ -332,8 +334,9 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - listener.deferred.addTimeout( - (end_time - now) / 1000., reactor, + add_timeout_to_deferred( + listener.deferred, + (end_time - now) / 1000., ) with PreserveLoggingContext(): yield listener.deferred @@ -347,7 +350,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except TimeoutError: + except DeferredTimeoutError: break except defer.CancelledError: break @@ -552,11 +555,14 @@ class Notifier(object): if end_time <= now: break - listener.deferred.addTimeout((end_time - now) / 1000., reactor) + add_timeout_to_deferred( + listener.deferred.addTimeout, + (end_time - now) / 1000., + ) try: with PreserveLoggingContext(): yield listener.deferred - except TimeoutError: + except DeferredTimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..1df5c5600c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -15,6 +15,8 @@ from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError +from twisted.python import failure from .logcontext import ( PreserveLoggingContext, make_deferred_yieldable, preserve_fn @@ -392,3 +394,68 @@ class ReadWriteLock(object): self.key_to_current_writer.pop(key) defer.returnValue(_ctx_manager()) + + +class DeferredTimeoutError(Exception): + """ + This error is raised by default when a L{Deferred} times out. + """ + + +def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): + """ + Add a timeout to a deferred by scheduling it to be cancelled after + timeout seconds. + + This is essentially a backport of deferred.addTimeout, which was introduced + in twisted 16.5. + + If the deferred gets timed out, it errbacks with a DeferredTimeoutError, + unless a cancelable function was passed to its initialization or unless + a different on_timeout_cancel callable is provided. + + Args: + deferred (defer.Deferred): deferred to be timed out + timeout (Number): seconds to time out after + + on_timeout_cancel (callable): A callable which is called immediately + after the deferred times out, and not if this deferred is + otherwise cancelled before the timeout. + + It takes an arbitrary value, which is the value of the deferred at + that exact point in time (probably a CancelledError Failure), and + the timeout. + + The default callable (if none is provided) will translate a + CancelledError Failure into a DeferredTimeoutError. + """ + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + deferred.cancel() + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise DeferredTimeoutError(timeout, "Deferred") + return value -- cgit 1.5.1