From 3e8b02c9393684f2c96e221ced804a0d20de2da0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Oct 2018 16:12:11 +0100 Subject: Rename _refresh_pusher This is public (or at least, called from outside the class), so ought to have a better name. --- synapse/app/pusher.py | 2 +- synapse/push/pusherpool.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 0f9f8e19f6..e06b70894e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -183,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def start_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) logger.info("Starting pusher %r / %r", user_id, key) - return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id) + return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) def start(config_options): diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 9f7d5ef217..bdfe27d8c1 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -86,7 +86,7 @@ class PusherPool: last_stream_ordering=last_stream_ordering, profile_tag=profile_tag, ) - yield self._refresh_pusher(app_id, pushkey, user_id) + yield self.start_pusher_by_id(app_id, pushkey, user_id) @defer.inlineCallbacks def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey, @@ -190,7 +190,8 @@ class PusherPool: logger.exception("Exception in pusher on_new_receipts") @defer.inlineCallbacks - def _refresh_pusher(self, app_id, pushkey, user_id): + def start_pusher_by_id(self, app_id, pushkey, user_id): + """Look up the details for the given pusher, and start it""" resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( app_id, pushkey ) -- cgit 1.4.1 From 04277d0ed8ba456aa6c2d5dca978013c295e7ace Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Oct 2018 16:12:11 +0100 Subject: Factor PusherPool._start_pusher out of _start_pushers ... and use it from start_pusher_by_id. This mostly simplifies start_pusher_by_id. --- synapse/push/pusherpool.py | 51 ++++++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 18 deletions(-) (limited to 'synapse') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index bdfe27d8c1..a5c133aa33 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -192,6 +192,9 @@ class PusherPool: @defer.inlineCallbacks def start_pusher_by_id(self, app_id, pushkey, user_id): """Look up the details for the given pusher, and start it""" + if not self._start_pushers: + return + resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( app_id, pushkey ) @@ -202,8 +205,7 @@ class PusherPool: p = r if p: - - self._start_pushers([p]) + self._start_pusher(p) def _start_pushers(self, pushers): if not self.start_pushers: @@ -211,24 +213,37 @@ class PusherPool: return logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: - try: - p = self.pusher_factory.create_pusher(pusherdict) - except Exception: - logger.exception("Couldn't start a pusher: caught Exception") - continue - if p: - appid_pushkey = "%s:%s" % ( - pusherdict['app_id'], - pusherdict['pushkey'], - ) - byuser = self.pushers.setdefault(pusherdict['user_name'], {}) + self._start_pusher(pusherdict) + logger.info("Started pushers") - if appid_pushkey in byuser: - byuser[appid_pushkey].on_stop() - byuser[appid_pushkey] = p - run_in_background(p.on_started) + def _start_pusher(self, pusherdict): + """Start the given pusher - logger.info("Started pushers") + Args: + pusherdict (dict): + + Returns: + None + """ + try: + p = self.pusher_factory.create_pusher(pusherdict) + except Exception: + logger.exception("Couldn't start a pusher: caught Exception") + return + + if not p: + return + + appid_pushkey = "%s:%s" % ( + pusherdict['app_id'], + pusherdict['pushkey'], + ) + byuser = self.pushers.setdefault(pusherdict['user_name'], {}) + + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p + run_in_background(p.on_started) @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): -- cgit 1.4.1 From 5110f4e4258564615edfca6112ab6a36eb551672 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Oct 2018 16:12:11 +0100 Subject: move get_all_pushers call down simplifies the interface to _start_pushers --- synapse/push/pusherpool.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index a5c133aa33..b9b68ec829 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -36,8 +36,7 @@ class PusherPool: @defer.inlineCallbacks def start(self): - pushers = yield self.store.get_all_pushers() - self._start_pushers(pushers) + yield self._start_pushers() @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, @@ -207,10 +206,17 @@ class PusherPool: if p: self._start_pusher(p) - def _start_pushers(self, pushers): + @defer.inlineCallbacks + def _start_pushers(self): + """Start all the pushers + + Returns: + Deferred + """ if not self.start_pushers: logger.info("Not starting pushers because they are disabled in the config") return + pushers = yield self.store.get_all_pushers() logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: self._start_pusher(pusherdict) -- cgit 1.4.1 From c7273c11bc73630e3f2ab1a82d20a40d8b17f9a3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Oct 2018 16:12:11 +0100 Subject: Give pushers their own background logcontext Each pusher has its own loop which runs for as long as it has work to do. This should run in its own background thread with its own logcontext, as other similar loops elsewhere in the system do - which means that CPU usage is consistently attributed to that loop, rather than to whatever request happened to start the loop. --- synapse/push/emailpusher.py | 48 +++++++++++++++++----------------- synapse/push/httppusher.py | 64 +++++++++++++++++++++------------------------ 2 files changed, 54 insertions(+), 58 deletions(-) (limited to 'synapse') diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index d746371420..0c9c0201e8 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,8 +18,7 @@ import logging from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled -from synapse.util.logcontext import LoggingContext -from synapse.util.metrics import Measure +from synapse.metrics.background_process_metrics import run_as_background_process logger = logging.getLogger(__name__) @@ -80,7 +79,7 @@ class EmailPusher(object): self.throttle_params = yield self.store.get_throttle_params_by_room( self.pusher_id ) - yield self._process() + self._start_processing() except Exception: logger.exception("Error starting email pusher") @@ -92,10 +91,10 @@ class EmailPusher(object): pass self.timed_call = None - @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) - yield self._process() + self._start_processing() + return defer.succeed(None) def on_new_receipts(self, min_stream_id, max_stream_id): # We could wake up and cancel the timer but there tend to be quite a @@ -103,32 +102,33 @@ class EmailPusher(object): # timer fire return defer.succeed(None) - @defer.inlineCallbacks def on_timer(self): self.timed_call = None - yield self._process() + self._start_processing() - @defer.inlineCallbacks - def _process(self): + def _start_processing(self): if self.processing: return - with LoggingContext("emailpush._process"): - with Measure(self.clock, "emailpush._process"): + run_as_background_process("emailpush.process", self._process) + + @defer.inlineCallbacks + def _process(self): + try: + self.processing = True + + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering try: - self.processing = True - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering - try: - yield self._unsafe_process() - except Exception: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self.processing = False + yield self._unsafe_process() + except Exception: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self.processing = False @defer.inlineCallbacks def _unsafe_process(self): diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 48abd5e4d6..5f6b21bc67 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -22,9 +22,8 @@ from prometheus_client import Counter from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException -from synapse.util.logcontext import LoggingContext -from synapse.util.metrics import Measure from . import push_rule_evaluator, push_tools @@ -92,34 +91,30 @@ class HttpPusher(object): self.data_minus_url.update(self.data) del self.data_minus_url['url'] - @defer.inlineCallbacks def on_started(self): - try: - yield self._process() - except Exception: - logger.exception("Error starting http pusher") + self._start_processing() + return defer.succeed(None) - @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) - yield self._process() + self._start_processing() + return defer.suceed(None) - @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id): # Note that the min here shouldn't be relied upon to be accurate. # We could check the receipts are actually m.read receipts here, # but currently that's the only type of receipt anyway... - with LoggingContext("push.on_new_receipts"): - with Measure(self.clock, "push.on_new_receipts"): - badge = yield push_tools.get_badge_count( - self.hs.get_datastore(), self.user_id - ) - yield self._send_badge(badge) + run_as_background_process("http_pusher.on_new_receipts", self._update_badge) + return defer.succeed(None) @defer.inlineCallbacks + def _update_badge(self): + badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id) + yield self._send_badge(badge) + def on_timer(self): - yield self._process() + self._start_processing() def on_stop(self): if self.timed_call: @@ -129,27 +124,28 @@ class HttpPusher(object): pass self.timed_call = None - @defer.inlineCallbacks - def _process(self): + def _start_processing(self): if self.processing: return - with LoggingContext("push._process"): - with Measure(self.clock, "push._process"): + run_as_background_process("httppush.process", self._process) + + @defer.inlineCallbacks + def _process(self): + try: + self.processing = True + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering try: - self.processing = True - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering - try: - yield self._unsafe_process() - except Exception: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self.processing = False + yield self._unsafe_process() + except Exception: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self.processing = False @defer.inlineCallbacks def _unsafe_process(self): -- cgit 1.4.1 From e7a16c6210191e9556fb1c11cbff2d98f0a5206c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Oct 2018 16:12:11 +0100 Subject: Remove redundant run_as_background_process() from pusherpool `on_new_notifications` and `on_new_receipts` in `HttpPusher` and `EmailPusher` now always return synchronously, so we can remove the `defer.gatherResults` on their results, and the `run_as_background_process` wrappers can be removed too because the PusherPool methods will now complete quickly enough. --- synapse/app/pusher.py | 4 ++-- synapse/handlers/federation.py | 4 ++-- synapse/handlers/message.py | 2 +- synapse/handlers/receipts.py | 2 +- synapse/push/emailpusher.py | 3 +-- synapse/push/httppusher.py | 2 -- synapse/push/pusherpool.py | 47 +++++++----------------------------------- 7 files changed, 14 insertions(+), 50 deletions(-) (limited to 'synapse') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index e06b70894e..83b0863f00 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -161,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler): else: yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - self.pusher_pool.on_new_notifications( + yield self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - self.pusher_pool.on_new_receipts( + yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) except Exception: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cab57a8849..63e495e3f8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler): if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) + yield self._notify_persisted_event(event, max_stream_id) def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the @@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler): extra_users=extra_users ) - self.pusher_pool.on_new_notifications( + return self.pusher_pool.on_new_notifications( event_stream_id, max_stream_id, ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4954b23a0d..6c4fcfb10a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -779,7 +779,7 @@ class EventCreationHandler(object): event, context=context ) - self.pusher_pool.on_new_notifications( + yield self.pusher_pool.on_new_notifications( event_stream_id, max_stream_id, ) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a6f3181f09..4c2690ba26 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler): "receipt_key", max_batch_id, rooms=affected_room_ids ) # Note that the min here shouldn't be relied upon to be accurate. - self.hs.get_pusherpool().on_new_receipts( + yield self.hs.get_pusherpool().on_new_receipts( min_batch_id, max_batch_id, affected_room_ids, ) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 0c9c0201e8..d5a99b838c 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -94,13 +94,12 @@ class EmailPusher(object): def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) self._start_processing() - return defer.succeed(None) def on_new_receipts(self, min_stream_id, max_stream_id): # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the # timer fire - return defer.succeed(None) + pass def on_timer(self): self.timed_call = None diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 5f6b21bc67..770f55feae 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -98,7 +98,6 @@ class HttpPusher(object): def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) self._start_processing() - return defer.suceed(None) def on_new_receipts(self, min_stream_id, max_stream_id): # Note that the min here shouldn't be relied upon to be accurate. @@ -106,7 +105,6 @@ class HttpPusher(object): # We could check the receipts are actually m.read receipts here, # but currently that's the only type of receipt anyway... run_as_background_process("http_pusher.on_new_receipts", self._update_badge) - return defer.succeed(None) @defer.inlineCallbacks def _update_badge(self): diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b9b68ec829..a4d1ce3aad 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -18,9 +18,8 @@ import logging from twisted.internet import defer -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push.pusher import PusherFactory -from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from synapse.util.logcontext import run_in_background logger = logging.getLogger(__name__) @@ -122,45 +121,23 @@ class PusherPool: p['app_id'], p['pushkey'], p['user_name'], ) - def on_new_notifications(self, min_stream_id, max_stream_id): - run_as_background_process( - "on_new_notifications", - self._on_new_notifications, min_stream_id, max_stream_id, - ) - @defer.inlineCallbacks - def _on_new_notifications(self, min_stream_id, max_stream_id): + def on_new_notifications(self, min_stream_id, max_stream_id): try: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id ) - deferreds = [] - for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - deferreds.append( - run_in_background( - p.on_new_notifications, - min_stream_id, max_stream_id, - ) - ) - - yield make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True), - ) + p.on_new_notifications(min_stream_id, max_stream_id) + except Exception: logger.exception("Exception in pusher on_new_notifications") - def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): - run_as_background_process( - "on_new_receipts", - self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids, - ) - @defer.inlineCallbacks - def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): + def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive @@ -170,21 +147,11 @@ class PusherPool: # This returns a tuple, user_id is at index 3 users_affected = set([r[3] for r in updated_receipts]) - deferreds = [] - for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - deferreds.append( - run_in_background( - p.on_new_receipts, - min_stream_id, max_stream_id, - ) - ) - - yield make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True), - ) + p.on_new_receipts(min_stream_id, max_stream_id) + except Exception: logger.exception("Exception in pusher on_new_receipts") -- cgit 1.4.1 From f749607c911a82e64685f1cfaeee892c49a1b606 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Oct 2018 16:12:11 +0100 Subject: Make on_started synchronous too This brings it into line with on_new_notifications and on_new_receipts. It requires a little bit of hoop-jumping in EmailPusher to load the throttle params before the first loop. --- synapse/push/emailpusher.py | 15 +++++++-------- synapse/push/httppusher.py | 1 - synapse/push/pusherpool.py | 16 ++++++++++++++-- 3 files changed, 21 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index d5a99b838c..0bc548ae7a 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -72,16 +72,9 @@ class EmailPusher(object): self.processing = False - @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - try: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - self._start_processing() - except Exception: - logger.exception("Error starting email pusher") + self._start_processing() def on_stop(self): if self.timed_call: @@ -116,6 +109,12 @@ class EmailPusher(object): try: self.processing = True + if self.throttle_params is None: + # this is our first loop: load up the throttle params + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + # if the max ordering changes while we're running _unsafe_process, # call it again, and so on until we've caught up. while True: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 770f55feae..33034d44da 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -93,7 +93,6 @@ class HttpPusher(object): def on_started(self): self._start_processing() - return defer.succeed(None) def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index a4d1ce3aad..695e582dce 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,12 +19,24 @@ import logging from twisted.internet import defer from synapse.push.pusher import PusherFactory -from synapse.util.logcontext import run_in_background logger = logging.getLogger(__name__) class PusherPool: + """ + The pusher pool. This is responsible for dispatching notifications of new events to + the http and email pushers. + + It provides three methods which are designed to be called by the rest of the + application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these + delegates to each of the relevant pushers. + + Note that it is expected that each pusher will have its own 'processing' loop which + will send out the notifications in the background, rather than blocking until the + notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and + Pusher.on_new_receipts are not expected to return deferreds. + """ def __init__(self, _hs): self.hs = _hs self.pusher_factory = PusherFactory(_hs) @@ -216,7 +228,7 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - run_in_background(p.on_started) + p.on_started() @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): -- cgit 1.4.1 From 026cd91ac878bc723f653f393fd1f44c438ba559 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Oct 2018 16:12:11 +0100 Subject: Run PusherPool.start as a background process We don't do anything with the result, so this is needed to give this code a logcontext. --- synapse/push/pusherpool.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 695e582dce..d25877cc66 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push.pusher import PusherFactory logger = logging.getLogger(__name__) @@ -45,9 +46,13 @@ class PusherPool: self.clock = self.hs.get_clock() self.pushers = {} - @defer.inlineCallbacks def start(self): - yield self._start_pushers() + """Starts the pushers off in a background process. + """ + if not self.start_pushers: + logger.info("Not starting pushers because they are disabled in the config") + return + run_as_background_process("start_pushers", self._start_pushers) @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, @@ -192,9 +197,6 @@ class PusherPool: Returns: Deferred """ - if not self.start_pushers: - logger.info("Not starting pushers because they are disabled in the config") - return pushers = yield self.store.get_all_pushers() logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: -- cgit 1.4.1 From e564306e315fc3dfd37e5fed495ae300fbb58c8a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 24 Oct 2018 09:23:33 +0100 Subject: sanity-check the is_processing flag ... and rename it, for even more sanity --- synapse/push/emailpusher.py | 11 +++++++---- synapse/push/httppusher.py | 11 +++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 0bc548ae7a..f369124258 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -70,7 +70,7 @@ class EmailPusher(object): # See httppusher self.max_stream_ordering = None - self.processing = False + self._is_processing = False def on_started(self): if self.mailer is not None: @@ -99,15 +99,18 @@ class EmailPusher(object): self._start_processing() def _start_processing(self): - if self.processing: + if self._is_processing: return run_as_background_process("emailpush.process", self._process) @defer.inlineCallbacks def _process(self): + # we should never get here if we are already processing + assert not self._is_processing + try: - self.processing = True + self._is_processing = True if self.throttle_params is None: # this is our first loop: load up the throttle params @@ -126,7 +129,7 @@ class EmailPusher(object): if self.max_stream_ordering == starting_max_ordering: break finally: - self.processing = False + self._is_processing = False @defer.inlineCallbacks def _unsafe_process(self): diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 33034d44da..6bd703632d 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -60,7 +60,7 @@ class HttpPusher(object): self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.failing_since = pusherdict['failing_since'] self.timed_call = None - self.processing = False + self._is_processing = False # This is the highest stream ordering we know it's safe to process. # When new events arrive, we'll be given a window of new events: we @@ -122,15 +122,18 @@ class HttpPusher(object): self.timed_call = None def _start_processing(self): - if self.processing: + if self._is_processing: return run_as_background_process("httppush.process", self._process) @defer.inlineCallbacks def _process(self): + # we should never get here if we are already processing + assert not self._is_processing + try: - self.processing = True + self._is_processing = True # if the max ordering changes while we're running _unsafe_process, # call it again, and so on until we've caught up. while True: @@ -142,7 +145,7 @@ class HttpPusher(object): if self.max_stream_ordering == starting_max_ordering: break finally: - self.processing = False + self._is_processing = False @defer.inlineCallbacks def _unsafe_process(self): -- cgit 1.4.1 From c573794b22a47a0f03c80f2732922c3375e7b74c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 24 Oct 2018 09:24:55 +0100 Subject: Fix start_pushers vs _start_pushers confusion --- synapse/push/pusherpool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index d25877cc66..5a4e73ccd6 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -41,7 +41,7 @@ class PusherPool: def __init__(self, _hs): self.hs = _hs self.pusher_factory = PusherFactory(_hs) - self.start_pushers = _hs.config.start_pushers + self._should_start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() self.pushers = {} @@ -49,7 +49,7 @@ class PusherPool: def start(self): """Starts the pushers off in a background process. """ - if not self.start_pushers: + if not self._should_start_pushers: logger.info("Not starting pushers because they are disabled in the config") return run_as_background_process("start_pushers", self._start_pushers) @@ -175,7 +175,7 @@ class PusherPool: @defer.inlineCallbacks def start_pusher_by_id(self, app_id, pushkey, user_id): """Look up the details for the given pusher, and start it""" - if not self._start_pushers: + if not self._should_start_pushers: return resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( -- cgit 1.4.1