diff --git a/changelog.d/3591.misc b/changelog.d/3591.misc
new file mode 100644
index 0000000000..f0137766a0
--- /dev/null
+++ b/changelog.d/3591.misc
@@ -0,0 +1 @@
+Fix some random logcontext leaks.
\ No newline at end of file
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ec9fe01a5a..ee41aed69e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
yield self._check_user_exists(event.state_key)
if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
+ def start_scheduler():
+ return self.scheduler.start().addErrback(log_failure)
+ run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
# Fork off pushes to these services
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fb11716eb8..40e7580a61 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
try:
if event.membership == Membership.JOIN:
room_end_token = now_token.room_key
- deferred_room_state = self.state_handler.get_current_state(
- event.room_id
+ deferred_room_state = run_in_background(
+ self.state_handler.get_current_state,
+ event.room_id,
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
- deferred_room_state = self.store.get_state_for_events(
- [event.event_id], None
+ deferred_room_state = run_in_background(
+ self.store.get_state_for_events,
+ [event.event_id], None,
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
receipts = []
defer.returnValue(receipts)
- presence, receipts, (messages, token) = yield defer.gatherResults(
- [
- run_in_background(get_presence),
- run_in_background(get_receipts),
- run_in_background(
- self.store.get_recent_events_for_room,
- room_id,
- limit=limit,
- end_token=now_token.room_key,
- )
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ presence, receipts, (messages, token) = yield make_deferred_yieldable(
+ defer.gatherResults(
+ [
+ run_in_background(get_presence),
+ run_in_background(get_receipts),
+ run_in_background(
+ self.store.get_recent_events_for_room,
+ room_id,
+ limit=limit,
+ end_token=now_token.room_key,
+ )
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError),
+ )
messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 392935cacf..4d0706f23d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -147,7 +147,8 @@ class _EventPeristenceQueue(object):
# callbacks on the deferred.
try:
ret = yield per_item_callback(item)
- item.deferred.callback(ret)
+ with PreserveLoggingContext():
+ item.deferred.callback(ret)
except Exception:
item.deferred.errback()
finally:
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cc273a57b2..8443bd4c1b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
)
if newly_inserted:
- self.runInteraction(
+ yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,)
|