diff options
author | Erik Johnston <erik@matrix.org> | 2016-08-18 11:54:41 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-08-18 11:54:41 +0100 |
commit | 9da84a9a1ef0b88d2e6170d706425ab36431abda (patch) | |
tree | cf6dc7f8655a619490795377eee2d8e5a32861aa /synapse/handlers/appservice.py | |
parent | Missed a s/federation reader/media repository/ in a log message (diff) | |
download | synapse-9da84a9a1ef0b88d2e6170d706425ab36431abda.tar.xz |
Make AppserviceHandler stream events from database
This is for two reasons: 1. Suppresses duplicates correctly, as the notifier doesn't do any duplicate suppression. 2. Makes it easier to connect the AppserviceHandler to the replication stream.
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r-- | synapse/handlers/appservice.py | 65 |
1 files changed, 44 insertions, 21 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 79805cdc2e..84341b0d20 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure +from synapse.util.logcontext import preserve_fn import logging @@ -45,35 +46,57 @@ class ApplicationServicesHandler(object): self.clock = hs.get_clock() @defer.inlineCallbacks - def notify_interested_services(self, event): + def notify_interested_services(self, current_id): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. Args: - event(Event): The event to push out to interested services. + current_id(int): The current maximum ID. """ + services = yield self.store.get_app_services() + if not services: + return + with Measure(self.clock, "notify_interested_services"): - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - return # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as these - # user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - self.scheduler.submit_event_for_as(service, event) + upper_bound = current_id + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user query API for + # all services which match that user regex. This needs to block as + # these user queries need to be made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)(service, event) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break @defer.inlineCallbacks def query_user_exists(self, user_id): |