summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-02-17 15:15:07 +0000
committerErik Johnston <erik@matrix.org>2015-02-17 15:15:07 +0000
commitc02da58a9da5c617c80fbeb7205f163e7b410c6f (patch)
tree8b166570eb2cbbecb7640d92d072f31add922535 /synapse/storage
parentConsume errors in time_bound_deferred (diff)
parentMerge pull request #74 from matrix-org/federation_min_depth_fix (diff)
downloadsynapse-c02da58a9da5c617c80fbeb7205f163e7b410c6f.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into failures
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py6
-rw-r--r--synapse/storage/appservice.py244
-rw-r--r--synapse/storage/schema/application_services.sql34
-rw-r--r--synapse/storage/schema/delta/v14.sql34
4 files changed, 318 insertions, 0 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 02b1f06854..c6e96b842f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
+from .appservice import ApplicationServiceStore
 from .directory import DirectoryStore
 from .feedback import FeedbackStore
 from .presence import PresenceStore
@@ -65,6 +66,7 @@ SCHEMAS = [
     "event_signatures",
     "pusher",
     "media_repository",
+    "application_services",
     "filtering",
     "rejections",
 ]
@@ -86,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
+                ApplicationServiceStore,
                 EventFederationStore,
                 MediaRepositoryStore,
                 RejectionsStore,
@@ -637,10 +640,13 @@ def prepare_database(db_conn):
                 c.executescript(sql_script)
 
             db_conn.commit()
+        else:
+            logger.info("Database is at version %r", user_version)
 
     else:
         sql_script = "BEGIN TRANSACTION;\n"
         for sql_loc in SCHEMAS:
