diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 47 | ||||
-rw-r--r-- | synapse/storage/schema/__init__.py | 5 |
2 files changed, 28 insertions, 24 deletions
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 945707b0ec..e284454b66 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -203,19 +203,29 @@ class ApplicationServiceTransactionWorkerStore( """Get the application service state. Args: - service: The service whose state to set. + service: The service whose state to get. Returns: - An ApplicationServiceState or none. + An ApplicationServiceState, or None if we have yet to attempt any + transactions to the AS. """ - result = await self.db_pool.simple_select_one( + # if we have created transactions for this AS but not yet attempted to send + # them, we will have a row in the table with state=NULL (recording the stream + # positions we have processed up to). + # + # On the other hand, if we have yet to create any transactions for this AS at + # all, then there will be no row for the AS. + # + # In either case, we return None to indicate "we don't yet know the state of + # this AS". + result = await self.db_pool.simple_select_one_onecol( "application_services_state", {"as_id": service.id}, - ["state"], + retcol="state", allow_none=True, desc="get_appservice_state", ) if result: - return ApplicationServiceState(result.get("state")) + return ApplicationServiceState(result) return None async def set_appservice_state( @@ -296,14 +306,6 @@ class ApplicationServiceTransactionWorkerStore( """ def _complete_appservice_txn(txn: LoggingTransaction) -> None: - # Set current txn_id for AS to 'txn_id' - self.db_pool.simple_upsert_txn( - txn, - "application_services_state", - {"as_id": service.id}, - {"last_txn": txn_id}, - ) - # Delete txn self.db_pool.simple_delete_txn( txn, @@ -452,16 +454,15 @@ class ApplicationServiceTransactionWorkerStore( % (stream_type,) ) - def set_appservice_stream_type_pos_txn(txn: LoggingTransaction) -> None: - stream_id_type = "%s_stream_id" % stream_type - txn.execute( - "UPDATE application_services_state SET %s = ? WHERE as_id=?" - % stream_id_type, - (pos, service.id), - ) - - await self.db_pool.runInteraction( - "set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn + # this may be the first time that we're recording any state for this AS, so + # we don't yet know if a row for it exists; hence we have to upsert here. + await self.db_pool.simple_upsert( + table="application_services_state", + keyvalues={"as_id": service.id}, + values={f"{stream_type}_stream_id": pos}, + # no need to lock when emulating upsert: as_id is a unique key + lock=False, + desc="set_appservice_stream_type_pos", ) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 20c344faea..da98f05e03 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -61,7 +61,9 @@ Changes in SCHEMA_VERSION = 68: Changes in SCHEMA_VERSION = 69: - We now write to `device_lists_changes_in_room` table. - - Use sequence to generate future `application_services_txns.txn_id`s + - We now use a PostgreSQL sequence to generate future txn_ids for + `application_services_txns`. `application_services_state.last_txn` is no longer + updated. Changes in SCHEMA_VERSION = 70: - event_reference_hashes is no longer written to. @@ -71,6 +73,7 @@ Changes in SCHEMA_VERSION = 70: SCHEMA_COMPAT_VERSION = ( # We now assume that `device_lists_changes_in_room` has been filled out for # recent device_list_updates. + # ... and that `application_services_state.last_txn` is not used. 69 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat |