summary refs log tree commit diff
path: root/synapse/storage/appservice.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-02-17 15:15:07 +0000
committerErik Johnston <erik@matrix.org>2015-02-17 15:15:07 +0000
commitc02da58a9da5c617c80fbeb7205f163e7b410c6f (patch)
tree8b166570eb2cbbecb7640d92d072f31add922535 /synapse/storage/appservice.py
parentConsume errors in time_bound_deferred (diff)
parentMerge pull request #74 from matrix-org/federation_min_depth_fix (diff)
downloadsynapse-c02da58a9da5c617c80fbeb7205f163e7b410c6f.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into failures
Diffstat (limited to 'synapse/storage/appservice.py')
-rw-r--r--synapse/storage/appservice.py244
1 files changed, 244 insertions, 0 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
new file mode 100644
index 0000000000..d941b1f387
--- /dev/null
+++ b/synapse/storage/appservice.py
@@ -0,0 +1,244 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+from synapse.appservice import ApplicationService
+from ._base import SQLBaseStore
+
+
+logger = logging.getLogger(__name__)
+
+
+class ApplicationServiceCache(object):
+    """Caches ApplicationServices and provides utility functions on top.
+
+    This class is designed to be invoked on incoming events in order to avoid
+    hammering the database every time to extract a list of application service
+    regexes.
+    """
+
+    def __init__(self):
+        self.services = []
+
+
+class ApplicationServiceStore(SQLBaseStore):
+
+    def __init__(self, hs):
+        super(ApplicationServiceStore, self).__init__(hs)
+        self.cache = ApplicationServiceCache()
+        self.cache_defer = self._populate_cache()
+
+    @defer.inlineCallbacks
+    def unregister_app_service(self, token):
+        """Unregisters this service.
+
+        This removes all AS specific regex and the base URL. The token is the
+        only thing preserved for future registration attempts.
+        """
+        yield self.cache_defer  # make sure the cache is ready
+        yield self.runInteraction(
+            "unregister_app_service",
+            self._unregister_app_service_txn,
+            token,
+        )
+        # update cache TODO: Should this be in the txn?
+        for service in self.cache.services:
+            if service.token == token:
+                service.url = None
+                service.namespaces = None
+                service.hs_token = None
+
+    def _unregister_app_service_txn(self, txn, token):
+        # kill the url to prevent pushes
+        txn.execute(
+            "UPDATE application_services SET url=NULL WHERE token=?",
+            (token,)
+        )
+
+        # cleanup regex
+        as_id = self._get_as_id_txn(txn, token)
+        if not as_id:
+            logger.warning(
+                "unregister_app_service_txn: Failed to find as_id for token=",
+                token
+            )
+            return False
+
+        txn.execute(
+            "DELETE FROM application_services_regex WHERE as_id=?",
+            (as_id,)
+        )
+        return True
+
+    @defer.inlineCallbacks
+    def update_app_service(self, service):
+        """Update an application service, clobbering what was previously there.
+
+        Args:
+            service(ApplicationService): The updated service.
+        """
+        yield self.cache_defer  # make sure the cache is ready
+
+        # NB: There is no "insert" since we provide no public-facing API to
+        # allocate new ASes. It relies on the server admin inserting the AS
+        # token into the database manually.
+
+        if not service.token or not service.url:
+            raise StoreError(400, "Token and url must be specified.")
+
+        if not service.hs_token:
+            raise StoreError(500, "No HS token")
+
+        yield self.runInteraction(
+            "update_app_service",
+            self._update_app_service_txn,
+            service
+        )
+
+        # update cache TODO: Should this be in the txn?
+        for (index, cache_service) in enumerate(self.cache.services):
+            if service.token == cache_service.token:
+                self.cache.services[index] = service
+                logger.info("Updated: %s", service)
+                return
+        # new entry
+        self.cache.services.append(service)
+        logger.info("Updated(new): %s", service)
+
+    def _update_app_service_txn(self, txn, service):
+        as_id = self._get_as_id_txn(txn, service.token)
+        if not as_id:
+            logger.warning(
+                "update_app_service_txn: Failed to find as_id for token=",
+                service.token
+            )
+            return False
+
+        txn.execute(
+            "UPDATE application_services SET url=?, hs_token=?, sender=? "
+            "WHERE id=?",
+            (service.url, service.hs_token, service.sender, as_id,)
+        )
+        # cleanup regex
+        txn.execute(
+            "DELETE FROM application_services_regex WHERE as_id=?",
+            (as_id,)
+        )
+        for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST):
+            if ns_str in service.namespaces:
+                for regex in service.namespaces[ns_str]:
+                    txn.execute(
+                        "INSERT INTO application_services_regex("
+                        "as_id, namespace, regex) values(?,?,?)",
+                        (as_id, ns_int, regex)
+                    )
+        return True
+
+    def _get_as_id_txn(self, txn, token):
+        cursor = txn.execute(
+            "SELECT id FROM application_services WHERE token=?",
+            (token,)
+        )
+        res = cursor.fetchone()
+        if res:
+            return res[0]
+
+    @defer.inlineCallbacks
+    def get_app_services(self):
+        yield self.cache_defer  # make sure the cache is ready
+        defer.returnValue(self.cache.services)
+
+    @defer.inlineCallbacks
+    def get_app_service_by_token(self, token, from_cache=True):
+        """Get the application service with the given token.
+
+        Args:
+            token (str): The application service token.
+            from_cache (bool): True to get this service from the cache, False to
+                               check the database.
+        Raises:
+            StoreError if there was a problem retrieving this service.
+        """
+        yield self.cache_defer  # make sure the cache is ready
+
+        if from_cache:
+            for service in self.cache.services:
+                if service.token == token:
+                    defer.returnValue(service)
+                    return
+            defer.returnValue(None)
+
+        # TODO: The from_cache=False impl
+        # TODO: This should be JOINed with the application_services_regex table.
+
+    @defer.inlineCallbacks
+    def _populate_cache(self):
+        """Populates the ApplicationServiceCache from the database."""
+        sql = ("SELECT * FROM application_services LEFT JOIN "
+               "application_services_regex ON application_services.id = "
+               "application_services_regex.as_id")
+        # SQL results in the form:
+        # [
+        #   {
+        #     'regex': "something",
+        #     'url': "something",
+        #     'namespace': enum,
+        #     'as_id': 0,
+        #     'token': "something",
+        #     'hs_token': "otherthing",
+        #     'id': 0
+        #   }
+        # ]
+        services = {}
+        results = yield self._execute_and_decode(sql)
+        for res in results:
+            as_token = res["token"]
+            if as_token not in services:
+                # add the service
+                services[as_token] = {
+                    "url": res["url"],
+                    "token": as_token,
+                    "hs_token": res["hs_token"],
+                    "sender": res["sender"],
+                    "namespaces": {
+                        ApplicationService.NS_USERS: [],
+                        ApplicationService.NS_ALIASES: [],
+                        ApplicationService.NS_ROOMS: []
+                    }
+                }
+            # add the namespace regex if one exists
+            ns_int = res["namespace"]
+            if ns_int is None:
+                continue
+            try:
+                services[as_token]["namespaces"][
+                    ApplicationService.NS_LIST[ns_int]].append(
+                    res["regex"]
+                )
+            except IndexError:
+                logger.error("Bad namespace enum '%s'. %s", ns_int, res)
+
+        # TODO get last successful txn id f.e. service
+        for service in services.values():
+            logger.info("Found application service: %s", service)
+            self.cache.services.append(ApplicationService(
+                token=service["token"],
+                url=service["url"],
+                namespaces=service["namespaces"],
+                hs_token=service["hs_token"],
+                sender=service["sender"]
+            ))