diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 07ff25cef3..d69c7cb991 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -29,34 +29,6 @@ def unwrapFirstError(failure):
return failure.value.subFailure
-def unwrap_deferred(d):
- """Given a deferred that we know has completed, return its value or raise
- the failure as an exception
- """
- if not d.called:
- raise RuntimeError("deferred has not finished")
-
- res = []
-
- def f(r):
- res.append(r)
- return r
- d.addCallback(f)
-
- if res:
- return res[0]
-
- def f(r):
- res.append(r)
- return r
- d.addErrback(f)
-
- if res:
- res[0].raiseException()
- else:
- raise RuntimeError("deferred did not call callbacks")
-
-
class Clock(object):
"""A small utility that obtains current time-of-day so that time may be
mocked during unit-tests.
@@ -81,6 +53,14 @@ class Clock(object):
loop.stop()
def call_later(self, delay, callback, *args, **kwargs):
+ """Call something later
+
+ Args:
+ delay(float): How long to wait in seconds.
+ callback(function): Function to call
+ *args: Postional arguments to pass to function.
+ **kwargs: Key arguments to pass to function.
+ """
current_context = LoggingContext.current_context()
def wrapped_callback(*args, **kwargs):
diff --git a/synapse/util/debug.py b/synapse/util/debug.py
new file mode 100644
index 0000000000..f6a5a841a4
--- /dev/null
+++ b/synapse/util/debug.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, reactor
+from functools import wraps
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+
+
+def debug_deferreds():
+ """Cause all deferreds to wait for a reactor tick before running their
+ callbacks. This increases the chance of getting a stack trace out of
+ a defer.inlineCallback since the code waiting on the deferred will get
+ a chance to add an errback before the deferred runs."""
+
+ # Helper method for retrieving and restoring the current logging context
+ # around a callback.
+ def with_logging_context(fn):
+ context = LoggingContext.current_context()
+
+ def restore_context_callback(x):
+ with PreserveLoggingContext():
+ LoggingContext.thread_local.current_context = context
+ return fn(x)
+
+ return restore_context_callback
+
+ # We are going to modify the __init__ method of defer.Deferred so we
+ # need to get a copy of the old method so we can still call it.
+ old__init__ = defer.Deferred.__init__
+
+ # We need to create a deferred to bounce the callbacks through the reactor
+ # but we don't want to add a callback when we create that deferred so we
+ # we create a new type of deferred that uses the old __init__ method.
+ # This is safe as long as the old __init__ method doesn't invoke an
+ # __init__ using super.
+ class Bouncer(defer.Deferred):
+ __init__ = old__init__
+
+ # We'll add this as a callback to all Deferreds. Twisted will wait until
+ # the bouncer deferred resolves before calling the callbacks of the
+ # original deferred.
+ def bounce_callback(x):
+ bouncer = Bouncer()
+ reactor.callLater(0, with_logging_context(bouncer.callback), x)
+ return bouncer
+
+ # We'll add this as an errback to all Deferreds. Twisted will wait until
+ # the bouncer deferred resolves before calling the errbacks of the
+ # original deferred.
+ def bounce_errback(x):
+ bouncer = Bouncer()
+ reactor.callLater(0, with_logging_context(bouncer.errback), x)
+ return bouncer
+
+ @wraps(old__init__)
+ def new__init__(self, *args, **kargs):
+ old__init__(self, *args, **kargs)
+ self.addCallbacks(bounce_callback, bounce_errback)
+
+ defer.Deferred.__init__ = new__init__
diff --git a/synapse/util/emailutils.py b/synapse/util/emailutils.py
deleted file mode 100644
index 7f9a77bf44..0000000000
--- a/synapse/util/emailutils.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-""" This module allows you to send out emails.
-"""
-import email.utils
-import smtplib
-import twisted.python.log
-from email.mime.text import MIMEText
-from email.mime.multipart import MIMEMultipart
-
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class EmailException(Exception):
- pass
-
-
-def send_email(smtp_server, from_addr, to_addr, subject, body):
- """Sends an email.
-
- Args:
- smtp_server(str): The SMTP server to use.
- from_addr(str): The address to send from.
- to_addr(str): The address to send to.
- subject(str): The subject of the email.
- body(str): The plain text body of the email.
- Raises:
- EmailException if there was a problem sending the mail.
- """
- if not smtp_server or not from_addr or not to_addr:
- raise EmailException("Need SMTP server, from and to addresses. Check"
- " the config to set these.")
-
- msg = MIMEMultipart('alternative')
- msg['Subject'] = subject
- msg['From'] = from_addr
- msg['To'] = to_addr
- plain_part = MIMEText(body)
- msg.attach(plain_part)
-
- raw_from = email.utils.parseaddr(from_addr)[1]
- raw_to = email.utils.parseaddr(to_addr)[1]
- if not raw_from or not raw_to:
- raise EmailException("Couldn't parse from/to address.")
-
- logger.info("Sending email to %s on server %s with subject %s",
- to_addr, smtp_server, subject)
-
- try:
- smtp = smtplib.SMTP(smtp_server)
- smtp.sendmail(raw_from, raw_to, msg.as_string())
- smtp.quit()
- except Exception as origException:
- twisted.python.log.err()
- ese = EmailException()
- ese.cause = origException
- raise ese
diff --git a/synapse/util/lockutils.py b/synapse/util/lockutils.py
deleted file mode 100644
index 33edc5c20e..0000000000
--- a/synapse/util/lockutils.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from twisted.internet import defer
-
-import logging
-
-
-logger = logging.getLogger(__name__)
-
-
-class Lock(object):
-
- def __init__(self, deferred, key):
- self._deferred = deferred
- self.released = False
- self.key = key
-
- def release(self):
- self.released = True
- self._deferred.callback(None)
-
- def __del__(self):
- if not self.released:
- logger.critical("Lock was destructed but never released!")
- self.release()
-
- def __enter__(self):
- return self
-
- def __exit__(self, type, value, traceback):
- logger.debug("Releasing lock for key=%r", self.key)
- self.release()
-
-
-class LockManager(object):
- """ Utility class that allows us to lock based on a `key` """
-
- def __init__(self):
- self._lock_deferreds = {}
-
- @defer.inlineCallbacks
- def lock(self, key):
- """ Allows us to block until it is our turn.
- Args:
- key (str)
- Returns:
- Lock
- """
- new_deferred = defer.Deferred()
- old_deferred = self._lock_deferreds.get(key)
- self._lock_deferreds[key] = new_deferred
-
- if old_deferred:
- logger.debug("Queueing on lock for key=%r", key)
- yield old_deferred
- logger.debug("Obtained lock for key=%r", key)
- else:
- logger.debug("Entering uncontended lock for key=%r", key)
-
- defer.returnValue(Lock(new_deferred, key))
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index a42138f556..2fe6814807 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException
import logging
+import random
logger = logging.getLogger(__name__)
@@ -85,8 +86,9 @@ def get_retry_limiter(destination, clock, store, **kwargs):
class RetryDestinationLimiter(object):
def __init__(self, destination, clock, store, retry_interval,
- min_retry_interval=5000, max_retry_interval=60 * 60 * 1000,
- multiplier_retry_interval=2,):
+ min_retry_interval=10 * 60 * 1000,
+ max_retry_interval=24 * 60 * 60 * 1000,
+ multiplier_retry_interval=5,):
"""Marks the destination as "down" if an exception is thrown in the
context, except for CodeMessageException with code < 500.
@@ -140,6 +142,7 @@ class RetryDestinationLimiter(object):
# We couldn't connect.
if self.retry_interval:
self.retry_interval *= self.multiplier_retry_interval
+ self.retry_interval *= int(random.uniform(0.8, 1.4))
if self.retry_interval >= self.max_retry_interval:
self.retry_interval = self.max_retry_interval
|