summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/appservice.py46
-rw-r--r--tests/storage/test_appservice.py67
2 files changed, 109 insertions, 4 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 37078f9ef0..1360a00eae 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -17,9 +17,10 @@ from simplejson import JSONDecodeError
 import simplejson as json
 from twisted.internet import defer
 
+from syutil.jsonutil import encode_canonical_json
 from synapse.api.constants import Membership
 from synapse.api.errors import StoreError
-from synapse.appservice import ApplicationService, ApplicationServiceState
+from synapse.appservice import ApplicationService, AppServiceTransaction
 from synapse.storage.roommember import RoomsForUser
 from ._base import SQLBaseStore
 
@@ -417,9 +418,46 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         Returns:
             AppServiceTransaction: A new transaction.
         """
-        # TODO: work out txn id (highest txn id for this service += 1)
-        # TODO: Within same db transaction, Insert new txn into txn table
-        pass
+        return self.runInteraction(
+            "create_appservice_txn",
+            self._create_appservice_txn,
+            service, events
+        )
+
+    def _create_appservice_txn(self, txn, service, events):
+        # work out new txn id (highest txn id for this service += 1)
+        # The highest id may be the last one sent (in which case it is last_txn)
+        # or it may be the highest in the txns list (which are waiting to be/are
+        # being sent)
+        result = txn.execute(
+            "SELECT last_txn FROM application_services_state WHERE as_id=?",
+            (service.id,)
+        )
+        last_txn_id = result.fetchone()
+        if last_txn_id is None:  # no row exists
+            last_txn_id = 0
+        else:
+            last_txn_id = int(last_txn_id[0])  # select 'last_txn' col
+
+        result = txn.execute(
+            "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
+            (service.id,)
+        )
+        highest_txn_id = result.fetchone()[0]
+        if highest_txn_id is None:
+            highest_txn_id = 0
+
+        new_txn_id = max(highest_txn_id, last_txn_id) + 1
+
+        # Insert new txn into txn table
+        txn.execute(
+            "INSERT INTO application_services_txns(as_id, txn_id, content) "
+            "VALUES(?,?,?)",
+            (service.id, new_txn_id, encode_canonical_json(events))
+        )
+        return AppServiceTransaction(
+            service=service, id=new_txn_id, events=events
+        )
 
     def complete_appservice_txn(self, txn_id, service):
         """Completes an application service transaction.
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 30c0b43d96..7a8cdb5593 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -21,6 +21,7 @@ from synapse.storage.appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
 
+import json
 from mock import Mock
 from tests.utils import SQLiteMemoryDbPool, MockClock
 
@@ -166,6 +167,20 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
             (id, state, txn)
         )
 
+    def _insert_txn(self, as_id, txn_id, content):
+        return self.db_pool.runQuery(
+            "INSERT INTO application_services_txns(as_id, txn_id, content) "
+            "VALUES(?,?,?)",
+            (as_id, txn_id, json.dumps(content))
+        )
+
+    def _set_last_txn(self, as_id, txn_id):
+        return self.db_pool.runQuery(
+            "INSERT INTO application_services_state(as_id, last_txn, state) "
+            "VALUES(?,?,?)",
+            (as_id, txn_id, ApplicationServiceState.UP)
+        )
+
     @defer.inlineCallbacks
     def test_get_appservice_state_none(self):
         service = Mock(id=999)
@@ -238,6 +253,58 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         self.assertEquals(service.id, rows[0][0])
 
     @defer.inlineCallbacks
+    def test_create_appservice_txn_first(self):
+        service = Mock(id=self.as_list[0]["id"])
+        events = [{"type": "nothing"}, {"type": "here"}]
+        txn = yield self.store.create_appservice_txn(service, events)
+        self.assertEquals(txn.id, 1)
+        self.assertEquals(txn.events, events)
+        self.assertEquals(txn.service, service)
+
+    @defer.inlineCallbacks
+    def test_create_appservice_txn_older_last_txn(self):
+        service = Mock(id=self.as_list[0]["id"])
+        events = [{"type": "nothing"}, {"type": "here"}]
+        yield self._set_last_txn(service.id, 9643)  # AS is falling behind
+        yield self._insert_txn(service.id, 9644, events)
+        yield self._insert_txn(service.id, 9645, events)
+        txn = yield self.store.create_appservice_txn(service, events)
+        self.assertEquals(txn.id, 9646)
+        self.assertEquals(txn.events, events)
+        self.assertEquals(txn.service, service)
+
+    @defer.inlineCallbacks
+    def test_create_appservice_txn_up_to_date_last_txn(self):
+        service = Mock(id=self.as_list[0]["id"])
+        events = [{"type": "nothing"}, {"type": "here"}]
+        yield self._set_last_txn(service.id, 9643)
+        txn = yield self.store.create_appservice_txn(service, events)
+        self.assertEquals(txn.id, 9644)
+        self.assertEquals(txn.events, events)
+        self.assertEquals(txn.service, service)
+
+    @defer.inlineCallbacks
+    def test_create_appservice_txn_up_fuzzing(self):
+        service = Mock(id=self.as_list[0]["id"])
+        events = [{"type": "nothing"}, {"type": "here"}]
+        yield self._set_last_txn(service.id, 9643)
+
+        # dump in rows with higher IDs to make sure the queries aren't wrong.
+        yield self._set_last_txn(self.as_list[1]["id"], 119643)
+        yield self._set_last_txn(self.as_list[2]["id"], 9)
+        yield self._set_last_txn(self.as_list[3]["id"], 9643)
+        yield self._insert_txn(self.as_list[1]["id"], 119644, events)
+        yield self._insert_txn(self.as_list[1]["id"], 119645, events)
+        yield self._insert_txn(self.as_list[1]["id"], 119646, events)
+        yield self._insert_txn(self.as_list[2]["id"], 10, events)
+        yield self._insert_txn(self.as_list[3]["id"], 9643, events)
+
+        txn = yield self.store.create_appservice_txn(service, events)
+        self.assertEquals(txn.id, 9644)
+        self.assertEquals(txn.events, events)
+        self.assertEquals(txn.service, service)
+
+    @defer.inlineCallbacks
     def test_get_appservices_by_state_single(self):
         yield self._set_state(
             self.as_list[0]["id"], ApplicationServiceState.DOWN