diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index b9b68ec829..a4d1ce3aad 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -18,9 +18,8 @@ import logging
from twisted.internet import defer
-from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.util.logcontext import run_in_background
logger = logging.getLogger(__name__)
@@ -122,45 +121,23 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'],
)
- def on_new_notifications(self, min_stream_id, max_stream_id):
- run_as_background_process(
- "on_new_notifications",
- self._on_new_notifications, min_stream_id, max_stream_id,
- )
-
@defer.inlineCallbacks
- def _on_new_notifications(self, min_stream_id, max_stream_id):
+ def on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
)
- deferreds = []
-
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- deferreds.append(
- run_in_background(
- p.on_new_notifications,
- min_stream_id, max_stream_id,
- )
- )
-
- yield make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True),
- )
+ p.on_new_notifications(min_stream_id, max_stream_id)
+
except Exception:
logger.exception("Exception in pusher on_new_notifications")
- def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
- run_as_background_process(
- "on_new_receipts",
- self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
- )
-
@defer.inlineCallbacks
- def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@@ -170,21 +147,11 @@ class PusherPool:
# This returns a tuple, user_id is at index 3
users_affected = set([r[3] for r in updated_receipts])
- deferreds = []
-
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- deferreds.append(
- run_in_background(
- p.on_new_receipts,
- min_stream_id, max_stream_id,
- )
- )
-
- yield make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True),
- )
+ p.on_new_receipts(min_stream_id, max_stream_id)
+
except Exception:
logger.exception("Exception in pusher on_new_receipts")
|