summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12680.misc1
-rw-r--r--synapse/storage/databases/main/appservice.py47
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--tests/handlers/test_appservice.py10
-rw-r--r--tests/storage/test_appservice.py27
5 files changed, 35 insertions, 55 deletions
diff --git a/changelog.d/12680.misc b/changelog.d/12680.misc
new file mode 100644
index 0000000000..dfd1f0a6c6
--- /dev/null
+++ b/changelog.d/12680.misc
@@ -0,0 +1 @@
+Remove code which updates unused database column `application_services_state.last_txn`.
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 945707b0ec..e284454b66 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -203,19 +203,29 @@ class ApplicationServiceTransactionWorkerStore(
         """Get the application service state.
 
         Args:
-            service: The service whose state to set.
+            service: The service whose state to get.
         Returns:
-            An ApplicationServiceState or none.
+            An ApplicationServiceState, or None if we have yet to attempt any
+            transactions to the AS.
         """
-        result = await self.db_pool.simple_select_one(
+        # if we have created transactions for this AS but not yet attempted to send
+        # them, we will have a row in the table with state=NULL (recording the stream
+        # positions we have processed up to).
+        #
+        # On the other hand, if we have yet to create any transactions for this AS at
+        # all, then there will be no row for the AS.
+        #
+        # In either case, we return None to indicate "we don't yet know the state of
+        # this AS".
+        result = await self.db_pool.simple_select_one_onecol(
             "application_services_state",
             {"as_id": service.id},
-            ["state"],
+            retcol="state",
             allow_none=True,
             desc="get_appservice_state",
         )
         if result:
-            return ApplicationServiceState(result.get("state"))
+            return ApplicationServiceState(result)
         return None
 
     async def set_appservice_state(
@@ -296,14 +306,6 @@ class ApplicationServiceTransactionWorkerStore(
         """
 
         def _complete_appservice_txn(txn: LoggingTransaction) -> None:
-            # Set current txn_id for AS to 'txn_id'
-            self.db_pool.simple_upsert_txn(
-                txn,
-                "application_services_state",
-                {"as_id": service.id},
-                {"last_txn": txn_id},
-            )
-
             # Delete txn
             self.db_pool.simple_delete_txn(
                 txn,
@@ -452,16 +454,15 @@ class ApplicationServiceTransactionWorkerStore(
                 % (stream_type,)
             )
 
-        def set_appservice_stream_type_pos_txn(txn: LoggingTransaction) -> None:
-            stream_id_type = "%s_stream_id" % stream_type
-            txn.execute(
-                "UPDATE application_services_state SET %s = ? WHERE as_id=?"
-                % stream_id_type,
-                (pos, service.id),
-            )
-
-        await self.db_pool.runInteraction(
-            "set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn
+        # this may be the first time that we're recording any state for this AS, so
+        # we don't yet know if a row for it exists; hence we have to upsert here.
+        await self.db_pool.simple_upsert(
+            table="application_services_state",
+            keyvalues={"as_id": service.id},
+            values={f"{stream_type}_stream_id": pos},
+            # no need to lock when emulating upsert: as_id is a unique key
+            lock=False,
+            desc="set_appservice_stream_type_pos",
         )
 
 
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 20c344faea..da98f05e03 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -61,7 +61,9 @@ Changes in SCHEMA_VERSION = 68:
 
 Changes in SCHEMA_VERSION = 69:
     - We now write to `device_lists_changes_in_room` table.
-    - Use sequence to generate future `application_services_txns.txn_id`s
+    - We now use a PostgreSQL sequence to generate future txn_ids for
+      `application_services_txns`. `application_services_state.last_txn` is no longer
+      updated.
 
 Changes in SCHEMA_VERSION = 70:
     - event_reference_hashes is no longer written to.
@@ -71,6 +73,7 @@ Changes in SCHEMA_VERSION = 70:
 SCHEMA_COMPAT_VERSION = (
     # We now assume that `device_lists_changes_in_room` has been filled out for
     # recent device_list_updates.
+    # ... and that `application_services_state.last_txn` is not used.
     69
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 5b0cd1ab86..53e7a5d81b 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -434,16 +434,6 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
             },
         )
 
-        # "Complete" a transaction.
-        # All this really does for us is make an entry in the application_services_state
-        # database table, which tracks the current stream_token per stream ID per AS.
-        self.get_success(
-            self.hs.get_datastores().main.complete_appservice_txn(
-                0,
-                interested_appservice,
-            )
-        )
-
         # Now, pretend that we receive a large burst of read receipts (300 total) that
         # all come in at once.
         for i in range(300):
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 1bf93e79a7..1047ed09c8 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -14,7 +14,7 @@
 import json
 import os
 import tempfile
-from typing import List, Optional, cast
+from typing import List, cast
 from unittest.mock import Mock
 
 import yaml
@@ -149,15 +149,12 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
             outfile.write(yaml.dump(as_yaml))
             self.as_yaml_files.append(as_token)
 
-    def _set_state(
-        self, id: str, state: ApplicationServiceState, txn: Optional[int] = None
-    ):
+    def _set_state(self, id: str, state: ApplicationServiceState):
         return self.db_pool.runOperation(
             self.engine.convert_param_style(
-                "INSERT INTO application_services_state(as_id, state, last_txn) "
-                "VALUES(?,?,?)"
+                "INSERT INTO application_services_state(as_id, state) VALUES(?,?)"
             ),
-            (id, state.value, txn),
+            (id, state.value),
         )
 
     def _insert_txn(self, as_id, txn_id, events):
@@ -283,17 +280,6 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
         res = self.get_success(
             self.db_pool.runQuery(
                 self.engine.convert_param_style(
-                    "SELECT last_txn FROM application_services_state WHERE as_id=?"
-                ),
-                (service.id,),
-            )
-        )
-        self.assertEqual(1, len(res))
-        self.assertEqual(txn_id, res[0][0])
-
-        res = self.get_success(
-            self.db_pool.runQuery(
-                self.engine.convert_param_style(
                     "SELECT * FROM application_services_txns WHERE txn_id=?"
                 ),
                 (txn_id,),
@@ -316,14 +302,13 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
         res = self.get_success(
             self.db_pool.runQuery(
                 self.engine.convert_param_style(
-                    "SELECT last_txn, state FROM application_services_state WHERE as_id=?"
+                    "SELECT state FROM application_services_state WHERE as_id=?"
                 ),
                 (service.id,),
             )
         )
         self.assertEqual(1, len(res))
-        self.assertEqual(txn_id, res[0][0])
-        self.assertEqual(ApplicationServiceState.UP.value, res[0][1])
+        self.assertEqual(ApplicationServiceState.UP.value, res[0][0])
 
         res = self.get_success(
             self.db_pool.runQuery(