diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 64f605b962..18f3d117b3 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -486,7 +486,7 @@ class Auth(object):
send_level = send_level_event.content.get("events", {}).get(
event.type
)
- if not send_level:
+ if send_level is None:
if hasattr(event, "state_key"):
send_level = send_level_event.content.get(
"state_default", 50
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 500cae05fb..afb46d2e23 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -32,15 +32,13 @@ from twisted.web.resource import Resource
from twisted.web.static import File
from twisted.web.server import Site
from synapse.http.server import JsonResource, RootRedirect
-from synapse.rest.appservice.v1 import AppServiceRestResource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.http.server_key_resource import LocalKey
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.api.urls import (
CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
- SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, APP_SERVICE_PREFIX,
- STATIC_PREFIX
+ SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, STATIC_PREFIX
)
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
@@ -78,9 +76,6 @@ class SynapseHomeServer(HomeServer):
def build_resource_for_federation(self):
return JsonResource(self)
- def build_resource_for_app_services(self):
- return AppServiceRestResource(self)
-
def build_resource_for_web_client(self):
import syweb
syweb_path = os.path.dirname(syweb.__file__)
@@ -141,7 +136,6 @@ class SynapseHomeServer(HomeServer):
(CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()),
(SERVER_KEY_PREFIX, self.get_resource_for_server_key()),
(MEDIA_PREFIX, self.get_resource_for_media_repository()),
- (APP_SERVICE_PREFIX, self.get_resource_for_app_services()),
(STATIC_PREFIX, self.get_resource_for_static_content()),
]
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index a268a6bcc4..63a18b802b 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -20,6 +20,50 @@ import re
logger = logging.getLogger(__name__)
+class ApplicationServiceState(object):
+ DOWN = "down"
+ UP = "up"
+
+
+class AppServiceTransaction(object):
+ """Represents an application service transaction."""
+
+ def __init__(self, service, id, events):
+ self.service = service
+ self.id = id
+ self.events = events
+
+ def send(self, as_api):
+ """Sends this transaction using the provided AS API interface.
+
+ Args:
+ as_api(ApplicationServiceApi): The API to use to send.
+ Returns:
+ A Deferred which resolves to True if the transaction was sent.
+ """
+ return as_api.push_bulk(
+ service=self.service,
+ events=self.events,
+ txn_id=self.id
+ )
+
+ def complete(self, store):
+ """Completes this transaction as successful.
+
+ Marks this transaction ID on the application service and removes the
+ transaction contents from the database.
+
+ Args:
+ store: The database store to operate on.
+ Returns:
+ A Deferred which resolves to True if the transaction was completed.
+ """
+ return store.complete_appservice_txn(
+ service=self.service,
+ txn_id=self.id
+ )
+
+
class ApplicationService(object):
"""Defines an application service. This definition is mostly what is
provided to the /register AS API.
@@ -35,13 +79,13 @@ class ApplicationService(object):
NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
def __init__(self, token, url=None, namespaces=None, hs_token=None,
- sender=None, txn_id=None):
+ sender=None, id=None):
self.token = token
self.url = url
self.hs_token = hs_token
self.sender = sender
self.namespaces = self._check_namespaces(namespaces)
- self.txn_id = txn_id
+ self.id = id
def _check_namespaces(self, namespaces):
# Sanity check that it is of the form:
@@ -51,7 +95,7 @@ class ApplicationService(object):
# rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...],
# }
if not namespaces:
- return None
+ namespaces = {}
for ns in ApplicationService.NS_LIST:
if ns not in namespaces:
@@ -155,7 +199,10 @@ class ApplicationService(object):
return self._matches_user(event, member_list)
def is_interested_in_user(self, user_id):
- return self._matches_regex(user_id, ApplicationService.NS_USERS)
+ return (
+ self._matches_regex(user_id, ApplicationService.NS_USERS)
+ or user_id == self.sender
+ )
def is_interested_in_alias(self, alias):
return self._matches_regex(alias, ApplicationService.NS_ALIASES)
@@ -164,7 +211,10 @@ class ApplicationService(object):
return self._matches_regex(room_id, ApplicationService.NS_ROOMS)
def is_exclusive_user(self, user_id):
- return self._is_exclusive(ApplicationService.NS_USERS, user_id)
+ return (
+ self._is_exclusive(ApplicationService.NS_USERS, user_id)
+ or user_id == self.sender
+ )
def is_exclusive_alias(self, alias):
return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index c2179f8d55..2a9becccb3 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -72,14 +72,19 @@ class ApplicationServiceApi(SimpleHttpClient):
defer.returnValue(False)
@defer.inlineCallbacks
- def push_bulk(self, service, events):
+ def push_bulk(self, service, events, txn_id=None):
events = self._serialize(events)
+ if txn_id is None:
+ logger.warning("push_bulk: Missing txn ID sending events to %s",
+ service.url)
+ txn_id = str(0)
+ txn_id = str(txn_id)
+
uri = service.url + ("/transactions/%s" %
- urllib.quote(str(0))) # TODO txn_ids
- response = None
+ urllib.quote(txn_id))
try:
- response = yield self.put_json(
+ yield self.put_json(
uri=uri,
json_body={
"events": events
@@ -87,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient):
args={
"access_token": service.hs_token
})
- if response: # just an empty json object
- # TODO: Mark txn as sent successfully
- defer.returnValue(True)
+ defer.returnValue(True)
+ return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
@@ -97,8 +101,8 @@ class ApplicationServiceApi(SimpleHttpClient):
defer.returnValue(False)
@defer.inlineCallbacks
- def push(self, service, event):
- response = yield self.push_bulk(service, [event])
+ def push(self, service, event, txn_id=None):
+ response = yield self.push_bulk(service, [event], txn_id)
defer.returnValue(response)
def _serialize(self, events):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
new file mode 100644
index 0000000000..59b0b1f4ac
--- /dev/null
+++ b/synapse/appservice/scheduler.py
@@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module controls the reliability for application service transactions.
+
+The nominal flow through this module looks like:
+ __________
+1---ASa[e]-->| Service |--> Queue ASa[f]
+2----ASb[e]->| Queuer |
+3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
+ V
+ -````````- +------------+
+ |````````|<--StoreTxn-|Transaction |
+ |Database| | Controller |---> SEND TO AS
+ `--------` +------------+
+What happens on SEND TO AS depends on the state of the Application Service:
+ - If the AS is marked as DOWN, do nothing.
+ - If the AS is marked as UP, send the transaction.
+ * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn
+ contents from the db.
+ * FAILURE : Marked AS as DOWN and start Recoverer.
+
+Recoverer attempts to recover ASes who have died. The flow for this looks like:
+ ,--------------------- backoff++ --------------.
+ V |
+ START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
+ backoff DB and try to send it
+ ^ |___________
+Mark AS as | V
+UP & quit +---------- YES SUCCESS
+ | | |
+ NO <--- Have more txns? <------ Mark txn success & nuke <-+
+ from db; incr AS pos.
+ Reset backoff.
+
+This is all tied together by the AppServiceScheduler which DIs the required
+components.
+"""
+
+from synapse.appservice import ApplicationServiceState
+from twisted.internet import defer
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class AppServiceScheduler(object):
+ """ Public facing API for this module. Does the required DI to tie the
+ components together. This also serves as the "event_pool", which in this
+ case is a simple array.
+ """
+
+ def __init__(self, clock, store, as_api):
+ self.clock = clock
+ self.store = store
+ self.as_api = as_api
+
+ def create_recoverer(service, callback):
+ return _Recoverer(clock, store, as_api, service, callback)
+
+ self.txn_ctrl = _TransactionController(
+ clock, store, as_api, create_recoverer
+ )
+ self.queuer = _ServiceQueuer(self.txn_ctrl)
+
+ @defer.inlineCallbacks
+ def start(self):
+ logger.info("Starting appservice scheduler")
+ # check for any DOWN ASes and start recoverers for them.
+ recoverers = yield _Recoverer.start(
+ self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+ )
+ self.txn_ctrl.add_recoverers(recoverers)
+
+ def submit_event_for_as(self, service, event):
+ self.queuer.enqueue(service, event)
+
+
+class _ServiceQueuer(object):
+ """Queues events for the same application service together, sending
+ transactions as soon as possible. Once a transaction is sent successfully,
+ this schedules any other events in the queue to run.
+ """
+
+ def __init__(self, txn_ctrl):
+ self.queued_events = {} # dict of {service_id: [events]}
+ self.pending_requests = {} # dict of {service_id: Deferred}
+ self.txn_ctrl = txn_ctrl
+
+ def enqueue(self, service, event):
+ # if this service isn't being sent something
+ if not self.pending_requests.get(service.id):
+ self._send_request(service, [event])
+ else:
+ # add to queue for this service
+ if service.id not in self.queued_events:
+ self.queued_events[service.id] = []
+ self.queued_events[service.id].append(event)
+
+ def _send_request(self, service, events):
+ # send request and add callbacks
+ d = self.txn_ctrl.send(service, events)
+ d.addBoth(self._on_request_finish)
+ d.addErrback(self._on_request_fail)
+ self.pending_requests[service.id] = d
+
+ def _on_request_finish(self, service):
+ self.pending_requests[service.id] = None
+ # if there are queued events, then send them.
+ if (service.id in self.queued_events
+ and len(self.queued_events[service.id]) > 0):
+ self._send_request(service, self.queued_events[service.id])
+ self.queued_events[service.id] = []
+
+ def _on_request_fail(self, err):
+ logger.error("AS request failed: %s", err)
+
+
+class _TransactionController(object):
+
+ def __init__(self, clock, store, as_api, recoverer_fn):
+ self.clock = clock
+ self.store = store
+ self.as_api = as_api
+ self.recoverer_fn = recoverer_fn
+ # keep track of how many recoverers there are
+ self.recoverers = []
+
+ @defer.inlineCallbacks
+ def send(self, service, events):
+ try:
+ txn = yield self.store.create_appservice_txn(
+ service=service,
+ events=events
+ )
+ service_is_up = yield self._is_service_up(service)
+ if service_is_up:
+ sent = yield txn.send(self.as_api)
+ if sent:
+ txn.complete(self.store)
+ else:
+ self._start_recoverer(service)
+ except Exception as e:
+ logger.exception(e)
+ self._start_recoverer(service)
+ # request has finished
+ defer.returnValue(service)
+
+ @defer.inlineCallbacks
+ def on_recovered(self, recoverer):
+ self.recoverers.remove(recoverer)
+ logger.info("Successfully recovered application service AS ID %s",
+ recoverer.service.id)
+ logger.info("Remaining active recoverers: %s", len(self.recoverers))
+ yield self.store.set_appservice_state(
+ recoverer.service,
+ ApplicationServiceState.UP
+ )
+
+ def add_recoverers(self, recoverers):
+ for r in recoverers:
+ self.recoverers.append(r)
+ if len(recoverers) > 0:
+ logger.info("New active recoverers: %s", len(self.recoverers))
+
+ @defer.inlineCallbacks
+ def _start_recoverer(self, service):
+ yield self.store.set_appservice_state(
+ service,
+ ApplicationServiceState.DOWN
+ )
+ logger.info(
+ "Application service falling behind. Starting recoverer. AS ID %s",
+ service.id
+ )
+ recoverer = self.recoverer_fn(service, self.on_recovered)
+ self.add_recoverers([recoverer])
+ recoverer.recover()
+
+ @defer.inlineCallbacks
+ def _is_service_up(self, service):
+ state = yield self.store.get_appservice_state(service)
+ defer.returnValue(state == ApplicationServiceState.UP or state is None)
+
+
+class _Recoverer(object):
+
+ @staticmethod
+ @defer.inlineCallbacks
+ def start(clock, store, as_api, callback):
+ services = yield store.get_appservices_by_state(
+ ApplicationServiceState.DOWN
+ )
+ recoverers = [
+ _Recoverer(clock, store, as_api, s, callback) for s in services
+ ]
+ for r in recoverers:
+ logger.info("Starting recoverer for AS ID %s which was marked as "
+ "DOWN", r.service.id)
+ r.recover()
+ defer.returnValue(recoverers)
+
+ def __init__(self, clock, store, as_api, service, callback):
+ self.clock = clock
+ self.store = store
+ self.as_api = as_api
+ self.service = service
+ self.callback = callback
+ self.backoff_counter = 1
+
+ def recover(self):
+ self.clock.call_later((2 ** self.backoff_counter), self.retry)
+
+ def _backoff(self):
+ # cap the backoff to be around 18h => (2^16) = 65536 secs
+ if self.backoff_counter < 16:
+ self.backoff_counter += 1
+ self.recover()
+
+ @defer.inlineCallbacks
+ def retry(self):
+ try:
+ txn = yield self.store.get_oldest_unsent_txn(self.service)
+ if txn:
+ logger.info("Retrying transaction %s for AS ID %s",
+ txn.id, txn.service.id)
+ sent = yield txn.send(self.as_api)
+ if sent:
+ yield txn.complete(self.store)
+ # reset the backoff counter and retry immediately
+ self.backoff_counter = 1
+ yield self.retry()
+ else:
+ self._backoff()
+ else:
+ self._set_service_recovered()
+ except Exception as e:
+ logger.exception(e)
+ self._backoff()
+
+ def _set_service_recovered(self):
+ self.callback(self)
diff --git a/synapse/rest/appservice/v1/__init__.py b/synapse/config/appservice.py
index a7877609ad..399a716d80 100644
--- a/synapse/rest/appservice/v1/__init__.py
+++ b/synapse/config/appservice.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,18 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from . import register
-from synapse.http.server import JsonResource
+from ._base import Config
-class AppServiceRestResource(JsonResource):
- """A resource for version 1 of the matrix application service API."""
+class AppServiceConfig(Config):
- def __init__(self, hs):
- JsonResource.__init__(self, hs)
- self.register_servlets(self, hs)
+ def __init__(self, args):
+ super(AppServiceConfig, self).__init__(args)
+ self.app_service_config_files = args.app_service_config_files
- @staticmethod
- def register_servlets(appservice_resource, hs):
- register.register_servlets(hs, appservice_resource)
+ @classmethod
+ def add_arguments(cls, parser):
+ super(AppServiceConfig, cls).add_arguments(parser)
+ group = parser.add_argument_group("appservice")
+ group.add_argument(
+ "--app-service-config-files", type=str, nargs='+',
+ help="A list of application service config files to use."
+ )
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 241afdf872..3edfadb98b 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -24,12 +24,13 @@ from .email import EmailConfig
from .voip import VoipConfig
from .registration import RegistrationConfig
from .metrics import MetricsConfig
+from .appservice import AppServiceConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
EmailConfig, VoipConfig, RegistrationConfig,
- MetricsConfig,):
+ MetricsConfig, AppServiceConfig,):
pass
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 4401e774d1..d5c8f4bf7b 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -25,11 +25,11 @@ class RegistrationConfig(Config):
def __init__(self, args):
super(RegistrationConfig, self).__init__(args)
- # `args.disable_registration` may either be a bool or a string depending
- # on if the option was given a value (e.g. --disable-registration=false
- # would set `args.disable_registration` to "false" not False.)
- self.disable_registration = bool(
- distutils.util.strtobool(str(args.disable_registration))
+ # `args.enable_registration` may either be a bool or a string depending
+ # on if the option was given a value (e.g. --enable-registration=true
+ # would set `args.enable_registration` to "true" not True.)
+ self.disable_registration = not bool(
+ distutils.util.strtobool(str(args.enable_registration))
)
self.registration_shared_secret = args.registration_shared_secret
@@ -39,11 +39,11 @@ class RegistrationConfig(Config):
reg_group = parser.add_argument_group("registration")
reg_group.add_argument(
- "--disable-registration",
+ "--enable-registration",
const=True,
- default=True,
+ default=False,
nargs='?',
- help="Disable registration of new users.",
+ help="Enable registration for new users.",
)
reg_group.add_argument(
"--registration-shared-secret", type=str,
@@ -53,8 +53,8 @@ class RegistrationConfig(Config):
@classmethod
def generate_config(cls, args, config_dir_path):
- if args.disable_registration is None:
- args.disable_registration = True
+ if args.enable_registration is None:
+ args.enable_registration = False
if args.registration_shared_secret is None:
args.registration_shared_secret = random_string_with_symbols(50)
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 8d345bf936..0c51d615ec 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.appservice.scheduler import AppServiceScheduler
from synapse.appservice.api import ApplicationServiceApi
from .register import RegistrationHandler
from .room import (
@@ -54,7 +55,12 @@ class Handlers(object):
self.directory_handler = DirectoryHandler(hs)
self.typing_notification_handler = TypingNotificationHandler(hs)
self.admin_handler = AdminHandler(hs)
+ asapi = ApplicationServiceApi(hs)
self.appservice_handler = ApplicationServicesHandler(
- hs, ApplicationServiceApi(hs)
+ hs, asapi, AppServiceScheduler(
+ clock=hs.get_clock(),
+ store=hs.get_datastore(),
+ as_api=asapi
+ )
)
self.sync_handler = SyncHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 2c488a46f6..492a630fdc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -16,57 +16,36 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.appservice import ApplicationService
from synapse.types import UserID
-import synapse.util.stringutils as stringutils
import logging
logger = logging.getLogger(__name__)
+def log_failure(failure):
+ logger.error(
+ "Application Services Failure",
+ exc_info=(
+ failure.type,
+ failure.value,
+ failure.getTracebackObject()
+ )
+ )
+
+
# NB: Purposefully not inheriting BaseHandler since that contains way too much
# setup code which this handler does not need or use. This makes testing a lot
# easier.
class ApplicationServicesHandler(object):
- def __init__(self, hs, appservice_api):
+ def __init__(self, hs, appservice_api, appservice_scheduler):
self.store = hs.get_datastore()
self.hs = hs
self.appservice_api = appservice_api
-
- @defer.inlineCallbacks
- def register(self, app_service):
- logger.info("Register -> %s", app_service)
- # check the token is recognised
- try:
- stored_service = yield self.store.get_app_service_by_token(
- app_service.token
- )
- if not stored_service:
- raise StoreError(404, "Application service not found")
- except StoreError:
- raise SynapseError(
- 403, "Unrecognised application services token. "
- "Consult the home server admin.",
- errcode=Codes.FORBIDDEN
- )
-
- app_service.hs_token = self._generate_hs_token()
-
- # create a sender for this application service which is used when
- # creating rooms, etc..
- account = yield self.hs.get_handlers().registration_handler.register()
- app_service.sender = account[0]
-
- yield self.store.update_app_service(app_service)
- defer.returnValue(app_service)
-
- @defer.inlineCallbacks
- def unregister(self, token):
- logger.info("Unregister as_token=%s", token)
- yield self.store.unregister_app_service(token)
+ self.scheduler = appservice_scheduler
+ self.started_scheduler = False
@defer.inlineCallbacks
def notify_interested_services(self, event):
@@ -90,9 +69,13 @@ class ApplicationServicesHandler(object):
if event.type == EventTypes.Member:
yield self._check_user_exists(event.state_key)
- # Fork off pushes to these services - XXX First cut, best effort
+ if not self.started_scheduler:
+ self.scheduler.start().addErrback(log_failure)
+ self.started_scheduler = True
+
+ # Fork off pushes to these services
for service in services:
- self.appservice_api.push(service, event)
+ self.scheduler.submit_event_for_as(service, event)
@defer.inlineCallbacks
def query_user_exists(self, user_id):
@@ -197,7 +180,14 @@ class ApplicationServicesHandler(object):
return
user_info = yield self.store.get_user_by_id(user_id)
- defer.returnValue(len(user_info) == 0)
+ if len(user_info) > 0:
+ defer.returnValue(False)
+ return
+
+ # user not found; could be the AS though, so check.
+ services = yield self.store.get_app_services()
+ service_list = [s for s in services if s.sender == user_id]
+ defer.returnValue(len(service_list) == 0)
@defer.inlineCallbacks
def _check_user_exists(self, user_id):
@@ -206,6 +196,3 @@ class ApplicationServicesHandler(object):
exists = yield self.query_user_exists(user_id)
defer.returnValue(exists)
defer.returnValue(True)
-
- def _generate_hs_token(self):
- return stringutils.random_string(24)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 15ba417e06..8aceac28cf 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -201,10 +201,18 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- yield self.notifier.on_new_room_event(
+ d = self.notifier.on_new_room_event(
event, extra_users=extra_users
)
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
user = UserID.from_string(event.state_key)
@@ -427,10 +435,18 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- yield self.notifier.on_new_room_event(
+ d = self.notifier.on_new_room_event(
new_event, extra_users=[joinee]
)
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ new_event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -500,10 +516,18 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- yield self.notifier.on_new_room_event(
+ d = self.notifier.on_new_room_event(
event, extra_users=extra_users
)
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
@@ -574,10 +598,18 @@ class FederationHandler(BaseHandler):
)
target_user = UserID.from_string(event.state_key)
- yield self.notifier.on_new_room_event(
+ d = self.notifier.on_new_room_event(
event, extra_users=[target_user],
)
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
defer.returnValue(event)
@defer.inlineCallbacks
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 731df00648..bbc7a0f200 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,6 +33,10 @@ logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
+# Don't bother bumping "last active" time if it differs by less than 60 seconds
+LAST_ACTIVE_GRANULARITY = 60*1000
+
+
# TODO(paul): Maybe there's one of these I can steal from somewhere
def partition(l, func):
"""Partition the list by the result of func applied to each element."""
@@ -282,6 +286,10 @@ class PresenceHandler(BaseHandler):
if now is None:
now = self.clock.time_msec()
+ prev_state = self._get_or_make_usercache(user)
+ if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY:
+ return
+
self.changed_presencelike_data(user, {"last_active": now})
def changed_presencelike_data(self, user, state):
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index dffb8a4861..9233ea3da9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,6 +18,8 @@ from __future__ import absolute_import
import logging
from resource import getrusage, getpagesize, RUSAGE_SELF
+import os
+import stat
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
@@ -109,3 +111,36 @@ resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000)
# pages
resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE)
+
+TYPES = {
+ stat.S_IFSOCK: "SOCK",
+ stat.S_IFLNK: "LNK",
+ stat.S_IFREG: "REG",
+ stat.S_IFBLK: "BLK",
+ stat.S_IFDIR: "DIR",
+ stat.S_IFCHR: "CHR",
+ stat.S_IFIFO: "FIFO",
+}
+
+
+def _process_fds():
+ counts = {(k,): 0 for k in TYPES.values()}
+ counts[("other",)] = 0
+
+ for fd in os.listdir("/proc/self/fd"):
+ try:
+ s = os.stat("/proc/self/fd/%s" % (fd))
+ fmt = stat.S_IFMT(s.st_mode)
+ if fmt in TYPES:
+ t = TYPES[fmt]
+ else:
+ t = "other"
+
+ counts[(t,)] += 1
+ except OSError:
+ # the dirh itself used by listdir() is usually missing by now
+ pass
+
+ return counts
+
+get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 7121d659d0..12573f3f59 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -62,7 +62,8 @@ class _NotificationListener(object):
self.rooms = rooms
- self.pending_notifications = []
+ def notified(self):
+ return self.deferred.called
def notify(self, notifier, events, start_token, end_token):
""" Inform whoever is listening about the new events. This will
@@ -78,11 +79,15 @@ class _NotificationListener(object):
except defer.AlreadyCalledError:
pass
+ # Should the following be done be using intrusively linked lists?
+ # -- erikj
+
for room in self.rooms:
lst = notifier.room_to_listeners.get(room, set())
lst.discard(self)
notifier.user_to_listeners.get(self.user, set()).discard(self)
+
if self.appservice:
notifier.appservice_to_listeners.get(
self.appservice, set()
@@ -161,10 +166,18 @@ class Notifier(object):
room_source = self.event_sources.sources["room"]
- listeners = self.room_to_listeners.get(room_id, set()).copy()
+ room_listeners = self.room_to_listeners.get(room_id, set())
+
+ _discard_if_notified(room_listeners)
+
+ listeners = room_listeners.copy()
for user in extra_users:
- listeners |= self.user_to_listeners.get(user, set()).copy()
+ user_listeners = self.user_to_listeners.get(user, set())
+
+ _discard_if_notified(user_listeners)
+
+ listeners |= user_listeners
for appservice in self.appservice_to_listeners:
# TODO (kegan): Redundant appservice listener checks?
@@ -173,9 +186,13 @@ class Notifier(object):
# receive *invites* for users they are interested in. Does this
# make the room_to_listeners check somewhat obselete?
if appservice.is_interested(event):
- listeners |= self.appservice_to_listeners.get(
+ app_listeners = self.appservice_to_listeners.get(
appservice, set()
- ).copy()
+ )
+
+ _discard_if_notified(app_listeners)
+
+ listeners |= app_listeners
logger.debug("on_new_room_event listeners %s", listeners)
@@ -226,10 +243,18 @@ class Notifier(object):
listeners = set()
for user in users:
- listeners |= self.user_to_listeners.get(user, set()).copy()
+ user_listeners = self.user_to_listeners.get(user, set())
+
+ _discard_if_notified(user_listeners)
+
+ listeners |= user_listeners
for room in rooms:
- listeners |= self.room_to_listeners.get(room, set()).copy()
+ room_listeners = self.room_to_listeners.get(room, set())
+
+ _discard_if_notified(room_listeners)
+
+ listeners |= room_listeners
@defer.inlineCallbacks
def notify(listener):
@@ -427,3 +452,17 @@ class Notifier(object):
listeners = self.room_to_listeners.setdefault(room_id, set())
listeners |= new_listeners
+
+ for l in new_listeners:
+ l.rooms.add(room_id)
+
+
+def _discard_if_notified(listener_set):
+ """Remove any 'stale' listeners from the given set.
+ """
+ to_discard = set()
+ for l in listener_set:
+ if l.notified():
+ to_discard.add(l)
+
+ listener_set -= to_discard
diff --git a/synapse/rest/appservice/__init__.py b/synapse/rest/appservice/__init__.py
deleted file mode 100644
index 1a84d94cd9..0000000000
--- a/synapse/rest/appservice/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/synapse/rest/appservice/v1/base.py b/synapse/rest/appservice/v1/base.py
deleted file mode 100644
index 65d5bcf9be..0000000000
--- a/synapse/rest/appservice/v1/base.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""This module contains base REST classes for constructing client v1 servlets.
-"""
-
-from synapse.http.servlet import RestServlet
-from synapse.api.urls import APP_SERVICE_PREFIX
-import re
-
-import logging
-
-
-logger = logging.getLogger(__name__)
-
-
-def as_path_pattern(path_regex):
- """Creates a regex compiled appservice path with the correct path
- prefix.
-
- Args:
- path_regex (str): The regex string to match. This should NOT have a ^
- as this will be prefixed.
- Returns:
- SRE_Pattern
- """
- return re.compile("^" + APP_SERVICE_PREFIX + path_regex)
-
-
-class AppServiceRestServlet(RestServlet):
- """A base Synapse REST Servlet for the application services version 1 API.
- """
-
- def __init__(self, hs):
- self.hs = hs
- self.handler = hs.get_handlers().appservice_handler
diff --git a/synapse/rest/appservice/v1/register.py b/synapse/rest/appservice/v1/register.py
deleted file mode 100644
index ea24d88f79..0000000000
--- a/synapse/rest/appservice/v1/register.py
+++ /dev/null
@@ -1,99 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
-#
-# Licensensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""This module contains REST servlets to do with registration: /register"""
-from twisted.internet import defer
-
-from base import AppServiceRestServlet, as_path_pattern
-from synapse.api.errors import CodeMessageException, SynapseError
-from synapse.storage.appservice import ApplicationService
-
-import json
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class RegisterRestServlet(AppServiceRestServlet):
- """Handles AS registration with the home server.
- """
-
- PATTERN = as_path_pattern("/register$")
-
- @defer.inlineCallbacks
- def on_POST(self, request):
- params = _parse_json(request)
-
- # sanity check required params
- try:
- as_token = params["as_token"]
- as_url = params["url"]
- if (not isinstance(as_token, basestring) or
- not isinstance(as_url, basestring)):
- raise ValueError
- except (KeyError, ValueError):
- raise SynapseError(
- 400, "Missed required keys: as_token(str) / url(str)."
- )
-
- try:
- app_service = ApplicationService(
- as_token, as_url, params["namespaces"]
- )
- except ValueError as e:
- raise SynapseError(400, e.message)
-
- app_service = yield self.handler.register(app_service)
- hs_token = app_service.hs_token
-
- defer.returnValue((200, {
- "hs_token": hs_token
- }))
-
-
-class UnregisterRestServlet(AppServiceRestServlet):
- """Handles AS registration with the home server.
- """
-
- PATTERN = as_path_pattern("/unregister$")
-
- def on_POST(self, request):
- params = _parse_json(request)
- try:
- as_token = params["as_token"]
- if not isinstance(as_token, basestring):
- raise ValueError
- except (KeyError, ValueError):
- raise SynapseError(400, "Missing required key: as_token(str)")
-
- yield self.handler.unregister(as_token)
-
- raise CodeMessageException(500, "Not implemented")
-
-
-def _parse_json(request):
- try:
- content = json.loads(request.content.read())
- if type(content) != dict:
- raise SynapseError(400, "Content must be a JSON object.")
- return content
- except ValueError as e:
- logger.warn(e)
- raise SynapseError(400, "Content not JSON.")
-
-
-def register_servlets(hs, http_server):
- RegisterRestServlet(hs).register(http_server)
- UnregisterRestServlet(hs).register(http_server)
diff --git a/synapse/server.py b/synapse/server.py
index c7772244ba..0bd87bdd77 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -79,7 +79,6 @@ class BaseHomeServer(object):
'resource_for_content_repo',
'resource_for_server_key',
'resource_for_media_repository',
- 'resource_for_app_services',
'resource_for_metrics',
'event_sources',
'ratelimiter',
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4b16f445d6..f4dec70393 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,13 +14,12 @@
# limitations under the License.
from twisted.internet import defer
-
-from synapse.util.logutils import log_function
-from synapse.api.constants import EventTypes
-
-from .appservice import ApplicationServiceStore
+from .appservice import (
+ ApplicationServiceStore, ApplicationServiceTransactionStore
+)
+from ._base import Cache
from .directory import DirectoryStore
-from .feedback import FeedbackStore
+from .events import EventsStore
from .presence import PresenceStore
from .profile import ProfileStore
from .registration import RegistrationStore
@@ -39,11 +38,6 @@ from .state import StateStore
from .signatures import SignatureStore
from .filtering import FilteringStore
-from syutil.base64util import decode_base64
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import compute_event_reference_hash
-
import fnmatch
import imp
@@ -57,20 +51,18 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 14
+SCHEMA_VERSION = 15
dir_path = os.path.abspath(os.path.dirname(__file__))
-
-class _RollbackButIsFineException(Exception):
- """ This exception is used to rollback a transaction without implying
- something went wrong.
- """
- pass
+# Number of msec of granularity to store the user IP 'last seen' time. Smaller
+# times give more inserts into the database even for readonly API hits
+# 120 seconds == 2 minutes
+LAST_SEEN_GRANULARITY = 120*1000
class DataStore(RoomMemberStore, RoomStore,
- RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
+ RegistrationStore, StreamStore, ProfileStore,
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
ApplicationServiceStore,
@@ -79,7 +71,9 @@ class DataStore(RoomMemberStore, RoomStore,
RejectionsStore,
FilteringStore,
PusherStore,
- PushRuleStore
+ PushRuleStore,
+ ApplicationServiceTransactionStore,
+ EventsStore,
):
def __init__(self, hs):
@@ -89,424 +83,28 @@ class DataStore(RoomMemberStore, RoomStore,
self.min_token_deferred = self._get_min_token()
self.min_token = None
- @defer.inlineCallbacks
- @log_function
- def persist_event(self, event, context, backfilled=False,
- is_new_state=True, current_state=None):
- stream_ordering = None
- if backfilled:
- if not self.min_token_deferred.called:
- yield self.min_token_deferred
- self.min_token -= 1
- stream_ordering = self.min_token
-
- try:
- yield self.runInteraction(
- "persist_event",
- self._persist_event_txn,
- event=event,
- context=context,
- backfilled=backfilled,
- stream_ordering=stream_ordering,
- is_new_state=is_new_state,
- current_state=current_state,
- )
- except _RollbackButIsFineException:
- pass
-
- @defer.inlineCallbacks
- def get_event(self, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False,
- allow_none=False):
- """Get an event from the database by event_id.
-
- Args:
- event_id (str): The event_id of the event to fetch
- check_redacted (bool): If True, check if event has been redacted
- and redact it.
- get_prev_content (bool): If True and event is a state event,
- include the previous states content in the unsigned field.
- allow_rejected (bool): If True return rejected events.
- allow_none (bool): If True, return None if no event found, if
- False throw an exception.
-
- Returns:
- Deferred : A FrozenEvent.
- """
- event = yield self.runInteraction(
- "get_event", self._get_event_txn,
- event_id,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- if not event and not allow_none:
- raise RuntimeError("Could not find event %s" % (event_id,))
-
- defer.returnValue(event)
-
- @log_function
- def _persist_event_txn(self, txn, event, context, backfilled,
- stream_ordering=None, is_new_state=True,
- current_state=None):
-
- # Remove the any existing cache entries for the event_id
- self._get_event_cache.pop(event.event_id)
-
- # We purposefully do this first since if we include a `current_state`
- # key, we *want* to update the `current_state_events` table
- if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
- )
-
- for s in current_state:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": s.event_id,
- "room_id": s.room_id,
- "type": s.type,
- "state_key": s.state_key,
- },
- or_replace=True,
- )
-
- if event.is_state() and is_new_state:
- if not backfilled and not context.rejected:
- self._simple_insert_txn(
- txn,
- table="state_forward_extremities",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- or_replace=True,
- )
-
- for prev_state_id, _ in event.prev_state:
- self._simple_delete_txn(
- txn,
- table="state_forward_extremities",
- keyvalues={
- "event_id": prev_state_id,
- }
- )
-
- outlier = event.internal_metadata.is_outlier()
-
- if not outlier:
- self._store_state_groups_txn(txn, event, context)
-
- self._update_min_depth_for_room_txn(
- txn,
- event.room_id,
- event.depth
- )
-
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
- have_persisted = self._simple_select_one_onecol_txn(
- txn,
- table="event_json",
- keyvalues={"event_id": event.event_id},
- retcol="event_id",
- allow_none=True,
- )
-
- metadata_json = encode_canonical_json(
- event.internal_metadata.get_dict()
- )
-
- # If we have already persisted this event, we don't need to do any
- # more processing.
- # The processing above must be done on every call to persist event,
- # since they might not have happened on previous calls. For example,
- # if we are persisting an event that we had persisted as an outlier,
- # but is no longer one.
- if have_persisted:
- if not outlier:
- sql = (
- "UPDATE event_json SET internal_metadata = ?"
- " WHERE event_id = ?"
- )
- txn.execute(
- sql,
- (metadata_json.decode("UTF-8"), event.event_id,)
- )
-
- sql = (
- "UPDATE events SET outlier = 0"
- " WHERE event_id = ?"
- )
- txn.execute(
- sql,
- (event.event_id,)
- )
- return
-
- if event.type == EventTypes.Member:
- self._store_room_member_txn(txn, event)
- elif event.type == EventTypes.Feedback:
- self._store_feedback_txn(txn, event)
- elif event.type == EventTypes.Name:
- self._store_room_name_txn(txn, event)
- elif event.type == EventTypes.Topic:
- self._store_room_topic_txn(txn, event)
- elif event.type == EventTypes.Redaction:
- self._store_redaction(txn, event)
-
- event_dict = {
- k: v
- for k, v in event.get_dict().items()
- if k not in [
- "redacted",
- "redacted_because",
- ]
- }
-
- self._simple_insert_txn(
- txn,
- table="event_json",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "internal_metadata": metadata_json.decode("UTF-8"),
- "json": encode_canonical_json(event_dict).decode("UTF-8"),
- },
- or_replace=True,
- )
-
- content = encode_canonical_json(
- event.content
- ).decode("UTF-8")
-
- vals = {
- "topological_ordering": event.depth,
- "event_id": event.event_id,
- "type": event.type,
- "room_id": event.room_id,
- "content": content,
- "processed": True,
- "outlier": outlier,
- "depth": event.depth,
- }
-
- if stream_ordering is not None:
- vals["stream_ordering"] = stream_ordering
-
- unrec = {
- k: v
- for k, v in event.get_dict().items()
- if k not in vals.keys() and k not in [
- "redacted",
- "redacted_because",
- "signatures",
- "hashes",
- "prev_events",
- ]
- }
-
- vals["unrecognized_keys"] = encode_canonical_json(
- unrec
- ).decode("UTF-8")
-
- try:
- self._simple_insert_txn(
- txn,
- "events",
- vals,
- or_replace=(not outlier),
- or_ignore=bool(outlier),
- )
- except:
- logger.warn(
- "Failed to persist, probably duplicate: %s",
- event.event_id,
- exc_info=True,
- )
- raise _RollbackButIsFineException("_persist_event")
-
- if context.rejected:
- self._store_rejections_txn(txn, event.event_id, context.rejected)
-
- if event.is_state():
- vals = {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- }
-
- # TODO: How does this work with backfilling?
- if hasattr(event, "replaces_state"):
- vals["prev_state"] = event.replaces_state
-
- self._simple_insert_txn(
- txn,
- "state_events",
- vals,
- or_replace=True,
- )
-
- if is_new_state and not context.rejected:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- or_replace=True,
- )
-
- for e_id, h in event.prev_state:
- self._simple_insert_txn(
- txn,
- table="event_edges",
- values={
- "event_id": event.event_id,
- "prev_event_id": e_id,
- "room_id": event.room_id,
- "is_state": 1,
- },
- or_ignore=True,
- )
-
- for hash_alg, hash_base64 in event.hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_event_content_hash_txn(
- txn, event.event_id, hash_alg, hash_bytes,
- )
-
- for prev_event_id, prev_hashes in event.prev_events:
- for alg, hash_base64 in prev_hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_prev_event_hash_txn(
- txn, event.event_id, prev_event_id, alg, hash_bytes
- )
-
- for auth_id, _ in event.auth_events:
- self._simple_insert_txn(
- txn,
- table="event_auth",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "auth_id": auth_id,
- },
- or_ignore=True,
- )
-
- (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
- self._store_event_reference_hash_txn(
- txn, event.event_id, ref_alg, ref_hash_bytes
- )
-
- def _store_redaction(self, txn, event):
- # invalidate the cache for the redacted event
- self._get_event_cache.pop(event.redacts)
- txn.execute(
- "INSERT OR IGNORE INTO redactions "
- "(event_id, redacts) VALUES (?,?)",
- (event.event_id, event.redacts)
- )
-
- @defer.inlineCallbacks
- def get_current_state(self, room_id, event_type=None, state_key=""):
- del_sql = (
- "SELECT event_id FROM redactions WHERE redacts = e.event_id "
- "LIMIT 1"
- )
-
- sql = (
- "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
- "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
- "INNER JOIN state_events as s ON e.event_id = s.event_id "
- "WHERE c.room_id = ? "
- ) % {
- "redacted": del_sql,
- }
-
- if event_type and state_key is not None:
- sql += " AND s.type = ? AND s.state_key = ? "
- args = (room_id, event_type, state_key)
- elif event_type:
- sql += " AND s.type = ?"
- args = (room_id, event_type)
- else:
- args = (room_id, )
-
- results = yield self._execute_and_decode("get_current_state", sql, *args)
-
- events = yield self._parse_events(results)
- defer.returnValue(events)
-
- @defer.inlineCallbacks
- def get_room_name_and_aliases(self, room_id):
- del_sql = (
- "SELECT event_id FROM redactions WHERE redacts = e.event_id "
- "LIMIT 1"
+ self.client_ip_last_seen = Cache(
+ name="client_ip_last_seen",
+ keylen=4,
)
- sql = (
- "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
- "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
- "INNER JOIN state_events as s ON e.event_id = s.event_id "
- "WHERE c.room_id = ? "
- ) % {
- "redacted": del_sql,
- }
-
- sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
- sql += " OR s.type = 'm.room.aliases')"
- args = (room_id,)
-
- results = yield self._execute_and_decode("get_current_state", sql, *args)
-
- events = yield self._parse_events(results)
-
- name = None
- aliases = []
-
- for e in events:
- if e.type == 'm.room.name':
- if 'name' in e.content:
- name = e.content['name']
- elif e.type == 'm.room.aliases':
- if 'aliases' in e.content:
- aliases.extend(e.content['aliases'])
-
- defer.returnValue((name, aliases))
-
@defer.inlineCallbacks
- def _get_min_token(self):
- row = yield self._execute(
- "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
- )
+ def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
+ now = int(self._clock.time_msec())
+ key = (user.to_string(), access_token, device_id, ip)
- self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
- self.min_token = min(self.min_token, -1)
+ try:
+ last_seen = self.client_ip_last_seen.get(*key)
+ except KeyError:
+ last_seen = None
- logger.debug("min_token is: %s", self.min_token)
+ # Rate-limited inserts
+ if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+ defer.returnValue(None)
- defer.returnValue(self.min_token)
+ self.client_ip_last_seen.prefill(*key + (now,))
- def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
- return self._simple_insert(
+ yield self._simple_insert(
"user_ips",
{
"user": user.to_string(),
@@ -514,8 +112,9 @@ class DataStore(RoomMemberStore, RoomStore,
"device_id": device_id,
"ip": ip,
"user_agent": user_agent,
- "last_seen": int(self._clock.time_msec()),
- }
+ "last_seen": now,
+ },
+ desc="insert_client_ip",
)
def get_user_ip_and_agents(self, user):
@@ -525,38 +124,7 @@ class DataStore(RoomMemberStore, RoomStore,
retcols=[
"device_id", "access_token", "ip", "user_agent", "last_seen"
],
- )
-
- def have_events(self, event_ids):
- """Given a list of event ids, check if we have already processed them.
-
- Returns:
- dict: Has an entry for each event id we already have seen. Maps to
- the rejected reason string if we rejected the event, else maps to
- None.
- """
- if not event_ids:
- return defer.succeed({})
-
- def f(txn):
- sql = (
- "SELECT e.event_id, reason FROM events as e "
- "LEFT JOIN rejections as r ON e.event_id = r.event_id "
- "WHERE e.event_id = ?"
- )
-
- res = {}
- for event_id in event_ids:
- txn.execute(sql, (event_id,))
- row = txn.fetchone()
- if row:
- _, rejected = row
- res[event_id] = rejected
-
- return res
-
- return self.runInteraction(
- "have_events", f,
+ desc="get_user_ip_and_agents",
)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 9125bb1198..53eee10d51 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
+perf_logger = logging.getLogger("synapse.storage.TIME")
metrics = synapse.metrics.get_metrics_for("synapse.storage")
@@ -53,14 +54,57 @@ cache_counter = metrics.register_cache(
)
-# TODO(paul):
-# * more generic key management
-# * consider other eviction strategies - LRU?
-def cached(max_entries=1000):
+class Cache(object):
+
+ def __init__(self, name, max_entries=1000, keylen=1, lru=False):
+ if lru:
+ self.cache = LruCache(max_size=max_entries)
+ self.max_entries = None
+ else:
+ self.cache = OrderedDict()
+ self.max_entries = max_entries
+
+ self.name = name
+ self.keylen = keylen
+
+ caches_by_name[name] = self.cache
+
+ def get(self, *keyargs):
+ if len(keyargs) != self.keylen:
+ raise ValueError("Expected a key to have %d items", self.keylen)
+
+ if keyargs in self.cache:
+ cache_counter.inc_hits(self.name)
+ return self.cache[keyargs]
+
+ cache_counter.inc_misses(self.name)
+ raise KeyError()
+
+ def prefill(self, *args): # because I can't *keyargs, value
+ keyargs = args[:-1]
+ value = args[-1]
+
+ if len(keyargs) != self.keylen:
+ raise ValueError("Expected a key to have %d items", self.keylen)
+
+ if self.max_entries is not None:
+ while len(self.cache) >= self.max_entries:
+ self.cache.popitem(last=False)
+
+ self.cache[keyargs] = value
+
+ def invalidate(self, *keyargs):
+ if len(keyargs) != self.keylen:
+ raise ValueError("Expected a key to have %d items", self.keylen)
+
+ self.cache.pop(keyargs, None)
+
+
+def cached(max_entries=1000, num_args=1, lru=False):
""" A method decorator that applies a memoizing cache around the function.
- The function is presumed to take one additional argument, which is used as
- the key for the cache. Cache hits are served directly from the cache;
+ The function is presumed to take zero or more arguments, which are used in
+ a tuple as the key for the cache. Hits are served directly from the cache;
misses use the function body to generate the value.
The wrapped function has an additional member, a callable called
@@ -71,34 +115,27 @@ def cached(max_entries=1000):
calling the calculation function.
"""
def wrap(orig):
- cache = OrderedDict()
- name = orig.__name__
-
- caches_by_name[name] = cache
-
- def prefill(key, value):
- while len(cache) > max_entries:
- cache.popitem(last=False)
-
- cache[key] = value
+ cache = Cache(
+ name=orig.__name__,
+ max_entries=max_entries,
+ keylen=num_args,
+ lru=lru,
+ )
@functools.wraps(orig)
@defer.inlineCallbacks
- def wrapped(self, key):
- if key in cache:
- cache_counter.inc_hits(name)
- defer.returnValue(cache[key])
+ def wrapped(self, *keyargs):
+ try:
+ defer.returnValue(cache.get(*keyargs))
+ except KeyError:
+ ret = yield orig(self, *keyargs)
- cache_counter.inc_misses(name)
- ret = yield orig(self, key)
- prefill(key, ret)
- defer.returnValue(ret)
+ cache.prefill(*keyargs + (ret,))
- def invalidate(key):
- cache.pop(key, None)
+ defer.returnValue(ret)
- wrapped.invalidate = invalidate
- wrapped.prefill = prefill
+ wrapped.invalidate = cache.invalidate
+ wrapped.prefill = cache.prefill
return wrapped
return wrap
@@ -232,7 +269,7 @@ class SQLBaseStore(object):
time_now - time_then, limit=3
)
- logger.info(
+ perf_logger.info(
"Total database time: %.3f%% {%s} {%s}",
ratio * 100, top_three_counters, top_3_event_counters
)
@@ -321,7 +358,8 @@ class SQLBaseStore(object):
# "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns.
- def _simple_insert(self, table, values, or_replace=False, or_ignore=False):
+ def _simple_insert(self, table, values, or_replace=False, or_ignore=False,
+ desc="_simple_insert"):
"""Executes an INSERT query on the named table.
Args:
@@ -330,7 +368,7 @@ class SQLBaseStore(object):
or_replace : bool; if True performs an INSERT OR REPLACE
"""
return self.runInteraction(
- "_simple_insert",
+ desc,
self._simple_insert_txn, table, values, or_replace=or_replace,
or_ignore=or_ignore,
)
@@ -354,7 +392,7 @@ class SQLBaseStore(object):
txn.execute(sql, values.values())
return txn.lastrowid
- def _simple_upsert(self, table, keyvalues, values):
+ def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"):
"""
Args:
table (str): The table to upsert into
@@ -363,7 +401,7 @@ class SQLBaseStore(object):
Returns: A deferred
"""
return self.runInteraction(
- "_simple_upsert",
+ desc,
self._simple_upsert_txn, table, keyvalues, values
)
@@ -399,7 +437,7 @@ class SQLBaseStore(object):
txn.execute(sql, allvalues.values())
def _simple_select_one(self, table, keyvalues, retcols,
- allow_none=False):
+ allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
return a single row, returning a single column from it.
@@ -411,12 +449,15 @@ class SQLBaseStore(object):
allow_none : If true, return None instead of failing if the SELECT
statement returns no rows
"""
- return self._simple_selectupdate_one(
- table, keyvalues, retcols=retcols, allow_none=allow_none
+ return self.runInteraction(
+ desc,
+ self._simple_select_one_txn,
+ table, keyvalues, retcols, allow_none,
)
def _simple_select_one_onecol(self, table, keyvalues, retcol,
- allow_none=False):
+ allow_none=False,
+ desc="_simple_select_one_onecol"):
"""Executes a SELECT query on the named table, which is expected to
return a single row, returning a single column from it."
@@ -426,7 +467,7 @@ class SQLBaseStore(object):
retcol : string giving the name of the column to return
"""
return self.runInteraction(
- "_simple_select_one_onecol",
+ desc,
self._simple_select_one_onecol_txn,
table, keyvalues, retcol, allow_none=allow_none,
)
@@ -462,7 +503,8 @@ class SQLBaseStore(object):
return [r[0] for r in txn.fetchall()]
- def _simple_select_onecol(self, table, keyvalues, retcol):
+ def _simple_select_onecol(self, table, keyvalues, retcol,
+ desc="_simple_select_onecol"):
"""Executes a SELECT query on the named table, which returns a list
comprising of the values of the named column from the selected rows.
@@ -475,12 +517,13 @@ class SQLBaseStore(object):
Deferred: Results in a list
"""
return self.runInteraction(
- "_simple_select_onecol",
+ desc,
self._simple_select_onecol_txn,
table, keyvalues, retcol
)
- def _simple_select_list(self, table, keyvalues, retcols):
+ def _simple_select_list(self, table, keyvalues, retcols,
+ desc="_simple_select_list"):
"""Executes a SELECT query on the named table, which may return zero or
more rows, returning the result as a list of dicts.
@@ -491,7 +534,7 @@ class SQLBaseStore(object):
retcols : list of strings giving the names of the columns to return
"""
return self.runInteraction(
- "_simple_select_list",
+ desc,
self._simple_select_list_txn,
table, keyvalues, retcols
)
@@ -523,7 +566,7 @@ class SQLBaseStore(object):
return self.cursor_to_dict(txn)
def _simple_update_one(self, table, keyvalues, updatevalues,
- retcols=None):
+ desc="_simple_update_one"):
"""Executes an UPDATE query on the named table, setting new values for
columns in a row matching the key values.
@@ -541,56 +584,76 @@ class SQLBaseStore(object):
get-and-set. This can be used to implement compare-and-set by putting
the update column in the 'keyvalues' dict as well.
"""
- return self._simple_selectupdate_one(table, keyvalues, updatevalues,
- retcols=retcols)
+ return self.runInteraction(
+ desc,
+ self._simple_update_one_txn,
+ table, keyvalues, updatevalues,
+ )
- def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None,
- retcols=None, allow_none=False):
- """ Combined SELECT then UPDATE."""
- if retcols:
- select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
- ", ".join(retcols),
- table,
- " AND ".join("%s = ?" % (k) for k in keyvalues)
- )
+ def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues):
+ update_sql = "UPDATE %s SET %s WHERE %s" % (
+ table,
+ ", ".join("%s = ?" % (k,) for k in updatevalues),
+ " AND ".join("%s = ?" % (k,) for k in keyvalues)
+ )
- if updatevalues:
- update_sql = "UPDATE %s SET %s WHERE %s" % (
- table,
- ", ".join("%s = ?" % (k,) for k in updatevalues),
- " AND ".join("%s = ?" % (k,) for k in keyvalues)
- )
+ txn.execute(
+ update_sql,
+ updatevalues.values() + keyvalues.values()
+ )
+ if txn.rowcount == 0:
+ raise StoreError(404, "No row found")
+ if txn.rowcount > 1:
+ raise StoreError(500, "More than one row matched")
+
+ def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
+ allow_none=False):
+ select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+ ", ".join(retcols),
+ table,
+ " AND ".join("%s = ?" % (k) for k in keyvalues)
+ )
+
+ txn.execute(select_sql, keyvalues.values())
+
+ row = txn.fetchone()
+ if not row:
+ if allow_none:
+ return None
+ raise StoreError(404, "No row found")
+ if txn.rowcount > 1:
+ raise StoreError(500, "More than one row matched")
+
+ return dict(zip(retcols, row))
+
+ def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None,
+ retcols=None, allow_none=False,
+ desc="_simple_selectupdate_one"):
+ """ Combined SELECT then UPDATE."""
def func(txn):
ret = None
if retcols:
- txn.execute(select_sql, keyvalues.values())
-
- row = txn.fetchone()
- if not row:
- if allow_none:
- return None
- raise StoreError(404, "No row found")
- if txn.rowcount > 1:
- raise StoreError(500, "More than one row matched")
-
- ret = dict(zip(retcols, row))
+ ret = self._simple_select_one_txn(
+ txn,
+ table=table,
+ keyvalues=keyvalues,
+ retcols=retcols,
+ allow_none=allow_none,
+ )
if updatevalues:
- txn.execute(
- update_sql,
- updatevalues.values() + keyvalues.values()
+ self._simple_update_one_txn(
+ txn,
+ table=table,
+ keyvalues=keyvalues,
+ updatevalues=updatevalues,
)
- if txn.rowcount == 0:
- raise StoreError(404, "No row found")
- if txn.rowcount > 1:
- raise StoreError(500, "More than one row matched")
-
return ret
- return self.runInteraction("_simple_selectupdate_one", func)
+ return self.runInteraction(desc, func)
- def _simple_delete_one(self, table, keyvalues):
+ def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"):
"""Executes a DELETE query on the named table, expecting to delete a
single row.
@@ -609,9 +672,9 @@ class SQLBaseStore(object):
raise StoreError(404, "No row found")
if txn.rowcount > 1:
raise StoreError(500, "more than one row matched")
- return self.runInteraction("_simple_delete_one", func)
+ return self.runInteraction(desc, func)
- def _simple_delete(self, table, keyvalues):
+ def _simple_delete(self, table, keyvalues, desc="_simple_delete"):
"""Executes a DELETE query on the named table.
Args:
@@ -619,7 +682,7 @@ class SQLBaseStore(object):
keyvalues : dict of column names and values to select the row with
"""
- return self.runInteraction("_simple_delete", self._simple_delete_txn)
+ return self.runInteraction(desc, self._simple_delete_txn)
def _simple_delete_txn(self, txn, table, keyvalues):
sql = "DELETE FROM %s WHERE %s" % (
@@ -789,6 +852,13 @@ class SQLBaseStore(object):
return result[0] if result else None
+class _RollbackButIsFineException(Exception):
+ """ This exception is used to rollback a transaction without implying
+ something went wrong.
+ """
+ pass
+
+
class Table(object):
""" A base class used to store information about a particular table.
"""
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 850676ce6c..f8cbb3f323 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -13,154 +13,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import simplejson
+import urllib
+import yaml
from simplejson import JSONDecodeError
+import simplejson as json
from twisted.internet import defer
from synapse.api.constants import Membership
-from synapse.api.errors import StoreError
-from synapse.appservice import ApplicationService
+from synapse.appservice import ApplicationService, AppServiceTransaction
from synapse.storage.roommember import RoomsForUser
+from synapse.types import UserID
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
-def log_failure(failure):
- logger.error("Failed to detect application services: %s", failure.value)
- logger.error(failure.getTraceback())
-
-
class ApplicationServiceStore(SQLBaseStore):
def __init__(self, hs):
super(ApplicationServiceStore, self).__init__(hs)
+ self.hostname = hs.hostname
self.services_cache = []
- self.cache_defer = self._populate_cache()
- self.cache_defer.addErrback(log_failure)
-
- @defer.inlineCallbacks
- def unregister_app_service(self, token):
- """Unregisters this service.
-
- This removes all AS specific regex and the base URL. The token is the
- only thing preserved for future registration attempts.
- """
- yield self.cache_defer # make sure the cache is ready
- yield self.runInteraction(
- "unregister_app_service",
- self._unregister_app_service_txn,
- token,
- )
- # update cache TODO: Should this be in the txn?
- for service in self.services_cache:
- if service.token == token:
- service.url = None
- service.namespaces = None
- service.hs_token = None
-
- def _unregister_app_service_txn(self, txn, token):
- # kill the url to prevent pushes
- txn.execute(
- "UPDATE application_services SET url=NULL WHERE token=?",
- (token,)
- )
-
- # cleanup regex
- as_id = self._get_as_id_txn(txn, token)
- if not as_id:
- logger.warning(
- "unregister_app_service_txn: Failed to find as_id for token=",
- token
- )
- return False
-
- txn.execute(
- "DELETE FROM application_services_regex WHERE as_id=?",
- (as_id,)
+ self._populate_appservice_cache(
+ hs.config.app_service_config_files
)
- return True
- @defer.inlineCallbacks
- def update_app_service(self, service):
- """Update an application service, clobbering what was previously there.
-
- Args:
- service(ApplicationService): The updated service.
- """
- yield self.cache_defer # make sure the cache is ready
-
- # NB: There is no "insert" since we provide no public-facing API to
- # allocate new ASes. It relies on the server admin inserting the AS
- # token into the database manually.
-
- if not service.token or not service.url:
- raise StoreError(400, "Token and url must be specified.")
-
- if not service.hs_token:
- raise StoreError(500, "No HS token")
-
- yield self.runInteraction(
- "update_app_service",
- self._update_app_service_txn,
- service
- )
-
- # update cache TODO: Should this be in the txn?
- for (index, cache_service) in enumerate(self.services_cache):
- if service.token == cache_service.token:
- self.services_cache[index] = service
- logger.info("Updated: %s", service)
- return
- # new entry
- self.services_cache.append(service)
- logger.info("Updated(new): %s", service)
-
- def _update_app_service_txn(self, txn, service):
- as_id = self._get_as_id_txn(txn, service.token)
- if not as_id:
- logger.warning(
- "update_app_service_txn: Failed to find as_id for token=",
- service.token
- )
- return False
-
- txn.execute(
- "UPDATE application_services SET url=?, hs_token=?, sender=? "
- "WHERE id=?",
- (service.url, service.hs_token, service.sender, as_id,)
- )
- # cleanup regex
- txn.execute(
- "DELETE FROM application_services_regex WHERE as_id=?",
- (as_id,)
- )
- for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST):
- if ns_str in service.namespaces:
- for regex_obj in service.namespaces[ns_str]:
- txn.execute(
- "INSERT INTO application_services_regex("
- "as_id, namespace, regex) values(?,?,?)",
- (as_id, ns_int, simplejson.dumps(regex_obj))
- )
- return True
-
- def _get_as_id_txn(self, txn, token):
- cursor = txn.execute(
- "SELECT id FROM application_services WHERE token=?",
- (token,)
- )
- res = cursor.fetchone()
- if res:
- return res[0]
-
- @defer.inlineCallbacks
def get_app_services(self):
- yield self.cache_defer # make sure the cache is ready
- defer.returnValue(self.services_cache)
+ return defer.succeed(self.services_cache)
- @defer.inlineCallbacks
def get_app_service_by_user_id(self, user_id):
"""Retrieve an application service from their user ID.
@@ -174,37 +55,23 @@ class ApplicationServiceStore(SQLBaseStore):
Returns:
synapse.appservice.ApplicationService or None.
"""
-
- yield self.cache_defer # make sure the cache is ready
-
for service in self.services_cache:
if service.sender == user_id:
- defer.returnValue(service)
- return
- defer.returnValue(None)
+ return defer.succeed(service)
+ return defer.succeed(None)
- @defer.inlineCallbacks
- def get_app_service_by_token(self, token, from_cache=True):
+ def get_app_service_by_token(self, token):
"""Get the application service with the given appservice token.
Args:
token (str): The application service token.
- from_cache (bool): True to get this service from the cache, False to
- check the database.
- Raises:
- StoreError if there was a problem retrieving this service.
+ Returns:
+ synapse.appservice.ApplicationService or None.
"""
- yield self.cache_defer # make sure the cache is ready
-
- if from_cache:
- for service in self.services_cache:
- if service.token == token:
- defer.returnValue(service)
- return
- defer.returnValue(None)
-
- # TODO: The from_cache=False impl
- # TODO: This should be JOINed with the application_services_regex table.
+ for service in self.services_cache:
+ if service.token == token:
+ return defer.succeed(service)
+ return defer.succeed(None)
def get_app_service_rooms(self, service):
"""Get a list of RoomsForUser for this application service.
@@ -277,12 +144,7 @@ class ApplicationServiceStore(SQLBaseStore):
return rooms_for_user_matching_user_id
- @defer.inlineCallbacks
- def _populate_cache(self):
- """Populates the ApplicationServiceCache from the database."""
- sql = ("SELECT * FROM application_services LEFT JOIN "
- "application_services_regex ON application_services.id = "
- "application_services_regex.as_id")
+ def _parse_services_dict(self, results):
# SQL results in the form:
# [
# {
@@ -296,12 +158,14 @@ class ApplicationServiceStore(SQLBaseStore):
# }
# ]
services = {}
- results = yield self._execute_and_decode("_populate_cache", sql)
for res in results:
as_token = res["token"]
+ if as_token is None:
+ continue
if as_token not in services:
# add the service
services[as_token] = {
+ "id": res["id"],
"url": res["url"],
"token": as_token,
"hs_token": res["hs_token"],
@@ -319,20 +183,287 @@ class ApplicationServiceStore(SQLBaseStore):
try:
services[as_token]["namespaces"][
ApplicationService.NS_LIST[ns_int]].append(
- simplejson.loads(res["regex"])
+ json.loads(res["regex"])
)
except IndexError:
logger.error("Bad namespace enum '%s'. %s", ns_int, res)
except JSONDecodeError:
logger.error("Bad regex object '%s'", res["regex"])
- # TODO get last successful txn id f.e. service
+ service_list = []
for service in services.values():
- logger.info("Found application service: %s", service)
- self.services_cache.append(ApplicationService(
+ service_list.append(ApplicationService(
token=service["token"],
url=service["url"],
namespaces=service["namespaces"],
hs_token=service["hs_token"],
- sender=service["sender"]
+ sender=service["sender"],
+ id=service["id"]
))
+ return service_list
+
+ def _load_appservice(self, as_info):
+ required_string_fields = [
+ "url", "as_token", "hs_token", "sender_localpart"
+ ]
+ for field in required_string_fields:
+ if not isinstance(as_info.get(field), basestring):
+ raise KeyError("Required string field: '%s'", field)
+
+ localpart = as_info["sender_localpart"]
+ if urllib.quote(localpart) != localpart:
+ raise ValueError(
+ "sender_localpart needs characters which are not URL encoded."
+ )
+ user = UserID(localpart, self.hostname)
+ user_id = user.to_string()
+
+ # namespace checks
+ if not isinstance(as_info.get("namespaces"), dict):
+ raise KeyError("Requires 'namespaces' object.")
+ for ns in ApplicationService.NS_LIST:
+ # specific namespaces are optional
+ if ns in as_info["namespaces"]:
+ # expect a list of dicts with exclusive and regex keys
+ for regex_obj in as_info["namespaces"][ns]:
+ if not isinstance(regex_obj, dict):
+ raise ValueError(
+ "Expected namespace entry in %s to be an object,"
+ " but got %s", ns, regex_obj
+ )
+ if not isinstance(regex_obj.get("regex"), basestring):
+ raise ValueError(
+ "Missing/bad type 'regex' key in %s", regex_obj
+ )
+ if not isinstance(regex_obj.get("exclusive"), bool):
+ raise ValueError(
+ "Missing/bad type 'exclusive' key in %s", regex_obj
+ )
+ return ApplicationService(
+ token=as_info["as_token"],
+ url=as_info["url"],
+ namespaces=as_info["namespaces"],
+ hs_token=as_info["hs_token"],
+ sender=user_id,
+ id=as_info["as_token"] # the token is the only unique thing here
+ )
+
+ def _populate_appservice_cache(self, config_files):
+ """Populates a cache of Application Services from the config files."""
+ if not isinstance(config_files, list):
+ logger.warning(
+ "Expected %s to be a list of AS config files.", config_files
+ )
+ return
+
+ for config_file in config_files:
+ try:
+ with open(config_file, 'r') as f:
+ appservice = self._load_appservice(yaml.load(f))
+ logger.info("Loaded application service: %s", appservice)
+ self.services_cache.append(appservice)
+ except Exception as e:
+ logger.error("Failed to load appservice from '%s'", config_file)
+ logger.exception(e)
+
+
+class ApplicationServiceTransactionStore(SQLBaseStore):
+
+ def __init__(self, hs):
+ super(ApplicationServiceTransactionStore, self).__init__(hs)
+
+ @defer.inlineCallbacks
+ def get_appservices_by_state(self, state):
+ """Get a list of application services based on their state.
+
+ Args:
+ state(ApplicationServiceState): The state to filter on.
+ Returns:
+ A Deferred which resolves to a list of ApplicationServices, which
+ may be empty.
+ """
+ results = yield self._simple_select_list(
+ "application_services_state",
+ dict(state=state),
+ ["as_id"]
+ )
+ # NB: This assumes this class is linked with ApplicationServiceStore
+ as_list = yield self.get_app_services()
+ services = []
+
+ for res in results:
+ for service in as_list:
+ if service.id == res["as_id"]:
+ services.append(service)
+ defer.returnValue(services)
+
+ @defer.inlineCallbacks
+ def get_appservice_state(self, service):
+ """Get the application service state.
+
+ Args:
+ service(ApplicationService): The service whose state to set.
+ Returns:
+ A Deferred which resolves to ApplicationServiceState.
+ """
+ result = yield self._simple_select_one(
+ "application_services_state",
+ dict(as_id=service.id),
+ ["state"],
+ allow_none=True
+ )
+ if result:
+ defer.returnValue(result.get("state"))
+ return
+ defer.returnValue(None)
+
+ def set_appservice_state(self, service, state):
+ """Set the application service state.
+
+ Args:
+ service(ApplicationService): The service whose state to set.
+ state(ApplicationServiceState): The connectivity state to apply.
+ Returns:
+ A Deferred which resolves when the state was set successfully.
+ """
+ return self._simple_upsert(
+ "application_services_state",
+ dict(as_id=service.id),
+ dict(state=state)
+ )
+
+ def create_appservice_txn(self, service, events):
+ """Atomically creates a new transaction for this application service
+ with the given list of events.
+
+ Args:
+ service(ApplicationService): The service who the transaction is for.
+ events(list<Event>): A list of events to put in the transaction.
+ Returns:
+ AppServiceTransaction: A new transaction.
+ """
+ return self.runInteraction(
+ "create_appservice_txn",
+ self._create_appservice_txn,
+ service, events
+ )
+
+ def _create_appservice_txn(self, txn, service, events):
+ # work out new txn id (highest txn id for this service += 1)
+ # The highest id may be the last one sent (in which case it is last_txn)
+ # or it may be the highest in the txns list (which are waiting to be/are
+ # being sent)
+ last_txn_id = self._get_last_txn(txn, service.id)
+
+ result = txn.execute(
+ "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
+ (service.id,)
+ )
+ highest_txn_id = result.fetchone()[0]
+ if highest_txn_id is None:
+ highest_txn_id = 0
+
+ new_txn_id = max(highest_txn_id, last_txn_id) + 1
+
+ # Insert new txn into txn table
+ event_ids = [e.event_id for e in events]
+ txn.execute(
+ "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
+ "VALUES(?,?,?)",
+ (service.id, new_txn_id, json.dumps(event_ids))
+ )
+ return AppServiceTransaction(
+ service=service, id=new_txn_id, events=events
+ )
+
+ def complete_appservice_txn(self, txn_id, service):
+ """Completes an application service transaction.
+
+ Args:
+ txn_id(str): The transaction ID being completed.
+ service(ApplicationService): The application service which was sent
+ this transaction.
+ Returns:
+ A Deferred which resolves if this transaction was stored
+ successfully.
+ """
+ return self.runInteraction(
+ "complete_appservice_txn",
+ self._complete_appservice_txn,
+ txn_id, service
+ )
+
+ def _complete_appservice_txn(self, txn, txn_id, service):
+ txn_id = int(txn_id)
+
+ # Debugging query: Make sure the txn being completed is EXACTLY +1 from
+ # what was there before. If it isn't, we've got problems (e.g. the AS
+ # has probably missed some events), so whine loudly but still continue,
+ # since it shouldn't fail completion of the transaction.
+ last_txn_id = self._get_last_txn(txn, service.id)
+ if (last_txn_id + 1) != txn_id:
+ logger.error(
+ "appservice: Completing a transaction which has an ID > 1 from "
+ "the last ID sent to this AS. We've either dropped events or "
+ "sent it to the AS out of order. FIX ME. last_txn=%s "
+ "completing_txn=%s service_id=%s", last_txn_id, txn_id,
+ service.id
+ )
+
+ # Set current txn_id for AS to 'txn_id'
+ self._simple_upsert_txn(
+ txn, "application_services_state", dict(as_id=service.id),
+ dict(last_txn=txn_id)
+ )
+
+ # Delete txn
+ self._simple_delete_txn(
+ txn, "application_services_txns",
+ dict(txn_id=txn_id, as_id=service.id)
+ )
+
+ def get_oldest_unsent_txn(self, service):
+ """Get the oldest transaction which has not been sent for this
+ service.
+
+ Args:
+ service(ApplicationService): The app service to get the oldest txn.
+ Returns:
+ A Deferred which resolves to an AppServiceTransaction or
+ None.
+ """
+ return self.runInteraction(
+ "get_oldest_unsent_appservice_txn",
+ self._get_oldest_unsent_txn,
+ service
+ )
+
+ def _get_oldest_unsent_txn(self, txn, service):
+ # Monotonically increasing txn ids, so just select the smallest
+ # one in the txns table (we delete them when they are sent)
+ result = txn.execute(
+ "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?",
+ (service.id,)
+ )
+ entry = self.cursor_to_dict(result)[0]
+ if not entry or entry["txn_id"] is None:
+ # the min(txn_id) part will force a row, so entry may not be None
+ return None
+
+ event_ids = json.loads(entry["event_ids"])
+ events = self._get_events_txn(txn, event_ids)
+
+ return AppServiceTransaction(
+ service=service, id=entry["txn_id"], events=events
+ )
+
+ def _get_last_txn(self, txn, service_id):
+ result = txn.execute(
+ "SELECT last_txn FROM application_services_state WHERE as_id=?",
+ (service_id,)
+ )
+ last_txn_id = result.fetchone()
+ if last_txn_id is None or last_txn_id[0] is None: # no row exists
+ return 0
+ else:
+ return int(last_txn_id[0]) # select 'last_txn' col
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 68b7d59693..0199539fea 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
from synapse.api.errors import SynapseError
@@ -48,6 +48,7 @@ class DirectoryStore(SQLBaseStore):
{"room_alias": room_alias.to_string()},
"room_id",
allow_none=True,
+ desc="get_association_from_room_alias",
)
if not room_id:
@@ -58,6 +59,7 @@ class DirectoryStore(SQLBaseStore):
"room_alias_servers",
{"room_alias": room_alias.to_string()},
"server",
+ desc="get_association_from_room_alias",
)
if not servers:
@@ -87,6 +89,7 @@ class DirectoryStore(SQLBaseStore):
"room_alias": room_alias.to_string(),
"room_id": room_id,
},
+ desc="create_room_alias_association",
)
except sqlite3.IntegrityError:
raise SynapseError(
@@ -100,16 +103,22 @@ class DirectoryStore(SQLBaseStore):
{
"room_alias": room_alias.to_string(),
"server": server,
- }
+ },
+ desc="create_room_alias_association",
)
+ self.get_aliases_for_room.invalidate(room_id)
+ @defer.inlineCallbacks
def delete_room_alias(self, room_alias):
- return self.runInteraction(
+ room_id = yield self.runInteraction(
"delete_room_alias",
self._delete_room_alias_txn,
room_alias,
)
+ self.get_aliases_for_room.invalidate(room_id)
+ defer.returnValue(room_id)
+
def _delete_room_alias_txn(self, txn, room_alias):
cursor = txn.execute(
"SELECT room_id FROM room_aliases WHERE room_alias = ?",
@@ -134,9 +143,11 @@ class DirectoryStore(SQLBaseStore):
return room_id
+ @cached()
def get_aliases_for_room(self, room_id):
return self._simple_select_onecol(
"room_aliases",
{"room_id": room_id},
"room_alias",
+ desc="get_aliases_for_room",
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
new file mode 100644
index 0000000000..a86230d92c
--- /dev/null
+++ b/synapse/storage/events.py
@@ -0,0 +1,395 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from _base import SQLBaseStore, _RollbackButIsFineException
+
+from twisted.internet import defer
+
+from synapse.util.logutils import log_function
+from synapse.api.constants import EventTypes
+from synapse.crypto.event_signing import compute_event_reference_hash
+
+from syutil.base64util import decode_base64
+from syutil.jsonutil import encode_canonical_json
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class EventsStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ @log_function
+ def persist_event(self, event, context, backfilled=False,
+ is_new_state=True, current_state=None):
+ stream_ordering = None
+ if backfilled:
+ if not self.min_token_deferred.called:
+ yield self.min_token_deferred
+ self.min_token -= 1
+ stream_ordering = self.min_token
+
+ try:
+ yield self.runInteraction(
+ "persist_event",
+ self._persist_event_txn,
+ event=event,
+ context=context,
+ backfilled=backfilled,
+ stream_ordering=stream_ordering,
+ is_new_state=is_new_state,
+ current_state=current_state,
+ )
+ self.get_room_events_max_id.invalidate()
+ except _RollbackButIsFineException:
+ pass
+
+ @defer.inlineCallbacks
+ def get_event(self, event_id, check_redacted=True,
+ get_prev_content=False, allow_rejected=False,
+ allow_none=False):
+ """Get an event from the database by event_id.
+
+ Args:
+ event_id (str): The event_id of the event to fetch
+ check_redacted (bool): If True, check if event has been redacted
+ and redact it.
+ get_prev_content (bool): If True and event is a state event,
+ include the previous states content in the unsigned field.
+ allow_rejected (bool): If True return rejected events.
+ allow_none (bool): If True, return None if no event found, if
+ False throw an exception.
+
+ Returns:
+ Deferred : A FrozenEvent.
+ """
+ event = yield self.runInteraction(
+ "get_event", self._get_event_txn,
+ event_id,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
+
+ if not event and not allow_none:
+ raise RuntimeError("Could not find event %s" % (event_id,))
+
+ defer.returnValue(event)
+
+ @log_function
+ def _persist_event_txn(self, txn, event, context, backfilled,
+ stream_ordering=None, is_new_state=True,
+ current_state=None):
+
+ # Remove the any existing cache entries for the event_id
+ self._get_event_cache.pop(event.event_id)
+
+ # We purposefully do this first since if we include a `current_state`
+ # key, we *want* to update the `current_state_events` table
+ if current_state:
+ txn.execute(
+ "DELETE FROM current_state_events WHERE room_id = ?",
+ (event.room_id,)
+ )
+
+ for s in current_state:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": s.event_id,
+ "room_id": s.room_id,
+ "type": s.type,
+ "state_key": s.state_key,
+ },
+ or_replace=True,
+ )
+
+ if event.is_state() and is_new_state:
+ if not backfilled and not context.rejected:
+ self._simple_insert_txn(
+ txn,
+ table="state_forward_extremities",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ or_replace=True,
+ )
+
+ for prev_state_id, _ in event.prev_state:
+ self._simple_delete_txn(
+ txn,
+ table="state_forward_extremities",
+ keyvalues={
+ "event_id": prev_state_id,
+ }
+ )
+
+ outlier = event.internal_metadata.is_outlier()
+
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
+
+ self._update_min_depth_for_room_txn(
+ txn,
+ event.room_id,
+ event.depth
+ )
+
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
+ have_persisted = self._simple_select_one_onecol_txn(
+ txn,
+ table="event_json",
+ keyvalues={"event_id": event.event_id},
+ retcol="event_id",
+ allow_none=True,
+ )
+
+ metadata_json = encode_canonical_json(
+ event.internal_metadata.get_dict()
+ )
+
+ # If we have already persisted this event, we don't need to do any
+ # more processing.
+ # The processing above must be done on every call to persist event,
+ # since they might not have happened on previous calls. For example,
+ # if we are persisting an event that we had persisted as an outlier,
+ # but is no longer one.
+ if have_persisted:
+ if not outlier:
+ sql = (
+ "UPDATE event_json SET internal_metadata = ?"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (metadata_json.decode("UTF-8"), event.event_id,)
+ )
+
+ sql = (
+ "UPDATE events SET outlier = 0"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (event.event_id,)
+ )
+ return
+
+ if event.type == EventTypes.Member:
+ self._store_room_member_txn(txn, event)
+ elif event.type == EventTypes.Feedback:
+ self._store_feedback_txn(txn, event)
+ elif event.type == EventTypes.Name:
+ self._store_room_name_txn(txn, event)
+ elif event.type == EventTypes.Topic:
+ self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Redaction:
+ self._store_redaction(txn, event)
+
+ event_dict = {
+ k: v
+ for k, v in event.get_dict().items()
+ if k not in [
+ "redacted",
+ "redacted_because",
+ ]
+ }
+
+ self._simple_insert_txn(
+ txn,
+ table="event_json",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "internal_metadata": metadata_json.decode("UTF-8"),
+ "json": encode_canonical_json(event_dict).decode("UTF-8"),
+ },
+ or_replace=True,
+ )
+
+ content = encode_canonical_json(
+ event.content
+ ).decode("UTF-8")
+
+ vals = {
+ "topological_ordering": event.depth,
+ "event_id": event.event_id,
+ "type": event.type,
+ "room_id": event.room_id,
+ "content": content,
+ "processed": True,
+ "outlier": outlier,
+ "depth": event.depth,
+ }
+
+ if stream_ordering is not None:
+ vals["stream_ordering"] = stream_ordering
+
+ unrec = {
+ k: v
+ for k, v in event.get_dict().items()
+ if k not in vals.keys() and k not in [
+ "redacted",
+ "redacted_because",
+ "signatures",
+ "hashes",
+ "prev_events",
+ ]
+ }
+
+ vals["unrecognized_keys"] = encode_canonical_json(
+ unrec
+ ).decode("UTF-8")
+
+ try:
+ self._simple_insert_txn(
+ txn,
+ "events",
+ vals,
+ or_replace=(not outlier),
+ or_ignore=bool(outlier),
+ )
+ except:
+ logger.warn(
+ "Failed to persist, probably duplicate: %s",
+ event.event_id,
+ exc_info=True,
+ )
+ raise _RollbackButIsFineException("_persist_event")
+
+ if context.rejected:
+ self._store_rejections_txn(txn, event.event_id, context.rejected)
+
+ if event.is_state():
+ vals = {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
+
+ # TODO: How does this work with backfilling?
+ if hasattr(event, "replaces_state"):
+ vals["prev_state"] = event.replaces_state
+
+ self._simple_insert_txn(
+ txn,
+ "state_events",
+ vals,
+ )
+
+ if is_new_state and not context.rejected:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ )
+
+ for e_id, h in event.prev_state:
+ self._simple_insert_txn(
+ txn,
+ table="event_edges",
+ values={
+ "event_id": event.event_id,
+ "prev_event_id": e_id,
+ "room_id": event.room_id,
+ "is_state": 1,
+ },
+ )
+
+ for hash_alg, hash_base64 in event.hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_event_content_hash_txn(
+ txn, event.event_id, hash_alg, hash_bytes,
+ )
+
+ for prev_event_id, prev_hashes in event.prev_events:
+ for alg, hash_base64 in prev_hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_prev_event_hash_txn(
+ txn, event.event_id, prev_event_id, alg, hash_bytes
+ )
+
+ for auth_id, _ in event.auth_events:
+ self._simple_insert_txn(
+ txn,
+ table="event_auth",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "auth_id": auth_id,
+ },
+ )
+
+ (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+ self._store_event_reference_hash_txn(
+ txn, event.event_id, ref_alg, ref_hash_bytes
+ )
+
+ def _store_redaction(self, txn, event):
+ # invalidate the cache for the redacted event
+ self._get_event_cache.pop(event.redacts)
+ txn.execute(
+ "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
+ (event.event_id, event.redacts)
+ )
+
+ def have_events(self, event_ids):
+ """Given a list of event ids, check if we have already processed them.
+
+ Returns:
+ dict: Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps to
+ None.
+ """
+ if not event_ids:
+ return defer.succeed({})
+
+ def f(txn):
+ sql = (
+ "SELECT e.event_id, reason FROM events as e "
+ "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+ "WHERE e.event_id = ?"
+ )
+
+ res = {}
+ for event_id in event_ids:
+ txn.execute(sql, (event_id,))
+ row = txn.fetchone()
+ if row:
+ _, rejected = row
+ res[event_id] = rejected
+
+ return res
+
+ return self.runInteraction(
+ "have_events", f,
+ )
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
deleted file mode 100644
index 8eab769b71..0000000000
--- a/synapse/storage/feedback.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from twisted.internet import defer
-
-from ._base import SQLBaseStore
-
-
-class FeedbackStore(SQLBaseStore):
-
- def _store_feedback_txn(self, txn, event):
- self._simple_insert_txn(txn, "feedback", {
- "event_id": event.event_id,
- "feedback_type": event.content["type"],
- "room_id": event.room_id,
- "target_event_id": event.content["target_event_id"],
- "sender": event.user_id,
- })
-
- @defer.inlineCallbacks
- def get_feedback_for_event(self, event_id):
- sql = (
- "SELECT events.* FROM events INNER JOIN feedback "
- "ON events.event_id = feedback.event_id "
- "WHERE feedback.target_event_id = ? "
- )
-
- rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id)
-
- defer.returnValue(
- [
- (yield self._parse_events(r))
- for r in rows
- ]
- )
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 457a11fd02..8800116570 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -31,6 +31,7 @@ class FilteringStore(SQLBaseStore):
},
retcol="filter_json",
allow_none=False,
+ desc="get_user_filter",
)
defer.returnValue(json.loads(def_json))
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 7101d2beec..7bf57234f6 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -32,6 +32,7 @@ class MediaRepositoryStore(SQLBaseStore):
{"media_id": media_id},
("media_type", "media_length", "upload_name", "created_ts"),
allow_none=True,
+ desc="get_local_media",
)
def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
@@ -45,7 +46,8 @@ class MediaRepositoryStore(SQLBaseStore):
"upload_name": upload_name,
"media_length": media_length,
"user_id": user_id.to_string(),
- }
+ },
+ desc="store_local_media",
)
def get_local_media_thumbnails(self, media_id):
@@ -55,7 +57,8 @@ class MediaRepositoryStore(SQLBaseStore):
(
"thumbnail_width", "thumbnail_height", "thumbnail_method",
"thumbnail_type", "thumbnail_length",
- )
+ ),
+ desc="get_local_media_thumbnails",
)
def store_local_thumbnail(self, media_id, thumbnail_width,
@@ -70,7 +73,8 @@ class MediaRepositoryStore(SQLBaseStore):
"thumbnail_method": thumbnail_method,
"thumbnail_type": thumbnail_type,
"thumbnail_length": thumbnail_length,
- }
+ },
+ desc="store_local_thumbnail",
)
def get_cached_remote_media(self, origin, media_id):
@@ -82,6 +86,7 @@ class MediaRepositoryStore(SQLBaseStore):
"filesystem_id",
),
allow_none=True,
+ desc="get_cached_remote_media",
)
def store_cached_remote_media(self, origin, media_id, media_type,
@@ -97,7 +102,8 @@ class MediaRepositoryStore(SQLBaseStore):
"created_ts": time_now_ms,
"upload_name": upload_name,
"filesystem_id": filesystem_id,
- }
+ },
+ desc="store_cached_remote_media",
)
def get_remote_media_thumbnails(self, origin, media_id):
@@ -107,7 +113,8 @@ class MediaRepositoryStore(SQLBaseStore):
(
"thumbnail_width", "thumbnail_height", "thumbnail_method",
"thumbnail_type", "thumbnail_length", "filesystem_id",
- )
+ ),
+ desc="get_remote_media_thumbnails",
)
def store_remote_media_thumbnail(self, origin, media_id, filesystem_id,
@@ -125,5 +132,6 @@ class MediaRepositoryStore(SQLBaseStore):
"thumbnail_type": thumbnail_type,
"thumbnail_length": thumbnail_length,
"filesystem_id": filesystem_id,
- }
+ },
+ desc="store_remote_media_thumbnail",
)
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 1dcd34723b..87fba55439 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -21,6 +21,7 @@ class PresenceStore(SQLBaseStore):
return self._simple_insert(
table="presence",
values={"user_id": user_localpart},
+ desc="create_presence",
)
def has_presence_state(self, user_localpart):
@@ -29,6 +30,7 @@ class PresenceStore(SQLBaseStore):
keyvalues={"user_id": user_localpart},
retcols=["user_id"],
allow_none=True,
+ desc="has_presence_state",
)
def get_presence_state(self, user_localpart):
@@ -36,6 +38,7 @@ class PresenceStore(SQLBaseStore):
table="presence",
keyvalues={"user_id": user_localpart},
retcols=["state", "status_msg", "mtime"],
+ desc="get_presence_state",
)
def set_presence_state(self, user_localpart, new_state):
@@ -45,7 +48,7 @@ class PresenceStore(SQLBaseStore):
updatevalues={"state": new_state["state"],
"status_msg": new_state["status_msg"],
"mtime": self._clock.time_msec()},
- retcols=["state"],
+ desc="set_presence_state",
)
def allow_presence_visible(self, observed_localpart, observer_userid):
@@ -53,6 +56,7 @@ class PresenceStore(SQLBaseStore):
table="presence_allow_inbound",
values={"observed_user_id": observed_localpart,
"observer_user_id": observer_userid},
+ desc="allow_presence_visible",
)
def disallow_presence_visible(self, observed_localpart, observer_userid):
@@ -60,6 +64,7 @@ class PresenceStore(SQLBaseStore):
table="presence_allow_inbound",
keyvalues={"observed_user_id": observed_localpart,
"observer_user_id": observer_userid},
+ desc="disallow_presence_visible",
)
def is_presence_visible(self, observed_localpart, observer_userid):
@@ -69,6 +74,7 @@ class PresenceStore(SQLBaseStore):
"observer_user_id": observer_userid},
retcols=["observed_user_id"],
allow_none=True,
+ desc="is_presence_visible",
)
def add_presence_list_pending(self, observer_localpart, observed_userid):
@@ -77,6 +83,7 @@ class PresenceStore(SQLBaseStore):
values={"user_id": observer_localpart,
"observed_user_id": observed_userid,
"accepted": False},
+ desc="add_presence_list_pending",
)
def set_presence_list_accepted(self, observer_localpart, observed_userid):
@@ -85,6 +92,7 @@ class PresenceStore(SQLBaseStore):
keyvalues={"user_id": observer_localpart,
"observed_user_id": observed_userid},
updatevalues={"accepted": True},
+ desc="set_presence_list_accepted",
)
def get_presence_list(self, observer_localpart, accepted=None):
@@ -96,6 +104,7 @@ class PresenceStore(SQLBaseStore):
table="presence_list",
keyvalues=keyvalues,
retcols=["observed_user_id", "accepted"],
+ desc="get_presence_list",
)
def del_presence_list(self, observer_localpart, observed_userid):
@@ -103,4 +112,5 @@ class PresenceStore(SQLBaseStore):
table="presence_list",
keyvalues={"user_id": observer_localpart,
"observed_user_id": observed_userid},
+ desc="del_presence_list",
)
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 153c7ad027..a6e52cb248 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -21,6 +21,7 @@ class ProfileStore(SQLBaseStore):
return self._simple_insert(
table="profiles",
values={"user_id": user_localpart},
+ desc="create_profile",
)
def get_profile_displayname(self, user_localpart):
@@ -28,6 +29,7 @@ class ProfileStore(SQLBaseStore):
table="profiles",
keyvalues={"user_id": user_localpart},
retcol="displayname",
+ desc="get_profile_displayname",
)
def set_profile_displayname(self, user_localpart, new_displayname):
@@ -35,6 +37,7 @@ class ProfileStore(SQLBaseStore):
table="profiles",
keyvalues={"user_id": user_localpart},
updatevalues={"displayname": new_displayname},
+ desc="set_profile_displayname",
)
def get_profile_avatar_url(self, user_localpart):
@@ -42,6 +45,7 @@ class ProfileStore(SQLBaseStore):
table="profiles",
keyvalues={"user_id": user_localpart},
retcol="avatar_url",
+ desc="get_profile_avatar_url",
)
def set_profile_avatar_url(self, user_localpart, new_avatar_url):
@@ -49,4 +53,5 @@ class ProfileStore(SQLBaseStore):
table="profiles",
keyvalues={"user_id": user_localpart},
updatevalues={"avatar_url": new_avatar_url},
+ desc="set_profile_avatar_url",
)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index d769db2c78..c47bdc2861 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -50,7 +50,8 @@ class PushRuleStore(SQLBaseStore):
results = yield self._simple_select_list(
PushRuleEnableTable.table_name,
{'user_name': user_name},
- PushRuleEnableTable.fields
+ PushRuleEnableTable.fields,
+ desc="get_push_rules_enabled_for_user",
)
defer.returnValue(
{r['rule_id']: False if r['enabled'] == 0 else True for r in results}
@@ -201,7 +202,8 @@ class PushRuleStore(SQLBaseStore):
"""
yield self._simple_delete_one(
PushRuleTable.table_name,
- {'user_name': user_name, 'rule_id': rule_id}
+ {'user_name': user_name, 'rule_id': rule_id},
+ desc="delete_push_rule",
)
@defer.inlineCallbacks
@@ -209,7 +211,8 @@ class PushRuleStore(SQLBaseStore):
yield self._simple_upsert(
PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id},
- {'enabled': enabled}
+ {'enabled': enabled},
+ desc="set_push_rule_enabled",
)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 587dada68f..000502b4ff 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -114,7 +114,9 @@ class PusherStore(SQLBaseStore):
ts=pushkey_ts,
lang=lang,
data=data
- ))
+ ),
+ desc="add_pusher",
+ )
except Exception as e:
logger.error("create_pusher with failed: %s", e)
raise StoreError(500, "Problem creating pusher.")
@@ -123,7 +125,8 @@ class PusherStore(SQLBaseStore):
def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
yield self._simple_delete_one(
PushersTable.table_name,
- dict(app_id=app_id, pushkey=pushkey)
+ {"app_id": app_id, "pushkey": pushkey},
+ desc="delete_pusher_by_app_id_pushkey",
)
@defer.inlineCallbacks
@@ -131,7 +134,8 @@ class PusherStore(SQLBaseStore):
yield self._simple_update_one(
PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey},
- {'last_token': last_token}
+ {'last_token': last_token},
+ desc="update_pusher_last_token",
)
@defer.inlineCallbacks
@@ -140,7 +144,8 @@ class PusherStore(SQLBaseStore):
yield self._simple_update_one(
PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey},
- {'last_token': last_token, 'last_success': last_success}
+ {'last_token': last_token, 'last_success': last_success},
+ desc="update_pusher_last_token_and_success",
)
@defer.inlineCallbacks
@@ -148,7 +153,8 @@ class PusherStore(SQLBaseStore):
yield self._simple_update_one(
PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey},
- {'failing_since': failing_since}
+ {'failing_since': failing_since},
+ desc="update_pusher_failing_since",
)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 3c2f1d6a15..f24154f146 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -39,7 +39,10 @@ class RegistrationStore(SQLBaseStore):
Raises:
StoreError if there was a problem adding this.
"""
- row = yield self._simple_select_one("users", {"name": user_id}, ["id"])
+ row = yield self._simple_select_one(
+ "users", {"name": user_id}, ["id"],
+ desc="add_access_token_to_user",
+ )
if not row:
raise StoreError(400, "Bad user ID supplied.")
row_id = row["id"]
@@ -48,7 +51,8 @@ class RegistrationStore(SQLBaseStore):
{
"user_id": row_id,
"token": token
- }
+ },
+ desc="add_access_token_to_user",
)
@defer.inlineCallbacks
@@ -120,6 +124,7 @@ class RegistrationStore(SQLBaseStore):
keyvalues={"name": user.to_string()},
retcol="admin",
allow_none=True,
+ desc="is_server_admin",
)
defer.returnValue(res if res else False)
diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py
index 4e1a9a2783..0838eb3d12 100644
--- a/synapse/storage/rejections.py
+++ b/synapse/storage/rejections.py
@@ -29,7 +29,7 @@ class RejectionsStore(SQLBaseStore):
"event_id": event_id,
"reason": reason,
"last_check": self._clock.time_msec(),
- }
+ },
)
def get_rejection_reason(self, event_id):
@@ -40,4 +40,5 @@ class RejectionsStore(SQLBaseStore):
"event_id": event_id,
},
allow_none=True,
+ desc="get_rejection_reason",
)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 549c9af393..be3e28c2ea 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -15,11 +15,9 @@
from twisted.internet import defer
-from sqlite3 import IntegrityError
-
from synapse.api.errors import StoreError
-from ._base import SQLBaseStore, Table
+from ._base import SQLBaseStore
import collections
import logging
@@ -27,8 +25,9 @@ import logging
logger = logging.getLogger(__name__)
-OpsLevel = collections.namedtuple("OpsLevel", (
- "ban_level", "kick_level", "redact_level")
+OpsLevel = collections.namedtuple(
+ "OpsLevel",
+ ("ban_level", "kick_level", "redact_level",)
)
@@ -47,13 +46,15 @@ class RoomStore(SQLBaseStore):
StoreError if the room could not be stored.
"""
try:
- yield self._simple_insert(RoomsTable.table_name, dict(
- room_id=room_id,
- creator=room_creator_user_id,
- is_public=is_public
- ))
- except IntegrityError:
- raise StoreError(409, "Room ID in use.")
+ yield self._simple_insert(
+ RoomsTable.table_name,
+ {
+ "room_id": room_id,
+ "creator": room_creator_user_id,
+ "is_public": is_public,
+ },
+ desc="store_room",
+ )
except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
@@ -66,9 +67,11 @@ class RoomStore(SQLBaseStore):
Returns:
A namedtuple containing the room information, or an empty list.
"""
- query = RoomsTable.select_statement("room_id=?")
- return self._execute(
- "get_room", RoomsTable.decode_single_result, query, room_id,
+ return self._simple_select_one(
+ table=RoomsTable.table_name,
+ keyvalues={"room_id": room_id},
+ retcols=RoomsTable.fields,
+ desc="get_room",
)
@defer.inlineCallbacks
@@ -143,7 +146,7 @@ class RoomStore(SQLBaseStore):
"event_id": event.event_id,
"room_id": event.room_id,
"topic": event.content["topic"],
- }
+ },
)
def _store_room_name_txn(self, txn, event):
@@ -158,8 +161,45 @@ class RoomStore(SQLBaseStore):
}
)
+ @defer.inlineCallbacks
+ def get_room_name_and_aliases(self, room_id):
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+ "LIMIT 1"
+ )
+
+ sql = (
+ "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+ "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+ "INNER JOIN state_events as s ON e.event_id = s.event_id "
+ "WHERE c.room_id = ? "
+ ) % {
+ "redacted": del_sql,
+ }
+
+ sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+ sql += " OR s.type = 'm.room.aliases')"
+ args = (room_id,)
-class RoomsTable(Table):
+ results = yield self._execute_and_decode("get_current_state", sql, *args)
+
+ events = yield self._parse_events(results)
+
+ name = None
+ aliases = []
+
+ for e in events:
+ if e.type == 'm.room.name':
+ if 'name' in e.content:
+ name = e.content['name']
+ elif e.type == 'm.room.aliases':
+ if 'aliases' in e.content:
+ aliases.extend(e.content['aliases'])
+
+ defer.returnValue((name, aliases))
+
+
+class RoomsTable(object):
table_name = "rooms"
fields = [
@@ -167,5 +207,3 @@ class RoomsTable(Table):
"is_public",
"creator"
]
-
- EntryType = collections.namedtuple("RoomEntry", fields)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 65ffb4627f..52c37c76f5 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -212,7 +212,8 @@ class RoomMemberStore(SQLBaseStore):
return self._simple_select_onecol(
"room_hosts",
{"room_id": room_id},
- "host"
+ "host",
+ desc="get_joined_hosts_for_room",
)
def _get_members_by_dict(self, where_dict):
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
new file mode 100644
index 0000000000..2b27e2a429
--- /dev/null
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -0,0 +1,30 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services_state(
+ as_id TEXT PRIMARY KEY,
+ state TEXT,
+ last_txn TEXT
+);
+
+CREATE TABLE IF NOT EXISTS application_services_txns(
+ as_id TEXT NOT NULL,
+ txn_id INTEGER NOT NULL,
+ event_ids TEXT NOT NULL,
+ UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+);
+
+
+
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 456e4bd45d..58dbf2802b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -15,6 +15,8 @@
from ._base import SQLBaseStore
+from twisted.internet import defer
+
import logging
logger = logging.getLogger(__name__)
@@ -122,3 +124,33 @@ class StateStore(SQLBaseStore):
},
or_replace=True,
)
+
+ @defer.inlineCallbacks
+ def get_current_state(self, room_id, event_type=None, state_key=""):
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+ "LIMIT 1"
+ )
+
+ sql = (
+ "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+ "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+ "INNER JOIN state_events as s ON e.event_id = s.event_id "
+ "WHERE c.room_id = ? "
+ ) % {
+ "redacted": del_sql,
+ }
+
+ if event_type and state_key is not None:
+ sql += " AND s.type = ? AND s.state_key = ? "
+ args = (room_id, event_type, state_key)
+ elif event_type:
+ sql += " AND s.type = ?"
+ args = (room_id, event_type)
+ else:
+ args = (room_id, )
+
+ results = yield self._execute_and_decode("get_current_state", sql, *args)
+
+ events = yield self._parse_events(results)
+ defer.returnValue(events)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 09bc522210..66f307e640 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,7 +35,7 @@ what sort order was used:
from twisted.internet import defer
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.util.logutils import log_function
@@ -413,12 +413,32 @@ class StreamStore(SQLBaseStore):
"get_recent_events_for_room", get_recent_events_for_room_txn
)
+ @cached(num_args=0)
def get_room_events_max_id(self):
return self.runInteraction(
"get_room_events_max_id",
self._get_room_events_max_id_txn
)
+ @defer.inlineCallbacks
+ def _get_min_token(self):
+ row = yield self._execute(
+ "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
+ )
+
+ self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
+ self.min_token = min(self.min_token, -1)
+
+ logger.debug("min_token is: %s", self.min_token)
+
+ defer.returnValue(self.min_token)
+
+ def get_next_stream_id(self):
+ with self._next_stream_id_lock:
+ i = self._next_stream_id
+ self._next_stream_id += 1
+ return i
+
def _get_room_events_max_id_txn(self, txn):
txn.execute(
"SELECT MAX(stream_ordering) as m FROM events"
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 0b8a3b7a07..b777395e06 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -46,15 +46,19 @@ class TransactionStore(SQLBaseStore):
)
def _get_received_txn_response(self, txn, transaction_id, origin):
- where_clause = "transaction_id = ? AND origin = ?"
- query = ReceivedTransactionsTable.select_statement(where_clause)
-
- txn.execute(query, (transaction_id, origin))
-
- results = ReceivedTransactionsTable.decode_results(txn.fetchall())
+ result = self._simple_select_one_txn(
+ txn,
+ table=ReceivedTransactionsTable.table_name,
+ keyvalues={
+ "transaction_id": transaction_id,
+ "origin": origin,
+ },
+ retcols=ReceivedTransactionsTable.fields,
+ allow_none=True,
+ )
- if results and results[0].response_code:
- return (results[0].response_code, results[0].response_json)
+ if result and result.response_code:
+ return result["response_code"], result["response_json"]
else:
return None
diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py
index 65d5792907..2f7b615f78 100644
--- a/synapse/util/lrucache.py
+++ b/synapse/util/lrucache.py
@@ -90,12 +90,16 @@ class LruCache(object):
def cache_len():
return len(cache)
+ def cache_contains(key):
+ return key in cache
+
self.sentinel = object()
self.get = cache_get
self.set = cache_set
self.setdefault = cache_set_default
self.pop = cache_pop
self.len = cache_len
+ self.contains = cache_contains
def __getitem__(self, key):
result = self.get(key, self.sentinel)
@@ -114,3 +118,6 @@ class LruCache(object):
def __len__(self):
return self.len()
+
+ def __contains__(self, key):
+ return self.contains(key)
|