From 7321f45457daec80439423e6f4f44184a345c2b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Aug 2016 12:03:04 +0100 Subject: Clean up _ServiceQueuer --- synapse/appservice/scheduler.py | 61 ++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 31 deletions(-) (limited to 'synapse/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 9afc8fd754..f130d4367d 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,9 +48,12 @@ UP & quit +---------- YES SUCCESS This is all tied together by the AppServiceScheduler which DIs the required components. """ +from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from twisted.internet import defer +from synapse.util.logcontext import preserve_fn +from synapse.util.metrics import Measure + import logging logger = logging.getLogger(__name__) @@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object): self.txn_ctrl = _TransactionController( self.clock, self.store, self.as_api, create_recoverer ) - self.queuer = _ServiceQueuer(self.txn_ctrl) + self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) @defer.inlineCallbacks def start(self): @@ -94,38 +97,36 @@ class _ServiceQueuer(object): this schedules any other events in the queue to run. """ - def __init__(self, txn_ctrl): + def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} - self.pending_requests = {} # dict of {service_id: Deferred} + self.requests_in_flight = set() self.txn_ctrl = txn_ctrl + self.clock = clock def enqueue(self, service, event): # if this service isn't being sent something - if not self.pending_requests.get(service.id): - self._send_request(service, [event]) - else: - # add to queue for this service - if service.id not in self.queued_events: - self.queued_events[service.id] = [] - self.queued_events[service.id].append(event) - - def _send_request(self, service, events): - # send request and add callbacks - d = self.txn_ctrl.send(service, events) - d.addBoth(self._on_request_finish) - d.addErrback(self._on_request_fail) - self.pending_requests[service.id] = d - - def _on_request_finish(self, service): - self.pending_requests[service.id] = None - # if there are queued events, then send them. - if (service.id in self.queued_events - and len(self.queued_events[service.id]) > 0): - self._send_request(service, self.queued_events[service.id]) - self.queued_events[service.id] = [] - - def _on_request_fail(self, err): - logger.error("AS request failed: %s", err) + self.queued_events.setdefault(service.id, []).append(event) + preserve_fn(self._send_request)(service) + + @defer.inlineCallbacks + def _send_request(self, service): + if service.id in self.requests_in_flight: + return + + with Measure(self.clock, "_ServiceQueuer._send_request"): + self.requests_in_flight.add(service.id) + try: + while True: + events = self.queued_events.pop(service.id, []) + if not events: + return + + try: + yield self.txn_ctrl.send(service, events) + except: + logger.exception("AS request failed") + finally: + self.requests_in_flight.discard(service.id) class _TransactionController(object): @@ -155,8 +156,6 @@ class _TransactionController(object): except Exception as e: logger.exception(e) self._start_recoverer(service) - # request has finished - defer.returnValue(service) @defer.inlineCallbacks def on_recovered(self, recoverer): -- cgit 1.5.1 From b9e888858ce6b0c3e4ec75c7b410b884a61fd4cf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Aug 2016 14:52:26 +0100 Subject: Move Measure block inside loop --- synapse/appservice/scheduler.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index f130d4367d..d46d2fead2 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -113,20 +113,20 @@ class _ServiceQueuer(object): if service.id in self.requests_in_flight: return - with Measure(self.clock, "_ServiceQueuer._send_request"): - self.requests_in_flight.add(service.id) - try: - while True: - events = self.queued_events.pop(service.id, []) - if not events: - return + self.requests_in_flight.add(service.id) + try: + while True: + events = self.queued_events.pop(service.id, []) + if not events: + return + with Measure(self.clock, "_ServiceQueuer._send_request"): try: yield self.txn_ctrl.send(service, events) except: logger.exception("AS request failed") - finally: - self.requests_in_flight.discard(service.id) + finally: + self.requests_in_flight.discard(service.id) class _TransactionController(object): -- cgit 1.5.1 From f7434713802aec24fde41e83f4c69fc4b8cdeb4f Mon Sep 17 00:00:00 2001 From: Matrix Date: Wed, 17 Aug 2016 15:05:50 +0100 Subject: Change name of metric --- synapse/appservice/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index d46d2fead2..6450a12890 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -120,7 +120,7 @@ class _ServiceQueuer(object): if not events: return - with Measure(self.clock, "_ServiceQueuer._send_request"): + with Measure(self.clock, "servicequeuer.send"): try: yield self.txn_ctrl.send(service, events) except: -- cgit 1.5.1 From 320dfe523c9496822f597f870dfae9f8ec77d54d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Aug 2016 17:20:50 +0100 Subject: Make notify_interested_services faster --- synapse/appservice/__init__.py | 81 ++++++++++++++------------- synapse/handlers/appservice.py | 31 +++------- tests/appservice/test_appservice.py | 109 +++++++++++++++--------------------- tests/handlers/test_appservice.py | 13 ++++- 4 files changed, 104 insertions(+), 130 deletions(-) (limited to 'synapse/appservice') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index f7178ea0d3..b1b91d0a55 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -14,6 +14,8 @@ # limitations under the License. from synapse.api.constants import EventTypes +from twisted.internet import defer + import logging import re @@ -138,65 +140,66 @@ class ApplicationService(object): return regex_obj["exclusive"] return False - def _matches_user(self, event, member_list): - if (hasattr(event, "sender") and - self.is_interested_in_user(event.sender)): - return True + @defer.inlineCallbacks + def _matches_user(self, event, store): + if not event: + defer.returnValue(False) + + if self.is_interested_in_user(event.sender): + defer.returnValue(True) # also check m.room.member state key - if (hasattr(event, "type") and event.type == EventTypes.Member - and hasattr(event, "state_key") - and self.is_interested_in_user(event.state_key)): - return True + if (event.type == EventTypes.Member and + self.is_interested_in_user(event.state_key)): + defer.returnValue(True) + + if not store: + defer.returnValue(False) + + member_list = yield store.get_users_in_room(event.room_id) + # check joined member events for user_id in member_list: if self.is_interested_in_user(user_id): - return True - return False + defer.returnValue(True) + defer.returnValue(False) def _matches_room_id(self, event): if hasattr(event, "room_id"): return self.is_interested_in_room(event.room_id) return False - def _matches_aliases(self, event, alias_list): + @defer.inlineCallbacks + def _matches_aliases(self, event, store): + if not store or not event: + defer.returnValue(False) + + alias_list = yield store.get_aliases_for_room(event.room_id) for alias in alias_list: if self.is_interested_in_alias(alias): - return True - return False + defer.returnValue(True) + defer.returnValue(False) - def is_interested(self, event, restrict_to=None, aliases_for_event=None, - member_list=None): + @defer.inlineCallbacks + def is_interested(self, event, store=None): """Check if this service is interested in this event. Args: event(Event): The event to check. - restrict_to(str): The namespace to restrict regex tests to. - aliases_for_event(list): A list of all the known room aliases for - this event. - member_list(list): A list of all joined user_ids in this room. + store(DataStore) Returns: bool: True if this service would like to know about this event. """ - if aliases_for_event is None: - aliases_for_event = [] - if member_list is None: - member_list = [] - - if restrict_to and restrict_to not in ApplicationService.NS_LIST: - # this is a programming error, so fail early and raise a general - # exception - raise Exception("Unexpected restrict_to value: %s". restrict_to) - - if not restrict_to: - return (self._matches_user(event, member_list) - or self._matches_aliases(event, aliases_for_event) - or self._matches_room_id(event)) - elif restrict_to == ApplicationService.NS_ALIASES: - return self._matches_aliases(event, aliases_for_event) - elif restrict_to == ApplicationService.NS_ROOMS: - return self._matches_room_id(event) - elif restrict_to == ApplicationService.NS_USERS: - return self._matches_user(event, member_list) + # Do cheap checks first + if self._matches_room_id(event): + defer.returnValue(True) + + if (yield self._matches_aliases(event, store)): + defer.returnValue(True) + + if (yield self._matches_user(event, store)): + defer.returnValue(True) + + defer.returnValue(False) def is_interested_in_user(self, user_id): return ( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 48feae07b5..79805cdc2e 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -16,7 +16,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.appservice import ApplicationService from synapse.util.metrics import Measure import logging @@ -107,11 +106,12 @@ class ApplicationServicesHandler(object): association can be found. """ room_alias_str = room_alias.to_string() - alias_query_services = yield self._get_services_for_event( - event=None, - restrict_to=ApplicationService.NS_ALIASES, - alias_list=[room_alias_str] - ) + services = yield self.store.get_app_services() + alias_query_services = [ + s for s in services if ( + s.is_interested_in_alias(room_alias_str) + ) + ] for alias_service in alias_query_services: is_known_alias = yield self.appservice_api.query_alias( alias_service, room_alias_str @@ -124,34 +124,19 @@ class ApplicationServicesHandler(object): defer.returnValue(result) @defer.inlineCallbacks - def _get_services_for_event(self, event, restrict_to="", alias_list=None): + def _get_services_for_event(self, event): """Retrieve a list of application services interested in this event. Args: event(Event): The event to check. Can be None if alias_list is not. - restrict_to(str): The namespace to restrict regex tests to. - alias_list: A list of aliases to get services for. If None, this - list is obtained from the database. Returns: list: A list of services interested in this event based on the service regex. """ - member_list = None - if hasattr(event, "room_id"): - # We need to know the aliases associated with this event.room_id, - # if any. - if not alias_list: - alias_list = yield self.store.get_aliases_for_room( - event.room_id - ) - # We need to know the members associated with this event.room_id, - # if any. - member_list = yield self.store.get_users_in_room(event.room_id) - services = yield self.store.get_app_services() interested_list = [ s for s in services if ( - s.is_interested(event, restrict_to, alias_list, member_list) + yield s.is_interested(event, self.store) ) ] defer.returnValue(interested_list) diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index d6cc1881e9..aa8cc50550 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -14,6 +14,8 @@ # limitations under the License. from synapse.appservice import ApplicationService +from twisted.internet import defer + from mock import Mock from tests import unittest @@ -42,20 +44,25 @@ class ApplicationServiceTestCase(unittest.TestCase): type="m.something", room_id="!foo:bar", sender="@someone:somewhere" ) + self.store = Mock() + + @defer.inlineCallbacks def test_regex_user_id_prefix_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") ) self.event.sender = "@irc_foobar:matrix.org" - self.assertTrue(self.service.is_interested(self.event)) + self.assertTrue((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_user_id_prefix_no_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") ) self.event.sender = "@someone_else:matrix.org" - self.assertFalse(self.service.is_interested(self.event)) + self.assertFalse((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_room_member_is_checked(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") @@ -63,30 +70,36 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.sender = "@someone_else:matrix.org" self.event.type = "m.room.member" self.event.state_key = "@irc_foobar:matrix.org" - self.assertTrue(self.service.is_interested(self.event)) + self.assertTrue((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_room_id_match(self): self.service.namespaces[ApplicationService.NS_ROOMS].append( _regex("!some_prefix.*some_suffix:matrix.org") ) self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org" - self.assertTrue(self.service.is_interested(self.event)) + self.assertTrue((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_room_id_no_match(self): self.service.namespaces[ApplicationService.NS_ROOMS].append( _regex("!some_prefix.*some_suffix:matrix.org") ) self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org" - self.assertFalse(self.service.is_interested(self.event)) + self.assertFalse((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_regex_alias_match(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") ) - self.assertTrue(self.service.is_interested( - self.event, - aliases_for_event=["#irc_foobar:matrix.org", "#athing:matrix.org"] - )) + self.store.get_aliases_for_room.return_value = [ + "#irc_foobar:matrix.org", "#athing:matrix.org" + ] + self.store.get_users_in_room.return_value = [] + self.assertTrue((yield self.service.is_interested( + self.event, self.store + ))) def test_non_exclusive_alias(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( @@ -136,15 +149,20 @@ class ApplicationServiceTestCase(unittest.TestCase): "!irc_foobar:matrix.org" )) + @defer.inlineCallbacks def test_regex_alias_no_match(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") ) - self.assertFalse(self.service.is_interested( - self.event, - aliases_for_event=["#xmpp_foobar:matrix.org", "#athing:matrix.org"] - )) + self.store.get_aliases_for_room.return_value = [ + "#xmpp_foobar:matrix.org", "#athing:matrix.org" + ] + self.store.get_users_in_room.return_value = [] + self.assertFalse((yield self.service.is_interested( + self.event, self.store + ))) + @defer.inlineCallbacks def test_regex_multiple_matches(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") @@ -153,53 +171,13 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("@irc_.*") ) self.event.sender = "@irc_foobar:matrix.org" - self.assertTrue(self.service.is_interested( - self.event, - aliases_for_event=["#irc_barfoo:matrix.org"] - )) - - def test_restrict_to_rooms(self): - self.service.namespaces[ApplicationService.NS_ROOMS].append( - _regex("!flibble_.*:matrix.org") - ) - self.service.namespaces[ApplicationService.NS_USERS].append( - _regex("@irc_.*") - ) - self.event.sender = "@irc_foobar:matrix.org" - self.event.room_id = "!wibblewoo:matrix.org" - self.assertFalse(self.service.is_interested( - self.event, - restrict_to=ApplicationService.NS_ROOMS - )) - - def test_restrict_to_aliases(self): - self.service.namespaces[ApplicationService.NS_ALIASES].append( - _regex("#xmpp_.*:matrix.org") - ) - self.service.namespaces[ApplicationService.NS_USERS].append( - _regex("@irc_.*") - ) - self.event.sender = "@irc_foobar:matrix.org" - self.assertFalse(self.service.is_interested( - self.event, - restrict_to=ApplicationService.NS_ALIASES, - aliases_for_event=["#irc_barfoo:matrix.org"] - )) - - def test_restrict_to_senders(self): - self.service.namespaces[ApplicationService.NS_ALIASES].append( - _regex("#xmpp_.*:matrix.org") - ) - self.service.namespaces[ApplicationService.NS_USERS].append( - _regex("@irc_.*") - ) - self.event.sender = "@xmpp_foobar:matrix.org" - self.assertFalse(self.service.is_interested( - self.event, - restrict_to=ApplicationService.NS_USERS, - aliases_for_event=["#xmpp_barfoo:matrix.org"] - )) + self.store.get_aliases_for_room.return_value = ["#irc_barfoo:matrix.org"] + self.store.get_users_in_room.return_value = [] + self.assertTrue((yield self.service.is_interested( + self.event, self.store + ))) + @defer.inlineCallbacks def test_interested_in_self(self): # make sure invites get through self.service.sender = "@appservice:name" @@ -211,20 +189,21 @@ class ApplicationServiceTestCase(unittest.TestCase): "membership": "invite" } self.event.state_key = self.service.sender - self.assertTrue(self.service.is_interested(self.event)) + self.assertTrue((yield self.service.is_interested(self.event))) + @defer.inlineCallbacks def test_member_list_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") ) - join_list = [ + self.store.get_users_in_room.return_value = [ "@alice:here", "@irc_fo:here", # AS user "@bob:here", ] + self.store.get_aliases_for_room.return_value = [] self.event.sender = "@xmpp_foobar:matrix.org" - self.assertTrue(self.service.is_interested( - event=self.event, - member_list=join_list - )) + self.assertTrue((yield self.service.is_interested( + event=self.event, store=self.store + ))) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 3116951472..9c1e5cc67c 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -110,11 +110,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): room_id = "!alpha:bet" servers = ["aperture"] - interested_service = self._mkservice(is_interested=True) + interested_service = self._mkservice_alias(is_interested_in_alias=True) services = [ - self._mkservice(is_interested=False), + self._mkservice_alias(is_interested_in_alias=False), interested_service, - self._mkservice(is_interested=False) + self._mkservice_alias(is_interested_in_alias=False) ] self.mock_store.get_app_services = Mock(return_value=services) @@ -137,3 +137,10 @@ class AppServiceHandlerTestCase(unittest.TestCase): service.token = "mock_service_token" service.url = "mock_service_url" return service + + def _mkservice_alias(self, is_interested_in_alias): + service = Mock() + service.is_interested_in_alias = Mock(return_value=is_interested_in_alias) + service.token = "mock_service_token" + service.url = "mock_service_url" + return service -- cgit 1.5.1