summary refs log tree commit diff
path: root/synapse/handlers/appservice.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-24 14:39:35 +0100
committerErik Johnston <erik@matrix.org>2016-08-24 14:39:35 +0100
commit37638c06c59bbf7327d5c1edc4b9e346716e7374 (patch)
tree1c843a49d3d5168ff998a54f50d30cdc3814f104 /synapse/handlers/appservice.py
parentMerge branch 'release-v0.17.0' of github.com:matrix-org/synapse (diff)
parentBump changelog and version (diff)
downloadsynapse-37638c06c59bbf7327d5c1edc4b9e346716e7374.tar.xz
Merge branch 'release-v0.17.1' of github.com:matrix-org/synapse v0.17.1
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r--synapse/handlers/appservice.py136
1 files changed, 92 insertions, 44 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 051ccdb380..306686a384 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -16,7 +16,8 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
-from synapse.appservice import ApplicationService
+from synapse.util.metrics import Measure
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 
 import logging
 
@@ -42,36 +43,73 @@ class ApplicationServicesHandler(object):
         self.appservice_api = hs.get_application_service_api()
         self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
+        self.clock = hs.get_clock()
+        self.notify_appservices = hs.config.notify_appservices
+
+        self.current_max = 0
+        self.is_processing = False
 
     @defer.inlineCallbacks
-    def notify_interested_services(self, event):
+    def notify_interested_services(self, current_id):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
         prolonged length of time.
 
         Args:
-            event(Event): The event to push out to interested services.
+            current_id(int): The current maximum ID.
         """
-        # Gather interested services
-        services = yield self._get_services_for_event(event)
-        if len(services) == 0:
-            return  # no services need notifying
-
-        # Do we know this user exists? If not, poke the user query API for
-        # all services which match that user regex. This needs to block as these
-        # user queries need to be made BEFORE pushing the event.
-        yield self._check_user_exists(event.sender)
-        if event.type == EventTypes.Member:
-            yield self._check_user_exists(event.state_key)
-
-        if not self.started_scheduler:
-            self.scheduler.start().addErrback(log_failure)
-            self.started_scheduler = True
-
-        # Fork off pushes to these services
-        for service in services:
-            self.scheduler.submit_event_for_as(service, event)
+        services = yield self.store.get_app_services()
+        if not services or not self.notify_appservices:
+            return
+
+        self.current_max = max(self.current_max, current_id)
+        if self.is_processing:
+            return
+
+        with Measure(self.clock, "notify_interested_services"):
+            self.is_processing = True
+            try:
+                upper_bound = self.current_max
+                limit = 100
+                while True:
+                    upper_bound, events = yield self.store.get_new_events_for_appservice(
+                        upper_bound, limit
+                    )
+
+                    if not events:
+                        break
+
+                    for event in events:
+                        # Gather interested services
+                        services = yield self._get_services_for_event(event)
+                        if len(services) == 0:
+                            continue  # no services need notifying
+
+                        # Do we know this user exists? If not, poke the user
+                        # query API for all services which match that user regex.
+                        # This needs to block as these user queries need to be
+                        # made BEFORE pushing the event.
+                        yield self._check_user_exists(event.sender)
+                        if event.type == EventTypes.Member:
+                            yield self._check_user_exists(event.state_key)
+
+                        if not self.started_scheduler:
+                            self.scheduler.start().addErrback(log_failure)
+                            self.started_scheduler = True
+
+                        # Fork off pushes to these services
+                        for service in services:
+                            preserve_fn(self.scheduler.submit_event_for_as)(
+                                service, event
+                            )
+
+                    yield self.store.set_appservice_last_pos(upper_bound)
+
+                    if len(events) < limit:
+                        break
+            finally:
+                self.is_processing = False
 
     @defer.inlineCallbacks
     def query_user_exists(self, user_id):
@@ -104,11 +142,12 @@ class ApplicationServicesHandler(object):
             association can be found.
         """
         room_alias_str = room_alias.to_string()
-        alias_query_services = yield self._get_services_for_event(
-            event=None,
-            restrict_to=ApplicationService.NS_ALIASES,
-            alias_list=[room_alias_str]
-        )
+        services = yield self.store.get_app_services()
+        alias_query_services = [
+            s for s in services if (
+                s.is_interested_in_alias(room_alias_str)
+            )
+        ]
         for alias_service in alias_query_services:
             is_known_alias = yield self.appservice_api.query_alias(
                 alias_service, room_alias_str
@@ -121,34 +160,35 @@ class ApplicationServicesHandler(object):
                 defer.returnValue(result)
 
     @defer.inlineCallbacks
-    def _get_services_for_event(self, event, restrict_to="", alias_list=None):
+    def query_3pe(self, kind, protocol, fields):
+        services = yield self._get_services_for_3pn(protocol)
+
+        results = yield preserve_context_over_deferred(defer.DeferredList([
+            preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
+            for service in services
+        ], consumeErrors=True))
+
+        ret = []
+        for (success, result) in results:
+            if success:
+                ret.extend(result)
+
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _get_services_for_event(self, event):
         """Retrieve a list of application services interested in this event.
 
         Args:
             event(Event): The event to check. Can be None if alias_list is not.
-            restrict_to(str): The namespace to restrict regex tests to.
-            alias_list: A list of aliases to get services for. If None, this
-            list is obtained from the database.
         Returns:
             list<ApplicationService>: A list of services interested in this
             event based on the service regex.
         """
-        member_list = None
-        if hasattr(event, "room_id"):
-            # We need to know the aliases associated with this event.room_id,
-            # if any.
-            if not alias_list:
-                alias_list = yield self.store.get_aliases_for_room(
-                    event.room_id
-                )
-            # We need to know the members associated with this event.room_id,
-            # if any.
-            member_list = yield self.store.get_users_in_room(event.room_id)
-
         services = yield self.store.get_app_services()
         interested_list = [
             s for s in services if (
-                s.is_interested(event, restrict_to, alias_list, member_list)
+                yield s.is_interested(event, self.store)
             )
         ]
         defer.returnValue(interested_list)
@@ -164,6 +204,14 @@ class ApplicationServicesHandler(object):
         defer.returnValue(interested_list)
 
     @defer.inlineCallbacks
+    def _get_services_for_3pn(self, protocol):
+        services = yield self.store.get_app_services()
+        interested_list = [
+            s for s in services if s.is_interested_in_protocol(protocol)
+        ]
+        defer.returnValue(interested_list)
+
+    @defer.inlineCallbacks
     def _is_unknown_user(self, user_id):
         if not self.is_mine_id(user_id):
             # we don't know if they are unknown or not since it isn't one of our