diff options
Diffstat (limited to 'synapse')
39 files changed, 1617 insertions, 1068 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 64f605b962..18f3d117b3 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -486,7 +486,7 @@ class Auth(object): send_level = send_level_event.content.get("events", {}).get( event.type ) - if not send_level: + if send_level is None: if hasattr(event, "state_key"): send_level = send_level_event.content.get( "state_default", 50 diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 500cae05fb..afb46d2e23 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,15 +32,13 @@ from twisted.web.resource import Resource from twisted.web.static import File from twisted.web.server import Site from synapse.http.server import JsonResource, RootRedirect -from synapse.rest.appservice.v1 import AppServiceRestResource from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.http.server_key_resource import LocalKey from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.api.urls import ( CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, APP_SERVICE_PREFIX, - STATIC_PREFIX + SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, STATIC_PREFIX ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory @@ -78,9 +76,6 @@ class SynapseHomeServer(HomeServer): def build_resource_for_federation(self): return JsonResource(self) - def build_resource_for_app_services(self): - return AppServiceRestResource(self) - def build_resource_for_web_client(self): import syweb syweb_path = os.path.dirname(syweb.__file__) @@ -141,7 +136,6 @@ class SynapseHomeServer(HomeServer): (CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()), (SERVER_KEY_PREFIX, self.get_resource_for_server_key()), (MEDIA_PREFIX, self.get_resource_for_media_repository()), - (APP_SERVICE_PREFIX, self.get_resource_for_app_services()), (STATIC_PREFIX, self.get_resource_for_static_content()), ] diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index a268a6bcc4..63a18b802b 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -20,6 +20,50 @@ import re logger = logging.getLogger(__name__) +class ApplicationServiceState(object): + DOWN = "down" + UP = "up" + + +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) + + def complete(self, store): + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. @@ -35,13 +79,13 @@ class ApplicationService(object): NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] def __init__(self, token, url=None, namespaces=None, hs_token=None, - sender=None, txn_id=None): + sender=None, id=None): self.token = token self.url = url self.hs_token = hs_token self.sender = sender self.namespaces = self._check_namespaces(namespaces) - self.txn_id = txn_id + self.id = id def _check_namespaces(self, namespaces): # Sanity check that it is of the form: @@ -51,7 +95,7 @@ class ApplicationService(object): # rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...], # } if not namespaces: - return None + namespaces = {} for ns in ApplicationService.NS_LIST: if ns not in namespaces: @@ -155,7 +199,10 @@ class ApplicationService(object): return self._matches_user(event, member_list) def is_interested_in_user(self, user_id): - return self._matches_regex(user_id, ApplicationService.NS_USERS) + return ( + self._matches_regex(user_id, ApplicationService.NS_USERS) + or user_id == self.sender + ) def is_interested_in_alias(self, alias): return self._matches_regex(alias, ApplicationService.NS_ALIASES) @@ -164,7 +211,10 @@ class ApplicationService(object): return self._matches_regex(room_id, ApplicationService.NS_ROOMS) def is_exclusive_user(self, user_id): - return self._is_exclusive(ApplicationService.NS_USERS, user_id) + return ( + self._is_exclusive(ApplicationService.NS_USERS, user_id) + or user_id == self.sender + ) def is_exclusive_alias(self, alias): return self._is_exclusive(ApplicationService.NS_ALIASES, alias) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c2179f8d55..2a9becccb3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -72,14 +72,19 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push_bulk(self, service, events): + def push_bulk(self, service, events, txn_id=None): events = self._serialize(events) + if txn_id is None: + logger.warning("push_bulk: Missing txn ID sending events to %s", + service.url) + txn_id = str(0) + txn_id = str(txn_id) + uri = service.url + ("/transactions/%s" % - urllib.quote(str(0))) # TODO txn_ids - response = None + urllib.quote(txn_id)) try: - response = yield self.put_json( + yield self.put_json( uri=uri, json_body={ "events": events @@ -87,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) - if response: # just an empty json object - # TODO: Mark txn as sent successfully - defer.returnValue(True) + defer.returnValue(True) + return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: @@ -97,8 +101,8 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push(self, service, event): - response = yield self.push_bulk(service, [event]) + def push(self, service, event, txn_id=None): + response = yield self.push_bulk(service, [event], txn_id) defer.returnValue(response) def _serialize(self, events): diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py new file mode 100644 index 0000000000..59b0b1f4ac --- /dev/null +++ b/synapse/appservice/scheduler.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +This module controls the reliability for application service transactions. + +The nominal flow through this module looks like: + __________ +1---ASa[e]-->| Service |--> Queue ASa[f] +2----ASb[e]->| Queuer | +3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e] + V + -````````- +------------+ + |````````|<--StoreTxn-|Transaction | + |Database| | Controller |---> SEND TO AS + `--------` +------------+ +What happens on SEND TO AS depends on the state of the Application Service: + - If the AS is marked as DOWN, do nothing. + - If the AS is marked as UP, send the transaction. + * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn + contents from the db. + * FAILURE : Marked AS as DOWN and start Recoverer. + +Recoverer attempts to recover ASes who have died. The flow for this looks like: + ,--------------------- backoff++ --------------. + V | + START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE + backoff DB and try to send it + ^ |___________ +Mark AS as | V +UP & quit +---------- YES SUCCESS + | | | + NO <--- Have more txns? <------ Mark txn success & nuke <-+ + from db; incr AS pos. + Reset backoff. + +This is all tied together by the AppServiceScheduler which DIs the required +components. +""" + +from synapse.appservice import ApplicationServiceState +from twisted.internet import defer +import logging + +logger = logging.getLogger(__name__) + + +class AppServiceScheduler(object): + """ Public facing API for this module. Does the required DI to tie the + components together. This also serves as the "event_pool", which in this + case is a simple array. + """ + + def __init__(self, clock, store, as_api): + self.clock = clock + self.store = store + self.as_api = as_api + + def create_recoverer(service, callback): + return _Recoverer(clock, store, as_api, service, callback) + + self.txn_ctrl = _TransactionController( + clock, store, as_api, create_recoverer + ) + self.queuer = _ServiceQueuer(self.txn_ctrl) + + @defer.inlineCallbacks + def start(self): + logger.info("Starting appservice scheduler") + # check for any DOWN ASes and start recoverers for them. + recoverers = yield _Recoverer.start( + self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + ) + self.txn_ctrl.add_recoverers(recoverers) + + def submit_event_for_as(self, service, event): + self.queuer.enqueue(service, event) + + +class _ServiceQueuer(object): + """Queues events for the same application service together, sending + transactions as soon as possible. Once a transaction is sent successfully, + this schedules any other events in the queue to run. + """ + + def __init__(self, txn_ctrl): + self.queued_events = {} # dict of {service_id: [events]} + self.pending_requests = {} # dict of {service_id: Deferred} + self.txn_ctrl = txn_ctrl + + def enqueue(self, service, event): + # if this service isn't being sent something + if not self.pending_requests.get(service.id): + self._send_request(service, [event]) + else: + # add to queue for this service + if service.id not in self.queued_events: + self.queued_events[service.id] = [] + self.queued_events[service.id].append(event) + + def _send_request(self, service, events): + # send request and add callbacks + d = self.txn_ctrl.send(service, events) + d.addBoth(self._on_request_finish) + d.addErrback(self._on_request_fail) + self.pending_requests[service.id] = d + + def _on_request_finish(self, service): + self.pending_requests[service.id] = None + # if there are queued events, then send them. + if (service.id in self.queued_events + and len(self.queued_events[service.id]) > 0): + self._send_request(service, self.queued_events[service.id]) + self.queued_events[service.id] = [] + + def _on_request_fail(self, err): + logger.error("AS request failed: %s", err) + + +class _TransactionController(object): + + def __init__(self, clock, store, as_api, recoverer_fn): + self.clock = clock + self.store = store + self.as_api = as_api + self.recoverer_fn = recoverer_fn + # keep track of how many recoverers there are + self.recoverers = [] + + @defer.inlineCallbacks + def send(self, service, events): + try: + txn = yield self.store.create_appservice_txn( + service=service, + events=events + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: + txn.complete(self.store) + else: + self._start_recoverer(service) + except Exception as e: + logger.exception(e) + self._start_recoverer(service) + # request has finished + defer.returnValue(service) + + @defer.inlineCallbacks + def on_recovered(self, recoverer): + self.recoverers.remove(recoverer) + logger.info("Successfully recovered application service AS ID %s", + recoverer.service.id) + logger.info("Remaining active recoverers: %s", len(self.recoverers)) + yield self.store.set_appservice_state( + recoverer.service, + ApplicationServiceState.UP + ) + + def add_recoverers(self, recoverers): + for r in recoverers: + self.recoverers.append(r) + if len(recoverers) > 0: + logger.info("New active recoverers: %s", len(self.recoverers)) + + @defer.inlineCallbacks + def _start_recoverer(self, service): + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + + @defer.inlineCallbacks + def _is_service_up(self, service): + state = yield self.store.get_appservice_state(service) + defer.returnValue(state == ApplicationServiceState.UP or state is None) + + +class _Recoverer(object): + + @staticmethod + @defer.inlineCallbacks + def start(clock, store, as_api, callback): + services = yield store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + recoverers = [ + _Recoverer(clock, store, as_api, s, callback) for s in services + ] + for r in recoverers: + logger.info("Starting recoverer for AS ID %s which was marked as " + "DOWN", r.service.id) + r.recover() + defer.returnValue(recoverers) + + def __init__(self, clock, store, as_api, service, callback): + self.clock = clock + self.store = store + self.as_api = as_api + self.service = service + self.callback = callback + self.backoff_counter = 1 + + def recover(self): + self.clock.call_later((2 ** self.backoff_counter), self.retry) + + def _backoff(self): + # cap the backoff to be around 18h => (2^16) = 65536 secs + if self.backoff_counter < 16: + self.backoff_counter += 1 + self.recover() + + @defer.inlineCallbacks + def retry(self): + try: + txn = yield self.store.get_oldest_unsent_txn(self.service) + if txn: + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) + # reset the backoff counter and retry immediately + self.backoff_counter = 1 + yield self.retry() + else: + self._backoff() + else: + self._set_service_recovered() + except Exception as e: + logger.exception(e) + self._backoff() + + def _set_service_recovered(self): + self.callback(self) diff --git a/synapse/rest/appservice/v1/__init__.py b/synapse/config/appservice.py index a7877609ad..399a716d80 100644 --- a/synapse/rest/appservice/v1/__init__.py +++ b/synapse/config/appservice.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,18 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from . import register -from synapse.http.server import JsonResource +from ._base import Config -class AppServiceRestResource(JsonResource): - """A resource for version 1 of the matrix application service API.""" +class AppServiceConfig(Config): - def __init__(self, hs): - JsonResource.__init__(self, hs) - self.register_servlets(self, hs) + def __init__(self, args): + super(AppServiceConfig, self).__init__(args) + self.app_service_config_files = args.app_service_config_files - @staticmethod - def register_servlets(appservice_resource, hs): - register.register_servlets(hs, appservice_resource) + @classmethod + def add_arguments(cls, parser): + super(AppServiceConfig, cls).add_arguments(parser) + group = parser.add_argument_group("appservice") + group.add_argument( + "--app-service-config-files", type=str, nargs='+', + help="A list of application service config files to use." + ) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 241afdf872..3edfadb98b 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -24,12 +24,13 @@ from .email import EmailConfig from .voip import VoipConfig from .registration import RegistrationConfig from .metrics import MetricsConfig +from .appservice import AppServiceConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, EmailConfig, VoipConfig, RegistrationConfig, - MetricsConfig,): + MetricsConfig, AppServiceConfig,): pass diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 4401e774d1..d5c8f4bf7b 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -25,11 +25,11 @@ class RegistrationConfig(Config): def __init__(self, args): super(RegistrationConfig, self).__init__(args) - # `args.disable_registration` may either be a bool or a string depending - # on if the option was given a value (e.g. --disable-registration=false - # would set `args.disable_registration` to "false" not False.) - self.disable_registration = bool( - distutils.util.strtobool(str(args.disable_registration)) + # `args.enable_registration` may either be a bool or a string depending + # on if the option was given a value (e.g. --enable-registration=true + # would set `args.enable_registration` to "true" not True.) + self.disable_registration = not bool( + distutils.util.strtobool(str(args.enable_registration)) ) self.registration_shared_secret = args.registration_shared_secret @@ -39,11 +39,11 @@ class RegistrationConfig(Config): reg_group = parser.add_argument_group("registration") reg_group.add_argument( - "--disable-registration", + "--enable-registration", const=True, - default=True, + default=False, nargs='?', - help="Disable registration of new users.", + help="Enable registration for new users.", ) reg_group.add_argument( "--registration-shared-secret", type=str, @@ -53,8 +53,8 @@ class RegistrationConfig(Config): @classmethod def generate_config(cls, args, config_dir_path): - if args.disable_registration is None: - args.disable_registration = True + if args.enable_registration is None: + args.enable_registration = False if args.registration_shared_secret is None: args.registration_shared_secret = random_string_with_symbols(50) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 8d345bf936..0c51d615ec 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( @@ -54,7 +55,12 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( - hs, ApplicationServiceApi(hs) + hs, asapi, AppServiceScheduler( + clock=hs.get_clock(), + store=hs.get_datastore(), + as_api=asapi + ) ) self.sync_handler = SyncHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2c488a46f6..492a630fdc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -16,57 +16,36 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import Codes, StoreError, SynapseError from synapse.appservice import ApplicationService from synapse.types import UserID -import synapse.util.stringutils as stringutils import logging logger = logging.getLogger(__name__) +def log_failure(failure): + logger.error( + "Application Services Failure", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) + + # NB: Purposefully not inheriting BaseHandler since that contains way too much # setup code which this handler does not need or use. This makes testing a lot # easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api): + def __init__(self, hs, appservice_api, appservice_scheduler): self.store = hs.get_datastore() self.hs = hs self.appservice_api = appservice_api - - @defer.inlineCallbacks - def register(self, app_service): - logger.info("Register -> %s", app_service) - # check the token is recognised - try: - stored_service = yield self.store.get_app_service_by_token( - app_service.token - ) - if not stored_service: - raise StoreError(404, "Application service not found") - except StoreError: - raise SynapseError( - 403, "Unrecognised application services token. " - "Consult the home server admin.", - errcode=Codes.FORBIDDEN - ) - - app_service.hs_token = self._generate_hs_token() - - # create a sender for this application service which is used when - # creating rooms, etc.. - account = yield self.hs.get_handlers().registration_handler.register() - app_service.sender = account[0] - - yield self.store.update_app_service(app_service) - defer.returnValue(app_service) - - @defer.inlineCallbacks - def unregister(self, token): - logger.info("Unregister as_token=%s", token) - yield self.store.unregister_app_service(token) + self.scheduler = appservice_scheduler + self.started_scheduler = False @defer.inlineCallbacks def notify_interested_services(self, event): @@ -90,9 +69,13 @@ class ApplicationServicesHandler(object): if event.type == EventTypes.Member: yield self._check_user_exists(event.state_key) - # Fork off pushes to these services - XXX First cut, best effort + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services for service in services: - self.appservice_api.push(service, event) + self.scheduler.submit_event_for_as(service, event) @defer.inlineCallbacks def query_user_exists(self, user_id): @@ -197,7 +180,14 @@ class ApplicationServicesHandler(object): return user_info = yield self.store.get_user_by_id(user_id) - defer.returnValue(len(user_info) == 0) + if len(user_info) > 0: + defer.returnValue(False) + return + + # user not found; could be the AS though, so check. + services = yield self.store.get_app_services() + service_list = [s for s in services if s.sender == user_id] + defer.returnValue(len(service_list) == 0) @defer.inlineCallbacks def _check_user_exists(self, user_id): @@ -206,6 +196,3 @@ class ApplicationServicesHandler(object): exists = yield self.query_user_exists(user_id) defer.returnValue(exists) defer.returnValue(True) - - def _generate_hs_token(self): - return stringutils.random_string(24) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 15ba417e06..8aceac28cf 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -201,10 +201,18 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - yield self.notifier.on_new_room_event( + d = self.notifier.on_new_room_event( event, extra_users=extra_users ) + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + event.event_id, f.value + ) + + d.addErrback(log_failure) + if event.type == EventTypes.Member: if event.membership == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -427,10 +435,18 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - yield self.notifier.on_new_room_event( + d = self.notifier.on_new_room_event( new_event, extra_users=[joinee] ) + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + new_event.event_id, f.value + ) + + d.addErrback(log_failure) + logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -500,10 +516,18 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - yield self.notifier.on_new_room_event( + d = self.notifier.on_new_room_event( event, extra_users=extra_users ) + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + event.event_id, f.value + ) + + d.addErrback(log_failure) + if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -574,10 +598,18 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - yield self.notifier.on_new_room_event( + d = self.notifier.on_new_room_event( event, extra_users=[target_user], ) + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + event.event_id, f.value + ) + + d.addErrback(log_failure) + defer.returnValue(event) @defer.inlineCallbacks diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 731df00648..bbc7a0f200 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -33,6 +33,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) +# Don't bother bumping "last active" time if it differs by less than 60 seconds +LAST_ACTIVE_GRANULARITY = 60*1000 + + # TODO(paul): Maybe there's one of these I can steal from somewhere def partition(l, func): """Partition the list by the result of func applied to each element.""" @@ -282,6 +286,10 @@ class PresenceHandler(BaseHandler): if now is None: now = self.clock.time_msec() + prev_state = self._get_or_make_usercache(user) + if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY: + return + self.changed_presencelike_data(user, {"last_active": now}) def changed_presencelike_data(self, user, state): diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index dffb8a4861..9233ea3da9 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,6 +18,8 @@ from __future__ import absolute_import import logging from resource import getrusage, getpagesize, RUSAGE_SELF +import os +import stat from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric @@ -109,3 +111,36 @@ resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000) # pages resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE) + +TYPES = { + stat.S_IFSOCK: "SOCK", + stat.S_IFLNK: "LNK", + stat.S_IFREG: "REG", + stat.S_IFBLK: "BLK", + stat.S_IFDIR: "DIR", + stat.S_IFCHR: "CHR", + stat.S_IFIFO: "FIFO", +} + + +def _process_fds(): + counts = {(k,): 0 for k in TYPES.values()} + counts[("other",)] = 0 + + for fd in os.listdir("/proc/self/fd"): + try: + s = os.stat("/proc/self/fd/%s" % (fd)) + fmt = stat.S_IFMT(s.st_mode) + if fmt in TYPES: + t = TYPES[fmt] + else: + t = "other" + + counts[(t,)] += 1 + except OSError: + # the dirh itself used by listdir() is usually missing by now + pass + + return counts + +get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"]) diff --git a/synapse/notifier.py b/synapse/notifier.py index 7121d659d0..12573f3f59 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -62,7 +62,8 @@ class _NotificationListener(object): self.rooms = rooms - self.pending_notifications = [] + def notified(self): + return self.deferred.called def notify(self, notifier, events, start_token, end_token): """ Inform whoever is listening about the new events. This will @@ -78,11 +79,15 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass + # Should the following be done be using intrusively linked lists? + # -- erikj + for room in self.rooms: lst = notifier.room_to_listeners.get(room, set()) lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: notifier.appservice_to_listeners.get( self.appservice, set() @@ -161,10 +166,18 @@ class Notifier(object): room_source = self.event_sources.sources["room"] - listeners = self.room_to_listeners.get(room_id, set()).copy() + room_listeners = self.room_to_listeners.get(room_id, set()) + + _discard_if_notified(room_listeners) + + listeners = room_listeners.copy() for user in extra_users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + _discard_if_notified(user_listeners) + + listeners |= user_listeners for appservice in self.appservice_to_listeners: # TODO (kegan): Redundant appservice listener checks? @@ -173,9 +186,13 @@ class Notifier(object): # receive *invites* for users they are interested in. Does this # make the room_to_listeners check somewhat obselete? if appservice.is_interested(event): - listeners |= self.appservice_to_listeners.get( + app_listeners = self.appservice_to_listeners.get( appservice, set() - ).copy() + ) + + _discard_if_notified(app_listeners) + + listeners |= app_listeners logger.debug("on_new_room_event listeners %s", listeners) @@ -226,10 +243,18 @@ class Notifier(object): listeners = set() for user in users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + _discard_if_notified(user_listeners) + + listeners |= user_listeners for room in rooms: - listeners |= self.room_to_listeners.get(room, set()).copy() + room_listeners = self.room_to_listeners.get(room, set()) + + _discard_if_notified(room_listeners) + + listeners |= room_listeners @defer.inlineCallbacks def notify(listener): @@ -427,3 +452,17 @@ class Notifier(object): listeners = self.room_to_listeners.setdefault(room_id, set()) listeners |= new_listeners + + for l in new_listeners: + l.rooms.add(room_id) + + +def _discard_if_notified(listener_set): + """Remove any 'stale' listeners from the given set. + """ + to_discard = set() + for l in listener_set: + if l.notified(): + to_discard.add(l) + + listener_set -= to_discard diff --git a/synapse/rest/appservice/__init__.py b/synapse/rest/appservice/__init__.py deleted file mode 100644 index 1a84d94cd9..0000000000 --- a/synapse/rest/appservice/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/synapse/rest/appservice/v1/base.py b/synapse/rest/appservice/v1/base.py deleted file mode 100644 index 65d5bcf9be..0000000000 --- a/synapse/rest/appservice/v1/base.py +++ /dev/null @@ -1,48 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains base REST classes for constructing client v1 servlets. -""" - -from synapse.http.servlet import RestServlet -from synapse.api.urls import APP_SERVICE_PREFIX -import re - -import logging - - -logger = logging.getLogger(__name__) - - -def as_path_pattern(path_regex): - """Creates a regex compiled appservice path with the correct path - prefix. - - Args: - path_regex (str): The regex string to match. This should NOT have a ^ - as this will be prefixed. - Returns: - SRE_Pattern - """ - return re.compile("^" + APP_SERVICE_PREFIX + path_regex) - - -class AppServiceRestServlet(RestServlet): - """A base Synapse REST Servlet for the application services version 1 API. - """ - - def __init__(self, hs): - self.hs = hs - self.handler = hs.get_handlers().appservice_handler diff --git a/synapse/rest/appservice/v1/register.py b/synapse/rest/appservice/v1/register.py deleted file mode 100644 index ea24d88f79..0000000000 --- a/synapse/rest/appservice/v1/register.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains REST servlets to do with registration: /register""" -from twisted.internet import defer - -from base import AppServiceRestServlet, as_path_pattern -from synapse.api.errors import CodeMessageException, SynapseError -from synapse.storage.appservice import ApplicationService - -import json -import logging - -logger = logging.getLogger(__name__) - - -class RegisterRestServlet(AppServiceRestServlet): - """Handles AS registration with the home server. - """ - - PATTERN = as_path_pattern("/register$") - - @defer.inlineCallbacks - def on_POST(self, request): - params = _parse_json(request) - - # sanity check required params - try: - as_token = params["as_token"] - as_url = params["url"] - if (not isinstance(as_token, basestring) or - not isinstance(as_url, basestring)): - raise ValueError - except (KeyError, ValueError): - raise SynapseError( - 400, "Missed required keys: as_token(str) / url(str)." - ) - - try: - app_service = ApplicationService( - as_token, as_url, params["namespaces"] - ) - except ValueError as e: - raise SynapseError(400, e.message) - - app_service = yield self.handler.register(app_service) - hs_token = app_service.hs_token - - defer.returnValue((200, { - "hs_token": hs_token - })) - - -class UnregisterRestServlet(AppServiceRestServlet): - """Handles AS registration with the home server. - """ - - PATTERN = as_path_pattern("/unregister$") - - def on_POST(self, request): - params = _parse_json(request) - try: - as_token = params["as_token"] - if not isinstance(as_token, basestring): - raise ValueError - except (KeyError, ValueError): - raise SynapseError(400, "Missing required key: as_token(str)") - - yield self.handler.unregister(as_token) - - raise CodeMessageException(500, "Not implemented") - - -def _parse_json(request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise SynapseError(400, "Content must be a JSON object.") - return content - except ValueError as e: - logger.warn(e) - raise SynapseError(400, "Content not JSON.") - - -def register_servlets(hs, http_server): - RegisterRestServlet(hs).register(http_server) - UnregisterRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index c7772244ba..0bd87bdd77 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -79,7 +79,6 @@ class BaseHomeServer(object): 'resource_for_content_repo', 'resource_for_server_key', 'resource_for_media_repository', - 'resource_for_app_services', 'resource_for_metrics', 'event_sources', 'ratelimiter', diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4b16f445d6..f4dec70393 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -14,13 +14,12 @@ # limitations under the License. from twisted.internet import defer - -from synapse.util.logutils import log_function -from synapse.api.constants import EventTypes - -from .appservice import ApplicationServiceStore +from .appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) +from ._base import Cache from .directory import DirectoryStore -from .feedback import FeedbackStore +from .events import EventsStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore @@ -39,11 +38,6 @@ from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore -from syutil.base64util import decode_base64 -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import compute_event_reference_hash - import fnmatch import imp @@ -57,20 +51,18 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 dir_path = os.path.abspath(os.path.dirname(__file__)) - -class _RollbackButIsFineException(Exception): - """ This exception is used to rollback a transaction without implying - something went wrong. - """ - pass +# Number of msec of granularity to store the user IP 'last seen' time. Smaller +# times give more inserts into the database even for readonly API hits +# 120 seconds == 2 minutes +LAST_SEEN_GRANULARITY = 120*1000 class DataStore(RoomMemberStore, RoomStore, - RegistrationStore, StreamStore, ProfileStore, FeedbackStore, + RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, ApplicationServiceStore, @@ -79,7 +71,9 @@ class DataStore(RoomMemberStore, RoomStore, RejectionsStore, FilteringStore, PusherStore, - PushRuleStore + PushRuleStore, + ApplicationServiceTransactionStore, + EventsStore, ): def __init__(self, hs): @@ -89,424 +83,28 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token_deferred = self._get_min_token() self.min_token = None - @defer.inlineCallbacks - @log_function - def persist_event(self, event, context, backfilled=False, - is_new_state=True, current_state=None): - stream_ordering = None - if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - self.min_token -= 1 - stream_ordering = self.min_token - - try: - yield self.runInteraction( - "persist_event", - self._persist_event_txn, - event=event, - context=context, - backfilled=backfilled, - stream_ordering=stream_ordering, - is_new_state=is_new_state, - current_state=current_state, - ) - except _RollbackButIsFineException: - pass - - @defer.inlineCallbacks - def get_event(self, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False, - allow_none=False): - """Get an event from the database by event_id. - - Args: - event_id (str): The event_id of the event to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - allow_none (bool): If True, return None if no event found, if - False throw an exception. - - Returns: - Deferred : A FrozenEvent. - """ - event = yield self.runInteraction( - "get_event", self._get_event_txn, - event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - if not event and not allow_none: - raise RuntimeError("Could not find event %s" % (event_id,)) - - defer.returnValue(event) - - @log_function - def _persist_event_txn(self, txn, event, context, backfilled, - stream_ordering=None, is_new_state=True, - current_state=None): - - # Remove the any existing cache entries for the event_id - self._get_event_cache.pop(event.event_id) - - # We purposefully do this first since if we include a `current_state` - # key, we *want* to update the `current_state_events` table - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - if event.is_state() and is_new_state: - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - - outlier = event.internal_metadata.is_outlier() - - if not outlier: - self._store_state_groups_txn(txn, event, context) - - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - have_persisted = self._simple_select_one_onecol_txn( - txn, - table="event_json", - keyvalues={"event_id": event.event_id}, - retcol="event_id", - allow_none=True, - ) - - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - - # If we have already persisted this event, we don't need to do any - # more processing. - # The processing above must be done on every call to persist event, - # since they might not have happened on previous calls. For example, - # if we are persisting an event that we had persisted as an outlier, - # but is no longer one. - if have_persisted: - if not outlier: - sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json.decode("UTF-8"), event.event_id,) - ) - - sql = ( - "UPDATE events SET outlier = 0" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (event.event_id,) - ) - return - - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - - event_dict = { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - self._simple_insert_txn( - txn, - table="event_json", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), - "json": encode_canonical_json(event_dict).decode("UTF-8"), - }, - or_replace=True, - ) - - content = encode_canonical_json( - event.content - ).decode("UTF-8") - - vals = { - "topological_ordering": event.depth, - "event_id": event.event_id, - "type": event.type, - "room_id": event.room_id, - "content": content, - "processed": True, - "outlier": outlier, - "depth": event.depth, - } - - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - - unrec = { - k: v - for k, v in event.get_dict().items() - if k not in vals.keys() and k not in [ - "redacted", - "redacted_because", - "signatures", - "hashes", - "prev_events", - ] - } - - vals["unrecognized_keys"] = encode_canonical_json( - unrec - ).decode("UTF-8") - - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, - ) - raise _RollbackButIsFineException("_persist_event") - - if context.rejected: - self._store_rejections_txn(txn, event.event_id, context.rejected) - - if event.is_state(): - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } - - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state - - self._simple_insert_txn( - txn, - "state_events", - vals, - or_replace=True, - ) - - if is_new_state and not context.rejected: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for e_id, h in event.prev_state: - self._simple_insert_txn( - txn, - table="event_edges", - values={ - "event_id": event.event_id, - "prev_event_id": e_id, - "room_id": event.room_id, - "is_state": 1, - }, - or_ignore=True, - ) - - for hash_alg, hash_base64 in event.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, - ) - - for prev_event_id, prev_hashes in event.prev_events: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes - ) - - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - }, - or_ignore=True, - ) - - (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) - - def _store_redaction(self, txn, event): - # invalidate the cache for the redacted event - self._get_event_cache.pop(event.redacts) - txn.execute( - "INSERT OR IGNORE INTO redactions " - "(event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts) - ) - - @defer.inlineCallbacks - def get_current_state(self, room_id, event_type=None, state_key=""): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - if event_type and state_key is not None: - sql += " AND s.type = ? AND s.state_key = ? " - args = (room_id, event_type, state_key) - elif event_type: - sql += " AND s.type = ?" - args = (room_id, event_type) - else: - args = (room_id, ) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - defer.returnValue(events) - - @defer.inlineCallbacks - def get_room_name_and_aliases(self, room_id): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, ) - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" - sql += " OR s.type = 'm.room.aliases')" - args = (room_id,) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - - name = None - aliases = [] - - for e in events: - if e.type == 'm.room.name': - if 'name' in e.content: - name = e.content['name'] - elif e.type == 'm.room.aliases': - if 'aliases' in e.content: - aliases.extend(e.content['aliases']) - - defer.returnValue((name, aliases)) - @defer.inlineCallbacks - def _get_min_token(self): - row = yield self._execute( - "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" - ) + def insert_client_ip(self, user, access_token, device_id, ip, user_agent): + now = int(self._clock.time_msec()) + key = (user.to_string(), access_token, device_id, ip) - self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 - self.min_token = min(self.min_token, -1) + try: + last_seen = self.client_ip_last_seen.get(*key) + except KeyError: + last_seen = None - logger.debug("min_token is: %s", self.min_token) + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + defer.returnValue(None) - defer.returnValue(self.min_token) + self.client_ip_last_seen.prefill(*key + (now,)) - def insert_client_ip(self, user, access_token, device_id, ip, user_agent): - return self._simple_insert( + yield self._simple_insert( "user_ips", { "user": user.to_string(), @@ -514,8 +112,9 @@ class DataStore(RoomMemberStore, RoomStore, "device_id": device_id, "ip": ip, "user_agent": user_agent, - "last_seen": int(self._clock.time_msec()), - } + "last_seen": now, + }, + desc="insert_client_ip", ) def get_user_ip_and_agents(self, user): @@ -525,38 +124,7 @@ class DataStore(RoomMemberStore, RoomStore, retcols=[ "device_id", "access_token", "ip", "user_agent", "last_seen" ], - ) - - def have_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction( - "have_events", f, + desc="get_user_ip_and_agents", ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9125bb1198..53eee10d51 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -35,6 +35,7 @@ logger = logging.getLogger(__name__) sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") +perf_logger = logging.getLogger("synapse.storage.TIME") metrics = synapse.metrics.get_metrics_for("synapse.storage") @@ -53,14 +54,57 @@ cache_counter = metrics.register_cache( ) -# TODO(paul): -# * more generic key management -# * consider other eviction strategies - LRU? -def cached(max_entries=1000): +class Cache(object): + + def __init__(self, name, max_entries=1000, keylen=1, lru=False): + if lru: + self.cache = LruCache(max_size=max_entries) + self.max_entries = None + else: + self.cache = OrderedDict() + self.max_entries = max_entries + + self.name = name + self.keylen = keylen + + caches_by_name[name] = self.cache + + def get(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if keyargs in self.cache: + cache_counter.inc_hits(self.name) + return self.cache[keyargs] + + cache_counter.inc_misses(self.name) + raise KeyError() + + def prefill(self, *args): # because I can't *keyargs, value + keyargs = args[:-1] + value = args[-1] + + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if self.max_entries is not None: + while len(self.cache) >= self.max_entries: + self.cache.popitem(last=False) + + self.cache[keyargs] = value + + def invalidate(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + self.cache.pop(keyargs, None) + + +def cached(max_entries=1000, num_args=1, lru=False): """ A method decorator that applies a memoizing cache around the function. - The function is presumed to take one additional argument, which is used as - the key for the cache. Cache hits are served directly from the cache; + The function is presumed to take zero or more arguments, which are used in + a tuple as the key for the cache. Hits are served directly from the cache; misses use the function body to generate the value. The wrapped function has an additional member, a callable called @@ -71,34 +115,27 @@ def cached(max_entries=1000): calling the calculation function. """ def wrap(orig): - cache = OrderedDict() - name = orig.__name__ - - caches_by_name[name] = cache - - def prefill(key, value): - while len(cache) > max_entries: - cache.popitem(last=False) - - cache[key] = value + cache = Cache( + name=orig.__name__, + max_entries=max_entries, + keylen=num_args, + lru=lru, + ) @functools.wraps(orig) @defer.inlineCallbacks - def wrapped(self, key): - if key in cache: - cache_counter.inc_hits(name) - defer.returnValue(cache[key]) + def wrapped(self, *keyargs): + try: + defer.returnValue(cache.get(*keyargs)) + except KeyError: + ret = yield orig(self, *keyargs) - cache_counter.inc_misses(name) - ret = yield orig(self, key) - prefill(key, ret) - defer.returnValue(ret) + cache.prefill(*keyargs + (ret,)) - def invalidate(key): - cache.pop(key, None) + defer.returnValue(ret) - wrapped.invalidate = invalidate - wrapped.prefill = prefill + wrapped.invalidate = cache.invalidate + wrapped.prefill = cache.prefill return wrapped return wrap @@ -232,7 +269,7 @@ class SQLBaseStore(object): time_now - time_then, limit=3 ) - logger.info( + perf_logger.info( "Total database time: %.3f%% {%s} {%s}", ratio * 100, top_three_counters, top_3_event_counters ) @@ -321,7 +358,8 @@ class SQLBaseStore(object): # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. - def _simple_insert(self, table, values, or_replace=False, or_ignore=False): + def _simple_insert(self, table, values, or_replace=False, or_ignore=False, + desc="_simple_insert"): """Executes an INSERT query on the named table. Args: @@ -330,7 +368,7 @@ class SQLBaseStore(object): or_replace : bool; if True performs an INSERT OR REPLACE """ return self.runInteraction( - "_simple_insert", + desc, self._simple_insert_txn, table, values, or_replace=or_replace, or_ignore=or_ignore, ) @@ -354,7 +392,7 @@ class SQLBaseStore(object): txn.execute(sql, values.values()) return txn.lastrowid - def _simple_upsert(self, table, keyvalues, values): + def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"): """ Args: table (str): The table to upsert into @@ -363,7 +401,7 @@ class SQLBaseStore(object): Returns: A deferred """ return self.runInteraction( - "_simple_upsert", + desc, self._simple_upsert_txn, table, keyvalues, values ) @@ -399,7 +437,7 @@ class SQLBaseStore(object): txn.execute(sql, allvalues.values()) def _simple_select_one(self, table, keyvalues, retcols, - allow_none=False): + allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it. @@ -411,12 +449,15 @@ class SQLBaseStore(object): allow_none : If true, return None instead of failing if the SELECT statement returns no rows """ - return self._simple_selectupdate_one( - table, keyvalues, retcols=retcols, allow_none=allow_none + return self.runInteraction( + desc, + self._simple_select_one_txn, + table, keyvalues, retcols, allow_none, ) def _simple_select_one_onecol(self, table, keyvalues, retcol, - allow_none=False): + allow_none=False, + desc="_simple_select_one_onecol"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it." @@ -426,7 +467,7 @@ class SQLBaseStore(object): retcol : string giving the name of the column to return """ return self.runInteraction( - "_simple_select_one_onecol", + desc, self._simple_select_one_onecol_txn, table, keyvalues, retcol, allow_none=allow_none, ) @@ -462,7 +503,8 @@ class SQLBaseStore(object): return [r[0] for r in txn.fetchall()] - def _simple_select_onecol(self, table, keyvalues, retcol): + def _simple_select_onecol(self, table, keyvalues, retcol, + desc="_simple_select_onecol"): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -475,12 +517,13 @@ class SQLBaseStore(object): Deferred: Results in a list """ return self.runInteraction( - "_simple_select_onecol", + desc, self._simple_select_onecol_txn, table, keyvalues, retcol ) - def _simple_select_list(self, table, keyvalues, retcols): + def _simple_select_list(self, table, keyvalues, retcols, + desc="_simple_select_list"): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -491,7 +534,7 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ return self.runInteraction( - "_simple_select_list", + desc, self._simple_select_list_txn, table, keyvalues, retcols ) @@ -523,7 +566,7 @@ class SQLBaseStore(object): return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, - retcols=None): + desc="_simple_update_one"): """Executes an UPDATE query on the named table, setting new values for columns in a row matching the key values. @@ -541,56 +584,76 @@ class SQLBaseStore(object): get-and-set. This can be used to implement compare-and-set by putting the update column in the 'keyvalues' dict as well. """ - return self._simple_selectupdate_one(table, keyvalues, updatevalues, - retcols=retcols) + return self.runInteraction( + desc, + self._simple_update_one_txn, + table, keyvalues, updatevalues, + ) - def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, - retcols=None, allow_none=False): - """ Combined SELECT then UPDATE.""" - if retcols: - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k) for k in keyvalues) - ) + def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues): + update_sql = "UPDATE %s SET %s WHERE %s" % ( + table, + ", ".join("%s = ?" % (k,) for k in updatevalues), + " AND ".join("%s = ?" % (k,) for k in keyvalues) + ) - if updatevalues: - update_sql = "UPDATE %s SET %s WHERE %s" % ( - table, - ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) - ) + txn.execute( + update_sql, + updatevalues.values() + keyvalues.values() + ) + if txn.rowcount == 0: + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + def _simple_select_one_txn(self, txn, table, keyvalues, retcols, + allow_none=False): + select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table, + " AND ".join("%s = ?" % (k) for k in keyvalues) + ) + + txn.execute(select_sql, keyvalues.values()) + + row = txn.fetchone() + if not row: + if allow_none: + return None + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + return dict(zip(retcols, row)) + + def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, + retcols=None, allow_none=False, + desc="_simple_selectupdate_one"): + """ Combined SELECT then UPDATE.""" def func(txn): ret = None if retcols: - txn.execute(select_sql, keyvalues.values()) - - row = txn.fetchone() - if not row: - if allow_none: - return None - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - - ret = dict(zip(retcols, row)) + ret = self._simple_select_one_txn( + txn, + table=table, + keyvalues=keyvalues, + retcols=retcols, + allow_none=allow_none, + ) if updatevalues: - txn.execute( - update_sql, - updatevalues.values() + keyvalues.values() + self._simple_update_one_txn( + txn, + table=table, + keyvalues=keyvalues, + updatevalues=updatevalues, ) - if txn.rowcount == 0: - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - return ret - return self.runInteraction("_simple_selectupdate_one", func) + return self.runInteraction(desc, func) - def _simple_delete_one(self, table, keyvalues): + def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"): """Executes a DELETE query on the named table, expecting to delete a single row. @@ -609,9 +672,9 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "more than one row matched") - return self.runInteraction("_simple_delete_one", func) + return self.runInteraction(desc, func) - def _simple_delete(self, table, keyvalues): + def _simple_delete(self, table, keyvalues, desc="_simple_delete"): """Executes a DELETE query on the named table. Args: @@ -619,7 +682,7 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the row with """ - return self.runInteraction("_simple_delete", self._simple_delete_txn) + return self.runInteraction(desc, self._simple_delete_txn) def _simple_delete_txn(self, txn, table, keyvalues): sql = "DELETE FROM %s WHERE %s" % ( @@ -789,6 +852,13 @@ class SQLBaseStore(object): return result[0] if result else None +class _RollbackButIsFineException(Exception): + """ This exception is used to rollback a transaction without implying + something went wrong. + """ + pass + + class Table(object): """ A base class used to store information about a particular table. """ diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 850676ce6c..f8cbb3f323 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,154 +13,35 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson +import urllib +import yaml from simplejson import JSONDecodeError +import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser +from synapse.types import UserID from ._base import SQLBaseStore logger = logging.getLogger(__name__) -def log_failure(failure): - logger.error("Failed to detect application services: %s", failure.value) - logger.error(failure.getTraceback()) - - class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) + self.hostname = hs.hostname self.services_cache = [] - self.cache_defer = self._populate_cache() - self.cache_defer.addErrback(log_failure) - - @defer.inlineCallbacks - def unregister_app_service(self, token): - """Unregisters this service. - - This removes all AS specific regex and the base URL. The token is the - only thing preserved for future registration attempts. - """ - yield self.cache_defer # make sure the cache is ready - yield self.runInteraction( - "unregister_app_service", - self._unregister_app_service_txn, - token, - ) - # update cache TODO: Should this be in the txn? - for service in self.services_cache: - if service.token == token: - service.url = None - service.namespaces = None - service.hs_token = None - - def _unregister_app_service_txn(self, txn, token): - # kill the url to prevent pushes - txn.execute( - "UPDATE application_services SET url=NULL WHERE token=?", - (token,) - ) - - # cleanup regex - as_id = self._get_as_id_txn(txn, token) - if not as_id: - logger.warning( - "unregister_app_service_txn: Failed to find as_id for token=", - token - ) - return False - - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) + self._populate_appservice_cache( + hs.config.app_service_config_files ) - return True - @defer.inlineCallbacks - def update_app_service(self, service): - """Update an application service, clobbering what was previously there. - - Args: - service(ApplicationService): The updated service. - """ - yield self.cache_defer # make sure the cache is ready - - # NB: There is no "insert" since we provide no public-facing API to - # allocate new ASes. It relies on the server admin inserting the AS - # token into the database manually. - - if not service.token or not service.url: - raise StoreError(400, "Token and url must be specified.") - - if not service.hs_token: - raise StoreError(500, "No HS token") - - yield self.runInteraction( - "update_app_service", - self._update_app_service_txn, - service - ) - - # update cache TODO: Should this be in the txn? - for (index, cache_service) in enumerate(self.services_cache): - if service.token == cache_service.token: - self.services_cache[index] = service - logger.info("Updated: %s", service) - return - # new entry - self.services_cache.append(service) - logger.info("Updated(new): %s", service) - - def _update_app_service_txn(self, txn, service): - as_id = self._get_as_id_txn(txn, service.token) - if not as_id: - logger.warning( - "update_app_service_txn: Failed to find as_id for token=", - service.token - ) - return False - - txn.execute( - "UPDATE application_services SET url=?, hs_token=?, sender=? " - "WHERE id=?", - (service.url, service.hs_token, service.sender, as_id,) - ) - # cleanup regex - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) - ) - for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST): - if ns_str in service.namespaces: - for regex_obj in service.namespaces[ns_str]: - txn.execute( - "INSERT INTO application_services_regex(" - "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, simplejson.dumps(regex_obj)) - ) - return True - - def _get_as_id_txn(self, txn, token): - cursor = txn.execute( - "SELECT id FROM application_services WHERE token=?", - (token,) - ) - res = cursor.fetchone() - if res: - return res[0] - - @defer.inlineCallbacks def get_app_services(self): - yield self.cache_defer # make sure the cache is ready - defer.returnValue(self.services_cache) + return defer.succeed(self.services_cache) - @defer.inlineCallbacks def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -174,37 +55,23 @@ class ApplicationServiceStore(SQLBaseStore): Returns: synapse.appservice.ApplicationService or None. """ - - yield self.cache_defer # make sure the cache is ready - for service in self.services_cache: if service.sender == user_id: - defer.returnValue(service) - return - defer.returnValue(None) + return defer.succeed(service) + return defer.succeed(None) - @defer.inlineCallbacks - def get_app_service_by_token(self, token, from_cache=True): + def get_app_service_by_token(self, token): """Get the application service with the given appservice token. Args: token (str): The application service token. - from_cache (bool): True to get this service from the cache, False to - check the database. - Raises: - StoreError if there was a problem retrieving this service. + Returns: + synapse.appservice.ApplicationService or None. """ - yield self.cache_defer # make sure the cache is ready - - if from_cache: - for service in self.services_cache: - if service.token == token: - defer.returnValue(service) - return - defer.returnValue(None) - - # TODO: The from_cache=False impl - # TODO: This should be JOINed with the application_services_regex table. + for service in self.services_cache: + if service.token == token: + return defer.succeed(service) + return defer.succeed(None) def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -277,12 +144,7 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - @defer.inlineCallbacks - def _populate_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + def _parse_services_dict(self, results): # SQL results in the form: # [ # { @@ -296,12 +158,14 @@ class ApplicationServiceStore(SQLBaseStore): # } # ] services = {} - results = yield self._execute_and_decode("_populate_cache", sql) for res in results: as_token = res["token"] + if as_token is None: + continue if as_token not in services: # add the service services[as_token] = { + "id": res["id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -319,20 +183,287 @@ class ApplicationServiceStore(SQLBaseStore): try: services[as_token]["namespaces"][ ApplicationService.NS_LIST[ns_int]].append( - simplejson.loads(res["regex"]) + json.loads(res["regex"]) ) except IndexError: logger.error("Bad namespace enum '%s'. %s", ns_int, res) except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) - # TODO get last successful txn id f.e. service + service_list = [] for service in services.values(): - logger.info("Found application service: %s", service) - self.services_cache.append(ApplicationService( + service_list.append(ApplicationService( token=service["token"], url=service["url"], namespaces=service["namespaces"], hs_token=service["hs_token"], - sender=service["sender"] + sender=service["sender"], + id=service["id"] )) + return service_list + + def _load_appservice(self, as_info): + required_string_fields = [ + "url", "as_token", "hs_token", "sender_localpart" + ] + for field in required_string_fields: + if not isinstance(as_info.get(field), basestring): + raise KeyError("Required string field: '%s'", field) + + localpart = as_info["sender_localpart"] + if urllib.quote(localpart) != localpart: + raise ValueError( + "sender_localpart needs characters which are not URL encoded." + ) + user = UserID(localpart, self.hostname) + user_id = user.to_string() + + # namespace checks + if not isinstance(as_info.get("namespaces"), dict): + raise KeyError("Requires 'namespaces' object.") + for ns in ApplicationService.NS_LIST: + # specific namespaces are optional + if ns in as_info["namespaces"]: + # expect a list of dicts with exclusive and regex keys + for regex_obj in as_info["namespaces"][ns]: + if not isinstance(regex_obj, dict): + raise ValueError( + "Expected namespace entry in %s to be an object," + " but got %s", ns, regex_obj + ) + if not isinstance(regex_obj.get("regex"), basestring): + raise ValueError( + "Missing/bad type 'regex' key in %s", regex_obj + ) + if not isinstance(regex_obj.get("exclusive"), bool): + raise ValueError( + "Missing/bad type 'exclusive' key in %s", regex_obj + ) + return ApplicationService( + token=as_info["as_token"], + url=as_info["url"], + namespaces=as_info["namespaces"], + hs_token=as_info["hs_token"], + sender=user_id, + id=as_info["as_token"] # the token is the only unique thing here + ) + + def _populate_appservice_cache(self, config_files): + """Populates a cache of Application Services from the config files.""" + if not isinstance(config_files, list): + logger.warning( + "Expected %s to be a list of AS config files.", config_files + ) + return + + for config_file in config_files: + try: + with open(config_file, 'r') as f: + appservice = self._load_appservice(yaml.load(f)) + logger.info("Loaded application service: %s", appservice) + self.services_cache.append(appservice) + except Exception as e: + logger.error("Failed to load appservice from '%s'", config_file) + logger.exception(e) + + +class ApplicationServiceTransactionStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceTransactionStore, self).__init__(hs) + + @defer.inlineCallbacks + def get_appservices_by_state(self, state): + """Get a list of application services based on their state. + + Args: + state(ApplicationServiceState): The state to filter on. + Returns: + A Deferred which resolves to a list of ApplicationServices, which + may be empty. + """ + results = yield self._simple_select_list( + "application_services_state", + dict(state=state), + ["as_id"] + ) + # NB: This assumes this class is linked with ApplicationServiceStore + as_list = yield self.get_app_services() + services = [] + + for res in results: + for service in as_list: + if service.id == res["as_id"]: + services.append(service) + defer.returnValue(services) + + @defer.inlineCallbacks + def get_appservice_state(self, service): + """Get the application service state. + + Args: + service(ApplicationService): The service whose state to set. + Returns: + A Deferred which resolves to ApplicationServiceState. + """ + result = yield self._simple_select_one( + "application_services_state", + dict(as_id=service.id), + ["state"], + allow_none=True + ) + if result: + defer.returnValue(result.get("state")) + return + defer.returnValue(None) + + def set_appservice_state(self, service, state): + """Set the application service state. + + Args: + service(ApplicationService): The service whose state to set. + state(ApplicationServiceState): The connectivity state to apply. + Returns: + A Deferred which resolves when the state was set successfully. + """ + return self._simple_upsert( + "application_services_state", + dict(as_id=service.id), + dict(state=state) + ) + + def create_appservice_txn(self, service, events): + """Atomically creates a new transaction for this application service + with the given list of events. + + Args: + service(ApplicationService): The service who the transaction is for. + events(list<Event>): A list of events to put in the transaction. + Returns: + AppServiceTransaction: A new transaction. + """ + return self.runInteraction( + "create_appservice_txn", + self._create_appservice_txn, + service, events + ) + + def _create_appservice_txn(self, txn, service, events): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + last_txn_id = self._get_last_txn(txn, service.id) + + result = txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = result.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 + + new_txn_id = max(highest_txn_id, last_txn_id) + 1 + + # Insert new txn into txn table + event_ids = [e.event_id for e in events] + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " + "VALUES(?,?,?)", + (service.id, new_txn_id, json.dumps(event_ids)) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) + + def complete_appservice_txn(self, txn_id, service): + """Completes an application service transaction. + + Args: + txn_id(str): The transaction ID being completed. + service(ApplicationService): The application service which was sent + this transaction. + Returns: + A Deferred which resolves if this transaction was stored + successfully. + """ + return self.runInteraction( + "complete_appservice_txn", + self._complete_appservice_txn, + txn_id, service + ) + + def _complete_appservice_txn(self, txn, txn_id, service): + txn_id = int(txn_id) + + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) + ) + + # Delete txn + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) + + def get_oldest_unsent_txn(self, service): + """Get the oldest transaction which has not been sent for this + service. + + Args: + service(ApplicationService): The app service to get the oldest txn. + Returns: + A Deferred which resolves to an AppServiceTransaction or + None. + """ + return self.runInteraction( + "get_oldest_unsent_appservice_txn", + self._get_oldest_unsent_txn, + service + ) + + def _get_oldest_unsent_txn(self, txn, service): + # Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) + result = txn.execute( + "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + entry = self.cursor_to_dict(result)[0] + if not entry or entry["txn_id"] is None: + # the min(txn_id) part will force a row, so entry may not be None + return None + + event_ids = json.loads(entry["event_ids"]) + events = self._get_events_txn(txn, event_ids) + + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=events + ) + + def _get_last_txn(self, txn, service_id): + result = txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service_id,) + ) + last_txn_id = result.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 68b7d59693..0199539fea 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.errors import SynapseError @@ -48,6 +48,7 @@ class DirectoryStore(SQLBaseStore): {"room_alias": room_alias.to_string()}, "room_id", allow_none=True, + desc="get_association_from_room_alias", ) if not room_id: @@ -58,6 +59,7 @@ class DirectoryStore(SQLBaseStore): "room_alias_servers", {"room_alias": room_alias.to_string()}, "server", + desc="get_association_from_room_alias", ) if not servers: @@ -87,6 +89,7 @@ class DirectoryStore(SQLBaseStore): "room_alias": room_alias.to_string(), "room_id": room_id, }, + desc="create_room_alias_association", ) except sqlite3.IntegrityError: raise SynapseError( @@ -100,16 +103,22 @@ class DirectoryStore(SQLBaseStore): { "room_alias": room_alias.to_string(), "server": server, - } + }, + desc="create_room_alias_association", ) + self.get_aliases_for_room.invalidate(room_id) + @defer.inlineCallbacks def delete_room_alias(self, room_alias): - return self.runInteraction( + room_id = yield self.runInteraction( "delete_room_alias", self._delete_room_alias_txn, room_alias, ) + self.get_aliases_for_room.invalidate(room_id) + defer.returnValue(room_id) + def _delete_room_alias_txn(self, txn, room_alias): cursor = txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", @@ -134,9 +143,11 @@ class DirectoryStore(SQLBaseStore): return room_id + @cached() def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", {"room_id": room_id}, "room_alias", + desc="get_aliases_for_room", ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py new file mode 100644 index 0000000000..a86230d92c --- /dev/null +++ b/synapse/storage/events.py @@ -0,0 +1,395 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore, _RollbackButIsFineException + +from twisted.internet import defer + +from synapse.util.logutils import log_function +from synapse.api.constants import EventTypes +from synapse.crypto.event_signing import compute_event_reference_hash + +from syutil.base64util import decode_base64 +from syutil.jsonutil import encode_canonical_json + +import logging + +logger = logging.getLogger(__name__) + + +class EventsStore(SQLBaseStore): + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): + stream_ordering = None + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + stream_ordering = self.min_token + + try: + yield self.runInteraction( + "persist_event", + self._persist_event_txn, + event=event, + context=context, + backfilled=backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + self.get_room_events_max_id.invalidate() + except _RollbackButIsFineException: + pass + + @defer.inlineCallbacks + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) + + @log_function + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): + + # Remove the any existing cache entries for the event_id + self._get_event_cache.pop(event.event_id) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + + outlier = event.internal_metadata.is_outlier() + + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Feedback: + self._store_feedback_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + + event_dict = { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } + + self._simple_insert_txn( + txn, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": metadata_json.decode("UTF-8"), + "json": encode_canonical_json(event_dict).decode("UTF-8"), + }, + or_replace=True, + ) + + content = encode_canonical_json( + event.content + ).decode("UTF-8") + + vals = { + "topological_ordering": event.depth, + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": content, + "processed": True, + "outlier": outlier, + "depth": event.depth, + } + + if stream_ordering is not None: + vals["stream_ordering"] = stream_ordering + + unrec = { + k: v + for k, v in event.get_dict().items() + if k not in vals.keys() and k not in [ + "redacted", + "redacted_because", + "signatures", + "hashes", + "prev_events", + ] + } + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") + + try: + self._simple_insert_txn( + txn, + "events", + vals, + or_replace=(not outlier), + or_ignore=bool(outlier), + ) + except: + logger.warn( + "Failed to persist, probably duplicate: %s", + event.event_id, + exc_info=True, + ) + raise _RollbackButIsFineException("_persist_event") + + if context.rejected: + self._store_rejections_txn(txn, event.event_id, context.rejected) + + if event.is_state(): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state + + self._simple_insert_txn( + txn, + "state_events", + vals, + ) + + if is_new_state and not context.rejected: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + ) + + for e_id, h in event.prev_state: + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": 1, + }, + ) + + for hash_alg, hash_base64 in event.hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_event_content_hash_txn( + txn, event.event_id, hash_alg, hash_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + + def _store_redaction(self, txn, event): + # invalidate the cache for the redacted event + self._get_event_cache.pop(event.redacts) + txn.execute( + "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", + (event.event_id, event.redacts) + ) + + def have_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Returns: + dict: Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps to + None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction( + "have_events", f, + ) diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py deleted file mode 100644 index 8eab769b71..0000000000 --- a/synapse/storage/feedback.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import SQLBaseStore - - -class FeedbackStore(SQLBaseStore): - - def _store_feedback_txn(self, txn, event): - self._simple_insert_txn(txn, "feedback", { - "event_id": event.event_id, - "feedback_type": event.content["type"], - "room_id": event.room_id, - "target_event_id": event.content["target_event_id"], - "sender": event.user_id, - }) - - @defer.inlineCallbacks - def get_feedback_for_event(self, event_id): - sql = ( - "SELECT events.* FROM events INNER JOIN feedback " - "ON events.event_id = feedback.event_id " - "WHERE feedback.target_event_id = ? " - ) - - rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id) - - defer.returnValue( - [ - (yield self._parse_events(r)) - for r in rows - ] - ) diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 457a11fd02..8800116570 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -31,6 +31,7 @@ class FilteringStore(SQLBaseStore): }, retcol="filter_json", allow_none=False, + desc="get_user_filter", ) defer.returnValue(json.loads(def_json)) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 7101d2beec..7bf57234f6 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -32,6 +32,7 @@ class MediaRepositoryStore(SQLBaseStore): {"media_id": media_id}, ("media_type", "media_length", "upload_name", "created_ts"), allow_none=True, + desc="get_local_media", ) def store_local_media(self, media_id, media_type, time_now_ms, upload_name, @@ -45,7 +46,8 @@ class MediaRepositoryStore(SQLBaseStore): "upload_name": upload_name, "media_length": media_length, "user_id": user_id.to_string(), - } + }, + desc="store_local_media", ) def get_local_media_thumbnails(self, media_id): @@ -55,7 +57,8 @@ class MediaRepositoryStore(SQLBaseStore): ( "thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_type", "thumbnail_length", - ) + ), + desc="get_local_media_thumbnails", ) def store_local_thumbnail(self, media_id, thumbnail_width, @@ -70,7 +73,8 @@ class MediaRepositoryStore(SQLBaseStore): "thumbnail_method": thumbnail_method, "thumbnail_type": thumbnail_type, "thumbnail_length": thumbnail_length, - } + }, + desc="store_local_thumbnail", ) def get_cached_remote_media(self, origin, media_id): @@ -82,6 +86,7 @@ class MediaRepositoryStore(SQLBaseStore): "filesystem_id", ), allow_none=True, + desc="get_cached_remote_media", ) def store_cached_remote_media(self, origin, media_id, media_type, @@ -97,7 +102,8 @@ class MediaRepositoryStore(SQLBaseStore): "created_ts": time_now_ms, "upload_name": upload_name, "filesystem_id": filesystem_id, - } + }, + desc="store_cached_remote_media", ) def get_remote_media_thumbnails(self, origin, media_id): @@ -107,7 +113,8 @@ class MediaRepositoryStore(SQLBaseStore): ( "thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_type", "thumbnail_length", "filesystem_id", - ) + ), + desc="get_remote_media_thumbnails", ) def store_remote_media_thumbnail(self, origin, media_id, filesystem_id, @@ -125,5 +132,6 @@ class MediaRepositoryStore(SQLBaseStore): "thumbnail_type": thumbnail_type, "thumbnail_length": thumbnail_length, "filesystem_id": filesystem_id, - } + }, + desc="store_remote_media_thumbnail", ) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 1dcd34723b..87fba55439 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -21,6 +21,7 @@ class PresenceStore(SQLBaseStore): return self._simple_insert( table="presence", values={"user_id": user_localpart}, + desc="create_presence", ) def has_presence_state(self, user_localpart): @@ -29,6 +30,7 @@ class PresenceStore(SQLBaseStore): keyvalues={"user_id": user_localpart}, retcols=["user_id"], allow_none=True, + desc="has_presence_state", ) def get_presence_state(self, user_localpart): @@ -36,6 +38,7 @@ class PresenceStore(SQLBaseStore): table="presence", keyvalues={"user_id": user_localpart}, retcols=["state", "status_msg", "mtime"], + desc="get_presence_state", ) def set_presence_state(self, user_localpart, new_state): @@ -45,7 +48,7 @@ class PresenceStore(SQLBaseStore): updatevalues={"state": new_state["state"], "status_msg": new_state["status_msg"], "mtime": self._clock.time_msec()}, - retcols=["state"], + desc="set_presence_state", ) def allow_presence_visible(self, observed_localpart, observer_userid): @@ -53,6 +56,7 @@ class PresenceStore(SQLBaseStore): table="presence_allow_inbound", values={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, + desc="allow_presence_visible", ) def disallow_presence_visible(self, observed_localpart, observer_userid): @@ -60,6 +64,7 @@ class PresenceStore(SQLBaseStore): table="presence_allow_inbound", keyvalues={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, + desc="disallow_presence_visible", ) def is_presence_visible(self, observed_localpart, observer_userid): @@ -69,6 +74,7 @@ class PresenceStore(SQLBaseStore): "observer_user_id": observer_userid}, retcols=["observed_user_id"], allow_none=True, + desc="is_presence_visible", ) def add_presence_list_pending(self, observer_localpart, observed_userid): @@ -77,6 +83,7 @@ class PresenceStore(SQLBaseStore): values={"user_id": observer_localpart, "observed_user_id": observed_userid, "accepted": False}, + desc="add_presence_list_pending", ) def set_presence_list_accepted(self, observer_localpart, observed_userid): @@ -85,6 +92,7 @@ class PresenceStore(SQLBaseStore): keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, updatevalues={"accepted": True}, + desc="set_presence_list_accepted", ) def get_presence_list(self, observer_localpart, accepted=None): @@ -96,6 +104,7 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues=keyvalues, retcols=["observed_user_id", "accepted"], + desc="get_presence_list", ) def del_presence_list(self, observer_localpart, observed_userid): @@ -103,4 +112,5 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, + desc="del_presence_list", ) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 153c7ad027..a6e52cb248 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -21,6 +21,7 @@ class ProfileStore(SQLBaseStore): return self._simple_insert( table="profiles", values={"user_id": user_localpart}, + desc="create_profile", ) def get_profile_displayname(self, user_localpart): @@ -28,6 +29,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, retcol="displayname", + desc="get_profile_displayname", ) def set_profile_displayname(self, user_localpart, new_displayname): @@ -35,6 +37,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"displayname": new_displayname}, + desc="set_profile_displayname", ) def get_profile_avatar_url(self, user_localpart): @@ -42,6 +45,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, retcol="avatar_url", + desc="get_profile_avatar_url", ) def set_profile_avatar_url(self, user_localpart, new_avatar_url): @@ -49,4 +53,5 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"avatar_url": new_avatar_url}, + desc="set_profile_avatar_url", ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index d769db2c78..c47bdc2861 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -50,7 +50,8 @@ class PushRuleStore(SQLBaseStore): results = yield self._simple_select_list( PushRuleEnableTable.table_name, {'user_name': user_name}, - PushRuleEnableTable.fields + PushRuleEnableTable.fields, + desc="get_push_rules_enabled_for_user", ) defer.returnValue( {r['rule_id']: False if r['enabled'] == 0 else True for r in results} @@ -201,7 +202,8 @@ class PushRuleStore(SQLBaseStore): """ yield self._simple_delete_one( PushRuleTable.table_name, - {'user_name': user_name, 'rule_id': rule_id} + {'user_name': user_name, 'rule_id': rule_id}, + desc="delete_push_rule", ) @defer.inlineCallbacks @@ -209,7 +211,8 @@ class PushRuleStore(SQLBaseStore): yield self._simple_upsert( PushRuleEnableTable.table_name, {'user_name': user_name, 'rule_id': rule_id}, - {'enabled': enabled} + {'enabled': enabled}, + desc="set_push_rule_enabled", ) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 587dada68f..000502b4ff 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -114,7 +114,9 @@ class PusherStore(SQLBaseStore): ts=pushkey_ts, lang=lang, data=data - )) + ), + desc="add_pusher", + ) except Exception as e: logger.error("create_pusher with failed: %s", e) raise StoreError(500, "Problem creating pusher.") @@ -123,7 +125,8 @@ class PusherStore(SQLBaseStore): def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): yield self._simple_delete_one( PushersTable.table_name, - dict(app_id=app_id, pushkey=pushkey) + {"app_id": app_id, "pushkey": pushkey}, + desc="delete_pusher_by_app_id_pushkey", ) @defer.inlineCallbacks @@ -131,7 +134,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token} + {'last_token': last_token}, + desc="update_pusher_last_token", ) @defer.inlineCallbacks @@ -140,7 +144,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token, 'last_success': last_success} + {'last_token': last_token, 'last_success': last_success}, + desc="update_pusher_last_token_and_success", ) @defer.inlineCallbacks @@ -148,7 +153,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'failing_since': failing_since} + {'failing_since': failing_since}, + desc="update_pusher_failing_since", ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 3c2f1d6a15..f24154f146 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -39,7 +39,10 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - row = yield self._simple_select_one("users", {"name": user_id}, ["id"]) + row = yield self._simple_select_one( + "users", {"name": user_id}, ["id"], + desc="add_access_token_to_user", + ) if not row: raise StoreError(400, "Bad user ID supplied.") row_id = row["id"] @@ -48,7 +51,8 @@ class RegistrationStore(SQLBaseStore): { "user_id": row_id, "token": token - } + }, + desc="add_access_token_to_user", ) @defer.inlineCallbacks @@ -120,6 +124,7 @@ class RegistrationStore(SQLBaseStore): keyvalues={"name": user.to_string()}, retcol="admin", allow_none=True, + desc="is_server_admin", ) defer.returnValue(res if res else False) diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index 4e1a9a2783..0838eb3d12 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -29,7 +29,7 @@ class RejectionsStore(SQLBaseStore): "event_id": event_id, "reason": reason, "last_check": self._clock.time_msec(), - } + }, ) def get_rejection_reason(self, event_id): @@ -40,4 +40,5 @@ class RejectionsStore(SQLBaseStore): "event_id": event_id, }, allow_none=True, + desc="get_rejection_reason", ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 549c9af393..be3e28c2ea 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -15,11 +15,9 @@ from twisted.internet import defer -from sqlite3 import IntegrityError - from synapse.api.errors import StoreError -from ._base import SQLBaseStore, Table +from ._base import SQLBaseStore import collections import logging @@ -27,8 +25,9 @@ import logging logger = logging.getLogger(__name__) -OpsLevel = collections.namedtuple("OpsLevel", ( - "ban_level", "kick_level", "redact_level") +OpsLevel = collections.namedtuple( + "OpsLevel", + ("ban_level", "kick_level", "redact_level",) ) @@ -47,13 +46,15 @@ class RoomStore(SQLBaseStore): StoreError if the room could not be stored. """ try: - yield self._simple_insert(RoomsTable.table_name, dict( - room_id=room_id, - creator=room_creator_user_id, - is_public=is_public - )) - except IntegrityError: - raise StoreError(409, "Room ID in use.") + yield self._simple_insert( + RoomsTable.table_name, + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + }, + desc="store_room", + ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") @@ -66,9 +67,11 @@ class RoomStore(SQLBaseStore): Returns: A namedtuple containing the room information, or an empty list. """ - query = RoomsTable.select_statement("room_id=?") - return self._execute( - "get_room", RoomsTable.decode_single_result, query, room_id, + return self._simple_select_one( + table=RoomsTable.table_name, + keyvalues={"room_id": room_id}, + retcols=RoomsTable.fields, + desc="get_room", ) @defer.inlineCallbacks @@ -143,7 +146,7 @@ class RoomStore(SQLBaseStore): "event_id": event.event_id, "room_id": event.room_id, "topic": event.content["topic"], - } + }, ) def _store_room_name_txn(self, txn, event): @@ -158,8 +161,45 @@ class RoomStore(SQLBaseStore): } ) + @defer.inlineCallbacks + def get_room_name_and_aliases(self, room_id): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" + sql += " OR s.type = 'm.room.aliases')" + args = (room_id,) -class RoomsTable(Table): + results = yield self._execute_and_decode("get_current_state", sql, *args) + + events = yield self._parse_events(results) + + name = None + aliases = [] + + for e in events: + if e.type == 'm.room.name': + if 'name' in e.content: + name = e.content['name'] + elif e.type == 'm.room.aliases': + if 'aliases' in e.content: + aliases.extend(e.content['aliases']) + + defer.returnValue((name, aliases)) + + +class RoomsTable(object): table_name = "rooms" fields = [ @@ -167,5 +207,3 @@ class RoomsTable(Table): "is_public", "creator" ] - - EntryType = collections.namedtuple("RoomEntry", fields) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 65ffb4627f..52c37c76f5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -212,7 +212,8 @@ class RoomMemberStore(SQLBaseStore): return self._simple_select_onecol( "room_hosts", {"room_id": room_id}, - "host" + "host", + desc="get_joined_hosts_for_room", ) def _get_members_by_dict(self, where_dict): diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql new file mode 100644 index 0000000000..2b27e2a429 --- /dev/null +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -0,0 +1,30 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id TEXT PRIMARY KEY, + state TEXT, + last_txn TEXT +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id TEXT NOT NULL, + txn_id INTEGER NOT NULL, + event_ids TEXT NOT NULL, + UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK +); + + + diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 456e4bd45d..58dbf2802b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,8 @@ from ._base import SQLBaseStore +from twisted.internet import defer + import logging logger = logging.getLogger(__name__) @@ -122,3 +124,33 @@ class StateStore(SQLBaseStore): }, or_replace=True, ) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + if event_type and state_key is not None: + sql += " AND s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) + elif event_type: + sql += " AND s.type = ?" + args = (room_id, event_type) + else: + args = (room_id, ) + + results = yield self._execute_and_decode("get_current_state", sql, *args) + + events = yield self._parse_events(results) + defer.returnValue(events) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09bc522210..66f307e640 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -35,7 +35,7 @@ what sort order was used: from twisted.internet import defer -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function @@ -413,12 +413,32 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + @cached(num_args=0) def get_room_events_max_id(self): return self.runInteraction( "get_room_events_max_id", self._get_room_events_max_id_txn ) + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" + ) + + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) + + logger.debug("min_token is: %s", self.min_token) + + defer.returnValue(self.min_token) + + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + def _get_room_events_max_id_txn(self, txn): txn.execute( "SELECT MAX(stream_ordering) as m FROM events" diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 0b8a3b7a07..b777395e06 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -46,15 +46,19 @@ class TransactionStore(SQLBaseStore): ) def _get_received_txn_response(self, txn, transaction_id, origin): - where_clause = "transaction_id = ? AND origin = ?" - query = ReceivedTransactionsTable.select_statement(where_clause) - - txn.execute(query, (transaction_id, origin)) - - results = ReceivedTransactionsTable.decode_results(txn.fetchall()) + result = self._simple_select_one_txn( + txn, + table=ReceivedTransactionsTable.table_name, + keyvalues={ + "transaction_id": transaction_id, + "origin": origin, + }, + retcols=ReceivedTransactionsTable.fields, + allow_none=True, + ) - if results and results[0].response_code: - return (results[0].response_code, results[0].response_json) + if result and result.response_code: + return result["response_code"], result["response_json"] else: return None diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py index 65d5792907..2f7b615f78 100644 --- a/synapse/util/lrucache.py +++ b/synapse/util/lrucache.py @@ -90,12 +90,16 @@ class LruCache(object): def cache_len(): return len(cache) + def cache_contains(key): + return key in cache + self.sentinel = object() self.get = cache_get self.set = cache_set self.setdefault = cache_set_default self.pop = cache_pop self.len = cache_len + self.contains = cache_contains def __getitem__(self, key): result = self.get(key, self.sentinel) @@ -114,3 +118,6 @@ class LruCache(object): def __len__(self): return self.len() + + def __contains__(self, key): + return self.contains(key) |