diff --git a/changelog.d/4075.misc b/changelog.d/4075.misc
new file mode 100644
index 0000000000..d08b8cc271
--- /dev/null
+++ b/changelog.d/4075.misc
@@ -0,0 +1 @@
+Clean up threading and logcontexts in pushers
\ No newline at end of file
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 0f9f8e19f6..83b0863f00 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -161,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
- self.pusher_pool.on_new_receipts(
+ yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
@@ -183,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
def start_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
- return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+ return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
def start(config_options):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cab57a8849..63e495e3f8 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
+ yield self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
- self.pusher_pool.on_new_notifications(
+ return self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4954b23a0d..6c4fcfb10a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -779,7 +779,7 @@ class EventCreationHandler(object):
event, context=context
)
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a6f3181f09..4c2690ba26 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
- self.hs.get_pusherpool().on_new_receipts(
+ yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d746371420..f369124258 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,8 +18,7 @@ import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
@@ -71,18 +70,11 @@ class EmailPusher(object):
# See httppusher
self.max_stream_ordering = None
- self.processing = False
+ self._is_processing = False
- @defer.inlineCallbacks
def on_started(self):
if self.mailer is not None:
- try:
- self.throttle_params = yield self.store.get_throttle_params_by_room(
- self.pusher_id
- )
- yield self._process()
- except Exception:
- logger.exception("Error starting email pusher")
+ self._start_processing()
def on_stop(self):
if self.timed_call:
@@ -92,43 +84,52 @@ class EmailPusher(object):
pass
self.timed_call = None
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
- yield self._process()
+ self._start_processing()
def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a
# lot of read receipts so it's probably less work to just let the
# timer fire
- return defer.succeed(None)
+ pass
- @defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
- yield self._process()
+ self._start_processing()
+
+ def _start_processing(self):
+ if self._is_processing:
+ return
+
+ run_as_background_process("emailpush.process", self._process)
@defer.inlineCallbacks
def _process(self):
- if self.processing:
- return
+ # we should never get here if we are already processing
+ assert not self._is_processing
+
+ try:
+ self._is_processing = True
+
+ if self.throttle_params is None:
+ # this is our first loop: load up the throttle params
+ self.throttle_params = yield self.store.get_throttle_params_by_room(
+ self.pusher_id
+ )
- with LoggingContext("emailpush._process"):
- with Measure(self.clock, "emailpush._process"):
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 48abd5e4d6..6bd703632d 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,9 +22,8 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
@@ -61,7 +60,7 @@ class HttpPusher(object):
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusherdict['failing_since']
self.timed_call = None
- self.processing = False
+ self._is_processing = False
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
@@ -92,34 +91,27 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
- @defer.inlineCallbacks
def on_started(self):
- try:
- yield self._process()
- except Exception:
- logger.exception("Error starting http pusher")
+ self._start_processing()
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
- yield self._process()
+ self._start_processing()
- @defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id):
# Note that the min here shouldn't be relied upon to be accurate.
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
- with LoggingContext("push.on_new_receipts"):
- with Measure(self.clock, "push.on_new_receipts"):
- badge = yield push_tools.get_badge_count(
- self.hs.get_datastore(), self.user_id
- )
- yield self._send_badge(badge)
+ run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
@defer.inlineCallbacks
+ def _update_badge(self):
+ badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+ yield self._send_badge(badge)
+
def on_timer(self):
- yield self._process()
+ self._start_processing()
def on_stop(self):
if self.timed_call:
@@ -129,27 +121,31 @@ class HttpPusher(object):
pass
self.timed_call = None
+ def _start_processing(self):
+ if self._is_processing:
+ return
+
+ run_as_background_process("httppush.process", self._process)
+
@defer.inlineCallbacks
def _process(self):
- if self.processing:
- return
+ # we should never get here if we are already processing
+ assert not self._is_processing
- with LoggingContext("push._process"):
- with Measure(self.clock, "push._process"):
+ try:
+ self._is_processing = True
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 9f7d5ef217..5a4e73ccd6 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -20,24 +20,39 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class PusherPool:
+ """
+ The pusher pool. This is responsible for dispatching notifications of new events to
+ the http and email pushers.
+
+ It provides three methods which are designed to be called by the rest of the
+ application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
+ delegates to each of the relevant pushers.
+
+ Note that it is expected that each pusher will have its own 'processing' loop which
+ will send out the notifications in the background, rather than blocking until the
+ notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
+ Pusher.on_new_receipts are not expected to return deferreds.
+ """
def __init__(self, _hs):
self.hs = _hs
self.pusher_factory = PusherFactory(_hs)
- self.start_pushers = _hs.config.start_pushers
+ self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.pushers = {}
- @defer.inlineCallbacks
def start(self):
- pushers = yield self.store.get_all_pushers()
- self._start_pushers(pushers)
+ """Starts the pushers off in a background process.
+ """
+ if not self._should_start_pushers:
+ logger.info("Not starting pushers because they are disabled in the config")
+ return
+ run_as_background_process("start_pushers", self._start_pushers)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@@ -86,7 +101,7 @@ class PusherPool:
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
)
- yield self._refresh_pusher(app_id, pushkey, user_id)
+ yield self.start_pusher_by_id(app_id, pushkey, user_id)
@defer.inlineCallbacks
def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
@@ -123,45 +138,23 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'],
)
- def on_new_notifications(self, min_stream_id, max_stream_id):
- run_as_background_process(
- "on_new_notifications",
- self._on_new_notifications, min_stream_id, max_stream_id,
- )
-
@defer.inlineCallbacks
- def _on_new_notifications(self, min_stream_id, max_stream_id):
+ def on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
)
- deferreds = []
-
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- deferreds.append(
- run_in_background(
- p.on_new_notifications,
- min_stream_id, max_stream_id,
- )
- )
-
- yield make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True),
- )
+ p.on_new_notifications(min_stream_id, max_stream_id)
+
except Exception:
logger.exception("Exception in pusher on_new_notifications")
- def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
- run_as_background_process(
- "on_new_receipts",
- self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
- )
-
@defer.inlineCallbacks
- def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@@ -171,26 +164,20 @@ class PusherPool:
# This returns a tuple, user_id is at index 3
users_affected = set([r[3] for r in updated_receipts])
- deferreds = []
-
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- deferreds.append(
- run_in_background(
- p.on_new_receipts,
- min_stream_id, max_stream_id,
- )
- )
-
- yield make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True),
- )
+ p.on_new_receipts(min_stream_id, max_stream_id)
+
except Exception:
logger.exception("Exception in pusher on_new_receipts")
@defer.inlineCallbacks
- def _refresh_pusher(self, app_id, pushkey, user_id):
+ def start_pusher_by_id(self, app_id, pushkey, user_id):
+ """Look up the details for the given pusher, and start it"""
+ if not self._should_start_pushers:
+ return
+
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id, pushkey
)
@@ -201,33 +188,49 @@ class PusherPool:
p = r
if p:
+ self._start_pusher(p)
- self._start_pushers([p])
+ @defer.inlineCallbacks
+ def _start_pushers(self):
+ """Start all the pushers
- def _start_pushers(self, pushers):
- if not self.start_pushers:
- logger.info("Not starting pushers because they are disabled in the config")
- return
+ Returns:
+ Deferred
+ """
+ pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
- try:
- p = self.pusher_factory.create_pusher(pusherdict)
- except Exception:
- logger.exception("Couldn't start a pusher: caught Exception")
- continue
- if p:
- appid_pushkey = "%s:%s" % (
- pusherdict['app_id'],
- pusherdict['pushkey'],
- )
- byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+ self._start_pusher(pusherdict)
+ logger.info("Started pushers")
- if appid_pushkey in byuser:
- byuser[appid_pushkey].on_stop()
- byuser[appid_pushkey] = p
- run_in_background(p.on_started)
+ def _start_pusher(self, pusherdict):
+ """Start the given pusher
- logger.info("Started pushers")
+ Args:
+ pusherdict (dict):
+
+ Returns:
+ None
+ """
+ try:
+ p = self.pusher_factory.create_pusher(pusherdict)
+ except Exception:
+ logger.exception("Couldn't start a pusher: caught Exception")
+ return
+
+ if not p:
+ return
+
+ appid_pushkey = "%s:%s" % (
+ pusherdict['app_id'],
+ pusherdict['pushkey'],
+ )
+ byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+
+ if appid_pushkey in byuser:
+ byuser[appid_pushkey].on_stop()
+ byuser[appid_pushkey] = p
+ p.on_started()
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
|