diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 9afc8fd754..6450a12890 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,9 +48,12 @@ UP & quit +---------- YES SUCCESS
This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
+from twisted.internet import defer
from synapse.appservice import ApplicationServiceState
-from twisted.internet import defer
+from synapse.util.logcontext import preserve_fn
+from synapse.util.metrics import Measure
+
import logging
logger = logging.getLogger(__name__)
@@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object):
self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
- self.queuer = _ServiceQueuer(self.txn_ctrl)
+ self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks
def start(self):
@@ -94,38 +97,36 @@ class _ServiceQueuer(object):
this schedules any other events in the queue to run.
"""
- def __init__(self, txn_ctrl):
+ def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
- self.pending_requests = {} # dict of {service_id: Deferred}
+ self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
+ self.clock = clock
def enqueue(self, service, event):
# if this service isn't being sent something
- if not self.pending_requests.get(service.id):
- self._send_request(service, [event])
- else:
- # add to queue for this service
- if service.id not in self.queued_events:
- self.queued_events[service.id] = []
- self.queued_events[service.id].append(event)
-
- def _send_request(self, service, events):
- # send request and add callbacks
- d = self.txn_ctrl.send(service, events)
- d.addBoth(self._on_request_finish)
- d.addErrback(self._on_request_fail)
- self.pending_requests[service.id] = d
-
- def _on_request_finish(self, service):
- self.pending_requests[service.id] = None
- # if there are queued events, then send them.
- if (service.id in self.queued_events
- and len(self.queued_events[service.id]) > 0):
- self._send_request(service, self.queued_events[service.id])
- self.queued_events[service.id] = []
-
- def _on_request_fail(self, err):
- logger.error("AS request failed: %s", err)
+ self.queued_events.setdefault(service.id, []).append(event)
+ preserve_fn(self._send_request)(service)
+
+ @defer.inlineCallbacks
+ def _send_request(self, service):
+ if service.id in self.requests_in_flight:
+ return
+
+ self.requests_in_flight.add(service.id)
+ try:
+ while True:
+ events = self.queued_events.pop(service.id, [])
+ if not events:
+ return
+
+ with Measure(self.clock, "servicequeuer.send"):
+ try:
+ yield self.txn_ctrl.send(service, events)
+ except:
+ logger.exception("AS request failed")
+ finally:
+ self.requests_in_flight.discard(service.id)
class _TransactionController(object):
@@ -155,8 +156,6 @@ class _TransactionController(object):
except Exception as e:
logger.exception(e)
self._start_recoverer(service)
- # request has finished
- defer.returnValue(service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 051ccdb380..48feae07b5 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
+from synapse.util.metrics import Measure
import logging
@@ -42,6 +43,7 @@ class ApplicationServicesHandler(object):
self.appservice_api = hs.get_application_service_api()
self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
+ self.clock = hs.get_clock()
@defer.inlineCallbacks
def notify_interested_services(self, event):
@@ -53,25 +55,26 @@ class ApplicationServicesHandler(object):
Args:
event(Event): The event to push out to interested services.
"""
- # Gather interested services
- services = yield self._get_services_for_event(event)
- if len(services) == 0:
- return # no services need notifying
-
- # Do we know this user exists? If not, poke the user query API for
- # all services which match that user regex. This needs to block as these
- # user queries need to be made BEFORE pushing the event.
- yield self._check_user_exists(event.sender)
- if event.type == EventTypes.Member:
- yield self._check_user_exists(event.state_key)
-
- if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
- self.started_scheduler = True
-
- # Fork off pushes to these services
- for service in services:
- self.scheduler.submit_event_for_as(service, event)
+ with Measure(self.clock, "notify_interested_services"):
+ # Gather interested services
+ services = yield self._get_services_for_event(event)
+ if len(services) == 0:
+ return # no services need notifying
+
+ # Do we know this user exists? If not, poke the user query API for
+ # all services which match that user regex. This needs to block as these
+ # user queries need to be made BEFORE pushing the event.
+ yield self._check_user_exists(event.sender)
+ if event.type == EventTypes.Member:
+ yield self._check_user_exists(event.state_key)
+
+ if not self.started_scheduler:
+ self.scheduler.start().addErrback(log_failure)
+ self.started_scheduler = True
+
+ # Fork off pushes to these services
+ for service in services:
+ self.scheduler.submit_event_for_as(service, event)
@defer.inlineCallbacks
def query_user_exists(self, user_id):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 30883a0696..e4a25f2411 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -67,10 +67,8 @@ class _NotifierUserStream(object):
so that it can remove itself from the indexes in the Notifier class.
"""
- def __init__(self, user_id, rooms, current_token, time_now_ms,
- appservice=None):
+ def __init__(self, user_id, rooms, current_token, time_now_ms):
self.user_id = user_id
- self.appservice = appservice
self.rooms = set(rooms)
self.current_token = current_token
self.last_notified_ms = time_now_ms
@@ -107,11 +105,6 @@ class _NotifierUserStream(object):
notifier.user_to_user_stream.pop(self.user_id)
- if self.appservice:
- notifier.appservice_to_user_streams.get(
- self.appservice, set()
- ).discard(self)
-
def count_listeners(self):
return len(self.notify_deferred.observers())
@@ -142,7 +135,6 @@ class Notifier(object):
def __init__(self, hs):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
- self.appservice_to_user_streams = {}
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
@@ -168,8 +160,6 @@ class Notifier(object):
all_user_streams |= x
for x in self.user_to_user_stream.values():
all_user_streams.add(x)
- for x in self.appservice_to_user_streams.values():
- all_user_streams |= x
return sum(stream.count_listeners() for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners)
@@ -182,10 +172,6 @@ class Notifier(object):
"users",
lambda: len(self.user_to_user_stream),
)
- metrics.register_callback(
- "appservices",
- lambda: count(bool, self.appservice_to_user_streams.values()),
- )
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]):
@@ -230,20 +216,6 @@ class Notifier(object):
# poke any interested application service.
self.appservice_handler.notify_interested_services(event)
- app_streams = set()
-
- for appservice in self.appservice_to_user_streams:
- # TODO (kegan): Redundant appservice listener checks?
- # App services will already be in the room_to_user_streams set, but
- # that isn't enough. They need to be checked here in order to
- # receive *invites* for users they are interested in. Does this
- # make the room_to_user_streams check somewhat obselete?
- if appservice.is_interested(event):
- app_user_streams = self.appservice_to_user_streams.get(
- appservice, set()
- )
- app_streams |= app_user_streams
-
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id)
@@ -251,11 +223,9 @@ class Notifier(object):
"room_key", room_stream_id,
users=extra_users,
rooms=[event.room_id],
- extra_streams=app_streams,
)
- def on_new_event(self, stream_key, new_token, users=[], rooms=[],
- extra_streams=set()):
+ def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise.
Will wake up all listeners for the given users and rooms.
@@ -294,7 +264,6 @@ class Notifier(object):
"""
user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None:
- appservice = yield self.store.get_app_service_by_user_id(user_id)
current_token = yield self.event_sources.get_current_token()
if room_ids is None:
rooms = yield self.store.get_rooms_for_user(user_id)
@@ -302,7 +271,6 @@ class Notifier(object):
user_stream = _NotifierUserStream(
user_id=user_id,
rooms=room_ids,
- appservice=appservice,
current_token=current_token,
time_now_ms=self.clock.time_msec(),
)
@@ -477,11 +445,6 @@ class Notifier(object):
s = self.room_to_user_streams.setdefault(room, set())
s.add(user_stream)
- if user_stream.appservice:
- self.appservice_to_user_stream.setdefault(
- user_stream.appservice, set()
- ).add(user_stream)
-
def _user_joined_room(self, user_id, room_id):
new_user_stream = self.user_to_user_stream.get(user_id)
if new_user_stream is not None:
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 4e4100bdec..33f35fb44e 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -449,7 +449,7 @@ def _iterate_over_text(tree, *tags_to_ignore):
el = elements.next()
if isinstance(el, basestring):
yield el
- elif el.tag not in tags_to_ignore:
+ elif el is not None and el.tag not in tags_to_ignore:
# el.text is the text before the first child, so we can immediately
# return it if the text exists.
if el.text:
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0a2e78fd81..029f6612e6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -894,6 +894,9 @@ class SQLBaseStore(object):
)
def get_all_updated_caches(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed([])
+
def get_all_updated_caches_txn(txn):
# We purposefully don't bound by the current token, as we want to
# send across cache invalidations as quickly as possible. Cache
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 631a229332..e5a902f734 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -193,7 +193,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
def setUp(self):
self.txn_ctrl = Mock()
- self.queuer = _ServiceQueuer(self.txn_ctrl)
+ self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock())
def test_send_single_event_no_queue(self):
# Expect the event to be sent immediately.
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index a884c95f8d..3116951472 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -15,6 +15,7 @@
from twisted.internet import defer
from .. import unittest
+from tests.utils import MockClock
from synapse.handlers.appservice import ApplicationServicesHandler
@@ -32,6 +33,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
hs.get_datastore = Mock(return_value=self.mock_store)
hs.get_application_service_api = Mock(return_value=self.mock_as_api)
hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
+ hs.get_clock.return_value = MockClock()
self.handler = ApplicationServicesHandler(hs)
@defer.inlineCallbacks
|