diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 75fc74c797..05af54d31b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -16,8 +16,8 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes
-from synapse.appservice import ApplicationService
-from synapse.types import UserID
+from synapse.util.metrics import Measure
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
import logging
@@ -35,47 +35,81 @@ def log_failure(failure):
)
-# NB: Purposefully not inheriting BaseHandler since that contains way too much
-# setup code which this handler does not need or use. This makes testing a lot
-# easier.
class ApplicationServicesHandler(object):
- def __init__(self, hs, appservice_api, appservice_scheduler):
+ def __init__(self, hs):
self.store = hs.get_datastore()
- self.hs = hs
- self.appservice_api = appservice_api
- self.scheduler = appservice_scheduler
+ self.is_mine_id = hs.is_mine_id
+ self.appservice_api = hs.get_application_service_api()
+ self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
+ self.clock = hs.get_clock()
+ self.notify_appservices = hs.config.notify_appservices
+
+ self.current_max = 0
+ self.is_processing = False
@defer.inlineCallbacks
- def notify_interested_services(self, event):
+ def notify_interested_services(self, current_id):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
prolonged length of time.
Args:
- event(Event): The event to push out to interested services.
+ current_id(int): The current maximum ID.
"""
- # Gather interested services
- services = yield self._get_services_for_event(event)
- if len(services) == 0:
- return # no services need notifying
-
- # Do we know this user exists? If not, poke the user query API for
- # all services which match that user regex. This needs to block as these
- # user queries need to be made BEFORE pushing the event.
- yield self._check_user_exists(event.sender)
- if event.type == EventTypes.Member:
- yield self._check_user_exists(event.state_key)
-
- if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
- self.started_scheduler = True
-
- # Fork off pushes to these services
- for service in services:
- self.scheduler.submit_event_for_as(service, event)
+ services = self.store.get_app_services()
+ if not services or not self.notify_appservices:
+ return
+
+ self.current_max = max(self.current_max, current_id)
+ if self.is_processing:
+ return
+
+ with Measure(self.clock, "notify_interested_services"):
+ self.is_processing = True
+ try:
+ upper_bound = self.current_max
+ limit = 100
+ while True:
+ upper_bound, events = yield self.store.get_new_events_for_appservice(
+ upper_bound, limit
+ )
+
+ if not events:
+ break
+
+ for event in events:
+ # Gather interested services
+ services = yield self._get_services_for_event(event)
+ if len(services) == 0:
+ continue # no services need notifying
+
+ # Do we know this user exists? If not, poke the user
+ # query API for all services which match that user regex.
+ # This needs to block as these user queries need to be
+ # made BEFORE pushing the event.
+ yield self._check_user_exists(event.sender)
+ if event.type == EventTypes.Member:
+ yield self._check_user_exists(event.state_key)
+
+ if not self.started_scheduler:
+ self.scheduler.start().addErrback(log_failure)
+ self.started_scheduler = True
+
+ # Fork off pushes to these services
+ for service in services:
+ preserve_fn(self.scheduler.submit_event_for_as)(
+ service, event
+ )
+
+ yield self.store.set_appservice_last_pos(upper_bound)
+
+ if len(events) < limit:
+ break
+ finally:
+ self.is_processing = False
@defer.inlineCallbacks
def query_user_exists(self, user_id):
@@ -108,11 +142,12 @@ class ApplicationServicesHandler(object):
association can be found.
"""
room_alias_str = room_alias.to_string()
- alias_query_services = yield self._get_services_for_event(
- event=None,
- restrict_to=ApplicationService.NS_ALIASES,
- alias_list=[room_alias_str]
- )
+ services = self.store.get_app_services()
+ alias_query_services = [
+ s for s in services if (
+ s.is_interested_in_alias(room_alias_str)
+ )
+ ]
for alias_service in alias_query_services:
is_known_alias = yield self.appservice_api.query_alias(
alias_service, room_alias_str
@@ -125,52 +160,97 @@ class ApplicationServicesHandler(object):
defer.returnValue(result)
@defer.inlineCallbacks
- def _get_services_for_event(self, event, restrict_to="", alias_list=None):
+ def query_3pe(self, kind, protocol, fields):
+ services = yield self._get_services_for_3pn(protocol)
+
+ results = yield preserve_context_over_deferred(defer.DeferredList([
+ preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
+ for service in services
+ ], consumeErrors=True))
+
+ ret = []
+ for (success, result) in results:
+ if success:
+ ret.extend(result)
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_3pe_protocols(self, only_protocol=None):
+ services = self.store.get_app_services()
+ protocols = {}
+
+ # Collect up all the individual protocol responses out of the ASes
+ for s in services:
+ for p in s.protocols:
+ if only_protocol is not None and p != only_protocol:
+ continue
+
+ if p not in protocols:
+ protocols[p] = []
+
+ info = yield self.appservice_api.get_3pe_protocol(s, p)
+
+ if info is not None:
+ protocols[p].append(info)
+
+ def _merge_instances(infos):
+ if not infos:
+ return {}
+
+ # Merge the 'instances' lists of multiple results, but just take
+ # the other fields from the first as they ought to be identical
+ # copy the result so as not to corrupt the cached one
+ combined = dict(infos[0])
+ combined["instances"] = list(combined["instances"])
+
+ for info in infos[1:]:
+ combined["instances"].extend(info["instances"])
+
+ return combined
+
+ for p in protocols.keys():
+ protocols[p] = _merge_instances(protocols[p])
+
+ defer.returnValue(protocols)
+
+ @defer.inlineCallbacks
+ def _get_services_for_event(self, event):
"""Retrieve a list of application services interested in this event.
Args:
event(Event): The event to check. Can be None if alias_list is not.
- restrict_to(str): The namespace to restrict regex tests to.
- alias_list: A list of aliases to get services for. If None, this
- list is obtained from the database.
Returns:
list<ApplicationService>: A list of services interested in this
event based on the service regex.
"""
- member_list = None
- if hasattr(event, "room_id"):
- # We need to know the aliases associated with this event.room_id,
- # if any.
- if not alias_list:
- alias_list = yield self.store.get_aliases_for_room(
- event.room_id
- )
- # We need to know the members associated with this event.room_id,
- # if any.
- member_list = yield self.store.get_users_in_room(event.room_id)
-
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
interested_list = [
s for s in services if (
- s.is_interested(event, restrict_to, alias_list, member_list)
+ yield s.is_interested(event, self.store)
)
]
defer.returnValue(interested_list)
- @defer.inlineCallbacks
def _get_services_for_user(self, user_id):
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
interested_list = [
s for s in services if (
s.is_interested_in_user(user_id)
)
]
- defer.returnValue(interested_list)
+ return defer.succeed(interested_list)
+
+ def _get_services_for_3pn(self, protocol):
+ services = self.store.get_app_services()
+ interested_list = [
+ s for s in services if s.is_interested_in_protocol(protocol)
+ ]
+ return defer.succeed(interested_list)
@defer.inlineCallbacks
def _is_unknown_user(self, user_id):
- user = UserID.from_string(user_id)
- if not self.hs.is_mine(user):
+ if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
defer.returnValue(False)
@@ -182,7 +262,7 @@ class ApplicationServicesHandler(object):
return
# user not found; could be the AS though, so check.
- services = yield self.store.get_app_services()
+ services = self.store.get_app_services()
service_list = [s for s in services if s.sender == user_id]
defer.returnValue(len(service_list) == 0)
|