diff options
author | Paul "LeoNerd" Evans <paul@matrix.org> | 2016-08-18 14:21:01 +0100 |
---|---|---|
committer | Paul "LeoNerd" Evans <paul@matrix.org> | 2016-08-18 14:21:01 +0100 |
commit | d5bf7a4a991b0bfe2134bb5e5c7e194f33f037aa (patch) | |
tree | 2187b123096728d0ff329d354368ef48daed1fdb /synapse/handlers/appservice.py | |
parent | Since empty lookups now return 200/empty list not 404, we can safely log fail... (diff) | |
parent | Merge pull request #1025 from matrix-org/erikj/appservice_stream (diff) | |
download | synapse-d5bf7a4a991b0bfe2134bb5e5c7e194f33f037aa.tar.xz |
Merge remote-tracking branch 'origin/develop' into paul/thirdpartylookup
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/appservice.py | 99 |
1 files changed, 55 insertions, 44 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f124590e4a..52e897d8d9 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -16,7 +16,8 @@ from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.appservice import ApplicationService +from synapse.util.metrics import Measure +from synapse.util.logcontext import preserve_fn import logging @@ -42,36 +43,60 @@ class ApplicationServicesHandler(object): self.appservice_api = hs.get_application_service_api() self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False + self.clock = hs.get_clock() @defer.inlineCallbacks - def notify_interested_services(self, event): + def notify_interested_services(self, current_id): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. Args: - event(Event): The event to push out to interested services. + current_id(int): The current maximum ID. """ - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - return # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as these - # user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - self.scheduler.submit_event_for_as(service, event) + services = yield self.store.get_app_services() + if not services: + return + + with Measure(self.clock, "notify_interested_services"): + upper_bound = current_id + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user query API for + # all services which match that user regex. This needs to block as + # these user queries need to be made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)(service, event) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break @defer.inlineCallbacks def query_user_exists(self, user_id): @@ -104,11 +129,12 @@ class ApplicationServicesHandler(object): association can be found. """ room_alias_str = room_alias.to_string() - alias_query_services = yield self._get_services_for_event( - event=None, - restrict_to=ApplicationService.NS_ALIASES, - alias_list=[room_alias_str] - ) + services = yield self.store.get_app_services() + alias_query_services = [ + s for s in services if ( + s.is_interested_in_alias(room_alias_str) + ) + ] for alias_service in alias_query_services: is_known_alias = yield self.appservice_api.query_alias( alias_service, room_alias_str @@ -136,34 +162,19 @@ class ApplicationServicesHandler(object): defer.returnValue(results) @defer.inlineCallbacks - def _get_services_for_event(self, event, restrict_to="", alias_list=None): + def _get_services_for_event(self, event): """Retrieve a list of application services interested in this event. Args: event(Event): The event to check. Can be None if alias_list is not. - restrict_to(str): The namespace to restrict regex tests to. - alias_list: A list of aliases to get services for. If None, this - list is obtained from the database. Returns: list<ApplicationService>: A list of services interested in this event based on the service regex. """ - member_list = None - if hasattr(event, "room_id"): - # We need to know the aliases associated with this event.room_id, - # if any. - if not alias_list: - alias_list = yield self.store.get_aliases_for_room( - event.room_id - ) - # We need to know the members associated with this event.room_id, - # if any. - member_list = yield self.store.get_users_in_room(event.room_id) - services = yield self.store.get_app_services() interested_list = [ s for s in services if ( - s.is_interested(event, restrict_to, alias_list, member_list) + yield s.is_interested(event, self.store) ) ] defer.returnValue(interested_list) |