diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 9295a51d5b..0089a7c64f 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -162,11 +162,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
- yield self.pusher_pool.on_new_notifications(
+ self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
- yield self.pusher_pool.on_new_receipts(
+ self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f38b393e4a..3dd107a285 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2386,8 +2386,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
- logcontext.run_in_background(
- self.pusher_pool.on_new_notifications,
+ self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 893c9bcdc4..f21d740968 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -774,11 +774,8 @@ class EventCreationHandler(object):
event, context=context
)
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id
+ self.pusher_pool.on_new_notifications(
+ event_stream_id, max_stream_id,
)
def _notify():
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index cb905a3903..a6f3181f09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
from synapse.types import get_domain_from_id
from synapse.util import logcontext
-from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@@ -116,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
affected_room_ids = list(set([r["room_id"] for r in receipts]))
- with PreserveLoggingContext():
- self.notifier.on_new_event(
- "receipt_key", max_batch_id, rooms=affected_room_ids
- )
- # Note that the min here shouldn't be relied upon to be accurate.
- self.hs.get_pusherpool().on_new_receipts(
- min_batch_id, max_batch_id, affected_room_ids
- )
+ self.notifier.on_new_event(
+ "receipt_key", max_batch_id, rooms=affected_room_ids
+ )
+ # Note that the min here shouldn't be relied upon to be accurate.
+ self.hs.get_pusherpool().on_new_receipts(
+ min_batch_id, max_batch_id, affected_room_ids,
+ )
- defer.returnValue(True)
+ defer.returnValue(True)
@logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 36bb5bbc65..9f7d5ef217 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -18,6 +18,7 @@ import logging
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@@ -122,8 +123,14 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'],
)
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
+ run_as_background_process(
+ "on_new_notifications",
+ self._on_new_notifications, min_stream_id, max_stream_id,
+ )
+
+ @defer.inlineCallbacks
+ def _on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@@ -147,8 +154,14 @@ class PusherPool:
except Exception:
logger.exception("Exception in pusher on_new_notifications")
- @defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ run_as_background_process(
+ "on_new_receipts",
+ self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
+ )
+
+ @defer.inlineCallbacks
+ def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
|