diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 1360a00eae..d89b0cc8c9 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -429,15 +429,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
# The highest id may be the last one sent (in which case it is last_txn)
# or it may be the highest in the txns list (which are waiting to be/are
# being sent)
- result = txn.execute(
- "SELECT last_txn FROM application_services_state WHERE as_id=?",
- (service.id,)
- )
- last_txn_id = result.fetchone()
- if last_txn_id is None: # no row exists
- last_txn_id = 0
- else:
- last_txn_id = int(last_txn_id[0]) # select 'last_txn' col
+ last_txn_id = self._get_last_txn(txn, service.id)
result = txn.execute(
"SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
@@ -467,12 +459,43 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
service(ApplicationService): The application service which was sent
this transaction.
Returns:
- A Deferred which resolves to True if this transaction was completed
+ A Deferred which resolves if this transaction was stored
successfully.
"""
- # TODO: Set current txn_id for AS to 'txn_id'
- # TODO: Delete txn contents
- pass
+ return self.runInteraction(
+ "complete_appservice_txn",
+ self._complete_appservice_txn,
+ txn_id, service
+ )
+
+ def _complete_appservice_txn(self, txn, txn_id, service):
+ txn_id = int(txn_id)
+
+ # Debugging query: Make sure the txn being completed is EXACTLY +1 from
+ # what was there before. If it isn't, we've got problems (e.g. the AS
+ # has probably missed some events), so whine loudly but still continue,
+ # since it shouldn't fail completion of the transaction.
+ last_txn_id = self._get_last_txn(txn, service.id)
+ if (last_txn_id + 1) != txn_id:
+ logger.error(
+ "appservice: Completing a transaction which has an ID > 1 from "
+ "the last ID sent to this AS. We've either dropped events or "
+ "sent it to the AS out of order. FIX ME. last_txn=%s "
+ "completing_txn=%s service_id=%s", last_txn_id, txn_id,
+ service.id
+ )
+
+ # Set current txn_id for AS to 'txn_id'
+ self._simple_upsert_txn(
+ txn, "application_services_state", dict(as_id=service.id),
+ dict(last_txn=txn_id)
+ )
+
+ # Delete txn contents
+ self._simple_delete_txn(
+ txn, "application_services_txns",
+ dict(txn_id=txn_id, as_id=service.id)
+ )
def get_oldest_unsent_txn(self, service):
"""Get the oldest transaction which has not been sent for this
@@ -484,6 +507,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
A Deferred which resolves to an AppServiceTransaction or
None.
"""
- # TODO: Monotonically increasing txn ids, so just select the smallest
+ return self.runInteraction(
+ "get_oldest_unsent_appservice_txn",
+ self._get_oldest_unsent_txn,
+ service
+ )
+
+ def _get_oldest_unsent_txn(self, txn, service):
+ # Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
- pass
+ result = txn.execute(
+ "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?",
+ (service.id,)
+ )
+ entry = self.cursor_to_dict(result)[0]
+
+ if not entry or entry["txn_id"] is None:
+ # the min(txn_id) part will force a row, so entry may not be None
+ return None
+
+ return AppServiceTransaction(
+ service=service, id=entry["txn_id"], events=json.loads(
+ entry["content"]
+ )
+ )
+
+ def _get_last_txn(self, txn, service_id):
+ result = txn.execute(
+ "SELECT last_txn FROM application_services_state WHERE as_id=?",
+ (service_id,)
+ )
+ last_txn_id = result.fetchone()
+ if last_txn_id is None: # no row exists
+ return 0
+ else:
+ return int(last_txn_id[0]) # select 'last_txn' col
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
index 11f0c799aa..ff15aa019e 100644
--- a/synapse/storage/schema/delta/15/appservice_txns.sql
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -15,7 +15,7 @@
CREATE TABLE IF NOT EXISTS application_services_state(
as_id INTEGER PRIMARY KEY,
- state TEXT NOT NULL,
+ state TEXT,
last_txn TEXT,
FOREIGN KEY(as_id) REFERENCES application_services(id)
);
|