From 01c099d9ef2b3891643845031c917fd0dc41d954 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:16:47 +0000 Subject: Add appservice txns sql schema --- synapse/storage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a3ff995695..dfce5224a9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,7 +57,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 dir_path = os.path.abspath(os.path.dirname(__file__)) -- cgit 1.4.1 From 21fd84dcb8645a555cc35adb8b2a5a68536b8087 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:01:19 +0000 Subject: Use seconds; start gluing in the AS scheduler into the AS handler. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/__init__.py | 8 +++++++- synapse/handlers/appservice.py | 17 ++++++++++++++--- synapse/storage/__init__.py | 7 +++++-- tests/appservice/test_scheduler.py | 10 +++++----- tests/handlers/test_appservice.py | 7 +++++-- 6 files changed, 38 insertions(+), 15 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index ee5978da6e..068d4bd087 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -132,7 +132,7 @@ class _TransactionController(object): txn.complete(self.store) else: self._start_recoverer(service) - self.clock.call_later(1000, self.start_polling) + self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks def on_recovered(self, recoverer): @@ -202,7 +202,7 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry) + self.clock.call_later((2 ** self.backoff_counter), self.retry) @defer.inlineCallbacks def retry(self): diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 8d345bf936..0c51d615ec 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( @@ -54,7 +55,12 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( - hs, ApplicationServiceApi(hs) + hs, asapi, AppServiceScheduler( + clock=hs.get_clock(), + store=hs.get_datastore(), + as_api=asapi + ) ) self.sync_handler = SyncHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2c488a46f6..f3cd458e6b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -26,15 +26,22 @@ import logging logger = logging.getLogger(__name__) +def log_failure(failure): + logger.error("Application Services Failure: %s", failure.value) + logger.error(failure.getTraceback()) + + # NB: Purposefully not inheriting BaseHandler since that contains way too much # setup code which this handler does not need or use. This makes testing a lot # easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api): + def __init__(self, hs, appservice_api, appservice_scheduler): self.store = hs.get_datastore() self.hs = hs self.appservice_api = appservice_api + self.scheduler = appservice_scheduler + self.started_scheduler = False @defer.inlineCallbacks def register(self, app_service): @@ -90,9 +97,13 @@ class ApplicationServicesHandler(object): if event.type == EventTypes.Member: yield self._check_user_exists(event.state_key) - # Fork off pushes to these services - XXX First cut, best effort + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services for service in services: - self.appservice_api.push(service, event) + self.scheduler.submit_event_for_as(service, event) @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index dfce5224a9..6c159b52a0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.api.constants import EventTypes -from .appservice import ApplicationServiceStore +from .appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -79,7 +81,8 @@ class DataStore(RoomMemberStore, RoomStore, RejectionsStore, FilteringStore, PusherStore, - PushRuleStore + PushRuleStore, + ApplicationServiceTransactionStore ): def __init__(self, hs): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 9532bf66b8..e18e879319 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -162,7 +162,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=True) # wait for exp backoff - self.clock.advance_time(2000) + self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns @@ -185,21 +185,21 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.recoverer.recover() self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=False) - self.clock.advance_time(2000) + self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - self.clock.advance_time(4000) + self.clock.advance_time(4) self.assertEquals(2, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - self.clock.advance_time(8000) + self.clock.advance_time(8) self.assertEquals(3, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) txn.send = Mock(return_value=True) # successfully send the txn pop_txn = True # returns the txn the first time, then no more. - self.clock.advance_time(16000) + self.clock.advance_time(16) self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a2c541317c..06cb1dd4cf 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -27,10 +27,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): def setUp(self): self.mock_store = Mock() self.mock_as_api = Mock() + self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) self.handler = ApplicationServicesHandler( - hs, self.mock_as_api + hs, self.mock_as_api, self.mock_scheduler ) @defer.inlineCallbacks @@ -52,7 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() yield self.handler.notify_interested_services(event) - self.mock_as_api.push.assert_called_once_with(interested_service, event) + self.mock_scheduler.submit_event_for_as.assert_called_once_with( + interested_service, event + ) @defer.inlineCallbacks def test_query_room_alias_exists(self): -- cgit 1.4.1 From 835e01fc7047e34a813936544027596627a112df Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 10:16:59 +0000 Subject: Minor PR comment tweaks. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/appservice.py | 10 ++++++++-- synapse/storage/__init__.py | 2 +- synapse/storage/appservice.py | 6 +++--- tests/appservice/test_scheduler.py | 10 +++++----- 5 files changed, 19 insertions(+), 13 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index add1e3879c..8a3a6a880f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -86,7 +86,7 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.on_receive(service, event) + self.event_grouper.enqueue(service, event) class _EventGrouper(object): @@ -96,7 +96,7 @@ class _EventGrouper(object): def __init__(self): self.groups = {} # dict of {service: [events]} - def on_receive(self, service, event): + def enqueue(self, service, event): if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f3cd458e6b..a24f7f5587 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,8 +27,14 @@ logger = logging.getLogger(__name__) def log_failure(failure): - logger.error("Application Services Failure: %s", failure.value) - logger.error(failure.getTraceback()) + logger.error( + "Application Services Failure", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) # NB: Purposefully not inheriting BaseHandler since that contains way too much diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index efef859214..e752b035e6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -82,7 +82,7 @@ class DataStore(RoomMemberStore, RoomStore, FilteringStore, PusherStore, PushRuleStore, - ApplicationServiceTransactionStore + ApplicationServiceTransactionStore, ): def __init__(self, hs): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 670e1d56af..e928812bc9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -365,9 +365,9 @@ class ApplicationServiceTransactionStore(SQLBaseStore): may be empty. """ sql = ( - "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN " - "application_services AS a ON a.id=s.as_id LEFT JOIN " - "application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" + " application_services AS a ON a.id=s.as_id LEFT JOIN" + " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" ) results = yield self._execute_and_decode( "get_appservices_by_state", sql, state diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index e18e879319..4534d05b93 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -213,7 +213,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def test_drain_single_event(self): service = Mock() event = Mock() - self.grouper.on_receive(service, event) + self.grouper.enqueue(service, event) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals([event], groups[service]) @@ -225,7 +225,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): service = Mock() events = [Mock(), Mock(), Mock()] for e in events: - self.grouper.on_receive(service, e) + self.grouper.enqueue(service, e) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals(events, groups[service]) @@ -243,11 +243,11 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): services[2]: events_c } for e in events_b: - self.grouper.on_receive(services[1], e) + self.grouper.enqueue(services[1], e) for e in events_c: - self.grouper.on_receive(services[2], e) + self.grouper.enqueue(services[2], e) for e in events_a: - self.grouper.on_receive(services[0], e) + self.grouper.enqueue(services[0], e) groups = self.grouper.drain_groups() for service in services: -- cgit 1.4.1