diff options
Diffstat (limited to 'synapse/storage/databases/main/appservice.py')
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 47 |
1 files changed, 24 insertions, 23 deletions
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 945707b0ec..e284454b66 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -203,19 +203,29 @@ class ApplicationServiceTransactionWorkerStore( """Get the application service state. Args: - service: The service whose state to set. + service: The service whose state to get. Returns: - An ApplicationServiceState or none. + An ApplicationServiceState, or None if we have yet to attempt any + transactions to the AS. """ - result = await self.db_pool.simple_select_one( + # if we have created transactions for this AS but not yet attempted to send + # them, we will have a row in the table with state=NULL (recording the stream + # positions we have processed up to). + # + # On the other hand, if we have yet to create any transactions for this AS at + # all, then there will be no row for the AS. + # + # In either case, we return None to indicate "we don't yet know the state of + # this AS". + result = await self.db_pool.simple_select_one_onecol( "application_services_state", {"as_id": service.id}, - ["state"], + retcol="state", allow_none=True, desc="get_appservice_state", ) if result: - return ApplicationServiceState(result.get("state")) + return ApplicationServiceState(result) return None async def set_appservice_state( @@ -296,14 +306,6 @@ class ApplicationServiceTransactionWorkerStore( """ def _complete_appservice_txn(txn: LoggingTransaction) -> None: - # Set current txn_id for AS to 'txn_id' - self.db_pool.simple_upsert_txn( - txn, - "application_services_state", - {"as_id": service.id}, - {"last_txn": txn_id}, - ) - # Delete txn self.db_pool.simple_delete_txn( txn, @@ -452,16 +454,15 @@ class ApplicationServiceTransactionWorkerStore( % (stream_type,) ) - def set_appservice_stream_type_pos_txn(txn: LoggingTransaction) -> None: - stream_id_type = "%s_stream_id" % stream_type - txn.execute( - "UPDATE application_services_state SET %s = ? WHERE as_id=?" - % stream_id_type, - (pos, service.id), - ) - - await self.db_pool.runInteraction( - "set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn + # this may be the first time that we're recording any state for this AS, so + # we don't yet know if a row for it exists; hence we have to upsert here. + await self.db_pool.simple_upsert( + table="application_services_state", + keyvalues={"as_id": service.id}, + values={f"{stream_type}_stream_id": pos}, + # no need to lock when emulating upsert: as_id is a unique key + lock=False, + desc="set_appservice_stream_type_pos", ) |