diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 93dcd40324..4993c92b74 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -15,6 +15,7 @@
from twisted.internet import defer
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from ._base import BaseHandler
@@ -66,9 +67,10 @@ class EventStreamHandler(BaseHandler):
rm_handler = self.hs.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
- events, tokens = yield self.notifier.get_events_for(
- auth_user, room_ids, pagin_config, timeout
- )
+ with PreserveLoggingContext():
+ events, tokens = yield self.notifier.get_events_for(
+ auth_user, room_ids, pagin_config, timeout
+ )
chunks = [self.hs.serialize_event(e) for e in events]
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f38c410e33..022d60a3b5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -16,6 +16,7 @@
from twisted.internet import defer, reactor
from synapse.util.logutils import log_function
+from synapse.util.logcontext import PreserveLoggingContext
import logging
@@ -79,6 +80,8 @@ class Notifier(object):
self.event_sources = hs.get_event_sources()
+ self.clock = hs.get_clock()
+
hs.get_distributor().observe(
"user_joined_room", self._user_joined_room
)
@@ -127,9 +130,10 @@ class Notifier(object):
def eb(failure):
logger.exception("Failed to notify listener", failure)
- yield defer.DeferredList(
- [notify(l).addErrback(eb) for l in listeners]
- )
+ with PreserveLoggingContext():
+ yield defer.DeferredList(
+ [notify(l).addErrback(eb) for l in listeners]
+ )
@defer.inlineCallbacks
@log_function
@@ -175,9 +179,10 @@ class Notifier(object):
failure.getTracebackObject())
)
- yield defer.DeferredList(
- [notify(l).addErrback(eb) for l in listeners]
- )
+ with PreserveLoggingContext():
+ yield defer.DeferredList(
+ [notify(l).addErrback(eb) for l in listeners]
+ )
def get_events_for(self, user, rooms, pagination_config, timeout):
""" For the given user and rooms, return any new events for them. If
@@ -206,29 +211,28 @@ class Notifier(object):
timeout,
deferred,
)
+ def _timeout_listener():
+ # TODO (erikj): We should probably set to_token to the current
+ # max rather than reusing from_token.
+ listener.notify(
+ self,
+ [],
+ listener.from_token,
+ listener.from_token,
+ )
if timeout:
- reactor.callLater(timeout/1000.0, self._timeout_listener, listener)
+ self.clock.call_later(timeout/1000.0, _timeout_listener)
self._register_with_keys(listener)
yield self._check_for_updates(listener)
if not timeout:
- self._timeout_listener(listener)
+ _timeout_listener()
return
- def _timeout_listener(self, listener):
- # TODO (erikj): We should probably set to_token to the current max
- # rather than reusing from_token.
- listener.notify(
- self,
- [],
- listener.from_token,
- listener.from_token,
- )
-
@log_function
def _register_with_keys(self, listener):
for room in listener.rooms:
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index c9a73b0413..9ad613b8f1 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.util.logcontext import LoggingContext
from twisted.internet import reactor
@@ -35,7 +36,11 @@ class Clock(object):
return self.time() * 1000
def call_later(self, delay, callback):
- return reactor.callLater(delay, callback)
+ current_context = LoggingContext.current_context()
+ def wrapped_callback():
+ current_context.thread_local.current_context = current_context
+ callback()
+ return reactor.callLater(delay, wrapped_callback)
def cancel_call_later(self, timer):
timer.cancel()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 13176b05ce..2f430a0f19 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -18,6 +18,9 @@ class LoggingContext(object):
__slots__ = []
+ def __str__(self):
+ return "sentinel"
+
def copy_to(self, record):
pass
@@ -102,6 +105,7 @@ class PreserveLoggingContext(object):
def __enter__(self):
"""Captures the current logging context"""
self.current_context = LoggingContext.current_context()
+ LoggingContext.thread_local.current_context = LoggingContext.sentinel
def __exit__(self, type, value, traceback):
"""Restores the current logging context"""
|