summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/http/__init__.py2
-rw-r--r--synapse/http/client.py6
-rw-r--r--synapse/http/matrixfederationclient.py7
-rw-r--r--synapse/notifier.py22
-rw-r--r--synapse/util/async.py67
5 files changed, 90 insertions, 14 deletions
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 20e568bc43..0d47ccdb59 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -28,7 +28,7 @@ class RequestTimedOutError(SynapseError):
 def cancelled_to_request_timed_out_error(value):
     """Turns CancelledErrors into RequestTimedOutErrors.
 
-    For use with deferred.addTimeout()
+    For use with async.add_timeout_to_deferred
     """
     if isinstance(value, failure.Failure):
         value.trap(CancelledError)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 35c8d51e71..62309c3365 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -20,6 +20,7 @@ from synapse.api.errors import (
     CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
 )
 from synapse.http import cancelled_to_request_timed_out_error
+from synapse.util.async import add_timeout_to_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
 import synapse.metrics
@@ -102,8 +103,9 @@ class SimpleHttpClient(object):
             request_deferred = self.agent.request(
                 method, uri, *args, **kwargs
             )
-            request_deferred.addTimeout(
-                60, reactor, cancelled_to_request_timed_out_error,
+            add_timeout_to_deferred(
+                request_deferred,
+                60, cancelled_to_request_timed_out_error,
             )
             response = yield make_deferred_yieldable(request_deferred)
 
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index fe4b1636a6..30036fe81c 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone
 from synapse.http import cancelled_to_request_timed_out_error
 from synapse.http.endpoint import matrix_federation_endpoint
 import synapse.metrics
-from synapse.util.async import sleep
+from synapse.util.async import sleep, add_timeout_to_deferred
 from synapse.util import logcontext
 from synapse.util.logcontext import make_deferred_yieldable
 import synapse.util.retryutils
@@ -193,9 +193,10 @@ class MatrixFederationHttpClient(object):
                             Headers(headers_dict),
                             producer
                         )
-                        request_deferred.addTimeout(
+                        add_timeout_to_deferred(
+                            request_deferred,
                             timeout / 1000. if timeout else 60,
-                            reactor, cancelled_to_request_timed_out_error,
+                            cancelled_to_request_timed_out_error,
                         )
                         response = yield make_deferred_yieldable(
                             request_deferred,
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1e4f78b993..a1c06e8fca 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,15 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer, reactor
-from twisted.internet.defer import TimeoutError
+from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 
 from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
+from synapse.util.async import (
+    ObservableDeferred, add_timeout_to_deferred,
+    DeferredTimeoutError,
+)
 from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
 from synapse.util.metrics import Measure
 from synapse.types import StreamToken
@@ -332,8 +334,9 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
-                    listener.deferred.addTimeout(
-                        (end_time - now) / 1000., reactor,
+                    add_timeout_to_deferred(
+                        listener.deferred,
+                        (end_time - now) / 1000.,
                     )
                     with PreserveLoggingContext():
                         yield listener.deferred
@@ -347,7 +350,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except TimeoutError:
+                except DeferredTimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -552,11 +555,14 @@ class Notifier(object):
             if end_time <= now:
                 break
 
-            listener.deferred.addTimeout((end_time - now) / 1000., reactor)
+            add_timeout_to_deferred(
+                listener.deferred.addTimeout,
+                (end_time - now) / 1000.,
+            )
             try:
                 with PreserveLoggingContext():
                     yield listener.deferred
-            except TimeoutError:
+            except DeferredTimeoutError:
                 break
             except defer.CancelledError:
                 break
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 0729bb2863..1df5c5600c 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -15,6 +15,8 @@
 
 
 from twisted.internet import defer, reactor
+from twisted.internet.defer import CancelledError
+from twisted.python import failure
 
 from .logcontext import (
     PreserveLoggingContext, make_deferred_yieldable, preserve_fn
@@ -392,3 +394,68 @@ class ReadWriteLock(object):
                     self.key_to_current_writer.pop(key)
 
         defer.returnValue(_ctx_manager())
+
+
+class DeferredTimeoutError(Exception):
+    """
+    This error is raised by default when a L{Deferred} times out.
+    """
+
+
+def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+    """
+    Add a timeout to a deferred by scheduling it to be cancelled after
+    timeout seconds.
+
+    This is essentially a backport of deferred.addTimeout, which was introduced
+    in twisted 16.5.
+
+    If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
+    unless a cancelable function was passed to its initialization or unless
+    a different on_timeout_cancel callable is provided.
+
+    Args:
+        deferred (defer.Deferred): deferred to be timed out
+        timeout (Number): seconds to time out after
+
+        on_timeout_cancel (callable): A callable which is called immediately
+            after the deferred times out, and not if this deferred is
+            otherwise cancelled before the timeout.
+
+            It takes an arbitrary value, which is the value of the deferred at
+            that exact point in time (probably a CancelledError Failure), and
+            the timeout.
+
+            The default callable (if none is provided) will translate a
+            CancelledError Failure into a DeferredTimeoutError.
+    """
+    timed_out = [False]
+
+    def time_it_out():
+        timed_out[0] = True
+        deferred.cancel()
+
+    delayed_call = reactor.callLater(timeout, time_it_out)
+
+    def convert_cancelled(value):
+        if timed_out[0]:
+            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+            return to_call(value, timeout)
+        return value
+
+    deferred.addBoth(convert_cancelled)
+
+    def cancel_timeout(result):
+        # stop the pending call to cancel the deferred if it's been fired
+        if delayed_call.active():
+            delayed_call.cancel()
+        return result
+
+    deferred.addBoth(cancel_timeout)
+
+
+def _cancelled_to_timed_out_error(value, timeout):
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise DeferredTimeoutError(timeout, "Deferred")
+    return value