diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 84341b0d20..6556dd1ae8 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -44,6 +44,10 @@ class ApplicationServicesHandler(object):
self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
self.clock = hs.get_clock()
+ self.notify_appservices = hs.config.notify_appservices
+
+ self.current_max = 0
+ self.is_processing = False
@defer.inlineCallbacks
def notify_interested_services(self, current_id):
@@ -56,47 +60,56 @@ class ApplicationServicesHandler(object):
current_id(int): The current maximum ID.
"""
services = yield self.store.get_app_services()
- if not services:
+ if not services or not self.notify_appservices:
return
- with Measure(self.clock, "notify_interested_services"):
- upper_bound = current_id
- limit = 100
- while True:
- upper_bound, events = yield self.store.get_new_events_for_appservice(
- upper_bound, limit
- )
-
- logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound)
-
- if not events:
- break
-
- for event in events:
- # Gather interested services
- services = yield self._get_services_for_event(event)
- if len(services) == 0:
- continue # no services need notifying
-
- # Do we know this user exists? If not, poke the user query API for
- # all services which match that user regex. This needs to block as
- # these user queries need to be made BEFORE pushing the event.
- yield self._check_user_exists(event.sender)
- if event.type == EventTypes.Member:
- yield self._check_user_exists(event.state_key)
-
- if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
- self.started_scheduler = True
-
- # Fork off pushes to these services
- for service in services:
- preserve_fn(self.scheduler.submit_event_for_as)(service, event)
-
- yield self.store.set_appservice_last_pos(upper_bound)
+ self.current_max = max(self.current_max, current_id)
+ if self.is_processing:
+ return
- if len(events) < limit:
- break
+ with Measure(self.clock, "notify_interested_services"):
+ self.is_processing = True
+ try:
+ upper_bound = self.current_max
+ limit = 100
+ while True:
+ upper_bound, events = yield self.store.get_new_events_for_appservice(
+ upper_bound, limit
+ )
+
+ if not events:
+ break
+
+ for event in events:
+ # Gather interested services
+ services = yield self._get_services_for_event(event)
+ if len(services) == 0:
+ continue # no services need notifying
+
+ # Do we know this user exists? If not, poke the user
+ # query API for all services which match that user regex.
+ # This needs to block as these user queries need to be
+ # made BEFORE pushing the event.
+ yield self._check_user_exists(event.sender)
+ if event.type == EventTypes.Member:
+ yield self._check_user_exists(event.state_key)
+
+ if not self.started_scheduler:
+ self.scheduler.start().addErrback(log_failure)
+ self.started_scheduler = True
+
+ # Fork off pushes to these services
+ for service in services:
+ preserve_fn(self.scheduler.submit_event_for_as)(
+ service, event
+ )
+
+ yield self.store.set_appservice_last_pos(upper_bound)
+
+ if len(events) < limit:
+ break
+ finally:
+ self.is_processing = False
@defer.inlineCallbacks
def query_user_exists(self, user_id):
|