summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/appservice.py85
-rw-r--r--synapse/storage/schema/delta/15/appservice_txns.sql2
-rw-r--r--tests/storage/test_appservice.py68
3 files changed, 139 insertions, 16 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 1360a00eae..d89b0cc8c9 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -429,15 +429,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         # The highest id may be the last one sent (in which case it is last_txn)
         # or it may be the highest in the txns list (which are waiting to be/are
         # being sent)
-        result = txn.execute(
-            "SELECT last_txn FROM application_services_state WHERE as_id=?",
-            (service.id,)
-        )
-        last_txn_id = result.fetchone()
-        if last_txn_id is None:  # no row exists
-            last_txn_id = 0
-        else:
-            last_txn_id = int(last_txn_id[0])  # select 'last_txn' col
+        last_txn_id = self._get_last_txn(txn, service.id)
 
         result = txn.execute(
             "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
@@ -467,12 +459,43 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
             service(ApplicationService): The application service which was sent
             this transaction.
         Returns:
-            A Deferred which resolves to True if this transaction was completed
+            A Deferred which resolves if this transaction was stored
             successfully.
         """
-        # TODO: Set current txn_id for AS to 'txn_id'
-        # TODO: Delete txn contents
-        pass
+        return self.runInteraction(
+            "complete_appservice_txn",
+            self._complete_appservice_txn,
+            txn_id, service
+        )
+
+    def _complete_appservice_txn(self, txn, txn_id, service):
+        txn_id = int(txn_id)
+
+        # Debugging query: Make sure the txn being completed is EXACTLY +1 from
+        # what was there before. If it isn't, we've got problems (e.g. the AS
+        # has probably missed some events), so whine loudly but still continue,
+        # since it shouldn't fail completion of the transaction.
+        last_txn_id = self._get_last_txn(txn, service.id)
+        if (last_txn_id + 1) != txn_id:
+            logger.error(
+                "appservice: Completing a transaction which has an ID > 1 from "
+                "the last ID sent to this AS. We've either dropped events or "
+                "sent it to the AS out of order. FIX ME. last_txn=%s "
+                "completing_txn=%s service_id=%s", last_txn_id, txn_id,
+                service.id
+            )
+
+        # Set current txn_id for AS to 'txn_id'
+        self._simple_upsert_txn(
+            txn, "application_services_state", dict(as_id=service.id),
+            dict(last_txn=txn_id)
+        )
+
+        # Delete txn contents
+        self._simple_delete_txn(
+            txn, "application_services_txns",
+            dict(txn_id=txn_id, as_id=service.id)
+        )
 
     def get_oldest_unsent_txn(self, service):
         """Get the oldest transaction which has not been sent for this
@@ -484,6 +507,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
             A Deferred which resolves to an AppServiceTransaction or
             None.
         """
-        # TODO: Monotonically increasing txn ids, so just select the smallest
+        return self.runInteraction(
+            "get_oldest_unsent_appservice_txn",
+            self._get_oldest_unsent_txn,
+            service
+        )
+
+    def _get_oldest_unsent_txn(self, txn, service):
+        # Monotonically increasing txn ids, so just select the smallest
         # one in the txns table (we delete them when they are sent)
-        pass
+        result = txn.execute(
+            "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?",
+            (service.id,)
+        )
+        entry = self.cursor_to_dict(result)[0]
+
+        if not entry or entry["txn_id"] is None:
+            # the min(txn_id) part will force a row, so entry may not be None
+            return None
+
+        return AppServiceTransaction(
+            service=service, id=entry["txn_id"], events=json.loads(
+                entry["content"]
+            )
+        )
+
+    def _get_last_txn(self, txn, service_id):
+        result = txn.execute(
+            "SELECT last_txn FROM application_services_state WHERE as_id=?",
+            (service_id,)
+        )
+        last_txn_id = result.fetchone()
+        if last_txn_id is None:  # no row exists
+            return 0
+        else:
+            return int(last_txn_id[0])  # select 'last_txn' col
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
index 11f0c799aa..ff15aa019e 100644
--- a/synapse/storage/schema/delta/15/appservice_txns.sql
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -15,7 +15,7 @@
 
 CREATE TABLE IF NOT EXISTS application_services_state(
     as_id INTEGER PRIMARY KEY,
-    state TEXT NOT NULL,
+    state TEXT,
     last_txn TEXT,
     FOREIGN KEY(as_id) REFERENCES application_services(id)
 );
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 7a8cdb5593..d1809c7f3b 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -305,6 +305,74 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         self.assertEquals(txn.service, service)
 
     @defer.inlineCallbacks
+    def test_complete_appservice_txn_first_txn(self):
+        service = Mock(id=self.as_list[0]["id"])
+        events = [{"foo": "bar"}]
+        txn_id = 1
+
+        yield self._insert_txn(service.id, txn_id, events)
+        yield self.store.complete_appservice_txn(txn_id=txn_id, service=service)
+
+        res = yield self.db_pool.runQuery(
+            "SELECT last_txn FROM application_services_state WHERE as_id=?",
+            (service.id,)
+        )
+        self.assertEquals(1, len(res))
+        self.assertEquals(str(txn_id), res[0][0])
+
+        res = yield self.db_pool.runQuery(
+            "SELECT * FROM application_services_txns WHERE txn_id=?",
+            (txn_id,)
+        )
+        self.assertEquals(0, len(res))
+
+    @defer.inlineCallbacks
+    def test_complete_appservice_txn_existing_in_state_table(self):
+        service = Mock(id=self.as_list[0]["id"])
+        events = [{"foo": "bar"}]
+        txn_id = 5
+        yield self._set_last_txn(service.id, 4)
+        yield self._insert_txn(service.id, txn_id, events)
+        yield self.store.complete_appservice_txn(txn_id=txn_id, service=service)
+
+        res = yield self.db_pool.runQuery(
+            "SELECT last_txn, state FROM application_services_state WHERE "
+            "as_id=?",
+            (service.id,)
+        )
+        self.assertEquals(1, len(res))
+        self.assertEquals(str(txn_id), res[0][0])
+        self.assertEquals(ApplicationServiceState.UP, res[0][1])
+
+        res = yield self.db_pool.runQuery(
+            "SELECT * FROM application_services_txns WHERE txn_id=?",
+            (txn_id,)
+        )
+        self.assertEquals(0, len(res))
+
+    @defer.inlineCallbacks
+    def test_get_oldest_unsent_txn_none(self):
+        service = Mock(id=self.as_list[0]["id"])
+
+        txn = yield self.store.get_oldest_unsent_txn(service)
+        self.assertEquals(None, txn)
+
+    @defer.inlineCallbacks
+    def test_get_oldest_unsent_txn(self):
+        service = Mock(id=self.as_list[0]["id"])
+        events = [{"type": "nothing"}, {"type": "here"}]
+
+        yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"})
+        yield self._insert_txn(service.id, 10, events)
+        yield self._insert_txn(service.id, 11, [{"foo":"bar"}])
+        yield self._insert_txn(service.id, 12, [{"argh":"bargh"}])
+
+        txn = yield self.store.get_oldest_unsent_txn(service)
+        self.assertEquals(service, txn.service)
+        self.assertEquals(10, txn.id)
+        self.assertEquals(events, txn.events)
+
+    @defer.inlineCallbacks
     def test_get_appservices_by_state_single(self):
         yield self._set_state(
             self.as_list[0]["id"], ApplicationServiceState.DOWN