summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/appservice/scheduler.py61
-rw-r--r--synapse/handlers/appservice.py41
-rw-r--r--synapse/notifier.py41
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py2
-rw-r--r--synapse/storage/_base.py3
-rw-r--r--tests/appservice/test_scheduler.py2
-rw-r--r--tests/handlers/test_appservice.py2
7 files changed, 61 insertions, 91 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 9afc8fd754..6450a12890 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,9 +48,12 @@ UP & quit           +---------- YES                       SUCCESS
 This is all tied together by the AppServiceScheduler which DIs the required
 components.
 """
+from twisted.internet import defer
 
 from synapse.appservice import ApplicationServiceState
-from twisted.internet import defer
+from synapse.util.logcontext import preserve_fn
+from synapse.util.metrics import Measure
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object):
         self.txn_ctrl = _TransactionController(
             self.clock, self.store, self.as_api, create_recoverer
         )
-        self.queuer = _ServiceQueuer(self.txn_ctrl)
+        self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
 
     @defer.inlineCallbacks
     def start(self):
@@ -94,38 +97,36 @@ class _ServiceQueuer(object):
     this schedules any other events in the queue to run.
     """
 
-    def __init__(self, txn_ctrl):
+    def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
-        self.pending_requests = {}  # dict of {service_id: Deferred}
+        self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
+        self.clock = clock
 
     def enqueue(self, service, event):
         # if this service isn't being sent something
-        if not self.pending_requests.get(service.id):
-            self._send_request(service, [event])
-        else:
-            # add to queue for this service
-            if service.id not in self.queued_events:
-                self.queued_events[service.id] = []
-            self.queued_events[service.id].append(event)
-
-    def _send_request(self, service, events):
-        # send request and add callbacks
-        d = self.txn_ctrl.send(service, events)
-        d.addBoth(self._on_request_finish)
-        d.addErrback(self._on_request_fail)
-        self.pending_requests[service.id] = d
-
-    def _on_request_finish(self, service):
-        self.pending_requests[service.id] = None
-        # if there are queued events, then send them.
-        if (service.id in self.queued_events
-                and len(self.queued_events[service.id]) > 0):
-            self._send_request(service, self.queued_events[service.id])
-            self.queued_events[service.id] = []
-
-    def _on_request_fail(self, err):
-        logger.error("AS request failed: %s", err)
+        self.queued_events.setdefault(service.id, []).append(event)
+        preserve_fn(self._send_request)(service)
+
+    @defer.inlineCallbacks
+    def _send_request(self, service):
+        if service.id in self.requests_in_flight:
+            return
+
+        self.requests_in_flight.add(service.id)
+        try:
+            while True:
+                events = self.queued_events.pop(service.id, [])
+                if not events:
+                    return
+
+                with Measure(self.clock, "servicequeuer.send"):
+                    try:
+                        yield self.txn_ctrl.send(service, events)
+                    except:
+                        logger.exception("AS request failed")
+        finally:
+            self.requests_in_flight.discard(service.id)
 
 
 class _TransactionController(object):
@@ -155,8 +156,6 @@ class _TransactionController(object):
         except Exception as e:
             logger.exception(e)
             self._start_recoverer(service)
-        # request has finished
-        defer.returnValue(service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 051ccdb380..48feae07b5 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.appservice import ApplicationService
+from synapse.util.metrics import Measure
 
 import logging
 
@@ -42,6 +43,7 @@ class ApplicationServicesHandler(object):
         self.appservice_api = hs.get_application_service_api()
         self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
+        self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
     def notify_interested_services(self, event):
@@ -53,25 +55,26 @@ class ApplicationServicesHandler(object):
         Args:
             event(Event): The event to push out to interested services.
         """
-        # Gather interested services
-        services = yield self._get_services_for_event(event)
-        if len(services) == 0:
-            return  # no services need notifying
-
-        # Do we know this user exists? If not, poke the user query API for
-        # all services which match that user regex. This needs to block as these
-        # user queries need to be made BEFORE pushing the event.
-        yield self._check_user_exists(event.sender)
-        if event.type == EventTypes.Member:
-            yield self._check_user_exists(event.state_key)
-
-        if not self.started_scheduler:
-            self.scheduler.start().addErrback(log_failure)
-            self.started_scheduler = True
-
-        # Fork off pushes to these services
-        for service in services:
-            self.scheduler.submit_event_for_as(service, event)
+        with Measure(self.clock, "notify_interested_services"):
+            # Gather interested services
+            services = yield self._get_services_for_event(event)
+            if len(services) == 0:
+                return  # no services need notifying
+
+            # Do we know this user exists? If not, poke the user query API for
+            # all services which match that user regex. This needs to block as these
+            # user queries need to be made BEFORE pushing the event.
+            yield self._check_user_exists(event.sender)
+            if event.type == EventTypes.Member:
+                yield self._check_user_exists(event.state_key)
+
+            if not self.started_scheduler:
+                self.scheduler.start().addErrback(log_failure)
+                self.started_scheduler = True
+
+            # Fork off pushes to these services
+            for service in services:
+                self.scheduler.submit_event_for_as(service, event)
 
     @defer.inlineCallbacks
     def query_user_exists(self, user_id):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 30883a0696..e4a25f2411 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -67,10 +67,8 @@ class _NotifierUserStream(object):
     so that it can remove itself from the indexes in the Notifier class.
     """
 
-    def __init__(self, user_id, rooms, current_token, time_now_ms,
-                 appservice=None):
+    def __init__(self, user_id, rooms, current_token, time_now_ms):
         self.user_id = user_id
-        self.appservice = appservice
         self.rooms = set(rooms)
         self.current_token = current_token
         self.last_notified_ms = time_now_ms
@@ -107,11 +105,6 @@ class _NotifierUserStream(object):
 
         notifier.user_to_user_stream.pop(self.user_id)
 
-        if self.appservice:
-            notifier.appservice_to_user_streams.get(
-                self.appservice, set()
-            ).discard(self)
-
     def count_listeners(self):
         return len(self.notify_deferred.observers())
 
@@ -142,7 +135,6 @@ class Notifier(object):
     def __init__(self, hs):
         self.user_to_user_stream = {}
         self.room_to_user_streams = {}
-        self.appservice_to_user_streams = {}
 
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
@@ -168,8 +160,6 @@ class Notifier(object):
                 all_user_streams |= x
             for x in self.user_to_user_stream.values():
                 all_user_streams.add(x)
-            for x in self.appservice_to_user_streams.values():
-                all_user_streams |= x
 
             return sum(stream.count_listeners() for stream in all_user_streams)
         metrics.register_callback("listeners", count_listeners)
@@ -182,10 +172,6 @@ class Notifier(object):
             "users",
             lambda: len(self.user_to_user_stream),
         )
