From 141ec04d194c57f29756d6ccbda3f396cc3aa9e7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 14:53:35 +0000 Subject: Add stub ApplicationServiceTransactionStore. Bootstrap Recoverers. Fill in stub Transaction functions. --- synapse/storage/appservice.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e30265750a..c1762692b9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -336,3 +336,31 @@ class ApplicationServiceStore(SQLBaseStore): hs_token=service["hs_token"], sender=service["sender"] )) + + +class ApplicationServiceTransactionStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceTransactionStore, self).__init__(hs) + + def get_failing_appservices(self): + """Get a list of application services which are down. + + Returns: + A Deferred which resolves to a list of ApplicationServices, which + may be empty. + """ + pass + + def complete_appservice_txn(self, txn_id, service): + """Completes an application service transaction. + + Args: + txn_id(str): The transaction ID being completed. + service(ApplicationService): The application service which was sent + this transaction. + Returns: + A Deferred which resolves to True if this transaction was completed + successfully. + """ + pass -- cgit 1.5.1 From f260cb72cd3435d540411962a92ca2a9fd333eb1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 15:12:24 +0000 Subject: Flesh out more stub functions. --- synapse/appservice/__init__.py | 5 +++++ synapse/appservice/scheduler.py | 37 +++++++++++++++++++++++++++++-------- synapse/storage/appservice.py | 17 +++++++++++++++-- tests/appservice/test_scheduler.py | 5 +++-- 4 files changed, 52 insertions(+), 12 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index a268a6bcc4..cc6c381566 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -20,6 +20,11 @@ import re logger = logging.getLogger(__name__) +class ApplicationServiceState(object): + DOWN = "down" + UP = "up" + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 645d7bf6b2..99e83747a8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,7 +49,11 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ +from synapse.appservice import ApplicationServiceState from twisted.internet import defer +import logging + +logger = logging.getLogger(__name__) class AppServiceScheduler(object): @@ -162,21 +166,36 @@ class _TransactionController(object): if txn.send(self.as_api): txn.complete(self.store) else: - # TODO mark AS as down self._start_recoverer(service) self.clock.call_later(1000, self.start_polling) - def on_recovered(self, service): - # TODO mark AS as UP - pass + @defer.inlineCallbacks + def on_recovered(self, recoverer): + applied_state = yield self.store.set_appservice_state( + recoverer.service, + ApplicationServiceState.UP + ) + if not applied_state: + logger.error("Failed to apply appservice state UP to service %s", + recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) + @defer.inlineCallbacks def _start_recoverer(self, service): - recoverer = self.recoverer_fn(service, self.on_recovered) - recoverer.recover() + applied_state = yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + if applied_state: + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + else: + logger.error("Failed to apply appservice state DOWN to service %s", + service) def _is_service_up(self, service): pass @@ -193,7 +212,9 @@ class _Recoverer(object): @staticmethod @defer.inlineCallbacks def start(clock, store, as_api, callback): - services = yield store.get_failing_appservices() + services = yield store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) recoverers = [ _Recoverer(clock, store, as_api, s, callback) for s in services ] @@ -228,7 +249,7 @@ class _Recoverer(object): self._set_service_recovered() def _set_service_recovered(self): - self.callback(self.service) + self.callback(self) @defer.inlineCallbacks def _get_oldest_txn(self): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c1762692b9..214f6d99c5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -343,15 +343,28 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceTransactionStore, self).__init__(hs) - def get_failing_appservices(self): - """Get a list of application services which are down. + def get_appservices_by_state(self, state): + """Get a list of application services based on their state. + Args: + state(ApplicationServiceState): The state to filter on. Returns: A Deferred which resolves to a list of ApplicationServices, which may be empty. """ pass + def set_appservice_state(self, service, state): + """Set the application service state. + + Args: + service(ApplicationService): The service whose state to set. + state(ApplicationServiceState): The connectivity state to apply. + Returns: + A Deferred which resolves to True if the state was set successfully. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 1e3eb9e1cc..ec8f77c54b 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -57,7 +57,8 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns self.assertEquals(2, self.store.get_oldest_txn.call_count) - self.assertEquals(1, self.callback.call_count) + self.callback.assert_called_once_with(self.recoverer) + self.assertEquals(self.recoverer.service, self.service) def test_recover_retry_txn(self): txn = Mock() @@ -91,7 +92,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.clock.advance_time(16000) self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.complete.call_count) - self.assertEquals(1, self.callback.call_count) + self.callback.assert_called_once_with(self.recoverer) class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): -- cgit 1.5.1 From 0354659f9d8b60b9edc78b0b597bceb52b8c7b2b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:09:05 +0000 Subject: Finish synapse.appservice.scheduler implementation. With tests to assert behaviour. Not hooked up yet. Stub datastore methods not implemented yet. --- synapse/appservice/__init__.py | 39 +++++++++++++ synapse/appservice/scheduler.py | 63 ++++---------------- synapse/storage/appservice.py | 22 +++++++ tests/appservice/test_scheduler.py | 115 ++++++++++++++++++++++++++++++++++++- 4 files changed, 186 insertions(+), 53 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index cc6c381566..743a8278ad 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -25,6 +25,45 @@ class ApplicationServiceState(object): UP = "up" +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) + + def complete(self, store): + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 2b3aa3b0ea..50ad3b8e83 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -88,45 +88,6 @@ class AppServiceScheduler(object): self.event_grouper.on_receive(service, event) -class AppServiceTransaction(object): - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - def send(self, as_api): - """Sends this transaction using the provided AS API interface. - - Args: - as_api(ApplicationServiceApi): The API to use to send. - Returns: - A Deferred which resolves to True if the transaction was sent. - """ - return as_api.push_bulk( - service=self.service, - events=self.events, - txn_id=self.id - ) - - def complete(self, store): - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - Returns: - A Deferred which resolves to True if the transaction was completed. - """ - return store.complete_appservice_txn( - service=self.service, - txn_id=self.id - ) - - class _EventGrouper(object): """Groups events for the same application service together. """ @@ -156,14 +117,18 @@ class _TransactionController(object): # keep track of how many recoverers there are self.recoverers = [] + @defer.inlineCallbacks def start_polling(self): groups = self.event_grouper.drain_groups() for service in groups: - txn_id = self._get_next_txn_id(service) - txn = AppServiceTransaction(service, txn_id, groups[service]) - self._store_txn(txn) - if self._is_service_up(service): - if txn.send(self.as_api): + txn = yield self.store.create_appservice_txn( + service=service, + events=groups[service] + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: txn.complete(self.store) else: self._start_recoverer(service) @@ -207,14 +172,10 @@ class _TransactionController(object): logger.error("Failed to apply appservice state DOWN to service %s", service) + @defer.inlineCallbacks def _is_service_up(self, service): - pass - - def _get_next_txn_id(self, service): - pass # TODO work out the next txn_id for this service - - def _store_txn(self, txn): - pass + state = yield self.store.get_appservice_state(service) + defer.returnValue(state == ApplicationServiceState.UP) class _Recoverer(object): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 214f6d99c5..6fde7dcc66 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -354,6 +354,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def get_appservice_state(self, service): + """Get the application service state. + + Args: + service(ApplicationService): The service whose state to set. + Returns: + A Deferred which resolves to ApplicationServiceState. + """ + pass + def set_appservice_state(self, service, state): """Set the application service state. @@ -365,6 +375,18 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def create_appservice_txn(self, service, events): + """Atomically creates a new transaction for this application service + with the given list of events. + + Args: + service(ApplicationService): The service who the transaction is for. + events(list): A list of events to put in the transaction. + Returns: + ApplicationServiceTransaction: A new transaction. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index ec8f77c54b..a31755da67 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, AppServiceTransaction, _EventGrouper, - _TransactionController, _Recoverer + AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -22,6 +22,116 @@ from mock import Mock from tests import unittest +class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.store = Mock() + self.as_api = Mock() + self.event_grouper = Mock() + self.recoverer = Mock() + self.recoverer_fn = Mock(return_value=self.recoverer) + self.txnctrl = _TransactionController( + clock=self.clock, store=self.store, as_api=self.as_api, + event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + ) + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is successfully sent. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + txn.send = Mock(return_value=defer.succeed(True)) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made + txn.complete.assert_called_once_with(self.store) # txn completed + + def test_poll_single_group_service_down(self): + # Test: The AS is down so it shouldn't push; Recoverers will do it. + # It should still make a transaction though. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + + self.event_grouper.drain_groups = Mock(return_value=groups) + txn = Mock(id="idhere", service=service, events=events) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.DOWN) + ) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, txn.send.call_count) # txn not sent though + self.assertEquals(0, txn.complete.call_count) # or completed + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is not sent. A Recoverer is made and + # started. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + self.store.set_appservice_state = Mock(return_value=defer.succeed(True)) + txn.send = Mock(return_value=defer.succeed(False)) # fails to send + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events + ) + self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made + self.assertEquals(1, self.recoverer.recover.call_count) # and invoked + self.assertEquals(1, len(self.txnctrl.recoverers)) # and stored + self.assertEquals(0, txn.complete.call_count) # txn not completed + self.store.set_appservice_state.assert_called_once_with( + service, ApplicationServiceState.DOWN # service marked as down + ) + + def test_poll_no_groups(self): + self.as_api.push_bulk = Mock() + self.event_grouper.drain_groups = Mock(return_value={}) + self.txnctrl.start_polling() + self.assertEquals(0, self.as_api.push_bulk.call_count) + + class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def setUp(self): @@ -94,6 +204,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) + class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def setUp(self): -- cgit 1.5.1 From 2602ddc379f9bede21cafc8c8f7f57dec44cf69d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:16:14 +0000 Subject: Apply clarity and docstrings --- synapse/appservice/scheduler.py | 2 +- synapse/storage/appservice.py | 14 +++++++++++++- tests/appservice/test_scheduler.py | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 50ad3b8e83..514148c947 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -224,5 +224,5 @@ class _Recoverer(object): @defer.inlineCallbacks def _get_oldest_txn(self): - txn = yield self.store.get_oldest_txn(self.service) + txn = yield self.store.get_oldest_unsent_txn(self.service) defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 6fde7dcc66..4447c8a2e1 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -383,7 +383,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The service who the transaction is for. events(list): A list of events to put in the transaction. Returns: - ApplicationServiceTransaction: A new transaction. + AppServiceTransaction: A new transaction. """ pass @@ -399,3 +399,15 @@ class ApplicationServiceTransactionStore(SQLBaseStore): successfully. """ pass + + def get_oldest_unsent_txn(self, service): + """Get the oldest transaction which has not been sent for this + service. + + Args: + service(ApplicationService): The app service to get the oldest txn. + Returns: + A Deferred which resolves to an AppServiceTransaction or + None. + """ + pass diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index a31755da67..f75a6f5d95 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer + _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock -- cgit 1.5.1 From 01c099d9ef2b3891643845031c917fd0dc41d954 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:16:47 +0000 Subject: Add appservice txns sql schema --- synapse/storage/__init__.py | 2 +- synapse/storage/appservice.py | 6 +++++ .../storage/schema/delta/15/appservice_txns.sql | 31 ++++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/15/appservice_txns.sql (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a3ff995695..dfce5224a9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,7 +57,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 4447c8a2e1..eec8fbd592 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -385,6 +385,8 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ + # TODO: work out txn id (highest txn id for this service += 1) + # TODO: Within same db transaction, Insert new txn into txn table pass def complete_appservice_txn(self, txn_id, service): @@ -398,6 +400,8 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to True if this transaction was completed successfully. """ + # TODO: Set current txn_id for AS to 'txn_id' + # TODO: Delete txn contents pass def get_oldest_unsent_txn(self, service): @@ -410,4 +414,6 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ + # TODO: Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) pass diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql new file mode 100644 index 0000000000..11f0c799aa --- /dev/null +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -0,0 +1,31 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id INTEGER PRIMARY KEY, + state TEXT NOT NULL, + last_txn TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id INTEGER NOT NULL, + txn_id INTEGER NOT NULL, + content TEXT NOT NULL, + UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK +); + + + -- cgit 1.5.1 From 4a6afa6abf6c90c393bc3fa00e40d3927fc0c6c1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:27:55 +0000 Subject: Assign the AS ID from the database; replace old placeholder txn id. --- synapse/appservice/__init__.py | 4 ++-- synapse/storage/appservice.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 743a8278ad..c60db16b74 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -79,13 +79,13 @@ class ApplicationService(object): NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] def __init__(self, token, url=None, namespaces=None, hs_token=None, - sender=None, txn_id=None): + sender=None, id=None): self.token = token self.url = url self.hs_token = hs_token self.sender = sender self.namespaces = self._check_namespaces(namespaces) - self.txn_id = txn_id + self.id = id def _check_namespaces(self, namespaces): # Sanity check that it is of the form: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index eec8fbd592..582269b8d5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -302,6 +302,7 @@ class ApplicationServiceStore(SQLBaseStore): if as_token not in services: # add the service services[as_token] = { + "id": res["as_id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -326,7 +327,6 @@ class ApplicationServiceStore(SQLBaseStore): except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) - # TODO get last successful txn id f.e. service for service in services.values(): logger.info("Found application service: %s", service) self.services_cache.append(ApplicationService( @@ -334,7 +334,8 @@ class ApplicationServiceStore(SQLBaseStore): url=service["url"], namespaces=service["namespaces"], hs_token=service["hs_token"], - sender=service["sender"] + sender=service["sender"], + id=service["id"] )) -- cgit 1.5.1 From 406d32f8b514a572627eef1326d472e2825b2fe1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:35:14 +0000 Subject: Start implementing ApplicationServiceTransactionStore --- synapse/storage/appservice.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 582269b8d5..0b272e82dd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -374,7 +374,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: A Deferred which resolves to True if the state was set successfully. """ - pass + return self._simple_upsert( + "application_services_state", + dict(as_id=service.id), + dict(state=state) + ) def create_appservice_txn(self, service, events): """Atomically creates a new transaction for this application service -- cgit 1.5.1 From 1c2dcf762a8fe28390e9a98a01577aaadca7f1c0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 13:10:31 +0000 Subject: Partially implement txn store methods with tests. --- synapse/storage/appservice.py | 61 ++++++++++---- tests/storage/test_appservice.py | 171 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 213 insertions(+), 19 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 0b272e82dd..37078f9ef0 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson from simplejson import JSONDecodeError +import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -142,7 +142,7 @@ class ApplicationServiceStore(SQLBaseStore): txn.execute( "INSERT INTO application_services_regex(" "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, simplejson.dumps(regex_obj)) + (as_id, ns_int, json.dumps(regex_obj)) ) return True @@ -277,12 +277,7 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - @defer.inlineCallbacks - def _populate_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + def _parse_services_dict(self, results): # SQL results in the form: # [ # { @@ -296,13 +291,12 @@ class ApplicationServiceStore(SQLBaseStore): # } # ] services = {} - results = yield self._execute_and_decode(sql) for res in results: as_token = res["token"] if as_token not in services: # add the service services[as_token] = { - "id": res["as_id"], + "id": res["id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -320,16 +314,16 @@ class ApplicationServiceStore(SQLBaseStore): try: services[as_token]["namespaces"][ ApplicationService.NS_LIST[ns_int]].append( - simplejson.loads(res["regex"]) + json.loads(res["regex"]) ) except IndexError: logger.error("Bad namespace enum '%s'. %s", ns_int, res) except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) + service_list = [] for service in services.values(): - logger.info("Found application service: %s", service) - self.services_cache.append(ApplicationService( + service_list.append(ApplicationService( token=service["token"], url=service["url"], namespaces=service["namespaces"], @@ -337,6 +331,21 @@ class ApplicationServiceStore(SQLBaseStore): sender=service["sender"], id=service["id"] )) + return service_list + + @defer.inlineCallbacks + def _populate_cache(self): + """Populates the ApplicationServiceCache from the database.""" + sql = ("SELECT * FROM application_services LEFT JOIN " + "application_services_regex ON application_services.id = " + "application_services_regex.as_id") + + results = yield self._execute_and_decode(sql) + services = self._parse_services_dict(results) + + for service in services: + logger.info("Found application service: %s", service) + self.services_cache.append(service) class ApplicationServiceTransactionStore(SQLBaseStore): @@ -344,6 +353,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceTransactionStore, self).__init__(hs) + @defer.inlineCallbacks def get_appservices_by_state(self, state): """Get a list of application services based on their state. @@ -353,8 +363,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to a list of ApplicationServices, which may be empty. """ - pass + sql = ( + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN " + "application_services AS a ON a.id=s.as_id LEFT JOIN " + "application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + ) + results = yield self._execute_and_decode(sql, state) + # NB: This assumes this class is linked with ApplicationServiceStore + defer.returnValue(self._parse_services_dict(results)) + @defer.inlineCallbacks def get_appservice_state(self, service): """Get the application service state. @@ -363,7 +381,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: A Deferred which resolves to ApplicationServiceState. """ - pass + result = yield self._simple_select_one( + "application_services_state", + dict(as_id=service.id), + ["state"], + allow_none=True + ) + if result: + defer.returnValue(result.get("state")) + return + defer.returnValue(None) def set_appservice_state(self, service, state): """Set the application service state. @@ -372,7 +399,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The service whose state to set. state(ApplicationServiceState): The connectivity state to apply. Returns: - A Deferred which resolves to True if the state was set successfully. + A Deferred which resolves when the state was set successfully. """ return self._simple_upsert( "application_services_state", diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index ca5b92ec85..30c0b43d96 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -15,9 +15,11 @@ from tests import unittest from twisted.internet import defer -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.server import HomeServer -from synapse.storage.appservice import ApplicationServiceStore +from synapse.storage.appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -114,3 +116,168 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): def test_retrieval_of_all_services(self): services = yield self.store.get_app_services() self.assertEquals(len(services), 3) + + +class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def setUp(self): + self.db_pool = SQLiteMemoryDbPool() + yield self.db_pool.prepare() + hs = HomeServer( + "test", db_pool=self.db_pool, clock=MockClock(), config=Mock() + ) + self.as_list = [ + { + "token": "token1", + "url": "https://matrix-as.org", + "id": 3 + }, + { + "token": "alpha_tok", + "url": "https://alpha.com", + "id": 5 + }, + { + "token": "beta_tok", + "url": "https://beta.com", + "id": 6 + }, + { + "token": "delta_tok", + "url": "https://delta.com", + "id": 7 + }, + ] + for s in self.as_list: + yield self._add_service(s["id"], s["url"], s["token"]) + self.store = TestTransactionStore(hs) + + def _add_service(self, as_id, url, token): + return self.db_pool.runQuery( + "INSERT INTO application_services(id, url, token) VALUES(?,?,?)", + (as_id, url, token) + ) + + def _set_state(self, id, state, txn=None): + return self.db_pool.runQuery( + "INSERT INTO application_services_state(as_id, state, last_txn) " + "VALUES(?,?,?)", + (id, state, txn) + ) + + @defer.inlineCallbacks + def test_get_appservice_state_none(self): + service = Mock(id=999) + state = yield self.store.get_appservice_state(service) + self.assertEquals(None, state) + + @defer.inlineCallbacks + def test_get_appservice_state_up(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.UP + ) + service = Mock(id=self.as_list[0]["id"]) + state = yield self.store.get_appservice_state(service) + self.assertEquals(ApplicationServiceState.UP, state) + + @defer.inlineCallbacks + def test_get_appservice_state_down(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.UP + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[2]["id"], ApplicationServiceState.DOWN + ) + service = Mock(id=self.as_list[1]["id"]) + state = yield self.store.get_appservice_state(service) + self.assertEquals(ApplicationServiceState.DOWN, state) + + @defer.inlineCallbacks + def test_get_appservices_by_state_none(self): + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(0, len(services)) + + @defer.inlineCallbacks + def test_set_appservices_state_down(self): + service = Mock(id=self.as_list[1]["id"]) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + rows = yield self.db_pool.runQuery( + "SELECT as_id FROM application_services_state WHERE state=?", + (ApplicationServiceState.DOWN,) + ) + self.assertEquals(service.id, rows[0][0]) + + @defer.inlineCallbacks + def test_set_appservices_state_multiple_up(self): + service = Mock(id=self.as_list[1]["id"]) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.UP + ) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + yield self.store.set_appservice_state( + service, + ApplicationServiceState.UP + ) + rows = yield self.db_pool.runQuery( + "SELECT as_id FROM application_services_state WHERE state=?", + (ApplicationServiceState.UP,) + ) + self.assertEquals(service.id, rows[0][0]) + + @defer.inlineCallbacks + def test_get_appservices_by_state_single(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.UP + ) + + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(1, len(services)) + self.assertEquals(self.as_list[0]["id"], services[0].id) + + @defer.inlineCallbacks + def test_get_appservices_by_state_multiple(self): + yield self._set_state( + self.as_list[0]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[1]["id"], ApplicationServiceState.UP + ) + yield self._set_state( + self.as_list[2]["id"], ApplicationServiceState.DOWN + ) + yield self._set_state( + self.as_list[3]["id"], ApplicationServiceState.UP + ) + + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + self.assertEquals(2, len(services)) + self.assertEquals(self.as_list[2]["id"], services[0].id) + self.assertEquals(self.as_list[0]["id"], services[1].id) + + +# required for ApplicationServiceTransactionStoreTestCase tests +class TestTransactionStore(ApplicationServiceTransactionStore, + ApplicationServiceStore): + + def __init__(self, hs): + super(TestTransactionStore, self).__init__(hs) -- cgit 1.5.1 From 1ead1caa18bdbf708446f1faa3d6f3dd13e63c29 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 13:54:20 +0000 Subject: Implement create_appservice_txn with tests. --- synapse/storage/appservice.py | 46 ++++++++++++++++++++++++--- tests/storage/test_appservice.py | 67 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 4 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 37078f9ef0..1360a00eae 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,9 +17,10 @@ from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer +from syutil.jsonutil import encode_canonical_json from synapse.api.constants import Membership from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService, ApplicationServiceState +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -417,9 +418,46 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ - # TODO: work out txn id (highest txn id for this service += 1) - # TODO: Within same db transaction, Insert new txn into txn table - pass + return self.runInteraction( + "create_appservice_txn", + self._create_appservice_txn, + service, events + ) + + def _create_appservice_txn(self, txn, service, events): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + result = txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service.id,) + ) + last_txn_id = result.fetchone() + if last_txn_id is None: # no row exists + last_txn_id = 0 + else: + last_txn_id = int(last_txn_id[0]) # select 'last_txn' col + + result = txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = result.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 + + new_txn_id = max(highest_txn_id, last_txn_id) + 1 + + # Insert new txn into txn table + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, content) " + "VALUES(?,?,?)", + (service.id, new_txn_id, encode_canonical_json(events)) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 30c0b43d96..7a8cdb5593 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -21,6 +21,7 @@ from synapse.storage.appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) +import json from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -166,6 +167,20 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): (id, state, txn) ) + def _insert_txn(self, as_id, txn_id, content): + return self.db_pool.runQuery( + "INSERT INTO application_services_txns(as_id, txn_id, content) " + "VALUES(?,?,?)", + (as_id, txn_id, json.dumps(content)) + ) + + def _set_last_txn(self, as_id, txn_id): + return self.db_pool.runQuery( + "INSERT INTO application_services_state(as_id, last_txn, state) " + "VALUES(?,?,?)", + (as_id, txn_id, ApplicationServiceState.UP) + ) + @defer.inlineCallbacks def test_get_appservice_state_none(self): service = Mock(id=999) @@ -237,6 +252,58 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) self.assertEquals(service.id, rows[0][0]) + @defer.inlineCallbacks + def test_create_appservice_txn_first(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 1) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_older_last_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) # AS is falling behind + yield self._insert_txn(service.id, 9644, events) + yield self._insert_txn(service.id, 9645, events) + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9646) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_up_to_date_last_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9644) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + + @defer.inlineCallbacks + def test_create_appservice_txn_up_fuzzing(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + yield self._set_last_txn(service.id, 9643) + + # dump in rows with higher IDs to make sure the queries aren't wrong. + yield self._set_last_txn(self.as_list[1]["id"], 119643) + yield self._set_last_txn(self.as_list[2]["id"], 9) + yield self._set_last_txn(self.as_list[3]["id"], 9643) + yield self._insert_txn(self.as_list[1]["id"], 119644, events) + yield self._insert_txn(self.as_list[1]["id"], 119645, events) + yield self._insert_txn(self.as_list[1]["id"], 119646, events) + yield self._insert_txn(self.as_list[2]["id"], 10, events) + yield self._insert_txn(self.as_list[3]["id"], 9643, events) + + txn = yield self.store.create_appservice_txn(service, events) + self.assertEquals(txn.id, 9644) + self.assertEquals(txn.events, events) + self.assertEquals(txn.service, service) + @defer.inlineCallbacks def test_get_appservices_by_state_single(self): yield self._set_state( -- cgit 1.5.1 From 0a60bbf4fac4262da3fee702ca46d2f019597ef1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 15:53:03 +0000 Subject: Finish appservice txn storage impl and tests. --- synapse/storage/appservice.py | 85 ++++++++++++++++++---- .../storage/schema/delta/15/appservice_txns.sql | 2 +- tests/storage/test_appservice.py | 68 +++++++++++++++++ 3 files changed, 139 insertions(+), 16 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 1360a00eae..d89b0cc8c9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -429,15 +429,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # The highest id may be the last one sent (in which case it is last_txn) # or it may be the highest in the txns list (which are waiting to be/are # being sent) - result = txn.execute( - "SELECT last_txn FROM application_services_state WHERE as_id=?", - (service.id,) - ) - last_txn_id = result.fetchone() - if last_txn_id is None: # no row exists - last_txn_id = 0 - else: - last_txn_id = int(last_txn_id[0]) # select 'last_txn' col + last_txn_id = self._get_last_txn(txn, service.id) result = txn.execute( "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", @@ -467,12 +459,43 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The application service which was sent this transaction. Returns: - A Deferred which resolves to True if this transaction was completed + A Deferred which resolves if this transaction was stored successfully. """ - # TODO: Set current txn_id for AS to 'txn_id' - # TODO: Delete txn contents - pass + return self.runInteraction( + "complete_appservice_txn", + self._complete_appservice_txn, + txn_id, service + ) + + def _complete_appservice_txn(self, txn, txn_id, service): + txn_id = int(txn_id) + + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) + ) + + # Delete txn contents + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) def get_oldest_unsent_txn(self, service): """Get the oldest transaction which has not been sent for this @@ -484,6 +507,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ - # TODO: Monotonically increasing txn ids, so just select the smallest + return self.runInteraction( + "get_oldest_unsent_appservice_txn", + self._get_oldest_unsent_txn, + service + ) + + def _get_oldest_unsent_txn(self, txn, service): + # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) - pass + result = txn.execute( + "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + entry = self.cursor_to_dict(result)[0] + + if not entry or entry["txn_id"] is None: + # the min(txn_id) part will force a row, so entry may not be None + return None + + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=json.loads( + entry["content"] + ) + ) + + def _get_last_txn(self, txn, service_id): + result = txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service_id,) + ) + last_txn_id = result.fetchone() + if last_txn_id is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 11f0c799aa..ff15aa019e 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -15,7 +15,7 @@ CREATE TABLE IF NOT EXISTS application_services_state( as_id INTEGER PRIMARY KEY, - state TEXT NOT NULL, + state TEXT, last_txn TEXT, FOREIGN KEY(as_id) REFERENCES application_services(id) ); diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 7a8cdb5593..d1809c7f3b 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -304,6 +304,74 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) + @defer.inlineCallbacks + def test_complete_appservice_txn_first_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"foo": "bar"}] + txn_id = 1 + + yield self._insert_txn(service.id, txn_id, events) + yield self.store.complete_appservice_txn(txn_id=txn_id, service=service) + + res = yield self.db_pool.runQuery( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service.id,) + ) + self.assertEquals(1, len(res)) + self.assertEquals(str(txn_id), res[0][0]) + + res = yield self.db_pool.runQuery( + "SELECT * FROM application_services_txns WHERE txn_id=?", + (txn_id,) + ) + self.assertEquals(0, len(res)) + + @defer.inlineCallbacks + def test_complete_appservice_txn_existing_in_state_table(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"foo": "bar"}] + txn_id = 5 + yield self._set_last_txn(service.id, 4) + yield self._insert_txn(service.id, txn_id, events) + yield self.store.complete_appservice_txn(txn_id=txn_id, service=service) + + res = yield self.db_pool.runQuery( + "SELECT last_txn, state FROM application_services_state WHERE " + "as_id=?", + (service.id,) + ) + self.assertEquals(1, len(res)) + self.assertEquals(str(txn_id), res[0][0]) + self.assertEquals(ApplicationServiceState.UP, res[0][1]) + + res = yield self.db_pool.runQuery( + "SELECT * FROM application_services_txns WHERE txn_id=?", + (txn_id,) + ) + self.assertEquals(0, len(res)) + + @defer.inlineCallbacks + def test_get_oldest_unsent_txn_none(self): + service = Mock(id=self.as_list[0]["id"]) + + txn = yield self.store.get_oldest_unsent_txn(service) + self.assertEquals(None, txn) + + @defer.inlineCallbacks + def test_get_oldest_unsent_txn(self): + service = Mock(id=self.as_list[0]["id"]) + events = [{"type": "nothing"}, {"type": "here"}] + + yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"}) + yield self._insert_txn(service.id, 10, events) + yield self._insert_txn(service.id, 11, [{"foo":"bar"}]) + yield self._insert_txn(service.id, 12, [{"argh":"bargh"}]) + + txn = yield self.store.get_oldest_unsent_txn(service) + self.assertEquals(service, txn.service) + self.assertEquals(10, txn.id) + self.assertEquals(events, txn.events) + @defer.inlineCallbacks def test_get_appservices_by_state_single(self): yield self._set_state( -- cgit 1.5.1 From b98cd03193476dea5f8b47e79d4122bb18449ae2 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:25:20 +0000 Subject: Use event IDs instead of dumping event content in the txns table. --- synapse/storage/appservice.py | 14 +++++----- .../storage/schema/delta/15/appservice_txns.sql | 2 +- tests/storage/test_appservice.py | 30 ++++++++++++---------- 3 files changed, 26 insertions(+), 20 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d89b0cc8c9..c3c0a0bd43 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -442,10 +442,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): new_txn_id = max(highest_txn_id, last_txn_id) + 1 # Insert new txn into txn table + event_ids = [e.event_id for e in events] txn.execute( - "INSERT INTO application_services_txns(as_id, txn_id, content) " + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (service.id, new_txn_id, encode_canonical_json(events)) + (service.id, new_txn_id, json.dumps(event_ids)) ) return AppServiceTransaction( service=service, id=new_txn_id, events=events @@ -491,7 +492,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): dict(last_txn=txn_id) ) - # Delete txn contents + # Delete txn self._simple_delete_txn( txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id) @@ -526,10 +527,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # the min(txn_id) part will force a row, so entry may not be None return None + event_ids = json.loads(entry["event_ids"]) + events = self._get_events_txn(event_ids) + return AppServiceTransaction( - service=service, id=entry["txn_id"], events=json.loads( - entry["content"] - ) + service=service, id=entry["txn_id"], events=events ) def _get_last_txn(self, txn, service_id): diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index ff15aa019e..13bbb2de2e 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS application_services_state( CREATE TABLE IF NOT EXISTS application_services_txns( as_id INTEGER NOT NULL, txn_id INTEGER NOT NULL, - content TEXT NOT NULL, + event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK ); diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index d1809c7f3b..e79599f7fb 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -167,11 +167,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): (id, state, txn) ) - def _insert_txn(self, as_id, txn_id, content): + def _insert_txn(self, as_id, txn_id, events): return self.db_pool.runQuery( - "INSERT INTO application_services_txns(as_id, txn_id, content) " + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (as_id, txn_id, json.dumps(content)) + (as_id, txn_id, json.dumps([e.event_id for e in events])) ) def _set_last_txn(self, as_id, txn_id): @@ -255,7 +255,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_first(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn = yield self.store.create_appservice_txn(service, events) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) @@ -264,7 +264,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_older_last_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) # AS is falling behind yield self._insert_txn(service.id, 9644, events) yield self._insert_txn(service.id, 9645, events) @@ -276,7 +276,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_up_to_date_last_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) txn = yield self.store.create_appservice_txn(service, events) self.assertEquals(txn.id, 9644) @@ -286,7 +286,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_create_appservice_txn_up_fuzzing(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) # dump in rows with higher IDs to make sure the queries aren't wrong. @@ -307,7 +307,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_complete_appservice_txn_first_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"foo": "bar"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 1 yield self._insert_txn(service.id, txn_id, events) @@ -329,7 +329,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_complete_appservice_txn_existing_in_state_table(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"foo": "bar"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 5 yield self._set_last_txn(service.id, 4) yield self._insert_txn(service.id, txn_id, events) @@ -360,12 +360,16 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [{"type": "nothing"}, {"type": "here"}] + events = [Mock(event_id="e1"), Mock(event_id="e2")] + other_events = [Mock(event_id="e5"), Mock(event_id="e6")] - yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"}) + # we aren't testing store._base stuff here, so mock this out + self.store._get_events_txn = Mock(return_value=events) + + yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) - yield self._insert_txn(service.id, 11, [{"foo":"bar"}]) - yield self._insert_txn(service.id, 12, [{"argh":"bargh"}]) + yield self._insert_txn(service.id, 11, other_events) + yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) -- cgit 1.5.1 From 04c9751f24885b974d564b3e5749b7fc9ce01c73 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:45:41 +0000 Subject: Bug fixes whilst putting it all together --- synapse/appservice/api.py | 1 + synapse/appservice/scheduler.py | 4 +++- synapse/storage/appservice.py | 9 ++++----- 3 files changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c17fb219c5..3acb8867a2 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -79,6 +79,7 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("push_bulk: Missing txn ID sending events to %s", service.url) txn_id = str(0) + txn_id = str(txn_id) uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 068d4bd087..3ee2406463 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -175,7 +175,7 @@ class _TransactionController(object): @defer.inlineCallbacks def _is_service_up(self, service): state = yield self.store.get_appservice_state(service) - defer.returnValue(state == ApplicationServiceState.UP) + defer.returnValue(state == ApplicationServiceState.UP or state is None) class _Recoverer(object): @@ -208,6 +208,8 @@ class _Recoverer(object): def retry(self): txn = yield self._get_oldest_txn() if txn: + logger.info("Retrying transaction %s for service %s", + txn.id, txn.service) if txn.send(self.as_api): txn.complete(self.store) # reset the backoff counter and retry immediately diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c3c0a0bd43..ab03106513 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -337,9 +337,8 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def _populate_cache(self): """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " + "application_services_regex AS r ON a.id = r.as_id") results = yield self._execute_and_decode(sql) services = self._parse_services_dict(results) @@ -528,7 +527,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return None event_ids = json.loads(entry["event_ids"]) - events = self._get_events_txn(event_ids) + events = self._get_events_txn(txn, event_ids) return AppServiceTransaction( service=service, id=entry["txn_id"], events=events @@ -540,7 +539,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): (service_id,) ) last_txn_id = result.fetchone() - if last_txn_id is None: # no row exists + if last_txn_id is None or last_txn_id[0] is None: # no row exists return 0 else: return int(last_txn_id[0]) # select 'last_txn' col -- cgit 1.5.1 From 7e0bba555c4abeb55cffc123270ceee858839496 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:48:37 +0000 Subject: Remove unused import --- synapse/storage/appservice.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index ab03106513..fe347dfd3c 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,7 +17,6 @@ from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer -from syutil.jsonutil import encode_canonical_json from synapse.api.constants import Membership from synapse.api.errors import StoreError from synapse.appservice import ApplicationService, AppServiceTransaction -- cgit 1.5.1 From db1fbc6c6fb23ab92dd712aa60f0ff46ea76b42c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 10 Mar 2015 10:04:20 +0000 Subject: Fix remaining scheduler bugs. Add more informative logging. --- synapse/appservice/api.py | 8 +++---- synapse/appservice/scheduler.py | 52 +++++++++++++++++------------------------ synapse/storage/appservice.py | 5 ++-- 3 files changed, 28 insertions(+), 37 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3acb8867a2..2a9becccb3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) - response = None try: - response = yield self.put_json( + yield self.put_json( uri=uri, json_body={ "events": events @@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) - if response: # just an empty json object - # TODO: Mark txn as sent successfully - defer.returnValue(True) + defer.returnValue(True) + return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3ee2406463..add1e3879c 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -77,6 +77,7 @@ class AppServiceScheduler(object): @defer.inlineCallbacks def start(self): + logger.info("Starting appservice scheduler") # check for any DOWN ASes and start recoverers for them. recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered @@ -137,40 +138,33 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): self.recoverers.remove(recoverer) - logger.info("Successfully recovered application service: %s", - recoverer.service) - logger.info("Active recoverers: %s", len(self.recoverers)) - applied_state = yield self.store.set_appservice_state( + logger.info("Successfully recovered application service AS ID %s", + recoverer.service.id) + logger.info("Remaining active recoverers: %s", len(self.recoverers)) + yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - if not applied_state: - logger.error("Failed to apply appservice state UP to service %s", - recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) if len(recoverers) > 0: - logger.info("Active recoverers: %s", len(self.recoverers)) + logger.info("New active recoverers: %s", len(self.recoverers)) @defer.inlineCallbacks def _start_recoverer(self, service): - applied_state = yield self.store.set_appservice_state( + yield self.store.set_appservice_state( service, ApplicationServiceState.DOWN ) - if applied_state: - logger.info( - "Application service falling behind. Starting recoverer. %s", - service - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() - else: - logger.error("Failed to apply appservice state DOWN to service %s", - service) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() @defer.inlineCallbacks def _is_service_up(self, service): @@ -190,6 +184,8 @@ class _Recoverer(object): _Recoverer(clock, store, as_api, s, callback) for s in services ] for r in recoverers: + logger.info("Starting recoverer for AS ID %s which was marked as " + "DOWN", r.service.id) r.recover() defer.returnValue(recoverers) @@ -206,12 +202,13 @@ class _Recoverer(object): @defer.inlineCallbacks def retry(self): - txn = yield self._get_oldest_txn() + txn = yield self.store.get_oldest_unsent_txn(self.service) if txn: - logger.info("Retrying transaction %s for service %s", - txn.id, txn.service) - if txn.send(self.as_api): - txn.complete(self.store) + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 yield self.retry() @@ -225,8 +222,3 @@ class _Recoverer(object): def _set_service_recovered(self): self.callback(self) - - @defer.inlineCallbacks - def _get_oldest_txn(self): - txn = yield self.store.get_oldest_unsent_txn(self.service) - defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index fe347dfd3c..c4b4f56c5d 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -293,6 +293,8 @@ class ApplicationServiceStore(SQLBaseStore): services = {} for res in results: as_token = res["token"] + if as_token is None: + continue if as_token not in services: # add the service services[as_token] = { @@ -516,11 +518,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) result = txn.execute( - "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?", + "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", (service.id,) ) entry = self.cursor_to_dict(result)[0] - if not entry or entry["txn_id"] is None: # the min(txn_id) part will force a row, so entry may not be None return None -- cgit 1.5.1 From 835e01fc7047e34a813936544027596627a112df Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 10:16:59 +0000 Subject: Minor PR comment tweaks. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/appservice.py | 10 ++++++++-- synapse/storage/__init__.py | 2 +- synapse/storage/appservice.py | 6 +++--- tests/appservice/test_scheduler.py | 10 +++++----- 5 files changed, 19 insertions(+), 13 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index add1e3879c..8a3a6a880f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -86,7 +86,7 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.on_receive(service, event) + self.event_grouper.enqueue(service, event) class _EventGrouper(object): @@ -96,7 +96,7 @@ class _EventGrouper(object): def __init__(self): self.groups = {} # dict of {service: [events]} - def on_receive(self, service, event): + def enqueue(self, service, event): if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f3cd458e6b..a24f7f5587 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,8 +27,14 @@ logger = logging.getLogger(__name__) def log_failure(failure): - logger.error("Application Services Failure: %s", failure.value) - logger.error(failure.getTraceback()) + logger.error( + "Application Services Failure", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) # NB: Purposefully not inheriting BaseHandler since that contains way too much diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index efef859214..e752b035e6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -82,7 +82,7 @@ class DataStore(RoomMemberStore, RoomStore, FilteringStore, PusherStore, PushRuleStore, - ApplicationServiceTransactionStore + ApplicationServiceTransactionStore, ): def __init__(self, hs): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 670e1d56af..e928812bc9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -365,9 +365,9 @@ class ApplicationServiceTransactionStore(SQLBaseStore): may be empty. """ sql = ( - "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN " - "application_services AS a ON a.id=s.as_id LEFT JOIN " - "application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" + " application_services AS a ON a.id=s.as_id LEFT JOIN" + " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" ) results = yield self._execute_and_decode( "get_appservices_by_state", sql, state diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index e18e879319..4534d05b93 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -213,7 +213,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def test_drain_single_event(self): service = Mock() event = Mock() - self.grouper.on_receive(service, event) + self.grouper.enqueue(service, event) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals([event], groups[service]) @@ -225,7 +225,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): service = Mock() events = [Mock(), Mock(), Mock()] for e in events: - self.grouper.on_receive(service, e) + self.grouper.enqueue(service, e) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals(events, groups[service]) @@ -243,11 +243,11 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): services[2]: events_c } for e in events_b: - self.grouper.on_receive(services[1], e) + self.grouper.enqueue(services[1], e) for e in events_c: - self.grouper.on_receive(services[2], e) + self.grouper.enqueue(services[2], e) for e in events_a: - self.grouper.on_receive(services[0], e) + self.grouper.enqueue(services[0], e) groups = self.grouper.drain_groups() for service in services: -- cgit 1.5.1 From f0d6f724a241a50d4a12b1c00af2a4cc6f9a43f1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 15:24:32 +0000 Subject: Set the service ID as soon as it is known. --- synapse/handlers/appservice.py | 2 +- synapse/storage/appservice.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index a24f7f5587..58b5b60bb7 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -59,13 +59,13 @@ class ApplicationServicesHandler(object): ) if not stored_service: raise StoreError(404, "Application service not found") + app_service.id = stored_service.id except StoreError: raise SynapseError( 403, "Unrecognised application services token. " "Consult the home server admin.", errcode=Codes.FORBIDDEN ) - app_service.hs_token = self._generate_hs_token() # create a sender for this application service which is used when diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e928812bc9..06b3a04afc 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -101,11 +101,12 @@ class ApplicationServiceStore(SQLBaseStore): if not service.hs_token: raise StoreError(500, "No HS token") - yield self.runInteraction( + as_id = yield self.runInteraction( "update_app_service", self._update_app_service_txn, service ) + service.id = as_id # update cache TODO: Should this be in the txn? for (index, cache_service) in enumerate(self.services_cache): @@ -124,7 +125,7 @@ class ApplicationServiceStore(SQLBaseStore): "update_app_service_txn: Failed to find as_id for token=", service.token ) - return False + return txn.execute( "UPDATE application_services SET url=?, hs_token=?, sender=? " @@ -144,7 +145,7 @@ class ApplicationServiceStore(SQLBaseStore): "as_id, namespace, regex) values(?,?,?)", (as_id, ns_int, json.dumps(regex_obj)) ) - return True + return as_id def _get_as_id_txn(self, txn, token): cursor = txn.execute( -- cgit 1.5.1 From d7a0496f3ec534076121632352f44733253e1e16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Mar 2015 15:59:48 +0000 Subject: Convert storage layer to be mysql compatible --- synapse/storage/__init__.py | 164 +++++++++++++++++++++++++----------- synapse/storage/_base.py | 30 ++++--- synapse/storage/appservice.py | 4 +- synapse/storage/directory.py | 4 +- synapse/storage/event_federation.py | 25 +++--- synapse/storage/presence.py | 1 - synapse/storage/push_rule.py | 4 +- synapse/storage/registration.py | 14 ++- synapse/storage/room.py | 4 +- synapse/storage/roommember.py | 2 +- synapse/storage/state.py | 10 ++- synapse/storage/stream.py | 4 +- synapse/storage/transactions.py | 6 +- 13 files changed, 171 insertions(+), 101 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4b16f445d6..30cba47717 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -51,6 +51,8 @@ import logging import os import re +import threading + logger = logging.getLogger(__name__) @@ -89,6 +91,9 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token_deferred = self._get_min_token() self.min_token = None + self._next_stream_id_lock = threading.Lock() + self._next_stream_id = int(hs.get_clock().time_msec()) * 1000 + @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False, @@ -172,7 +177,6 @@ class DataStore(RoomMemberStore, RoomStore, "type": s.type, "state_key": s.state_key, }, - or_replace=True, ) if event.is_state() and is_new_state: @@ -186,7 +190,6 @@ class DataStore(RoomMemberStore, RoomStore, "type": event.type, "state_key": event.state_key, }, - or_replace=True, ) for prev_state_id, _ in event.prev_state: @@ -285,7 +288,6 @@ class DataStore(RoomMemberStore, RoomStore, "internal_metadata": metadata_json.decode("UTF-8"), "json": encode_canonical_json(event_dict).decode("UTF-8"), }, - or_replace=True, ) content = encode_canonical_json( @@ -303,8 +305,9 @@ class DataStore(RoomMemberStore, RoomStore, "depth": event.depth, } - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering + if stream_ordering is None: + stream_ordering = self.get_next_stream_id() + unrec = { k: v @@ -322,21 +325,18 @@ class DataStore(RoomMemberStore, RoomStore, unrec ).decode("UTF-8") - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, - ) - raise _RollbackButIsFineException("_persist_event") + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (%s,?,?,?,?,?,?,?,?)" + ) % (stream_ordering,) + + txn.execute( + sql, + (event.depth, event.event_id, event.type, event.room_id, + content, True, outlier, event.depth) + ) if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) @@ -357,7 +357,6 @@ class DataStore(RoomMemberStore, RoomStore, txn, "state_events", vals, - or_replace=True, ) if is_new_state and not context.rejected: @@ -370,7 +369,6 @@ class DataStore(RoomMemberStore, RoomStore, "type": event.type, "state_key": event.state_key, }, - or_replace=True, ) for e_id, h in event.prev_state: @@ -383,7 +381,6 @@ class DataStore(RoomMemberStore, RoomStore, "room_id": event.room_id, "is_state": 1, }, - or_ignore=True, ) for hash_alg, hash_base64 in event.hashes.items(): @@ -408,7 +405,6 @@ class DataStore(RoomMemberStore, RoomStore, "room_id": event.room_id, "auth_id": auth_id, }, - or_ignore=True, ) (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) @@ -420,8 +416,7 @@ class DataStore(RoomMemberStore, RoomStore, # invalidate the cache for the redacted event self._get_event_cache.pop(event.redacts) txn.execute( - "INSERT OR IGNORE INTO redactions " - "(event_id, redacts) VALUES (?,?)", + "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) ) @@ -515,7 +510,8 @@ class DataStore(RoomMemberStore, RoomStore, "ip": ip, "user_agent": user_agent, "last_seen": int(self._clock.time_msec()), - } + }, + or_replace=True, ) def get_user_ip_and_agents(self, user): @@ -559,6 +555,12 @@ class DataStore(RoomMemberStore, RoomStore, "have_events", f, ) + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + def read_schema(path): """ Read the named database schema. @@ -594,7 +596,7 @@ def prepare_database(db_conn): else: _setup_new_database(cur) - cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) + # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) cur.close() db_conn.commit() @@ -657,19 +659,17 @@ def _setup_new_database(cur): directory_entries = os.listdir(sql_dir) - sql_script = "BEGIN TRANSACTION;\n" for filename in fnmatch.filter(directory_entries, "*.sql"): sql_loc = os.path.join(sql_dir, filename) logger.debug("Applying schema %s", sql_loc) - sql_script += read_schema(sql_loc) - sql_script += "\n" - sql_script += "COMMIT TRANSACTION;" - cur.executescript(sql_script) + executescript(cur, sql_loc) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", - (max_current_ver, False) + _convert_param_style( + "REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)" + ), + (max_current_ver, False,) ) _upgrade_existing_database( @@ -737,6 +737,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, if not upgraded: start_ver += 1 + logger.debug("applied_delta_files: %s", applied_delta_files) + for v in range(start_ver, SCHEMA_VERSION + 1): logger.debug("Upgrading schema to v%d", v) @@ -753,6 +755,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, directory_entries.sort() for file_name in directory_entries: relative_path = os.path.join(str(v), file_name) + logger.debug("Found file: %s", relative_path) if relative_path in applied_delta_files: continue @@ -774,9 +777,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module.run_upgrade(cur) elif ext == ".sql": # A plain old .sql file, just read and execute it - delta_schema = read_schema(absolute_path) logger.debug("Applying schema %s", relative_path) - cur.executescript(delta_schema) + executescript(cur, absolute_path) else: # Not a valid delta file. logger.warn( @@ -788,24 +790,85 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, # Mark as done. cur.execute( - "INSERT INTO applied_schema_deltas (version, file)" - " VALUES (?,?)", + _convert_param_style( + "INSERT INTO applied_schema_deltas (version, file)" + " VALUES (?,?)" + ), (v, relative_path) ) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", + _convert_param_style( + "REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)" + ), (v, True) ) +def _convert_param_style(sql): + return sql.replace("?", "%s") + + +def get_statements(f): + statement_buffer = "" + in_comment = False # If we're in a /* ... */ style comment + + for line in f: + line = line.strip() + + if in_comment: + # Check if this line contains an end to the comment + comments = line.split("*/", 1) + if len(comments) == 1: + continue + line = comments[1] + in_comment = False + + # Remove inline block comments + line = re.sub(r"/\*.*\*/", " ", line) + + # Does this line start a comment? + comments = line.split("/*", 1) + if len(comments) > 1: + line = comments[0] + in_comment = True + + # Deal with line comments + line = line.split("--", 1)[0] + line = line.split("//", 1)[0] + + # Find *all* semicolons. We need to treat first and last entry + # specially. + statements = line.split(";") + + # We must prepend statement_buffer to the first statement + first_statement = "%s %s" % ( + statement_buffer.strip(), + statements[0].strip() + ) + statements[0] = first_statement + + # Every entry, except the last, is a full statement + for statement in statements[:-1]: + yield statement.strip() + + # The last entry did *not* end in a semicolon, so we store it for the + # next semicolon we find + statement_buffer = statements[-1].strip() + + +def executescript(txn, schema_path): + with open(schema_path, 'r') as f: + for statement in get_statements(f): + txn.execute(statement) + + def _get_or_create_schema_state(txn): schema_path = os.path.join( dir_path, "schema", "schema_version.sql", ) - create_schema = read_schema(schema_path) - txn.executescript(create_schema) + executescript(txn, schema_path) txn.execute("SELECT version, upgraded FROM schema_version") row = txn.fetchone() @@ -814,10 +877,13 @@ def _get_or_create_schema_state(txn): if current_version: txn.execute( - "SELECT file FROM applied_schema_deltas WHERE version >= ?", + _convert_param_style( + "SELECT file FROM applied_schema_deltas WHERE version >= ?" + ), (current_version,) ) - return current_version, txn.fetchall(), upgraded + applied_deltas = [d for d, in txn.fetchall()] + return current_version, applied_deltas, upgraded return None @@ -849,7 +915,9 @@ def prepare_sqlite3_database(db_conn): if row and row[0]: db_conn.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", + _convert_param_style( + "REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)" + ), (row[0], False) ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2979a83524..24ff872dad 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -102,6 +102,10 @@ def cached(max_entries=1000): return wrap +def _convert_param_style(sql): + return sql.replace("?", "%s") + + class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() @@ -122,6 +126,8 @@ class LoggingTransaction(object): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) + sql = _convert_param_style(sql) + try: if args and args[0]: values = args[0] @@ -305,11 +311,11 @@ class SQLBaseStore(object): The result of decoder(results) """ def interaction(txn): - cursor = txn.execute(query, args) + txn.execute(query, args) if decoder: - return decoder(cursor) + return decoder(txn) else: - return cursor.fetchall() + return txn.fetchall() return self.runInteraction(desc, interaction) @@ -337,8 +343,7 @@ class SQLBaseStore(object): def _simple_insert_txn(self, txn, table, values, or_replace=False, or_ignore=False): sql = "%s INTO %s (%s) VALUES(%s)" % ( - ("INSERT OR REPLACE" if or_replace else - "INSERT OR IGNORE" if or_ignore else "INSERT"), + ("REPLACE" if or_replace else "INSERT"), table, ", ".join(k for k in values), ", ".join("?" for k in values) @@ -448,8 +453,7 @@ class SQLBaseStore(object): def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s " - "ORDER BY rowid asc" + "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" ) % { "retcol": retcol, "table": table, @@ -505,14 +509,14 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ if keyvalues: - sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) txn.execute(sql, keyvalues.values()) else: - sql = "SELECT %s FROM %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s" % ( ", ".join(retcols), table ) @@ -546,7 +550,7 @@ class SQLBaseStore(object): retcols=None, allow_none=False): """ Combined SELECT then UPDATE.""" if retcols: - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + select_sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, " AND ".join("%s = ?" % (k) for k in keyvalues) @@ -580,8 +584,8 @@ class SQLBaseStore(object): updatevalues.values() + keyvalues.values() ) - if txn.rowcount == 0: - raise StoreError(404, "No row found") + # if txn.rowcount == 0: + # raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "More than one row matched") @@ -802,7 +806,7 @@ class Table(object): _select_where_clause = "SELECT %s FROM %s WHERE %s" _select_clause = "SELECT %s FROM %s" - _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)" + _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)" @classmethod def select_statement(cls, where_clause=None): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 850676ce6c..375265d666 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -147,11 +147,11 @@ class ApplicationServiceStore(SQLBaseStore): return True def _get_as_id_txn(self, txn, token): - cursor = txn.execute( + txn.execute( "SELECT id FROM application_services WHERE token=?", (token,) ) - res = cursor.fetchone() + res = txn.fetchone() if res: return res[0] diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 68b7d59693..0c2adffbbe 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -111,12 +111,12 @@ class DirectoryStore(SQLBaseStore): ) def _delete_room_alias_txn(self, txn, room_alias): - cursor = txn.execute( + txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", (room_alias.to_string(),) ) - res = cursor.fetchone() + res = txn.fetchone() if res: room_id = res[0] else: diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 2deda8ac50..5d66b2f24c 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "min_depth": depth, }, - or_replace=True, ) def _handle_prev_events(self, txn, outlier, event_id, prev_events, @@ -262,7 +261,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "is_state": 0, }, - or_ignore=True, ) # Update the extremities table if this is not an outlier. @@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore): # We only insert as a forward extremity the new event if there are # no other events that reference it as a prev event query = ( - "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " - "SELECT ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(event_edges)s WHERE " - "prev_event_id = ? " - ")" - ) % { - "table": "event_forward_extremities", - "event_edges": "event_edges", - } + "SELECT 1 FROM event_edges WHERE prev_event_id = ?" + ) - logger.debug("query: %s", query) + txn.execute(query, (event_id,)) + + if not txn.fetchone(): + query = ( + "INSERT INTO event_forward_extremities" + " (event_id, room_id)" + " VALUES (?, ?)" + ) - txn.execute(query, (event_id, room_id, event_id)) + txn.execute(query, (event_id, room_id)) # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. @@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore): "event_id": e_id, "room_id": room_id, }, - or_ignore=True, ) # Also delete from the backwards extremities table all ones that diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 1dcd34723b..0084d67e5b 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -45,7 +45,6 @@ class PresenceStore(SQLBaseStore): updatevalues={"state": new_state["state"], "status_msg": new_state["status_msg"], "mtime": self._clock.time_msec()}, - retcols=["state"], ) def allow_presence_visible(self, observed_localpart, observer_userid): diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index d769db2c78..27a0716323 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -153,7 +153,7 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) # now insert the new rule - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" @@ -182,7 +182,7 @@ class PushRuleStore(SQLBaseStore): new_rule['priority_class'] = priority_class new_rule['priority'] = new_prio - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index adc8fc0794..344dd3aaac 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -39,14 +39,10 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - row = yield self._simple_select_one("users", {"name": user_id}, ["id"]) - if not row: - raise StoreError(400, "Bad user ID supplied.") - row_id = row["id"] yield self._simple_insert( "access_tokens", { - "user_id": row_id, + "user_id": user_id, "token": token } ) @@ -82,7 +78,7 @@ class RegistrationStore(SQLBaseStore): # it's possible for this to get a conflict, but only for a single user # since tokens are namespaced based on their user ID txn.execute("INSERT INTO access_tokens(user_id, token) " + - "VALUES (?,?)", [txn.lastrowid, token]) + "VALUES (?,?)", [user_id, token]) def get_user_by_id(self, user_id): query = ("SELECT users.name, users.password_hash FROM users" @@ -124,12 +120,12 @@ class RegistrationStore(SQLBaseStore): "SELECT users.name, users.admin," " access_tokens.device_id, access_tokens.id as token_id" " FROM users" - " INNER JOIN access_tokens on users.id = access_tokens.user_id" + " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" ) - cursor = txn.execute(sql, (token,)) - rows = self.cursor_to_dict(cursor) + txn.execute(sql, (token,)) + rows = self.cursor_to_dict(txn) if rows: return rows[0] diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 549c9af393..3c23f29215 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -114,9 +114,9 @@ class RoomStore(SQLBaseStore): "name": name_subquery, } - c = txn.execute(sql, (is_public,)) + txn.execute(sql, (is_public,)) - return c.fetchall() + return txn.fetchall() rows = yield self.runInteraction( "get_rooms", f diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 65ffb4627f..e8ede14cd7 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -68,7 +68,7 @@ class RoomMemberStore(SQLBaseStore): # Update room hosts table if event.membership == Membership.JOIN: sql = ( - "INSERT OR IGNORE INTO room_hosts (room_id, host) " + "REPLACE INTO room_hosts (room_id, host) " "VALUES (?, ?)" ) txn.execute(sql, (event.room_id, domain)) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 456e4bd45d..888837cd1e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,8 @@ from ._base import SQLBaseStore +from synapse.util.stringutils import random_string + import logging logger = logging.getLogger(__name__) @@ -89,14 +91,15 @@ class StateStore(SQLBaseStore): state_group = context.state_group if not state_group: + group = _make_group_id(self._clock) state_group = self._simple_insert_txn( txn, table="state_groups", values={ + "id": group, "room_id": event.room_id, "event_id": event.event_id, }, - or_ignore=True, ) for state in state_events.values(): @@ -110,7 +113,6 @@ class StateStore(SQLBaseStore): "state_key": state.state_key, "event_id": state.event_id, }, - or_ignore=True, ) self._simple_insert_txn( @@ -122,3 +124,7 @@ class StateStore(SQLBaseStore): }, or_replace=True, ) + + +def _make_group_id(clock): + return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09bc522210..64adb0c7fa 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d < %s)" % (self.stream, "stream_ordering") else: - return "(%d < %s OR (%d == %s AND %d < %s))" % ( + return "(%d < %s OR (%d = %s AND %d < %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d >= %s)" % (self.stream, "stream_ordering") else: - return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + return "(%d > %s OR (%d = %s AND %d >= %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 0b8a3b7a07..b5ed5453d8 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -121,8 +121,8 @@ class TransactionStore(SQLBaseStore): SentTransactions.select_statement("destination = ?"), ) - results = txn.execute(query, (destination,)) - results = SentTransactions.decode_results(results) + txn.execute(query, (destination,)) + results = SentTransactions.decode_results(txn) prev_txns = [r.transaction_id for r in results] @@ -266,7 +266,7 @@ class TransactionStore(SQLBaseStore): retry_last_ts, retry_interval): query = ( - "INSERT OR REPLACE INTO %s " + "REPLACE INTO %s " "(destination, retry_last_ts, retry_interval) " "VALUES (?, ?, ?) " ) % DestinationsTable.table_name -- cgit 1.5.1 From 32206dde3f8dd59412490cd6f590304438c900f4 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Mar 2015 10:11:52 +0000 Subject: Fixes from PR comments --- synapse/appservice/scheduler.py | 3 ++- synapse/storage/appservice.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3cedd479a2..59b0b1f4ac 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -112,7 +112,7 @@ class _ServiceQueuer(object): def _send_request(self, service, events): # send request and add callbacks d = self.txn_ctrl.send(service, events) - d.addCallback(self._on_request_finish) + d.addBoth(self._on_request_finish) d.addErrback(self._on_request_fail) self.pending_requests[service.id] = d @@ -154,6 +154,7 @@ class _TransactionController(object): self._start_recoverer(service) except Exception as e: logger.exception(e) + self._start_recoverer(service) # request has finished defer.returnValue(service) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 06b3a04afc..93304a745f 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.services_cache = [] - self.cache_defer = self._populate_cache() + self.cache_defer = self._populate_appservice_cache() self.cache_defer.addErrback(log_failure) @defer.inlineCallbacks @@ -337,7 +337,7 @@ class ApplicationServiceStore(SQLBaseStore): return service_list @defer.inlineCallbacks - def _populate_cache(self): + def _populate_appservice_cache(self): """Populates the ApplicationServiceCache from the database.""" sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " "application_services_regex AS r ON a.id = r.as_id") -- cgit 1.5.1 From d33ae65efc14a18a8a690d39d6e9c81aaafa1062 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 11:00:00 +0100 Subject: Remove more reg/unreg methods. Read config not database for cache. --- synapse/handlers/appservice.py | 37 ------- synapse/storage/appservice.py | 219 +++++++++++---------------------------- tests/storage/test_appservice.py | 39 ------- 3 files changed, 59 insertions(+), 236 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 58b5b60bb7..59cf15b037 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -16,10 +16,8 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import Codes, StoreError, SynapseError from synapse.appservice import ApplicationService from synapse.types import UserID -import synapse.util.stringutils as stringutils import logging @@ -49,38 +47,6 @@ class ApplicationServicesHandler(object): self.scheduler = appservice_scheduler self.started_scheduler = False - @defer.inlineCallbacks - def register(self, app_service): - logger.info("Register -> %s", app_service) - # check the token is recognised - try: - stored_service = yield self.store.get_app_service_by_token( - app_service.token - ) - if not stored_service: - raise StoreError(404, "Application service not found") - app_service.id = stored_service.id - except StoreError: - raise SynapseError( - 403, "Unrecognised application services token. " - "Consult the home server admin.", - errcode=Codes.FORBIDDEN - ) - app_service.hs_token = self._generate_hs_token() - - # create a sender for this application service which is used when - # creating rooms, etc.. - account = yield self.hs.get_handlers().registration_handler.register() - app_service.sender = account[0] - - yield self.store.update_app_service(app_service) - defer.returnValue(app_service) - - @defer.inlineCallbacks - def unregister(self, token): - logger.info("Unregister as_token=%s", token) - yield self.store.unregister_app_service(token) - @defer.inlineCallbacks def notify_interested_services(self, event): """Notifies (pushes) all application services interested in this event. @@ -223,6 +189,3 @@ class ApplicationServicesHandler(object): exists = yield self.query_user_exists(user_id) defer.returnValue(exists) defer.returnValue(True) - - def _generate_hs_token(self): - return stringutils.random_string(24) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 93304a745f..fe9372a7c6 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import yaml from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import StoreError from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -27,141 +27,18 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) -def log_failure(failure): - logger.error("Failed to detect application services: %s", failure.value) - logger.error(failure.getTraceback()) - - class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.services_cache = [] - self.cache_defer = self._populate_appservice_cache() - self.cache_defer.addErrback(log_failure) - - @defer.inlineCallbacks - def unregister_app_service(self, token): - """Unregisters this service. - - This removes all AS specific regex and the base URL. The token is the - only thing preserved for future registration attempts. - """ - yield self.cache_defer # make sure the cache is ready - yield self.runInteraction( - "unregister_app_service", - self._unregister_app_service_txn, - token, - ) - # update cache TODO: Should this be in the txn? - for service in self.services_cache: - if service.token == token: - service.url = None - service.namespaces = None - service.hs_token = None - - def _unregister_app_service_txn(self, txn, token): - # kill the url to prevent pushes - txn.execute( - "UPDATE application_services SET url=NULL WHERE token=?", - (token,) - ) - - # cleanup regex - as_id = self._get_as_id_txn(txn, token) - if not as_id: - logger.warning( - "unregister_app_service_txn: Failed to find as_id for token=", - token - ) - return False - - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) - ) - return True - - @defer.inlineCallbacks - def update_app_service(self, service): - """Update an application service, clobbering what was previously there. - - Args: - service(ApplicationService): The updated service. - """ - yield self.cache_defer # make sure the cache is ready - - # NB: There is no "insert" since we provide no public-facing API to - # allocate new ASes. It relies on the server admin inserting the AS - # token into the database manually. - - if not service.token or not service.url: - raise StoreError(400, "Token and url must be specified.") - - if not service.hs_token: - raise StoreError(500, "No HS token") - - as_id = yield self.runInteraction( - "update_app_service", - self._update_app_service_txn, - service + self._populate_appservice_cache( + hs.config.app_service_config_files ) - service.id = as_id - # update cache TODO: Should this be in the txn? - for (index, cache_service) in enumerate(self.services_cache): - if service.token == cache_service.token: - self.services_cache[index] = service - logger.info("Updated: %s", service) - return - # new entry - self.services_cache.append(service) - logger.info("Updated(new): %s", service) - - def _update_app_service_txn(self, txn, service): - as_id = self._get_as_id_txn(txn, service.token) - if not as_id: - logger.warning( - "update_app_service_txn: Failed to find as_id for token=", - service.token - ) - return - - txn.execute( - "UPDATE application_services SET url=?, hs_token=?, sender=? " - "WHERE id=?", - (service.url, service.hs_token, service.sender, as_id,) - ) - # cleanup regex - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) - ) - for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST): - if ns_str in service.namespaces: - for regex_obj in service.namespaces[ns_str]: - txn.execute( - "INSERT INTO application_services_regex(" - "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, json.dumps(regex_obj)) - ) - return as_id - - def _get_as_id_txn(self, txn, token): - cursor = txn.execute( - "SELECT id FROM application_services WHERE token=?", - (token,) - ) - res = cursor.fetchone() - if res: - return res[0] - - @defer.inlineCallbacks def get_app_services(self): - yield self.cache_defer # make sure the cache is ready - defer.returnValue(self.services_cache) + defer.succeed(self.services_cache) - @defer.inlineCallbacks def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -175,37 +52,24 @@ class ApplicationServiceStore(SQLBaseStore): Returns: synapse.appservice.ApplicationService or None. """ - - yield self.cache_defer # make sure the cache is ready - for service in self.services_cache: if service.sender == user_id: - defer.returnValue(service) + defer.succeed(service) return - defer.returnValue(None) + defer.succeed(None) - @defer.inlineCallbacks - def get_app_service_by_token(self, token, from_cache=True): + def get_app_service_by_token(self, token): """Get the application service with the given appservice token. Args: token (str): The application service token. - from_cache (bool): True to get this service from the cache, False to - check the database. - Raises: - StoreError if there was a problem retrieving this service. + Returns: + synapse.appservice.ApplicationService or None. """ - yield self.cache_defer # make sure the cache is ready - - if from_cache: - for service in self.services_cache: - if service.token == token: - defer.returnValue(service) - return - defer.returnValue(None) - - # TODO: The from_cache=False impl - # TODO: This should be JOINed with the application_services_regex table. + for service in self.services_cache: + if service.token == token: + return defer.succeed(service) + defer.succeed(None) def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -336,18 +200,53 @@ class ApplicationServiceStore(SQLBaseStore): )) return service_list - @defer.inlineCallbacks - def _populate_appservice_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " - "application_services_regex AS r ON a.id = r.as_id") - - results = yield self._execute_and_decode("appservice_cache", sql) - services = self._parse_services_dict(results) + def _load_appservice(self, as_info): + required_string_fields = ["url", "as_token", "hs_token", "sender"] + for field in required_string_fields: + if not isinstance(as_info.get(field), basestring): + raise KeyError("Required string field: '%s'", field) + + # namespace checks + if not isinstance(as_info.get("namespaces"), dict): + raise KeyError("Requires 'namespaces' object.") + for ns in ApplicationService.NS_LIST: + # specific namespaces are optional + if ns in as_info["namespaces"]: + # expect a list of dicts with exclusive and regex keys + for regex_obj in as_info["namespaces"][ns]: + if not isinstance(regex_obj, dict): + raise ValueError( + "Expected namespace entry in %s to be an object," + " but got %s", ns, regex_obj + ) + if not isinstance(regex_obj.get("regex"), basestring): + raise ValueError( + "Missing/bad type 'regex' key in %s", regex_obj + ) + if not isinstance(regex_obj.get("exclusive"), bool): + raise ValueError( + "Missing/bad type 'exclusive' key in %s", regex_obj + ) + return ApplicationService( + token=as_info["as_token"], + url=as_info["url"], + namespaces=as_info["namespaces"], + hs_token=as_info["hs_token"], + sender=as_info["sender"] + ) - for service in services: - logger.info("Found application service: %s", service) - self.services_cache.append(service) + def _populate_appservice_cache(self, config_files): + """Populates a cache of Application Services from the config files.""" + for config_file in config_files: + try: + with open(config_file, 'r') as f: + as_info = yaml.load(f) + appservice = self._load_appservice(as_info) + logger.info("Loaded application service: %s", appservice) + self.services_cache.append(appservice) + except Exception as e: + logger.error("Failed to load appservice from '%s'", config_file) + logger.exception(e) class ApplicationServiceTransactionStore(SQLBaseStore): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index e79599f7fb..82bfea15a6 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -49,45 +49,6 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): # must be done after inserts self.store = ApplicationServiceStore(hs) - @defer.inlineCallbacks - def test_update_and_retrieval_of_service(self): - url = "https://matrix.org/appservices/foobar" - hs_token = "hstok" - user_regex = [ - {"regex": "@foobar_.*:matrix.org", "exclusive": True} - ] - alias_regex = [ - {"regex": "#foobar_.*:matrix.org", "exclusive": False} - ] - room_regex = [ - - ] - service = ApplicationService( - url=url, hs_token=hs_token, token=self.as_token, namespaces={ - ApplicationService.NS_USERS: user_regex, - ApplicationService.NS_ALIASES: alias_regex, - ApplicationService.NS_ROOMS: room_regex - }) - yield self.store.update_app_service(service) - - stored_service = yield self.store.get_app_service_by_token( - self.as_token - ) - self.assertEquals(stored_service.token, self.as_token) - self.assertEquals(stored_service.url, url) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_ALIASES], - alias_regex - ) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_ROOMS], - room_regex - ) - self.assertEquals( - stored_service.namespaces[ApplicationService.NS_USERS], - user_regex - ) - @defer.inlineCallbacks def test_retrieve_unknown_service_token(self): service = yield self.store.get_app_service_by_token("invalid_token") -- cgit 1.5.1 From b59aa745560608c8503421bd9542c99fc1c571b5 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 11:35:45 +0100 Subject: Fix tests and missing returns on deferreds. --- synapse/appservice/__init__.py | 2 +- synapse/storage/appservice.py | 18 +++++++++++------- tests/storage/test_appservice.py | 41 ++++++++++++++++++++++++++++------------ 3 files changed, 41 insertions(+), 20 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index c60db16b74..a8108c1efb 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -95,7 +95,7 @@ class ApplicationService(object): # rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...], # } if not namespaces: - return None + namespaces = {} for ns in ApplicationService.NS_LIST: if ns not in namespaces: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index fe9372a7c6..a520a859d3 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): ) def get_app_services(self): - defer.succeed(self.services_cache) + return defer.succeed(self.services_cache) def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -54,9 +54,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.sender == user_id: - defer.succeed(service) - return - defer.succeed(None) + return defer.succeed(service) + return defer.succeed(None) def get_app_service_by_token(self, token): """Get the application service with the given appservice token. @@ -69,7 +68,7 @@ class ApplicationServiceStore(SQLBaseStore): for service in self.services_cache: if service.token == token: return defer.succeed(service) - defer.succeed(None) + return defer.succeed(None) def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -237,11 +236,16 @@ class ApplicationServiceStore(SQLBaseStore): def _populate_appservice_cache(self, config_files): """Populates a cache of Application Services from the config files.""" + if not isinstance(config_files, list): + logger.warning( + "Expected %s to be a list of AS config files.", config_files + ) + return + for config_file in config_files: try: with open(config_file, 'r') as f: - as_info = yaml.load(f) - appservice = self._load_appservice(as_info) + appservice = self._load_appservice(yaml.load(f)) logger.info("Loaded application service: %s", appservice) self.services_cache.append(appservice) except Exception as e: diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 82bfea15a6..b856438fd2 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -22,6 +22,8 @@ from synapse.storage.appservice import ( ) import json +import os +import yaml from mock import Mock from tests.utils import SQLiteMemoryDbPool, MockClock @@ -30,25 +32,40 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): + self.as_yaml_files = [] db_pool = SQLiteMemoryDbPool() yield db_pool.prepare() hs = HomeServer( - "test", db_pool=db_pool, clock=MockClock(), config=Mock() + "test", db_pool=db_pool, clock=MockClock(), + config=Mock( + app_service_config_files=self.as_yaml_files + ) ) + self.as_token = "token1" - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", - (self.as_token,) - ) - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", ("token2",) - ) - db_pool.runQuery( - "INSERT INTO application_services(token) VALUES(?)", ("token3",) - ) + self.as_url = "some_url" + self._add_appservice(self.as_token, self.as_url, "some_hs_token", "bob") + self._add_appservice("token2", "some_url", "some_hs_token", "bob") + self._add_appservice("token3", "some_url", "some_hs_token", "bob") # must be done after inserts self.store = ApplicationServiceStore(hs) + def tearDown(self): + # TODO: suboptimal that we need to create files for tests! + for f in self.as_yaml_files: + try: + os.remove(f) + except: + pass + + def _add_appservice(self, as_token, url, hs_token, sender): + as_yaml = dict(url=url, as_token=as_token, hs_token=hs_token, + sender=sender, namespaces={}) + # use the token as the filename + with open(as_token, 'w') as outfile: + outfile.write(yaml.dump(as_yaml)) + self.as_yaml_files.append(as_token) + @defer.inlineCallbacks def test_retrieve_unknown_service_token(self): service = yield self.store.get_app_service_by_token("invalid_token") @@ -60,7 +77,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): self.as_token ) self.assertEquals(stored_service.token, self.as_token) - self.assertEquals(stored_service.url, None) + self.assertEquals(stored_service.url, self.as_url) self.assertEquals( stored_service.namespaces[ApplicationService.NS_ALIASES], [] -- cgit 1.5.1 From c217504949a90712f41a0422215f923b4d114a17 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 12:07:56 +0100 Subject: Edit SQL schema to use string IDs not ints. Use token as ID. Update tests. --- synapse/storage/appservice.py | 23 ++++++++----- .../storage/schema/delta/15/appservice_txns.sql | 7 ++-- tests/storage/test_appservice.py | 38 +++++++++++++--------- 3 files changed, 40 insertions(+), 28 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a520a859d3..a8780eca1e 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -231,7 +231,8 @@ class ApplicationServiceStore(SQLBaseStore): url=as_info["url"], namespaces=as_info["namespaces"], hs_token=as_info["hs_token"], - sender=as_info["sender"] + sender=as_info["sender"], + id=as_info["as_token"] # the token is the only unique thing here ) def _populate_appservice_cache(self, config_files): @@ -268,16 +269,20 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to a list of ApplicationServices, which may be empty. """ - sql = ( - "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" - " application_services AS a ON a.id=s.as_id LEFT JOIN" - " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" - ) - results = yield self._execute_and_decode( - "get_appservices_by_state", sql, state + results = yield self._simple_select_list( + "application_services_state", + dict(state=state), + ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore - defer.returnValue(self._parse_services_dict(results)) + as_list = yield self.get_app_services() + services = [] + + for res in results: + for service in as_list: + if service.id == res["as_id"]: + services.append(service) + defer.returnValue(services) @defer.inlineCallbacks def get_appservice_state(self, service): diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 13bbb2de2e..2b27e2a429 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -14,14 +14,13 @@ */ CREATE TABLE IF NOT EXISTS application_services_state( - as_id INTEGER PRIMARY KEY, + as_id TEXT PRIMARY KEY, state TEXT, - last_txn TEXT, - FOREIGN KEY(as_id) REFERENCES application_services(id) + last_txn TEXT ); CREATE TABLE IF NOT EXISTS application_services_txns( - as_id INTEGER NOT NULL, + as_id TEXT NOT NULL, txn_id INTEGER NOT NULL, event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index b856438fd2..58551e40b9 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -101,42 +101,48 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): + self.as_yaml_files = [] self.db_pool = SQLiteMemoryDbPool() yield self.db_pool.prepare() - hs = HomeServer( - "test", db_pool=self.db_pool, clock=MockClock(), config=Mock() - ) self.as_list = [ { "token": "token1", "url": "https://matrix-as.org", - "id": 3 + "id": "token1" }, { "token": "alpha_tok", "url": "https://alpha.com", - "id": 5 + "id": "alpha_tok" }, { "token": "beta_tok", "url": "https://beta.com", - "id": 6 + "id": "beta_tok" }, { "token": "delta_tok", "url": "https://delta.com", - "id": 7 + "id": "delta_tok" }, ] for s in self.as_list: - yield self._add_service(s["id"], s["url"], s["token"]) - self.store = TestTransactionStore(hs) + yield self._add_service(s["url"], s["token"]) - def _add_service(self, as_id, url, token): - return self.db_pool.runQuery( - "INSERT INTO application_services(id, url, token) VALUES(?,?,?)", - (as_id, url, token) + hs = HomeServer( + "test", db_pool=self.db_pool, clock=MockClock(), config=Mock( + app_service_config_files=self.as_yaml_files + ) ) + self.store = TestTransactionStore(hs) + + def _add_service(self, url, as_token): + as_yaml = dict(url=url, as_token=as_token, hs_token="something", + sender="a_sender", namespaces={}) + # use the token as the filename + with open(as_token, 'w') as outfile: + outfile.write(yaml.dump(as_yaml)) + self.as_yaml_files.append(as_token) def _set_state(self, id, state, txn=None): return self.db_pool.runQuery( @@ -388,8 +394,10 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ApplicationServiceState.DOWN ) self.assertEquals(2, len(services)) - self.assertEquals(self.as_list[2]["id"], services[0].id) - self.assertEquals(self.as_list[0]["id"], services[1].id) + self.assertEquals( + set([self.as_list[2]["id"], self.as_list[0]["id"]]), + set([services[0].id, services[1].id]) + ) # required for ApplicationServiceTransactionStoreTestCase tests -- cgit 1.5.1 From cf1fa59f4b72dbf5c9d735eaf051f1456721d91f Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 13:48:03 +0100 Subject: Use a sender localpart instead of a user ID. Form the user ID at runtime instead, This gives less room for error in AS config files since they cannot specify the domain of another HS. --- synapse/storage/appservice.py | 11 +++++++++-- tests/storage/test_appservice.py | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a8780eca1e..557e377ca5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser +from synapse.types import UserID from ._base import SQLBaseStore @@ -31,6 +32,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) + self.hostname = hs.hostname self.services_cache = [] self._populate_appservice_cache( hs.config.app_service_config_files @@ -200,11 +202,16 @@ class ApplicationServiceStore(SQLBaseStore): return service_list def _load_appservice(self, as_info): - required_string_fields = ["url", "as_token", "hs_token", "sender"] + required_string_fields = [ + "url", "as_token", "hs_token", "sender_localpart" + ] for field in required_string_fields: if not isinstance(as_info.get(field), basestring): raise KeyError("Required string field: '%s'", field) + user = UserID(as_info["sender_localpart"], self.hostname) + user_id = user.to_string() + # namespace checks if not isinstance(as_info.get("namespaces"), dict): raise KeyError("Requires 'namespaces' object.") @@ -231,7 +238,7 @@ class ApplicationServiceStore(SQLBaseStore): url=as_info["url"], namespaces=as_info["namespaces"], hs_token=as_info["hs_token"], - sender=as_info["sender"], + sender=user_id, id=as_info["as_token"] # the token is the only unique thing here ) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 58551e40b9..675959c56c 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -60,7 +60,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): def _add_appservice(self, as_token, url, hs_token, sender): as_yaml = dict(url=url, as_token=as_token, hs_token=hs_token, - sender=sender, namespaces={}) + sender_localpart=sender, namespaces={}) # use the token as the filename with open(as_token, 'w') as outfile: outfile.write(yaml.dump(as_yaml)) @@ -138,7 +138,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): def _add_service(self, url, as_token): as_yaml = dict(url=url, as_token=as_token, hs_token="something", - sender="a_sender", namespaces={}) + sender_localpart="a_sender", namespaces={}) # use the token as the filename with open(as_token, 'w') as outfile: outfile.write(yaml.dump(as_yaml)) -- cgit 1.5.1 From 5e88a09a424b8ce65bfe9a809cfd245286474de3 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 14:00:25 +0100 Subject: Add same user_id char checks as registration. --- synapse/storage/appservice.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 557e377ca5..f8cbb3f323 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import urllib import yaml from simplejson import JSONDecodeError import simplejson as json @@ -209,7 +210,12 @@ class ApplicationServiceStore(SQLBaseStore): if not isinstance(as_info.get(field), basestring): raise KeyError("Required string field: '%s'", field) - user = UserID(as_info["sender_localpart"], self.hostname) + localpart = as_info["sender_localpart"] + if urllib.quote(localpart) != localpart: + raise ValueError( + "sender_localpart needs characters which are not URL encoded." + ) + user = UserID(localpart, self.hostname) user_id = user.to_string() # namespace checks -- cgit 1.5.1 From b8092fbc82edb3c7d8aa09f0756fa853ad6a6ad8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Apr 2015 11:17:52 +0100 Subject: Go back to storing JSON in TEXT --- synapse/storage/_base.py | 3 --- synapse/storage/appservice.py | 4 +--- synapse/storage/engines/postgres.py | 3 --- synapse/storage/engines/sqlite3.py | 5 ----- synapse/storage/events.py | 16 ++++++++-------- synapse/storage/profile.py | 5 +---- synapse/storage/registration.py | 5 ----- synapse/storage/schema/delta/15/appservice_txns.sql | 2 +- synapse/storage/schema/full_schemas/11/im.sql | 8 ++++---- .../schema/full_schemas/16/application_services.sql | 2 +- synapse/storage/schema/full_schemas/16/im.sql | 8 ++++---- 11 files changed, 20 insertions(+), 41 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5ec1d2613e..f5952d1fc0 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -818,9 +818,6 @@ class SQLBaseStore(object): internal_metadata, js, redacted, rejected_reason = res - internal_metadata = self.database_engine.load_unicode(internal_metadata) - js = self.database_engine.load_unicode(js) - start_time = update_counter("select_event", start_time) result = self._get_event_from_row_txn( diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 40e05b3635..63d1af4e86 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -366,9 +366,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): new_txn_id = max(highest_txn_id, last_txn_id) + 1 # Insert new txn into txn table - event_ids = buffer( - json.dumps([e.event_id for e in events]).encode("utf8") - ) + event_ids = json.dumps([e.event_id for e in events]) txn.execute( "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 457c1f70a5..6f75245fa7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -39,6 +39,3 @@ class PostgresEngine(object): if isinstance(error, self.module.DatabaseError): return error.pgcode in ["40001", "40P01"] return False - - def load_unicode(self, v): - return bytes(v).decode("UTF8") diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index 389df35eb5..dd0d8e0e0f 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -37,8 +37,3 @@ class Sqlite3Engine(object): def is_deadlock(self, error): return False - - def load_unicode(self, v): - if isinstance(v, types.UnicodeType): - return v - return bytes(v).decode("UTF8") diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0373d152b2..7dbf7a396a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -168,7 +168,7 @@ class EventsStore(SQLBaseStore): metadata_json = encode_canonical_json( event.internal_metadata.get_dict() - ) + ).decode("UTF-8") # If we have already persisted this event, we don't need to do any # more processing. @@ -184,7 +184,7 @@ class EventsStore(SQLBaseStore): ) txn.execute( sql, - (buffer(metadata_json), event.event_id,) + (metadata_json, event.event_id,) ) sql = ( @@ -229,14 +229,14 @@ class EventsStore(SQLBaseStore): values={ "event_id": event.event_id, "room_id": event.room_id, - "internal_metadata": buffer(metadata_json), - "json": buffer(encode_canonical_json(event_dict)), + "internal_metadata": metadata_json, + "json": encode_canonical_json(event_dict).decode("UTF-8"), }, ) - content = buffer(encode_canonical_json( + content = encode_canonical_json( event.content - )) + ).decode("UTF-8") vals = { "topological_ordering": event.depth, @@ -261,9 +261,9 @@ class EventsStore(SQLBaseStore): ] } - vals["unrecognized_keys"] = buffer(encode_canonical_json( + vals["unrecognized_keys"] = encode_canonical_json( unrec - )) + ).decode("UTF-8") sql = ( "INSERT INTO events" diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index e33963d0b4..047698aa13 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -35,16 +35,13 @@ class ProfileStore(SQLBaseStore): desc="get_profile_displayname", ) - if name: - name = self.database_engine.load_unicode(name) - defer.returnValue(name) def set_profile_displayname(self, user_localpart, new_displayname): return self._simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname.encode("utf8")}, + updatevalues={"displayname": new_displayname}, desc="set_profile_displayname", ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 8a63fe4691..2a5c5080e4 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -98,11 +98,6 @@ class RegistrationStore(SQLBaseStore): allow_none=True, ) - if user_info: - user_info["password_hash"] = self.database_engine.load_unicode( - user_info["password_hash"] - ) - defer.returnValue(user_info) @cached() diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index ddea8fc693..1c3324f415 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -22,7 +22,7 @@ CREATE TABLE IF NOT EXISTS application_services_state( CREATE TABLE IF NOT EXISTS application_services_txns( as_id VARCHAR(150) NOT NULL, txn_id INTEGER NOT NULL, - event_ids LONGBLOB NOT NULL, + event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ); diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql index e9e09214d7..addbec5885 100644 --- a/synapse/storage/schema/full_schemas/11/im.sql +++ b/synapse/storage/schema/full_schemas/11/im.sql @@ -19,8 +19,8 @@ CREATE TABLE IF NOT EXISTS events( event_id VARCHAR(150) NOT NULL, type VARCHAR(150) NOT NULL, room_id VARCHAR(150) NOT NULL, - content bytea NOT NULL, - unrecognized_keys bytea, + content TEXT NOT NULL, + unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, @@ -35,8 +35,8 @@ CREATE INDEX events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( event_id VARCHAR(150) NOT NULL, room_id VARCHAR(150) NOT NULL, - internal_metadata bytea NOT NULL, - json bytea NOT NULL, + internal_metadata TEXT NOT NULL, + json TEXT NOT NULL, UNIQUE (event_id) ); diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql index f08c5bcf76..5d63d57d59 100644 --- a/synapse/storage/schema/full_schemas/16/application_services.sql +++ b/synapse/storage/schema/full_schemas/16/application_services.sql @@ -39,7 +39,7 @@ CREATE TABLE IF NOT EXISTS application_services_state( CREATE TABLE IF NOT EXISTS application_services_txns( as_id VARCHAR(150) NOT NULL, txn_id INTEGER NOT NULL, - event_ids bytea NOT NULL, + event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ); diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql index 17e4c949b9..5b4b494484 100644 --- a/synapse/storage/schema/full_schemas/16/im.sql +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -19,8 +19,8 @@ CREATE TABLE IF NOT EXISTS events( event_id VARCHAR(150) NOT NULL, type VARCHAR(150) NOT NULL, room_id VARCHAR(150) NOT NULL, - content bytea NOT NULL, - unrecognized_keys bytea, + content TEXT NOT NULL, + unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, @@ -39,8 +39,8 @@ CREATE INDEX events_order_room ON events ( CREATE TABLE IF NOT EXISTS event_json( event_id VARCHAR(150) NOT NULL, room_id VARCHAR(150) NOT NULL, - internal_metadata bytea NOT NULL, - json bytea NOT NULL, + internal_metadata TEXT NOT NULL, + json TEXT NOT NULL, UNIQUE (event_id) ); -- cgit 1.5.1 From d76c058eea60ffc63ff7427e1de1e142a4b5b188 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Apr 2015 16:30:25 +0100 Subject: Fix invalid SQL to work in postgres land --- synapse/storage/appservice.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 63d1af4e86..bd285a6699 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -442,14 +442,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) result = txn.execute( - "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", + "SELECT * FROM application_services_txns WHERE as_id=?" + " ORDER BY txn_id ASC LIMIT 1", (service.id,) ) - entry = self.cursor_to_dict(result)[0] - if not entry or entry["txn_id"] is None: - # the min(txn_id) part will force a row, so entry may not be None + rows = self.cursor_to_dict(result) + if not rows: return None + entry = rows[0] + event_ids = json.loads(entry["event_ids"]) events = self._get_events_txn(txn, event_ids) -- cgit 1.5.1 From 884fb88e286008a1a8fef902cec928fad4a9cac9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Apr 2015 16:35:20 +0100 Subject: txn.execute doesn't return cursors --- synapse/storage/appservice.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index bd285a6699..e133cf5550 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -441,12 +441,12 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def _get_oldest_unsent_txn(self, txn, service): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) - result = txn.execute( + txn.execute( "SELECT * FROM application_services_txns WHERE as_id=?" " ORDER BY txn_id ASC LIMIT 1", (service.id,) ) - rows = self.cursor_to_dict(result) + rows = self.cursor_to_dict(txn) if not rows: return None -- cgit 1.5.1 From 0337eaf321a8519264f25a7ad14ee2e162a535b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Apr 2015 16:43:39 +0100 Subject: txn.execute doesn't return cursors --- synapse/storage/appservice.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage/appservice.py') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e133cf5550..39b7881c40 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -355,11 +355,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # being sent) last_txn_id = self._get_last_txn(txn, service.id) - result = txn.execute( + txn.execute( "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", (service.id,) ) - highest_txn_id = result.fetchone()[0] + highest_txn_id = txn.fetchone()[0] if highest_txn_id is None: highest_txn_id = 0 @@ -460,11 +460,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): ) def _get_last_txn(self, txn, service_id): - result = txn.execute( + txn.execute( "SELECT last_txn FROM application_services_state WHERE as_id=?", (service_id,) ) - last_txn_id = result.fetchone() + last_txn_id = txn.fetchone() if last_txn_id is None or last_txn_id[0] is None: # no row exists return 0 else: -- cgit 1.5.1