diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 37078f9ef0..1360a00eae 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -17,9 +17,10 @@ from simplejson import JSONDecodeError
import simplejson as json
from twisted.internet import defer
+from syutil.jsonutil import encode_canonical_json
from synapse.api.constants import Membership
from synapse.api.errors import StoreError
-from synapse.appservice import ApplicationService, ApplicationServiceState
+from synapse.appservice import ApplicationService, AppServiceTransaction
from synapse.storage.roommember import RoomsForUser
from ._base import SQLBaseStore
@@ -417,9 +418,46 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
Returns:
AppServiceTransaction: A new transaction.
"""
- # TODO: work out txn id (highest txn id for this service += 1)
- # TODO: Within same db transaction, Insert new txn into txn table
- pass
+ return self.runInteraction(
+ "create_appservice_txn",
+ self._create_appservice_txn,
+ service, events
+ )
+
+ def _create_appservice_txn(self, txn, service, events):
+ # work out new txn id (highest txn id for this service += 1)
+ # The highest id may be the last one sent (in which case it is last_txn)
+ # or it may be the highest in the txns list (which are waiting to be/are
+ # being sent)
+ result = txn.execute(
+ "SELECT last_txn FROM application_services_state WHERE as_id=?",
+ (service.id,)
+ )
+ last_txn_id = result.fetchone()
+ if last_txn_id is None: # no row exists
+ last_txn_id = 0
+ else:
+ last_txn_id = int(last_txn_id[0]) # select 'last_txn' col
+
+ result = txn.execute(
+ "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
+ (service.id,)
+ )
+ highest_txn_id = result.fetchone()[0]
+ if highest_txn_id is None:
+ highest_txn_id = 0
+
+ new_txn_id = max(highest_txn_id, last_txn_id) + 1
+
+ # Insert new txn into txn table
+ txn.execute(
+ "INSERT INTO application_services_txns(as_id, txn_id, content) "
+ "VALUES(?,?,?)",
+ (service.id, new_txn_id, encode_canonical_json(events))
+ )
+ return AppServiceTransaction(
+ service=service, id=new_txn_id, events=events
+ )
def complete_appservice_txn(self, txn_id, service):
"""Completes an application service transaction.
|