diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 945707b0ec..e284454b66 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -203,19 +203,29 @@ class ApplicationServiceTransactionWorkerStore(
"""Get the application service state.
Args:
- service: The service whose state to set.
+ service: The service whose state to get.
Returns:
- An ApplicationServiceState or none.
+ An ApplicationServiceState, or None if we have yet to attempt any
+ transactions to the AS.
"""
- result = await self.db_pool.simple_select_one(
+ # if we have created transactions for this AS but not yet attempted to send
+ # them, we will have a row in the table with state=NULL (recording the stream
+ # positions we have processed up to).
+ #
+ # On the other hand, if we have yet to create any transactions for this AS at
+ # all, then there will be no row for the AS.
+ #
+ # In either case, we return None to indicate "we don't yet know the state of
+ # this AS".
+ result = await self.db_pool.simple_select_one_onecol(
"application_services_state",
{"as_id": service.id},
- ["state"],
+ retcol="state",
allow_none=True,
desc="get_appservice_state",
)
if result:
- return ApplicationServiceState(result.get("state"))
+ return ApplicationServiceState(result)
return None
async def set_appservice_state(
@@ -296,14 +306,6 @@ class ApplicationServiceTransactionWorkerStore(
"""
def _complete_appservice_txn(txn: LoggingTransaction) -> None:
- # Set current txn_id for AS to 'txn_id'
- self.db_pool.simple_upsert_txn(
- txn,
- "application_services_state",
- {"as_id": service.id},
- {"last_txn": txn_id},
- )
-
# Delete txn
self.db_pool.simple_delete_txn(
txn,
@@ -452,16 +454,15 @@ class ApplicationServiceTransactionWorkerStore(
% (stream_type,)
)
- def set_appservice_stream_type_pos_txn(txn: LoggingTransaction) -> None:
- stream_id_type = "%s_stream_id" % stream_type
- txn.execute(
- "UPDATE application_services_state SET %s = ? WHERE as_id=?"
- % stream_id_type,
- (pos, service.id),
- )
-
- await self.db_pool.runInteraction(
- "set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn
+ # this may be the first time that we're recording any state for this AS, so
+ # we don't yet know if a row for it exists; hence we have to upsert here.
+ await self.db_pool.simple_upsert(
+ table="application_services_state",
+ keyvalues={"as_id": service.id},
+ values={f"{stream_type}_stream_id": pos},
+ # no need to lock when emulating upsert: as_id is a unique key
+ lock=False,
+ desc="set_appservice_stream_type_pos",
)
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 20c344faea..da98f05e03 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -61,7 +61,9 @@ Changes in SCHEMA_VERSION = 68:
Changes in SCHEMA_VERSION = 69:
- We now write to `device_lists_changes_in_room` table.
- - Use sequence to generate future `application_services_txns.txn_id`s
+ - We now use a PostgreSQL sequence to generate future txn_ids for
+ `application_services_txns`. `application_services_state.last_txn` is no longer
+ updated.
Changes in SCHEMA_VERSION = 70:
- event_reference_hashes is no longer written to.
@@ -71,6 +73,7 @@ Changes in SCHEMA_VERSION = 70:
SCHEMA_COMPAT_VERSION = (
# We now assume that `device_lists_changes_in_room` has been filled out for
# recent device_list_updates.
+ # ... and that `application_services_state.last_txn` is not used.
69
)
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
|