-        metrics.register_callback(
-            "appservices",
-            lambda: count(bool, self.appservice_to_user_streams.values()),
-        )
 
     def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
                           extra_users=[]):
@@ -230,20 +216,6 @@ class Notifier(object):
         # poke any interested application service.
         self.appservice_handler.notify_interested_services(event)
 
-        app_streams = set()
-
-        for appservice in self.appservice_to_user_streams:
-            # TODO (kegan): Redundant appservice listener checks?
-            # App services will already be in the room_to_user_streams set, but
-            # that isn't enough. They need to be checked here in order to
-            # receive *invites* for users they are interested in. Does this
-            # make the room_to_user_streams check somewhat obselete?
-            if appservice.is_interested(event):
-                app_user_streams = self.appservice_to_user_streams.get(
-                    appservice, set()
-                )
-                app_streams |= app_user_streams
-
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
             self._user_joined_room(event.state_key, event.room_id)
 
@@ -251,11 +223,9 @@ class Notifier(object):
             "room_key", room_stream_id,
             users=extra_users,
             rooms=[event.room_id],
-            extra_streams=app_streams,
         )
 
-    def on_new_event(self, stream_key, new_token, users=[], rooms=[],
-                     extra_streams=set()):
+    def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
         """ Used to inform listeners that something has happend event wise.
 
         Will wake up all listeners for the given users and rooms.
@@ -294,7 +264,6 @@ class Notifier(object):
         """
         user_stream = self.user_to_user_stream.get(user_id)
         if user_stream is None:
-            appservice = yield self.store.get_app_service_by_user_id(user_id)
             current_token = yield self.event_sources.get_current_token()
             if room_ids is None:
                 rooms = yield self.store.get_rooms_for_user(user_id)
@@ -302,7 +271,6 @@ class Notifier(object):
             user_stream = _NotifierUserStream(
                 user_id=user_id,
                 rooms=room_ids,
-                appservice=appservice,
                 current_token=current_token,
                 time_now_ms=self.clock.time_msec(),
             )
@@ -477,11 +445,6 @@ class Notifier(object):
             s = self.room_to_user_streams.setdefault(room, set())
             s.add(user_stream)
 
-        if user_stream.appservice:
-            self.appservice_to_user_stream.setdefault(
-                user_stream.appservice, set()
-            ).add(user_stream)
-
     def _user_joined_room(self, user_id, room_id):
         new_user_stream = self.user_to_user_stream.get(user_id)
         if new_user_stream is not None:
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 4e4100bdec..33f35fb44e 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -449,7 +449,7 @@ def _iterate_over_text(tree, *tags_to_ignore):
         el = elements.next()
         if isinstance(el, basestring):
             yield el
-        elif el.tag not in tags_to_ignore:
+        elif el is not None and el.tag not in tags_to_ignore:
             # el.text is the text before the first child, so we can immediately
             # return it if the text exists.
             if el.text:
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0a2e78fd81..029f6612e6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -894,6 +894,9 @@ class SQLBaseStore(object):
             )
 
     def get_all_updated_caches(self, last_id, current_id, limit):
+        if last_id == current_id:
+            return defer.succeed([])
+
         def get_all_updated_caches_txn(txn):
             # We purposefully don't bound by the current token, as we want to
             # send across cache invalidations as quickly as possible. Cache
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 631a229332..e5a902f734 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -193,7 +193,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
 
     def setUp(self):
         self.txn_ctrl = Mock()
-        self.queuer = _ServiceQueuer(self.txn_ctrl)
+        self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock())
 
     def test_send_single_event_no_queue(self):
         # Expect the event to be sent immediately.
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index a884c95f8d..3116951472 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -15,6 +15,7 @@
 
 from twisted.internet import defer
 from .. import unittest
+from tests.utils import MockClock
 
 from synapse.handlers.appservice import ApplicationServicesHandler
 
@@ -32,6 +33,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         hs.get_datastore = Mock(return_value=self.mock_store)
         hs.get_application_service_api = Mock(return_value=self.mock_as_api)
         hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
+        hs.get_clock.return_value = MockClock()
         self.handler = ApplicationServicesHandler(hs)
 
     @defer.inlineCallbacks