diff options
author | Mark Haines <mjark@negativecurvature.net> | 2015-02-13 15:06:14 +0000 |
---|---|---|
committer | Mark Haines <mjark@negativecurvature.net> | 2015-02-13 15:06:14 +0000 |
commit | 0d872f5aa6a81a31a83205210414c6e6f362e226 (patch) | |
tree | 2d7c2eeab3df724753d2b2ee358551a780048e25 /synapse/storage | |
parent | Merge pull request #72 from matrix-org/in_memory_sqlite_for_testing (diff) | |
parent | Fix tests which broke when event caching was introduced. (diff) | |
download | synapse-0d872f5aa6a81a31a83205210414c6e6f362e226.tar.xz |
Merge pull request #50 from matrix-org/application-services
Application Services
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 3 | ||||
-rw-r--r-- | synapse/storage/appservice.py | 244 | ||||
-rw-r--r-- | synapse/storage/schema/application_services.sql | 34 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v14.sql | 34 |
4 files changed, 315 insertions, 0 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 1c22e19ab0..c6e96b842f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.api.constants import EventTypes +from .appservice import ApplicationServiceStore from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -65,6 +66,7 @@ SCHEMAS = [ "event_signatures", "pusher", "media_repository", + "application_services", "filtering", "rejections", ] @@ -86,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, + ApplicationServiceStore, EventFederationStore, MediaRepositoryStore, RejectionsStore, diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py new file mode 100644 index 0000000000..d941b1f387 --- /dev/null +++ b/synapse/storage/appservice.py @@ -0,0 +1,244 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from twisted.internet import defer + +from synapse.api.errors import StoreError +from synapse.appservice import ApplicationService +from ._base import SQLBaseStore + + +logger = logging.getLogger(__name__) + + +class ApplicationServiceCache(object): + """Caches ApplicationServices and provides utility functions on top. + + This class is designed to be invoked on incoming events in order to avoid + hammering the database every time to extract a list of application service + regexes. + """ + + def __init__(self): + self.services = [] + + +class ApplicationServiceStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceStore, self).__init__(hs) + self.cache = ApplicationServiceCache() + self.cache_defer = self._populate_cache() + + @defer.inlineCallbacks + def unregister_app_service(self, token): + """Unregisters this service. + + This removes all AS specific regex and the base URL. The token is the + only thing preserved for future registration attempts. + """ + yield self.cache_defer # make sure the cache is ready + yield self.runInteraction( + "unregister_app_service", + self._unregister_app_service_txn, + token, + ) + # update cache TODO: Should this be in the txn? + for service in self.cache.services: + if service.token == token: + service.url = None + service.namespaces = None + service.hs_token = None + + def _unregister_app_service_txn(self, txn, token): + # kill the url to prevent pushes + txn.execute( + "UPDATE application_services SET url=NULL WHERE token=?", + (token,) + ) + + # cleanup regex + as_id = self._get_as_id_txn(txn, token) + if not as_id: + logger.warning( + "unregister_app_service_txn: Failed to find as_id for token=", + token + ) + return False + + txn.execute( + "DELETE FROM application_services_regex WHERE as_id=?", + (as_id,) + ) + return True + + @defer.inlineCallbacks + def update_app_service(self, service): + """Update an application service, clobbering what was previously there. + + Args: + service(ApplicationService): The updated service. + """ + yield self.cache_defer # make sure the cache is ready + + # NB: There is no "insert" since we provide no public-facing API to + # allocate new ASes. It relies on the server admin inserting the AS + # token into the database manually. + + if not service.token or not service.url: + raise StoreError(400, "Token and url must be specified.") + + if not service.hs_token: + raise StoreError(500, "No HS token") + + yield self.runInteraction( + "update_app_service", + self._update_app_service_txn, + service + ) + + # update cache TODO: Should this be in the txn? + for (index, cache_service) in enumerate(self.cache.services): + if service.token == cache_service.token: + self.cache.services[index] = service + logger.info("Updated: %s", service) + return + # new entry + self.cache.services.append(service) + logger.info("Updated(new): %s", service) + + def _update_app_service_txn(self, txn, service): + as_id = self._get_as_id_txn(txn, service.token) + if not as_id: + logger.warning( + "update_app_service_txn: Failed to find as_id for token=", + service.token + ) + return False + + txn.execute( + "UPDATE application_services SET url=?, hs_token=?, sender=? " + "WHERE id=?", + (service.url, service.hs_token, service.sender, as_id,) + ) + # cleanup regex + txn.execute( + "DELETE FROM application_services_regex WHERE as_id=?", + (as_id,) + ) + for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST): + if ns_str in service.namespaces: + for regex in service.namespaces[ns_str]: + txn.execute( + "INSERT INTO application_services_regex(" + "as_id, namespace, regex) values(?,?,?)", + (as_id, ns_int, regex) + ) + return True + + def _get_as_id_txn(self, txn, token): + cursor = txn.execute( + "SELECT id FROM application_services WHERE token=?", + (token,) + ) + res = cursor.fetchone() + if res: + return res[0] + + @defer.inlineCallbacks + def get_app_services(self): + yield self.cache_defer # make sure the cache is ready + defer.returnValue(self.cache.services) + + @defer.inlineCallbacks + def get_app_service_by_token(self, token, from_cache=True): + """Get the application service with the given token. + + Args: + token (str): The application service token. + from_cache (bool): True to get this service from the cache, False to + check the database. + Raises: + StoreError if there was a problem retrieving this service. + """ + yield self.cache_defer # make sure the cache is ready + + if from_cache: + for service in self.cache.services: + if service.token == token: + defer.returnValue(service) + return + defer.returnValue(None) + + # TODO: The from_cache=False impl + # TODO: This should be JOINed with the application_services_regex table. + + @defer.inlineCallbacks + def _populate_cache(self): + """Populates the ApplicationServiceCache from the database.""" + sql = ("SELECT * FROM application_services LEFT JOIN " + "application_services_regex ON application_services.id = " + "application_services_regex.as_id") + # SQL results in the form: + # [ + # { + # 'regex': "something", + # 'url': "something", + # 'namespace': enum, + # 'as_id': 0, + # 'token': "something", + # 'hs_token': "otherthing", + # 'id': 0 + # } + # ] + services = {} + results = yield self._execute_and_decode(sql) + for res in results: + as_token = res["token"] + if as_token not in services: + # add the service + services[as_token] = { + "url": res["url"], + "token": as_token, + "hs_token": res["hs_token"], + "sender": res["sender"], + "namespaces": { + ApplicationService.NS_USERS: [], + ApplicationService.NS_ALIASES: [], + ApplicationService.NS_ROOMS: [] + } + } + # add the namespace regex if one exists + ns_int = res["namespace"] + if ns_int is None: + continue + try: + services[as_token]["namespaces"][ + ApplicationService.NS_LIST[ns_int]].append( + res["regex"] + ) + except IndexError: + logger.error("Bad namespace enum '%s'. %s", ns_int, res) + + # TODO get last successful txn id f.e. service + for service in services.values(): + logger.info("Found application service: %s", service) + self.cache.services.append(ApplicationService( + token=service["token"], + url=service["url"], + namespaces=service["namespaces"], + hs_token=service["hs_token"], + sender=service["sender"] + )) diff --git a/synapse/storage/schema/application_services.sql b/synapse/storage/schema/application_services.sql new file mode 100644 index 0000000000..e491ad5aec --- /dev/null +++ b/synapse/storage/schema/application_services.sql @@ -0,0 +1,34 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT, + token TEXT, + hs_token TEXT, + sender TEXT, + UNIQUE(token) ON CONFLICT ROLLBACK +); + +CREATE TABLE IF NOT EXISTS application_services_regex( + id INTEGER PRIMARY KEY AUTOINCREMENT, + as_id INTEGER NOT NULL, + namespace INTEGER, /* enum[room_id|room_alias|user_id] */ + regex TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + + + diff --git a/synapse/storage/schema/delta/v14.sql b/synapse/storage/schema/delta/v14.sql new file mode 100644 index 0000000000..e491ad5aec --- /dev/null +++ b/synapse/storage/schema/delta/v14.sql @@ -0,0 +1,34 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT, + token TEXT, + hs_token TEXT, + sender TEXT, + UNIQUE(token) ON CONFLICT ROLLBACK +); + +CREATE TABLE IF NOT EXISTS application_services_regex( + id INTEGER PRIMARY KEY AUTOINCREMENT, + as_id INTEGER NOT NULL, + namespace INTEGER, /* enum[room_id|room_alias|user_id] */ + regex TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + + + |