diff options
author | Erik Johnston <erikj@jki.re> | 2016-08-18 16:49:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-08-18 16:49:43 +0100 |
commit | be088b32d8e1f28199aa089f6d59fe4d94da510f (patch) | |
tree | 5c6ee37d31a792fd87deec3d54c030dc9a1f6a78 /synapse/handlers/appservice.py | |
parent | Jenkins: tox install setuptools (diff) | |
parent | Remove log lines (diff) | |
download | synapse-be088b32d8e1f28199aa089f6d59fe4d94da510f.tar.xz |
Merge pull request #1027 from matrix-org/erikj/appservice_stream
Add appservice worker
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r-- | synapse/handlers/appservice.py | 89 |
1 files changed, 51 insertions, 38 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 84341b0d20..6556dd1ae8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -44,6 +44,10 @@ class ApplicationServicesHandler(object): self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False self.clock = hs.get_clock() + self.notify_appservices = hs.config.notify_appservices + + self.current_max = 0 + self.is_processing = False @defer.inlineCallbacks def notify_interested_services(self, current_id): @@ -56,47 +60,56 @@ class ApplicationServicesHandler(object): current_id(int): The current maximum ID. """ services = yield self.store.get_app_services() - if not services: + if not services or not self.notify_appservices: return - with Measure(self.clock, "notify_interested_services"): - upper_bound = current_id - limit = 100 - while True: - upper_bound, events = yield self.store.get_new_events_for_appservice( - upper_bound, limit - ) - - logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) - - if not events: - break - - for event in events: - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - continue # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as - # these user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - preserve_fn(self.scheduler.submit_event_for_as)(service, event) - - yield self.store.set_appservice_last_pos(upper_bound) + self.current_max = max(self.current_max, current_id) + if self.is_processing: + return - if len(events) < limit: - break + with Measure(self.clock, "notify_interested_services"): + self.is_processing = True + try: + upper_bound = self.current_max + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user + # query API for all services which match that user regex. + # This needs to block as these user queries need to be + # made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)( + service, event + ) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break + finally: + self.is_processing = False @defer.inlineCallbacks def query_user_exists(self, user_id): |