diff options
author | Kegsay <kegsay@gmail.com> | 2015-03-26 10:30:47 +0000 |
---|---|---|
committer | Kegsay <kegsay@gmail.com> | 2015-03-26 10:30:47 +0000 |
commit | 5b999e206e0dd29bfe3ef0dd647d3a367e83d953 (patch) | |
tree | 7e842d9ff6d7581a63425ee076ec5d5240c12701 /synapse | |
parent | Allow a choice of LRU behaviour for Cache() by using LruCache() or OrderedDict() (diff) | |
parent | Fixes from PR comments (diff) | |
download | synapse-5b999e206e0dd29bfe3ef0dd647d3a367e83d953.tar.xz |
Merge pull request #106 from matrix-org/application-services-txn-reliability
Application services transaction reliability (PR #106)
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/appservice/__init__.py | 48 | ||||
-rw-r--r-- | synapse/appservice/api.py | 22 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 254 | ||||
-rw-r--r-- | synapse/handlers/__init__.py | 8 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 25 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 8 | ||||
-rw-r--r-- | synapse/storage/appservice.py | 248 | ||||
-rw-r--r-- | synapse/storage/schema/delta/15/appservice_txns.sql | 31 |
8 files changed, 606 insertions, 38 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index a268a6bcc4..c60db16b74 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -20,6 +20,50 @@ import re logger = logging.getLogger(__name__) +class ApplicationServiceState(object): + DOWN = "down" + UP = "up" + + +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) + + def complete(self, store): + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. @@ -35,13 +79,13 @@ class ApplicationService(object): NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] def __init__(self, token, url=None, namespaces=None, hs_token=None, - sender=None, txn_id=None): + sender=None, id=None): self.token = token self.url = url self.hs_token = hs_token self.sender = sender self.namespaces = self._check_namespaces(namespaces) - self.txn_id = txn_id + self.id = id def _check_namespaces(self, namespaces): # Sanity check that it is of the form: diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c2179f8d55..2a9becccb3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -72,14 +72,19 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push_bulk(self, service, events): + def push_bulk(self, service, events, txn_id=None): events = self._serialize(events) + if txn_id is None: + logger.warning("push_bulk: Missing txn ID sending events to %s", + service.url) + txn_id = str(0) + txn_id = str(txn_id) + uri = service.url + ("/transactions/%s" % - urllib.quote(str(0))) # TODO txn_ids - response = None + urllib.quote(txn_id)) try: - response = yield self.put_json( + yield self.put_json( uri=uri, json_body={ "events": events @@ -87,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) - if response: # just an empty json object - # TODO: Mark txn as sent successfully - defer.returnValue(True) + defer.returnValue(True) + return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: @@ -97,8 +101,8 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push(self, service, event): - response = yield self.push_bulk(service, [event]) + def push(self, service, event, txn_id=None): + response = yield self.push_bulk(service, [event], txn_id) defer.returnValue(response) def _serialize(self, events): diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py new file mode 100644 index 0000000000..59b0b1f4ac --- /dev/null +++ b/synapse/appservice/scheduler.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +This module controls the reliability for application service transactions. + +The nominal flow through this module looks like: + __________ +1---ASa[e]-->| Service |--> Queue ASa[f] +2----ASb[e]->| Queuer | +3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e] + V + -````````- +------------+ + |````````|<--StoreTxn-|Transaction | + |Database| | Controller |---> SEND TO AS + `--------` +------------+ +What happens on SEND TO AS depends on the state of the Application Service: + - If the AS is marked as DOWN, do nothing. + - If the AS is marked as UP, send the transaction. + * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn + contents from the db. + * FAILURE : Marked AS as DOWN and start Recoverer. + +Recoverer attempts to recover ASes who have died. The flow for this looks like: + ,--------------------- backoff++ --------------. + V | + START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE + backoff DB and try to send it + ^ |___________ +Mark AS as | V +UP & quit +---------- YES SUCCESS + | | | + NO <--- Have more txns? <------ Mark txn success & nuke <-+ + from db; incr AS pos. + Reset backoff. + +This is all tied together by the AppServiceScheduler which DIs the required +components. +""" + +from synapse.appservice import ApplicationServiceState +from twisted.internet import defer +import logging + +logger = logging.getLogger(__name__) + + +class AppServiceScheduler(object): + """ Public facing API for this module. Does the required DI to tie the + components together. This also serves as the "event_pool", which in this + case is a simple array. + """ + + def __init__(self, clock, store, as_api): + self.clock = clock + self.store = store + self.as_api = as_api + + def create_recoverer(service, callback): + return _Recoverer(clock, store, as_api, service, callback) + + self.txn_ctrl = _TransactionController( + clock, store, as_api, create_recoverer + ) + self.queuer = _ServiceQueuer(self.txn_ctrl) + + @defer.inlineCallbacks + def start(self): + logger.info("Starting appservice scheduler") + # check for any DOWN ASes and start recoverers for them. + recoverers = yield _Recoverer.start( + self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + ) + self.txn_ctrl.add_recoverers(recoverers) + + def submit_event_for_as(self, service, event): + self.queuer.enqueue(service, event) + + +class _ServiceQueuer(object): + """Queues events for the same application service together, sending + transactions as soon as possible. Once a transaction is sent successfully, + this schedules any other events in the queue to run. + """ + + def __init__(self, txn_ctrl): + self.queued_events = {} # dict of {service_id: [events]} + self.pending_requests = {} # dict of {service_id: Deferred} + self.txn_ctrl = txn_ctrl + + def enqueue(self, service, event): + # if this service isn't being sent something + if not self.pending_requests.get(service.id): + self._send_request(service, [event]) + else: + # add to queue for this service + if service.id not in self.queued_events: + self.queued_events[service.id] = [] + self.queued_events[service.id].append(event) + + def _send_request(self, service, events): + # send request and add callbacks + d = self.txn_ctrl.send(service, events) + d.addBoth(self._on_request_finish) + d.addErrback(self._on_request_fail) + self.pending_requests[service.id] = d + + def _on_request_finish(self, service): + self.pending_requests[service.id] = None + # if there are queued events, then send them. + if (service.id in self.queued_events + and len(self.queued_events[service.id]) > 0): + self._send_request(service, self.queued_events[service.id]) + self.queued_events[service.id] = [] + + def _on_request_fail(self, err): + logger.error("AS request failed: %s", err) + + +class _TransactionController(object): + + def __init__(self, clock, store, as_api, recoverer_fn): + self.clock = clock + self.store = store + self.as_api = as_api + self.recoverer_fn = recoverer_fn + # keep track of how many recoverers there are + self.recoverers = [] + + @defer.inlineCallbacks + def send(self, service, events): + try: + txn = yield self.store.create_appservice_txn( + service=service, + events=events + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: + txn.complete(self.store) + else: + self._start_recoverer(service) + except Exception as e: + logger.exception(e) + self._start_recoverer(service) + # request has finished + defer.returnValue(service) + + @defer.inlineCallbacks + def on_recovered(self, recoverer): + self.recoverers.remove(recoverer) + logger.info("Successfully recovered application service AS ID %s", + recoverer.service.id) + logger.info("Remaining active recoverers: %s", len(self.recoverers)) + yield self.store.set_appservice_state( + recoverer.service, + ApplicationServiceState.UP + ) + + def add_recoverers(self, recoverers): + for r in recoverers: + self.recoverers.append(r) + if len(recoverers) > 0: + logger.info("New active recoverers: %s", len(self.recoverers)) + + @defer.inlineCallbacks + def _start_recoverer(self, service): + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + + @defer.inlineCallbacks + def _is_service_up(self, service): + state = yield self.store.get_appservice_state(service) + defer.returnValue(state == ApplicationServiceState.UP or state is None) + + +class _Recoverer(object): + + @staticmethod + @defer.inlineCallbacks + def start(clock, store, as_api, callback): + services = yield store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) + recoverers = [ + _Recoverer(clock, store, as_api, s, callback) for s in services + ] + for r in recoverers: + logger.info("Starting recoverer for AS ID %s which was marked as " + "DOWN", r.service.id) + r.recover() + defer.returnValue(recoverers) + + def __init__(self, clock, store, as_api, service, callback): + self.clock = clock + self.store = store + self.as_api = as_api + self.service = service + self.callback = callback + self.backoff_counter = 1 + + def recover(self): + self.clock.call_later((2 ** self.backoff_counter), self.retry) + + def _backoff(self): + # cap the backoff to be around 18h => (2^16) = 65536 secs + if self.backoff_counter < 16: + self.backoff_counter += 1 + self.recover() + + @defer.inlineCallbacks + def retry(self): + try: + txn = yield self.store.get_oldest_unsent_txn(self.service) + if txn: + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) + # reset the backoff counter and retry immediately + self.backoff_counter = 1 + yield self.retry() + else: + self._backoff() + else: + self._set_service_recovered() + except Exception as e: + logger.exception(e) + self._backoff() + + def _set_service_recovered(self): + self.callback(self) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 8d345bf936..0c51d615ec 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( @@ -54,7 +55,12 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( - hs, ApplicationServiceApi(hs) + hs, asapi, AppServiceScheduler( + clock=hs.get_clock(), + store=hs.get_datastore(), + as_api=asapi + ) ) self.sync_handler = SyncHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2c488a46f6..58b5b60bb7 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -26,15 +26,28 @@ import logging logger = logging.getLogger(__name__) +def log_failure(failure): + logger.error( + "Application Services Failure", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) + + # NB: Purposefully not inheriting BaseHandler since that contains way too much # setup code which this handler does not need or use. This makes testing a lot # easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api): + def __init__(self, hs, appservice_api, appservice_scheduler): self.store = hs.get_datastore() self.hs = hs self.appservice_api = appservice_api + self.scheduler = appservice_scheduler + self.started_scheduler = False @defer.inlineCallbacks def register(self, app_service): @@ -46,13 +59,13 @@ class ApplicationServicesHandler(object): ) if not stored_service: raise StoreError(404, "Application service not found") + app_service.id = stored_service.id except StoreError: raise SynapseError( 403, "Unrecognised application services token. " "Consult the home server admin.", errcode=Codes.FORBIDDEN ) - app_service.hs_token = self._generate_hs_token() # create a sender for this application service which is used when @@ -90,9 +103,13 @@ class ApplicationServicesHandler(object): if event.type == EventTypes.Member: yield self._check_user_exists(event.state_key) - # Fork off pushes to these services - XXX First cut, best effort + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services for service in services: - self.appservice_api.push(service, event) + self.scheduler.submit_event_for_as(service, event) @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c69d11261c..f4dec70393 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -14,9 +14,10 @@ # limitations under the License. from twisted.internet import defer - +from .appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) from ._base import Cache -from .appservice import ApplicationServiceStore from .directory import DirectoryStore from .events import EventsStore from .presence import PresenceStore @@ -50,7 +51,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -71,6 +72,7 @@ class DataStore(RoomMemberStore, RoomStore, FilteringStore, PusherStore, PushRuleStore, + ApplicationServiceTransactionStore, EventsStore, ): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 850676ce6c..93304a745f 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson from simplejson import JSONDecodeError +import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.services_cache = [] - self.cache_defer = self._populate_cache() + self.cache_defer = self._populate_appservice_cache() self.cache_defer.addErrback(log_failure) @defer.inlineCallbacks @@ -101,11 +101,12 @@ class ApplicationServiceStore(SQLBaseStore): if not service.hs_token: raise StoreError(500, "No HS token") - yield self.runInteraction( + as_id = yield self.runInteraction( "update_app_service", self._update_app_service_txn, service ) + service.id = as_id # update cache TODO: Should this be in the txn? for (index, cache_service) in enumerate(self.services_cache): @@ -124,7 +125,7 @@ class ApplicationServiceStore(SQLBaseStore): "update_app_service_txn: Failed to find as_id for token=", service.token ) - return False + return txn.execute( "UPDATE application_services SET url=?, hs_token=?, sender=? " @@ -142,9 +143,9 @@ class ApplicationServiceStore(SQLBaseStore): txn.execute( "INSERT INTO application_services_regex(" "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, simplejson.dumps(regex_obj)) + (as_id, ns_int, json.dumps(regex_obj)) ) - return True + return as_id def _get_as_id_txn(self, txn, token): cursor = txn.execute( @@ -277,12 +278,7 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - @defer.inlineCallbacks - def _populate_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + def _parse_services_dict(self, results): # SQL results in the form: # [ # { @@ -296,12 +292,14 @@ class ApplicationServiceStore(SQLBaseStore): # } # ] services = {} - results = yield self._execute_and_decode("_populate_cache", sql) for res in results: as_token = res["token"] + if as_token is None: + continue if as_token not in services: # add the service services[as_token] = { + "id": res["id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -319,20 +317,232 @@ class ApplicationServiceStore(SQLBaseStore): try: services[as_token]["namespaces"][ ApplicationService.NS_LIST[ns_int]].append( - simplejson.loads(res["regex"]) + json.loads(res["regex"]) ) except IndexError: logger.error("Bad namespace enum '%s'. %s", ns_int, res) except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) - # TODO get last successful txn id f.e. service + service_list = [] for service in services.values(): - logger.info("Found application service: %s", service) - self.services_cache.append(ApplicationService( + service_list.append(ApplicationService( token=service["token"], url=service["url"], namespaces=service["namespaces"], hs_token=service["hs_token"], - sender=service["sender"] + sender=service["sender"], + id=service["id"] )) + return service_list + + @defer.inlineCallbacks + def _populate_appservice_cache(self): + """Populates the ApplicationServiceCache from the database.""" + sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " + "application_services_regex AS r ON a.id = r.as_id") + + results = yield self._execute_and_decode("appservice_cache", sql) + services = self._parse_services_dict(results) + + for service in services: + logger.info("Found application service: %s", service) + self.services_cache.append(service) + + +class ApplicationServiceTransactionStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceTransactionStore, self).__init__(hs) + + @defer.inlineCallbacks + def get_appservices_by_state(self, state): + """Get a list of application services based on their state. + + Args: + state(ApplicationServiceState): The state to filter on. + Returns: + A Deferred which resolves to a list of ApplicationServices, which + may be empty. + """ + sql = ( + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" + " application_services AS a ON a.id=s.as_id LEFT JOIN" + " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + ) + results = yield self._execute_and_decode( + "get_appservices_by_state", sql, state + ) + # NB: This assumes this class is linked with ApplicationServiceStore + defer.returnValue(self._parse_services_dict(results)) + + @defer.inlineCallbacks + def get_appservice_state(self, service): + """Get the application service state. + + Args: + service(ApplicationService): The service whose state to set. + Returns: + A Deferred which resolves to ApplicationServiceState. + """ + result = yield self._simple_select_one( + "application_services_state", + dict(as_id=service.id), + ["state"], + allow_none=True + ) + if result: + defer.returnValue(result.get("state")) + return + defer.returnValue(None) + + def set_appservice_state(self, service, state): + """Set the application service state. + + Args: + service(ApplicationService): The service whose state to set. + state(ApplicationServiceState): The connectivity state to apply. + Returns: + A Deferred which resolves when the state was set successfully. + """ + return self._simple_upsert( + "application_services_state", + dict(as_id=service.id), + dict(state=state) + ) + + def create_appservice_txn(self, service, events): + """Atomically creates a new transaction for this application service + with the given list of events. + + Args: + service(ApplicationService): The service who the transaction is for. + events(list<Event>): A list of events to put in the transaction. + Returns: + AppServiceTransaction: A new transaction. + """ + return self.runInteraction( + "create_appservice_txn", + self._create_appservice_txn, + service, events + ) + + def _create_appservice_txn(self, txn, service, events): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + last_txn_id = self._get_last_txn(txn, service.id) + + result = txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = result.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 + + new_txn_id = max(highest_txn_id, last_txn_id) + 1 + + # Insert new txn into txn table + event_ids = [e.event_id for e in events] + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " + "VALUES(?,?,?)", + (service.id, new_txn_id, json.dumps(event_ids)) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) + + def complete_appservice_txn(self, txn_id, service): + """Completes an application service transaction. + + Args: + txn_id(str): The transaction ID being completed. + service(ApplicationService): The application service which was sent + this transaction. + Returns: + A Deferred which resolves if this transaction was stored + successfully. + """ + return self.runInteraction( + "complete_appservice_txn", + self._complete_appservice_txn, + txn_id, service + ) + + def _complete_appservice_txn(self, txn, txn_id, service): + txn_id = int(txn_id) + + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) + ) + + # Delete txn + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) + + def get_oldest_unsent_txn(self, service): + """Get the oldest transaction which has not been sent for this + service. + + Args: + service(ApplicationService): The app service to get the oldest txn. + Returns: + A Deferred which resolves to an AppServiceTransaction or + None. + """ + return self.runInteraction( + "get_oldest_unsent_appservice_txn", + self._get_oldest_unsent_txn, + service + ) + + def _get_oldest_unsent_txn(self, txn, service): + # Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) + result = txn.execute( + "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + entry = self.cursor_to_dict(result)[0] + if not entry or entry["txn_id"] is None: + # the min(txn_id) part will force a row, so entry may not be None + return None + + event_ids = json.loads(entry["event_ids"]) + events = self._get_events_txn(txn, event_ids) + + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=events + ) + + def _get_last_txn(self, txn, service_id): + result = txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service_id,) + ) + last_txn_id = result.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql new file mode 100644 index 0000000000..13bbb2de2e --- /dev/null +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -0,0 +1,31 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id INTEGER PRIMARY KEY, + state TEXT, + last_txn TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id INTEGER NOT NULL, + txn_id INTEGER NOT NULL, + event_ids TEXT NOT NULL, + UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK +); + + + |