summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-05-08 19:53:34 +0100
committerErik Johnston <erik@matrix.org>2015-05-08 19:53:34 +0100
commit2236ef6c92b7964665f5c43b941754d70aa506d8 (patch)
tree5befd0effe712d4c4af6264346769f57c9486060
parentAdd some docs (diff)
downloadsynapse-2236ef6c92b7964665f5c43b941754d70aa506d8.tar.xz
Fix up leak. Add warnings.
-rw-r--r--synapse/handlers/_base.py11
-rw-r--r--synapse/handlers/federation.py29
-rw-r--r--synapse/handlers/presence.py10
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/http/server.py6
-rw-r--r--synapse/util/__init__.py3
-rw-r--r--synapse/util/distributor.py45
-rw-r--r--synapse/util/logcontext.py11
8 files changed, 70 insertions, 49 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 4b3f4eadab..ddc5c21e7d 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
 from synapse.types import UserID
 
+from synapse.util.logcontext import PreserveLoggingContext
+
 import logging
 
 
@@ -137,10 +139,11 @@ class BaseHandler(object):
                     "Failed to get destination from event %s", s.event_id
                 )
 
-        # Don't block waiting on waking up all the listeners.
-        notify_d = self.notifier.on_new_room_event(
-            event, extra_users=extra_users
-        )
+        with PreserveLoggingContext():
+            # Don't block waiting on waking up all the listeners.
+            notify_d = self.notifier.on_new_room_event(
+                event, extra_users=extra_users
+            )
 
         def log_failure(f):
             logger.warn(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 85e2757227..77c315c47c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,6 +21,7 @@ from synapse.api.errors import (
     AuthError, FederationError, StoreError,
 )
 from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.util.frozenutils import unfreeze
@@ -197,9 +198,10 @@ class FederationHandler(BaseHandler):
                 target_user = UserID.from_string(target_user_id)
                 extra_users.append(target_user)
 
-            d = self.notifier.on_new_room_event(
-                event, extra_users=extra_users
-            )
+            with PreserveLoggingContext():
+                d = self.notifier.on_new_room_event(
+                    event, extra_users=extra_users
+                )
 
             def log_failure(f):
                 logger.warn(
@@ -431,9 +433,10 @@ class FederationHandler(BaseHandler):
                 auth_events=auth_events,
             )
 
-            d = self.notifier.on_new_room_event(
-                new_event, extra_users=[joinee]
-            )
+            with PreserveLoggingContext():
+                d = self.notifier.on_new_room_event(
+                    new_event, extra_users=[joinee]
+                )
 
             def log_failure(f):
                 logger.warn(
@@ -512,9 +515,10 @@ class FederationHandler(BaseHandler):
             target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
-        d = self.notifier.on_new_room_event(
-            event, extra_users=extra_users
-        )
+        with PreserveLoggingContext():
+            d = self.notifier.on_new_room_event(
+                event, extra_users=extra_users
+            )
 
         def log_failure(f):
             logger.warn(
@@ -594,9 +598,10 @@ class FederationHandler(BaseHandler):
         )
 
         target_user = UserID.from_string(event.state_key)
-        d = self.notifier.on_new_room_event(
-            event, extra_users=[target_user],
-        )
+        with PreserveLoggingContext():
+            d = self.notifier.on_new_room_event(
+                event, extra_users=[target_user],
+            )
 
         def log_failure(f):
             logger.warn(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 6ae39a1d37..1edab05492 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, AuthError
 from synapse.api.constants import PresenceState
 
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.types import UserID
 import synapse.metrics
@@ -808,10 +809,11 @@ class PresenceHandler(BaseHandler):
 
     def push_update_to_clients(self, observed_user, users_to_push=[],
                                room_ids=[], statuscache=None):
-        self.notifier.on_new_user_event(
-            users_to_push,
-            room_ids,
-        )
+        with PreserveLoggingContext():
+            self.notifier.on_new_user_event(
+                users_to_push,
+                room_ids,
+            )
 
 
 class PresenceEventSource(object):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c0b2bd7db0..64fe51aa3e 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError, AuthError
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import UserID
 
 import logging
@@ -216,7 +217,8 @@ class TypingNotificationHandler(BaseHandler):
         self._latest_room_serial += 1
         self._room_serials[room_id] = self._latest_room_serial
 
-        self.notifier.on_new_user_event(rooms=[room_id])
+        with PreserveLoggingContext():
+            self.notifier.on_new_user_event(rooms=[room_id])
 
 
 class TypingNotificationEventSource(object):
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 93ecbd7589..73efbff4f2 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -17,7 +17,7 @@
 from synapse.api.errors import (
     cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
 )
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 import synapse.metrics
 
 from syutil.jsonutil import (
@@ -85,7 +85,9 @@ def request_handler(request_handler):
                     "Received request: %s %s",
                     request.method, request.path
                 )
-                yield request_handler(self, request)
+                d = request_handler(self, request)
+                with PreserveLoggingContext():
+                    yield d
                 code = request.code
             except CodeMessageException as e:
                 code = e.code
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 364b927851..fd3eb1f574 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -54,7 +54,8 @@ class Clock(object):
                 LoggingContext.thread_local.current_context = current_context
                 callback()
 
-        return reactor.callLater(delay, wrapped_callback)
+        with PreserveLoggingContext():
+            return reactor.callLater(delay, wrapped_callback)
 
     def cancel_call_later(self, timer):
         timer.cancel()
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 9d9c350397..5b150cb0e5 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.logcontext import PreserveLoggingContext
-
 from twisted.internet import defer
 
 import logging
@@ -93,7 +91,6 @@ class Signal(object):
         Each observer callable may return a Deferred."""
         self.observers.append(observer)
 
-    @defer.inlineCallbacks
     def fire(self, *args, **kwargs):
         """Invokes every callable in the observer list, passing in the args and
         kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -101,24 +98,24 @@ class Signal(object):
 
         Returns a Deferred that will complete when all the observers have
         completed."""
-        with PreserveLoggingContext():
-            deferreds = []
-            for observer in self.observers:
-                d = defer.maybeDeferred(observer, *args, **kwargs)
-
-                def eb(failure):
-                    logger.warning(
-                        "%s signal observer %s failed: %r",
-                        self.name, observer, failure,
-                        exc_info=(
-                            failure.type,
-                            failure.value,
-                            failure.getTracebackObject()))
-                    if not self.suppress_failures:
-                        failure.raiseException()
-                deferreds.append(d.addErrback(eb))
-            results = []
-            for deferred in deferreds:
-                result = yield deferred
-                results.append(result)
-        defer.returnValue(results)
+
+        def eb(failure):
+            logger.warning(
+                "%s signal observer %s failed: %r",
+                self.name, observer, failure,
+                exc_info=(
+                    failure.type,
+                    failure.value,
+                    failure.getTracebackObject()))
+            if not self.suppress_failures:
+                failure.raiseException()
+
+        deferreds = [
+            defer.maybeDeferred(observer, *args, **kwargs)
+            for observer in self.observers
+        ]
+
+        d = defer.gatherResults(deferreds, consumeErrors=True)
+        d.addErrback(eb)
+
+        return d
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 3dce8d2bf3..a92d518b43 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -132,6 +132,13 @@ class PreserveLoggingContext(object):
         """Restores the current logging context"""
         LoggingContext.thread_local.current_context = self.current_context
 
+        if self.current_context is not LoggingContext.sentinel:
+            if self.current_context.parent_context is None:
+                logger.warn(
+                    "Restoring dead context: %s",
+                    self.current_context,
+                )
+
 
 def preserve_context_over_fn(fn, *args, **kwargs):
     """Takes a function and invokes it with the given arguments, but removes
@@ -169,6 +176,8 @@ def preserve_context_over_deferred(deferred):
             res = d.errback(failure)
         return res
 
-    deferred.addCallbacks(cb, eb)
+    if deferred.called:
+        return deferred
 
+    deferred.addCallbacks(cb, eb)
     return d