+            logger.debug("Applying schema %r", sql_loc)
             sql_script += read_schema(sql_loc)
             sql_script += "\n"
         sql_script += "COMMIT TRANSACTION;"
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
new file mode 100644
index 0000000000..d941b1f387
--- /dev/null
+++ b/synapse/storage/appservice.py
@@ -0,0 +1,244 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+from synapse.appservice import ApplicationService
+from ._base import SQLBaseStore
+
+
+logger = logging.getLogger(__name__)
+
+
+class ApplicationServiceCache(object):
+    """Caches ApplicationServices and provides utility functions on top.
+
+    This class is designed to be invoked on incoming events in order to avoid
+    hammering the database every time to extract a list of application service
+    regexes.
+    """
+
+    def __init__(self):
+        self.services = []
+
+
+class ApplicationServiceStore(SQLBaseStore):
+
+    def __init__(self, hs):
+        super(ApplicationServiceStore, self).__init__(hs)
+        self.cache = ApplicationServiceCache()
+        self.cache_defer = self._populate_cache()
+
+    @defer.inlineCallbacks
+    def unregister_app_service(self, token):
+        """Unregisters this service.
+
+        This removes all AS specific regex and the base URL. The token is the
+        only thing preserved for future registration attempts.
+        """
+        yield self.cache_defer  # make sure the cache is ready
+        yield self.runInteraction(
+            "unregister_app_service",
+            self._unregister_app_service_txn,
+            token,
+        )
+        # update cache TODO: Should this be in the txn?
+        for service in self.cache.services:
+            if service.token == token:
+                service.url = None
+                service.namespaces = None
+                service.hs_token = None
+
+    def _unregister_app_service_txn(self, txn, token):
+        # kill the url to prevent pushes
+        txn.execute(
+            "UPDATE application_services SET url=NULL WHERE token=?",
+            (token,)
+        )
+
+        # cleanup regex
+        as_id = self._get_as_id_txn(txn, token)
+        if not as_id:
+            logger.warning(
+                "unregister_app_service_txn: Failed to find as_id for token=",
+                token
+            )
+            return False
+
+        txn.execute(
+            "DELETE FROM application_services_regex WHERE as_id=?",
+            (as_id,)
+        )
+        return True
+
+    @defer.inlineCallbacks
+    def update_app_service(self, service):
+        """Update an application service, clobbering what was previously there.
+
+        Args:
+            service(ApplicationService): The updated service.
+        """
+        yield self.cache_defer  # make sure the cache is ready
+
+        # NB: There is no "insert" since we provide no public-facing API to
+        # allocate new ASes. It relies on the server admin inserting the AS
+        # token into the database manually.
+
+        if not service.token or not service.url:
+            raise StoreError(400, "Token and url must be specified.")
+
+        if not service.hs_token:
+            raise StoreError(500, "No HS token")
+
+        yield self.runInteraction(
+            "update_app_service",
+            self._update_app_service_txn,
+            service
+        )
+
+        # update cache TODO: Should this be in the txn?
+        for (index, cache_service) in enumerate(self.cache.services):
+            if service.token == cache_service.token:
+                self.cache.services[index] = service
+                logger.info("Updated: %s", service)
+                return
+        # new entry
+        self.cache.services.append(service)
+        logger.info("Updated(new): %s", service)
+
+    def _update_app_service_txn(self, txn, service):
+        as_id = self._get_as_id_txn(txn, service.token)
+        if not as_id:
+            logger.warning(
+                "update_app_service_txn: Failed to find as_id for token=",
+                service.token
+            )
+            return False
+
+        txn.execute(
+            "UPDATE application_services SET url=?, hs_token=?, sender=? "
+            "WHERE id=?",
+            (service.url, service.hs_token, service.sender, as_id,)
+        )
+        # cleanup regex
+        txn.execute(
+            "DELETE FROM application_services_regex WHERE as_id=?",
+            (as_id,)
+        )
+        for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST):
+            if ns_str in service.namespaces:
+                for regex in service.namespaces[ns_str]:
+                    txn.execute(
+                        "INSERT INTO application_services_regex("
+                        "as_id, namespace, regex) values(?,?,?)",
+                        (as_id, ns_int, regex)
+                    )
+        return True
+
+    def _get_as_id_txn(self, txn, token):
+        cursor = txn.execute(
+            "SELECT id FROM application_services WHERE token=?",
+            (token,)
+        )
+        res = cursor.fetchone()
+        if res:
+            return res[0]
+
+    @defer.inlineCallbacks
+    def get_app_services(self):
+        yield self.cache_defer  # make sure the cache is ready
+        defer.returnValue(self.cache.services)
+
+    @defer.inlineCallbacks
+    def get_app_service_by_token(self, token, from_cache=True):
+        """Get the application service with the given token.
+
+        Args:
+            token (str): The application service token.
+            from_cache (bool): True to get this service from the cache, False to
+                               check the database.
+        Raises:
+            StoreError if there was a problem retrieving this service.
+        """
+        yield self.cache_defer  # make sure the cache is ready
+
+        if from_cache:
+            for service in self.cache.services:
+                if service.token == token:
+                    defer.returnValue(service)
+                    return
+            defer.returnValue(None)
+
+        # TODO: The from_cache=False impl
+        # TODO: This should be JOINed with the application_services_regex table.
+
+    @defer.inlineCallbacks
+    def _populate_cache(self):
+        """Populates the ApplicationServiceCache from the database."""
+        sql = ("SELECT * FROM application_services LEFT JOIN "
+               "application_services_regex ON application_services.id = "
+               "application_services_regex.as_id")
+        # SQL results in the form:
+        # [
+        #   {
+        #     'regex': "something",
+        #     'url': "something",
+        #     'namespace': enum,
+        #     'as_id': 0,
+        #     'token': "something",
+        #     'hs_token': "otherthing",
+        #     'id': 0
+        #   }
+        # ]
+        services = {}
+        results = yield self._execute_and_decode(sql)
+        for res in results:
+            as_token = res["token"]
+            if as_token not in services:
+                # add the service
+                services[as_token] = {
+                    "url": res["url"],
+                    "token": as_token,
+                    "hs_token": res["hs_token"],
+                    "sender": res["sender"],
+                    "namespaces": {
+                        ApplicationService.NS_USERS: [],
+                        ApplicationService.NS_ALIASES: [],
+                        ApplicationService.NS_ROOMS: []
+                    }
+                }
+            # add the namespace regex if one exists
+            ns_int = res["namespace"]
+            if ns_int is None:
+                continue
+            try:
+                services[as_token]["namespaces"][
+                    ApplicationService.NS_LIST[ns_int]].append(
+                    res["regex"]
+                )
+            except IndexError:
+                logger.error("Bad namespace enum '%s'. %s", ns_int, res)
+
+        # TODO get last successful txn id f.e. service
+        for service in services.values():
+            logger.info("Found application service: %s", service)
+            self.cache.services.append(ApplicationService(
+                token=service["token"],
+                url=service["url"],
+                namespaces=service["namespaces"],
+                hs_token=service["hs_token"],
+                sender=service["sender"]
+            ))
diff --git a/synapse/storage/schema/application_services.sql b/synapse/storage/schema/application_services.sql
new file mode 100644
index 0000000000..e491ad5aec
--- /dev/null
+++ b/synapse/storage/schema/application_services.sql
@@ -0,0 +1,34 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services(
+    id INTEGER PRIMARY KEY AUTOINCREMENT,
+    url TEXT,
+    token TEXT,
+    hs_token TEXT,
+    sender TEXT,
+    UNIQUE(token) ON CONFLICT ROLLBACK
+);
+
+CREATE TABLE IF NOT EXISTS application_services_regex(
+    id INTEGER PRIMARY KEY AUTOINCREMENT,
+    as_id INTEGER NOT NULL,
+    namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
+    regex TEXT,
+    FOREIGN KEY(as_id) REFERENCES application_services(id)
+);
+
+
+
diff --git a/synapse/storage/schema/delta/v14.sql b/synapse/storage/schema/delta/v14.sql
new file mode 100644
index 0000000000..e491ad5aec
--- /dev/null
+++ b/synapse/storage/schema/delta/v14.sql
@@ -0,0 +1,34 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services(
+    id INTEGER PRIMARY KEY AUTOINCREMENT,
+    url TEXT,
+    token TEXT,
+    hs_token TEXT,
+    sender TEXT,
+    UNIQUE(token) ON CONFLICT ROLLBACK
+);
+
+CREATE TABLE IF NOT EXISTS application_services_regex(
+    id INTEGER PRIMARY KEY AUTOINCREMENT,
+    as_id INTEGER NOT NULL,
+    namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
+    regex TEXT,
+    FOREIGN KEY(as_id) REFERENCES application_services(id)
+);
+
+
+