diff options
-rw-r--r-- | synapse/appservice/scheduler.py | 61 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 41 | ||||
-rw-r--r-- | synapse/notifier.py | 41 | ||||
-rw-r--r-- | synapse/rest/media/v1/preview_url_resource.py | 2 | ||||
-rw-r--r-- | synapse/storage/_base.py | 3 | ||||
-rw-r--r-- | tests/appservice/test_scheduler.py | 2 | ||||
-rw-r--r-- | tests/handlers/test_appservice.py | 2 |
7 files changed, 61 insertions, 91 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 9afc8fd754..6450a12890 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,9 +48,12 @@ UP & quit +---------- YES SUCCESS This is all tied together by the AppServiceScheduler which DIs the required components. """ +from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from twisted.internet import defer +from synapse.util.logcontext import preserve_fn +from synapse.util.metrics import Measure + import logging logger = logging.getLogger(__name__) @@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object): self.txn_ctrl = _TransactionController( self.clock, self.store, self.as_api, create_recoverer ) - self.queuer = _ServiceQueuer(self.txn_ctrl) + self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) @defer.inlineCallbacks def start(self): @@ -94,38 +97,36 @@ class _ServiceQueuer(object): this schedules any other events in the queue to run. """ - def __init__(self, txn_ctrl): + def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} - self.pending_requests = {} # dict of {service_id: Deferred} + self.requests_in_flight = set() self.txn_ctrl = txn_ctrl + self.clock = clock def enqueue(self, service, event): # if this service isn't being sent something - if not self.pending_requests.get(service.id): - self._send_request(service, [event]) - else: - # add to queue for this service - if service.id not in self.queued_events: - self.queued_events[service.id] = [] - self.queued_events[service.id].append(event) - - def _send_request(self, service, events): - # send request and add callbacks - d = self.txn_ctrl.send(service, events) - d.addBoth(self._on_request_finish) - d.addErrback(self._on_request_fail) - self.pending_requests[service.id] = d - - def _on_request_finish(self, service): - self.pending_requests[service.id] = None - # if there are queued events, then send them. - if (service.id in self.queued_events - and len(self.queued_events[service.id]) > 0): - self._send_request(service, self.queued_events[service.id]) - self.queued_events[service.id] = [] - - def _on_request_fail(self, err): - logger.error("AS request failed: %s", err) + self.queued_events.setdefault(service.id, []).append(event) + preserve_fn(self._send_request)(service) + + @defer.inlineCallbacks + def _send_request(self, service): + if service.id in self.requests_in_flight: + return + + self.requests_in_flight.add(service.id) + try: + while True: + events = self.queued_events.pop(service.id, []) + if not events: + return + + with Measure(self.clock, "servicequeuer.send"): + try: + yield self.txn_ctrl.send(service, events) + except: + logger.exception("AS request failed") + finally: + self.requests_in_flight.discard(service.id) class _TransactionController(object): @@ -155,8 +156,6 @@ class _TransactionController(object): except Exception as e: logger.exception(e) self._start_recoverer(service) - # request has finished - defer.returnValue(service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 051ccdb380..48feae07b5 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService +from synapse.util.metrics import Measure import logging @@ -42,6 +43,7 @@ class ApplicationServicesHandler(object): self.appservice_api = hs.get_application_service_api() self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False + self.clock = hs.get_clock() @defer.inlineCallbacks def notify_interested_services(self, event): @@ -53,25 +55,26 @@ class ApplicationServicesHandler(object): Args: event(Event): The event to push out to interested services. """ - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - return # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as these - # user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - self.scheduler.submit_event_for_as(service, event) + with Measure(self.clock, "notify_interested_services"): + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + return # no services need notifying + + # Do we know this user exists? If not, poke the user query API for + # all services which match that user regex. This needs to block as these + # user queries need to be made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + self.scheduler.submit_event_for_as(service, event) @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/notifier.py b/synapse/notifier.py index 30883a0696..e4a25f2411 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -67,10 +67,8 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user_id, rooms, current_token, time_now_ms, - appservice=None): + def __init__(self, user_id, rooms, current_token, time_now_ms): self.user_id = user_id - self.appservice = appservice self.rooms = set(rooms) self.current_token = current_token self.last_notified_ms = time_now_ms @@ -107,11 +105,6 @@ class _NotifierUserStream(object): notifier.user_to_user_stream.pop(self.user_id) - if self.appservice: - notifier.appservice_to_user_streams.get( - self.appservice, set() - ).discard(self) - def count_listeners(self): return len(self.notify_deferred.observers()) @@ -142,7 +135,6 @@ class Notifier(object): def __init__(self, hs): self.user_to_user_stream = {} self.room_to_user_streams = {} - self.appservice_to_user_streams = {} self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() @@ -168,8 +160,6 @@ class Notifier(object): all_user_streams |= x for x in self.user_to_user_stream.values(): all_user_streams.add(x) - for x in self.appservice_to_user_streams.values(): - all_user_streams |= x return sum(stream.count_listeners() for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) @@ -182,10 +172,6 @@ class Notifier(object): "users", lambda: len(self.user_to_user_stream), ) - metrics.register_callback( - "appservices", - lambda: count(bool, self.appservice_to_user_streams.values()), - ) def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): @@ -230,20 +216,6 @@ class Notifier(object): # poke any interested application service. self.appservice_handler.notify_interested_services(event) - app_streams = set() - - for appservice in self.appservice_to_user_streams: - # TODO (kegan): Redundant appservice listener checks? - # App services will already be in the room_to_user_streams set, but - # that isn't enough. They need to be checked here in order to - # receive *invites* for users they are interested in. Does this - # make the room_to_user_streams check somewhat obselete? - if appservice.is_interested(event): - app_user_streams = self.appservice_to_user_streams.get( - appservice, set() - ) - app_streams |= app_user_streams - if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) @@ -251,11 +223,9 @@ class Notifier(object): "room_key", room_stream_id, users=extra_users, rooms=[event.room_id], - extra_streams=app_streams, ) - def on_new_event(self, stream_key, new_token, users=[], rooms=[], - extra_streams=set()): + def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. @@ -294,7 +264,6 @@ class Notifier(object): """ user_stream = self.user_to_user_stream.get(user_id) if user_stream is None: - appservice = yield self.store.get_app_service_by_user_id(user_id) current_token = yield self.event_sources.get_current_token() if room_ids is None: rooms = yield self.store.get_rooms_for_user(user_id) @@ -302,7 +271,6 @@ class Notifier(object): user_stream = _NotifierUserStream( user_id=user_id, rooms=room_ids, - appservice=appservice, current_token=current_token, time_now_ms=self.clock.time_msec(), ) @@ -477,11 +445,6 @@ class Notifier(object): s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - if user_stream.appservice: - self.appservice_to_user_stream.setdefault( - user_stream.appservice, set() - ).add(user_stream) - def _user_joined_room(self, user_id, room_id): new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 4e4100bdec..33f35fb44e 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -449,7 +449,7 @@ def _iterate_over_text(tree, *tags_to_ignore): el = elements.next() if isinstance(el, basestring): yield el - elif el.tag not in tags_to_ignore: + elif el is not None and el.tag not in tags_to_ignore: # el.text is the text before the first child, so we can immediately # return it if the text exists. if el.text: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0a2e78fd81..029f6612e6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -894,6 +894,9 @@ class SQLBaseStore(object): ) def get_all_updated_caches(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + def get_all_updated_caches_txn(txn): # We purposefully don't bound by the current token, as we want to # send across cache invalidations as quickly as possible. Cache diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 631a229332..e5a902f734 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -193,7 +193,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): def setUp(self): self.txn_ctrl = Mock() - self.queuer = _ServiceQueuer(self.txn_ctrl) + self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock()) def test_send_single_event_no_queue(self): # Expect the event to be sent immediately. diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a884c95f8d..3116951472 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -15,6 +15,7 @@ from twisted.internet import defer from .. import unittest +from tests.utils import MockClock from synapse.handlers.appservice import ApplicationServicesHandler @@ -32,6 +33,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): hs.get_datastore = Mock(return_value=self.mock_store) hs.get_application_service_api = Mock(return_value=self.mock_as_api) hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler) + hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) @defer.inlineCallbacks |