summary refs log tree commit diff
path: root/synapse/handlers/appservice.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-18 14:59:55 +0100
committerErik Johnston <erik@matrix.org>2016-08-18 14:59:55 +0100
commit07229bbdae6081f0c91a60e76de8fa848903b5bd (patch)
treefa5536a62b57a3d833e16108ac64c2f2d5beca5e /synapse/handlers/appservice.py
parentMake AppserviceHandler stream events from database (diff)
downloadsynapse-07229bbdae6081f0c91a60e76de8fa848903b5bd.tar.xz
Add appservice worker
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r--synapse/handlers/appservice.py89
1 files changed, 51 insertions, 38 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 84341b0d20..6556dd1ae8 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -44,6 +44,10 @@ class ApplicationServicesHandler(object):
         self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
         self.clock = hs.get_clock()
+        self.notify_appservices = hs.config.notify_appservices
+
+        self.current_max = 0
+        self.is_processing = False
 
     @defer.inlineCallbacks
     def notify_interested_services(self, current_id):
@@ -56,47 +60,56 @@ class ApplicationServicesHandler(object):
             current_id(int): The current maximum ID.
         """
         services = yield self.store.get_app_services()
-        if not services:
+        if not services or not self.notify_appservices:
             return
 
-        with Measure(self.clock, "notify_interested_services"):
-            upper_bound = current_id
-            limit = 100
-            while True:
-                upper_bound, events = yield self.store.get_new_events_for_appservice(
-                    upper_bound, limit
-                )
-
-                logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound)
-
-                if not events:
-                    break
-
-                for event in events:
-                    # Gather interested services
-                    services = yield self._get_services_for_event(event)
-                    if len(services) == 0:
-                        continue  # no services need notifying
-
-                    # Do we know this user exists? If not, poke the user query API for
-                    # all services which match that user regex. This needs to block as
-                    # these user queries need to be made BEFORE pushing the event.
-                    yield self._check_user_exists(event.sender)
-                    if event.type == EventTypes.Member:
-                        yield self._check_user_exists(event.state_key)
-
-                    if not self.started_scheduler:
-                        self.scheduler.start().addErrback(log_failure)
-                        self.started_scheduler = True
-
-                    # Fork off pushes to these services
-                    for service in services:
-                        preserve_fn(self.scheduler.submit_event_for_as)(service, event)
-
-                yield self.store.set_appservice_last_pos(upper_bound)
+        self.current_max = max(self.current_max, current_id)
+        if self.is_processing:
+            return
 
-                if len(events) < limit:
-                    break
+        with Measure(self.clock, "notify_interested_services"):
+            self.is_processing = True
+            try:
+                upper_bound = self.current_max
+                limit = 100
+                while True:
+                    upper_bound, events = yield self.store.get_new_events_for_appservice(
+                        upper_bound, limit
+                    )
+
+                    if not events:
+                        break
+
+                    for event in events:
+                        # Gather interested services
+                        services = yield self._get_services_for_event(event)
+                        if len(services) == 0:
+                            continue  # no services need notifying
+
+                        # Do we know this user exists? If not, poke the user
+                        # query API for all services which match that user regex.
+                        # This needs to block as these user queries need to be
+                        # made BEFORE pushing the event.
+                        yield self._check_user_exists(event.sender)
+                        if event.type == EventTypes.Member:
+                            yield self._check_user_exists(event.state_key)
+
+                        if not self.started_scheduler:
+                            self.scheduler.start().addErrback(log_failure)
+                            self.started_scheduler = True
+
+                        # Fork off pushes to these services
+                        for service in services:
+                            preserve_fn(self.scheduler.submit_event_for_as)(
+                                service, event
+                            )
+
+                    yield self.store.set_appservice_last_pos(upper_bound)
+
+                    if len(events) < limit:
+                        break
+            finally:
+                self.is_processing = False
 
     @defer.inlineCallbacks
     def query_user_exists(self, user_id):