summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/http/__init__.py22
-rw-r--r--synapse/http/client.py20
-rw-r--r--synapse/http/matrixfederationclient.py35
-rw-r--r--synapse/notifier.py23
-rw-r--r--synapse/util/__init__.py56
5 files changed, 59 insertions, 97 deletions
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index bfebb0f644..20e568bc43 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,3 +13,24 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from twisted.internet.defer import CancelledError
+from twisted.python import failure
+
+from synapse.api.errors import SynapseError
+
+
+class RequestTimedOutError(SynapseError):
+    """Exception representing timeout of an outbound request"""
+    def __init__(self):
+        super(RequestTimedOutError, self).__init__(504, "Timed out")
+
+
+def cancelled_to_request_timed_out_error(value):
+    """Turns CancelledErrors into RequestTimedOutErrors.
+
+    For use with deferred.addTimeout()
+    """
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise RequestTimedOutError()
+    return value
diff --git a/synapse/http/client.py b/synapse/http/client.py
index f3e4973c2e..35c8d51e71 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,9 +19,9 @@ from OpenSSL.SSL import VERIFY_NONE
 from synapse.api.errors import (
     CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
 )
+from synapse.http import cancelled_to_request_timed_out_error
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
-from synapse.util import logcontext
 import synapse.metrics
 from synapse.http.endpoint import SpiderEndpoint
 
@@ -95,21 +96,16 @@ class SimpleHttpClient(object):
         # counters to it
         outgoing_requests_counter.inc(method)
 
-        def send_request():
+        logger.info("Sending request %s %s", method, uri)
+
+        try:
             request_deferred = self.agent.request(
                 method, uri, *args, **kwargs
             )
-
-            return self.clock.time_bound_deferred(
-                request_deferred,
-                time_out=60,
+            request_deferred.addTimeout(
+                60, reactor, cancelled_to_request_timed_out_error,
             )
-
-        logger.info("Sending request %s %s", method, uri)
-
-        try:
-            with logcontext.PreserveLoggingContext():
-                response = yield send_request()
+            response = yield make_deferred_yieldable(request_deferred)
 
             incoming_responses_counter.inc(method, response.code)
             logger.info(
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 60a29081e8..fe4b1636a6 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,17 +13,19 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import synapse.util.retryutils
 from twisted.internet import defer, reactor, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.web.client import readBody, HTTPConnectionPool, Agent
 from twisted.web.http_headers import Headers
 from twisted.web._newclient import ResponseDone
 
+from synapse.http import cancelled_to_request_timed_out_error
 from synapse.http.endpoint import matrix_federation_endpoint
+import synapse.metrics
 from synapse.util.async import sleep
 from synapse.util import logcontext
-import synapse.metrics
+from synapse.util.logcontext import make_deferred_yieldable
+import synapse.util.retryutils
 
 from canonicaljson import encode_canonical_json
 
@@ -184,21 +187,19 @@ class MatrixFederationHttpClient(object):
                         producer = body_callback(method, http_url_bytes, headers_dict)
 
                     try:
-                        def send_request():
-                            request_deferred = self.agent.request(
-                                method,
-                                url_bytes,
-                                Headers(headers_dict),
-                                producer
-                            )
-
-                            return self.clock.time_bound_deferred(
-                                request_deferred,
-                                time_out=timeout / 1000. if timeout else 60,
-                            )
-
-                        with logcontext.PreserveLoggingContext():
-                            response = yield send_request()
+                        request_deferred = self.agent.request(
+                            method,
+                            url_bytes,
+                            Headers(headers_dict),
+                            producer
+                        )
+                        request_deferred.addTimeout(
+                            timeout / 1000. if timeout else 60,
+                            reactor, cancelled_to_request_timed_out_error,
+                        )
+                        response = yield make_deferred_yieldable(
+                            request_deferred,
+                        )
 
                         log_result = "%d %s" % (response.code, response.phrase,)
                         break
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 0e40a4aad6..1e4f78b993 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.internet.defer import TimeoutError
+
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 
-from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
@@ -331,11 +332,11 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
+                    listener.deferred.addTimeout(
+                        (end_time - now) / 1000., reactor,
+                    )
                     with PreserveLoggingContext():
-                        yield self.clock.time_bound_deferred(
-                            listener.deferred,
-                            time_out=(end_time - now) / 1000.
-                        )
+                        yield listener.deferred
 
                     current_token = user_stream.current_token
 
@@ -346,7 +347,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except DeferredTimedOutError:
+                except TimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -551,13 +552,11 @@ class Notifier(object):
             if end_time <= now:
                 break
 
+            listener.deferred.addTimeout((end_time - now) / 1000., reactor)
             try:
                 with PreserveLoggingContext():
-                    yield self.clock.time_bound_deferred(
-                        listener.deferred,
-                        time_out=(end_time - now) / 1000.
-                    )
-            except DeferredTimedOutError:
+                    yield listener.deferred
+            except TimeoutError:
                 break
             except defer.CancelledError:
                 break
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 756d8ffa32..814a7bf71b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.api.errors import SynapseError
 from synapse.util.logcontext import PreserveLoggingContext
 
 from twisted.internet import defer, reactor, task
@@ -24,11 +23,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class DeferredTimedOutError(SynapseError):
-    def __init__(self):
-        super(DeferredTimedOutError, self).__init__(504, "Timed out")
-
-
 def unwrapFirstError(failure):
     # defer.gatherResults and DeferredLists wrap failures.
     failure.trap(defer.FirstError)
@@ -85,53 +79,3 @@ class Clock(object):
         except Exception:
             if not ignore_errs:
                 raise
-
-    def time_bound_deferred(self, given_deferred, time_out):
-        if given_deferred.called:
-            return given_deferred
-
-        ret_deferred = defer.Deferred()
-
-        def timed_out_fn():
-            e = DeferredTimedOutError()
-
-            try:
-                ret_deferred.errback(e)
-            except Exception:
-                pass
-
-            try:
-                given_deferred.cancel()
-            except Exception:
-                pass
-
-        timer = None
-
-        def cancel(res):
-            try:
-                self.cancel_call_later(timer)
-            except Exception:
-                pass
-            return res
-
-        ret_deferred.addBoth(cancel)
-
-        def success(res):
-            try:
-                ret_deferred.callback(res)
-            except Exception:
-                pass
-
-            return res
-
-        def err(res):
-            try:
-                ret_deferred.errback(res)
-            except Exception:
-                pass
-
-        given_deferred.addCallbacks(callback=success, errback=err)
-
-        timer = self.call_later(time_out, timed_out_fn)
-
-        return ret_deferred