diff options
-rw-r--r-- | changelog.d/12680.misc | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 47 | ||||
-rw-r--r-- | synapse/storage/schema/__init__.py | 5 | ||||
-rw-r--r-- | tests/handlers/test_appservice.py | 10 | ||||
-rw-r--r-- | tests/storage/test_appservice.py | 27 |
5 files changed, 35 insertions, 55 deletions
diff --git a/changelog.d/12680.misc b/changelog.d/12680.misc new file mode 100644 index 0000000000..dfd1f0a6c6 --- /dev/null +++ b/changelog.d/12680.misc @@ -0,0 +1 @@ +Remove code which updates unused database column `application_services_state.last_txn`. diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 945707b0ec..e284454b66 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -203,19 +203,29 @@ class ApplicationServiceTransactionWorkerStore( """Get the application service state. Args: - service: The service whose state to set. + service: The service whose state to get. Returns: - An ApplicationServiceState or none. + An ApplicationServiceState, or None if we have yet to attempt any + transactions to the AS. """ - result = await self.db_pool.simple_select_one( + # if we have created transactions for this AS but not yet attempted to send + # them, we will have a row in the table with state=NULL (recording the stream + # positions we have processed up to). + # + # On the other hand, if we have yet to create any transactions for this AS at + # all, then there will be no row for the AS. + # + # In either case, we return None to indicate "we don't yet know the state of + # this AS". + result = await self.db_pool.simple_select_one_onecol( "application_services_state", {"as_id": service.id}, - ["state"], + retcol="state", allow_none=True, desc="get_appservice_state", ) if result: - return ApplicationServiceState(result.get("state")) + return ApplicationServiceState(result) return None async def set_appservice_state( @@ -296,14 +306,6 @@ class ApplicationServiceTransactionWorkerStore( """ def _complete_appservice_txn(txn: LoggingTransaction) -> None: - # Set current txn_id for AS to 'txn_id' - self.db_pool.simple_upsert_txn( - txn, - "application_services_state", - {"as_id": service.id}, - {"last_txn": txn_id}, - ) - # Delete txn self.db_pool.simple_delete_txn( txn, @@ -452,16 +454,15 @@ class ApplicationServiceTransactionWorkerStore( % (stream_type,) ) - def set_appservice_stream_type_pos_txn(txn: LoggingTransaction) -> None: - stream_id_type = "%s_stream_id" % stream_type - txn.execute( - "UPDATE application_services_state SET %s = ? WHERE as_id=?" - % stream_id_type, - (pos, service.id), - ) - - await self.db_pool.runInteraction( - "set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn + # this may be the first time that we're recording any state for this AS, so + # we don't yet know if a row for it exists; hence we have to upsert here. + await self.db_pool.simple_upsert( + table="application_services_state", + keyvalues={"as_id": service.id}, + values={f"{stream_type}_stream_id": pos}, + # no need to lock when emulating upsert: as_id is a unique key + lock=False, + desc="set_appservice_stream_type_pos", ) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 20c344faea..da98f05e03 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -61,7 +61,9 @@ Changes in SCHEMA_VERSION = 68: Changes in SCHEMA_VERSION = 69: - We now write to `device_lists_changes_in_room` table. - - Use sequence to generate future `application_services_txns.txn_id`s + - We now use a PostgreSQL sequence to generate future txn_ids for + `application_services_txns`. `application_services_state.last_txn` is no longer + updated. Changes in SCHEMA_VERSION = 70: - event_reference_hashes is no longer written to. @@ -71,6 +73,7 @@ Changes in SCHEMA_VERSION = 70: SCHEMA_COMPAT_VERSION = ( # We now assume that `device_lists_changes_in_room` has been filled out for # recent device_list_updates. + # ... and that `application_services_state.last_txn` is not used. 69 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 5b0cd1ab86..53e7a5d81b 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -434,16 +434,6 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): }, ) - # "Complete" a transaction. - # All this really does for us is make an entry in the application_services_state - # database table, which tracks the current stream_token per stream ID per AS. - self.get_success( - self.hs.get_datastores().main.complete_appservice_txn( - 0, - interested_appservice, - ) - ) - # Now, pretend that we receive a large burst of read receipts (300 total) that # all come in at once. for i in range(300): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 1bf93e79a7..1047ed09c8 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -14,7 +14,7 @@ import json import os import tempfile -from typing import List, Optional, cast +from typing import List, cast from unittest.mock import Mock import yaml @@ -149,15 +149,12 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - def _set_state( - self, id: str, state: ApplicationServiceState, txn: Optional[int] = None - ): + def _set_state(self, id: str, state: ApplicationServiceState): return self.db_pool.runOperation( self.engine.convert_param_style( - "INSERT INTO application_services_state(as_id, state, last_txn) " - "VALUES(?,?,?)" + "INSERT INTO application_services_state(as_id, state) VALUES(?,?)" ), - (id, state.value, txn), + (id, state.value), ) def _insert_txn(self, as_id, txn_id, events): @@ -283,17 +280,6 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): res = self.get_success( self.db_pool.runQuery( self.engine.convert_param_style( - "SELECT last_txn FROM application_services_state WHERE as_id=?" - ), - (service.id,), - ) - ) - self.assertEqual(1, len(res)) - self.assertEqual(txn_id, res[0][0]) - - res = self.get_success( - self.db_pool.runQuery( - self.engine.convert_param_style( "SELECT * FROM application_services_txns WHERE txn_id=?" ), (txn_id,), @@ -316,14 +302,13 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): res = self.get_success( self.db_pool.runQuery( self.engine.convert_param_style( - "SELECT last_txn, state FROM application_services_state WHERE as_id=?" + "SELECT state FROM application_services_state WHERE as_id=?" ), (service.id,), ) ) self.assertEqual(1, len(res)) - self.assertEqual(txn_id, res[0][0]) - self.assertEqual(ApplicationServiceState.UP.value, res[0][1]) + self.assertEqual(ApplicationServiceState.UP.value, res[0][0]) res = self.get_success( self.db_pool.runQuery( |