summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/events.py8
-rw-r--r--synapse/notifier.py40
-rw-r--r--synapse/util/__init__.py7
-rw-r--r--synapse/util/logcontext.py4
4 files changed, 37 insertions, 22 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 93dcd40324..4993c92b74 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -15,6 +15,7 @@
 
 from twisted.internet import defer
 
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 
 from ._base import BaseHandler
@@ -66,9 +67,10 @@ class EventStreamHandler(BaseHandler):
             rm_handler = self.hs.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(auth_user)
 
-            events, tokens = yield self.notifier.get_events_for(
-                auth_user, room_ids, pagin_config, timeout
-            )
+            with PreserveLoggingContext():
+                events, tokens = yield self.notifier.get_events_for(
+                    auth_user, room_ids, pagin_config, timeout
+                )
 
             chunks = [self.hs.serialize_event(e) for e in events]
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f38c410e33..022d60a3b5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer, reactor
 
 from synapse.util.logutils import log_function
+from synapse.util.logcontext import PreserveLoggingContext
 
 import logging
 
@@ -79,6 +80,8 @@ class Notifier(object):
 
         self.event_sources = hs.get_event_sources()
 
+        self.clock = hs.get_clock()
+
         hs.get_distributor().observe(
             "user_joined_room", self._user_joined_room
         )
@@ -127,9 +130,10 @@ class Notifier(object):
         def eb(failure):
             logger.exception("Failed to notify listener", failure)
 
-        yield defer.DeferredList(
-            [notify(l).addErrback(eb) for l in listeners]
-        )
+        with PreserveLoggingContext():
+            yield defer.DeferredList(
+                [notify(l).addErrback(eb) for l in listeners]
+            )
 
     @defer.inlineCallbacks
     @log_function
@@ -175,9 +179,10 @@ class Notifier(object):
                     failure.getTracebackObject())
             )
 
-        yield defer.DeferredList(
-            [notify(l).addErrback(eb) for l in listeners]
-        )
+        with PreserveLoggingContext():
+            yield defer.DeferredList(
+                [notify(l).addErrback(eb) for l in listeners]
+            )
 
     def get_events_for(self, user, rooms, pagination_config, timeout):
         """ For the given user and rooms, return any new events for them. If
@@ -206,29 +211,28 @@ class Notifier(object):
             timeout,
             deferred,
         )
+        def _timeout_listener():
+            # TODO (erikj): We should probably set to_token to the current
+            # max rather than reusing from_token.
+            listener.notify(
+                self,
+                [],
+                listener.from_token,
+                listener.from_token,
+            )
 
         if timeout:
-            reactor.callLater(timeout/1000.0, self._timeout_listener, listener)
+            self.clock.call_later(timeout/1000.0, _timeout_listener)
 
             self._register_with_keys(listener)
 
         yield self._check_for_updates(listener)
 
         if not timeout:
-            self._timeout_listener(listener)
+            _timeout_listener()
 
         return
 
-    def _timeout_listener(self, listener):
-        # TODO (erikj): We should probably set to_token to the current max
-        # rather than reusing from_token.
-        listener.notify(
-            self,
-            [],
-            listener.from_token,
-            listener.from_token,
-        )
-
     @log_function
     def _register_with_keys(self, listener):
         for room in listener.rooms:
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index c9a73b0413..9ad613b8f1 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.util.logcontext import LoggingContext
 
 from twisted.internet import reactor
 
@@ -35,7 +36,11 @@ class Clock(object):
         return self.time() * 1000
 
     def call_later(self, delay, callback):
-        return reactor.callLater(delay, callback)
+        current_context = LoggingContext.current_context()
+        def wrapped_callback():
+            current_context.thread_local.current_context = current_context
+            callback()
+        return reactor.callLater(delay, wrapped_callback)
 
     def cancel_call_later(self, timer):
         timer.cancel()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 13176b05ce..2f430a0f19 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -18,6 +18,9 @@ class LoggingContext(object):
 
         __slots__ = []
 
+        def __str__(self):
+            return "sentinel"
+
         def copy_to(self, record):
             pass
 
@@ -102,6 +105,7 @@ class PreserveLoggingContext(object):
     def __enter__(self):
         """Captures the current logging context"""
         self.current_context = LoggingContext.current_context()
+        LoggingContext.thread_local.current_context = LoggingContext.sentinel
 
     def __exit__(self, type, value, traceback):
         """Restores the current logging context"""