diff options
Diffstat (limited to 'synapse/storage/appservice.py')
-rw-r--r-- | synapse/storage/appservice.py | 67 |
1 files changed, 30 insertions, 37 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 31248d5e06..6092f600ba 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -51,8 +51,7 @@ def _make_exclusive_regex(services_cache): class ApplicationServiceWorkerStore(SQLBaseStore): def __init__(self, db_conn, hs): self.services_cache = load_appservices( - hs.hostname, - hs.config.app_service_config_files + hs.hostname, hs.config.app_service_config_files ) self.exclusive_user_regex = _make_exclusive_regex(self.services_cache) @@ -122,8 +121,9 @@ class ApplicationServiceStore(ApplicationServiceWorkerStore): pass -class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, - EventsWorkerStore): +class ApplicationServiceTransactionWorkerStore( + ApplicationServiceWorkerStore, EventsWorkerStore +): @defer.inlineCallbacks def get_appservices_by_state(self, state): """Get a list of application services based on their state. @@ -135,9 +135,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, may be empty. """ results = yield self._simple_select_list( - "application_services_state", - dict(state=state), - ["as_id"] + "application_services_state", dict(state=state), ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore as_list = self.get_app_services() @@ -180,9 +178,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, A Deferred which resolves when the state was set successfully. """ return self._simple_upsert( - "application_services_state", - dict(as_id=service.id), - dict(state=state) + "application_services_state", dict(as_id=service.id), dict(state=state) ) def create_appservice_txn(self, service, events): @@ -195,6 +191,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, Returns: AppServiceTransaction: A new transaction. """ + def _create_appservice_txn(txn): # work out new txn id (highest txn id for this service += 1) # The highest id may be the last one sent (in which case it is last_txn) @@ -204,7 +201,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, txn.execute( "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", - (service.id,) + (service.id,), ) highest_txn_id = txn.fetchone()[0] if highest_txn_id is None: @@ -217,16 +214,11 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, txn.execute( "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (service.id, new_txn_id, event_ids) - ) - return AppServiceTransaction( - service=service, id=new_txn_id, events=events + (service.id, new_txn_id, event_ids), ) + return AppServiceTransaction(service=service, id=new_txn_id, events=events) - return self.runInteraction( - "create_appservice_txn", - _create_appservice_txn, - ) + return self.runInteraction("create_appservice_txn", _create_appservice_txn) def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. @@ -252,26 +244,26 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, "appservice: Completing a transaction which has an ID > 1 from " "the last ID sent to this AS. We've either dropped events or " "sent it to the AS out of order. FIX ME. last_txn=%s " - "completing_txn=%s service_id=%s", last_txn_id, txn_id, - service.id + "completing_txn=%s service_id=%s", + last_txn_id, + txn_id, + service.id, ) # Set current txn_id for AS to 'txn_id' self._simple_upsert_txn( - txn, "application_services_state", dict(as_id=service.id), - dict(last_txn=txn_id) + txn, + "application_services_state", + dict(as_id=service.id), + dict(last_txn=txn_id), ) # Delete txn self._simple_delete_txn( - txn, "application_services_txns", - dict(txn_id=txn_id, as_id=service.id) + txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id) ) - return self.runInteraction( - "complete_appservice_txn", - _complete_appservice_txn, - ) + return self.runInteraction("complete_appservice_txn", _complete_appservice_txn) @defer.inlineCallbacks def get_oldest_unsent_txn(self, service): @@ -284,13 +276,14 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, A Deferred which resolves to an AppServiceTransaction or None. """ + def _get_oldest_unsent_txn(txn): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) txn.execute( "SELECT * FROM application_services_txns WHERE as_id=?" " ORDER BY txn_id ASC LIMIT 1", - (service.id,) + (service.id,), ) rows = self.cursor_to_dict(txn) if not rows: @@ -301,8 +294,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, return entry entry = yield self.runInteraction( - "get_oldest_unsent_appservice_txn", - _get_oldest_unsent_txn, + "get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn ) if not entry: @@ -312,14 +304,14 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, events = yield self._get_events(event_ids) - defer.returnValue(AppServiceTransaction( - service=service, id=entry["txn_id"], events=events - )) + defer.returnValue( + AppServiceTransaction(service=service, id=entry["txn_id"], events=events) + ) def _get_last_txn(self, txn, service_id): txn.execute( "SELECT last_txn FROM application_services_state WHERE as_id=?", - (service_id,) + (service_id,), ) last_txn_id = txn.fetchone() if last_txn_id is None or last_txn_id[0] is None: # no row exists @@ -332,6 +324,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, txn.execute( "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) ) + return self.runInteraction( "set_appservice_last_pos", set_appservice_last_pos_txn ) @@ -362,7 +355,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, return upper_bound, [row[1] for row in rows] upper_bound, event_ids = yield self.runInteraction( - "get_new_events_for_appservice", get_new_events_for_appservice_txn, + "get_new_events_for_appservice", get_new_events_for_appservice_txn ) events = yield self._get_events(event_ids) |