diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 5cf1a88399..454c0bc50c 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -16,13 +16,12 @@
import logging
import re
-from canonicaljson import json
-
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.util import json_encoder
logger = logging.getLogger(__name__)
@@ -162,20 +161,18 @@ class ApplicationServiceTransactionWorkerStore(
return result.get("state")
return None
- def set_appservice_state(self, service, state):
+ async def set_appservice_state(self, service, state) -> None:
"""Set the application service state.
Args:
service(ApplicationService): The service whose state to set.
state(ApplicationServiceState): The connectivity state to apply.
- Returns:
- A Deferred which resolves when the state was set successfully.
"""
- return self.db_pool.simple_upsert(
+ await self.db_pool.simple_upsert(
"application_services_state", {"as_id": service.id}, {"state": state}
)
- def create_appservice_txn(self, service, events):
+ async def create_appservice_txn(self, service, events):
"""Atomically creates a new transaction for this application service
with the given list of events.
@@ -204,7 +201,7 @@ class ApplicationServiceTransactionWorkerStore(
new_txn_id = max(highest_txn_id, last_txn_id) + 1
# Insert new txn into txn table
- event_ids = json.dumps([e.event_id for e in events])
+ event_ids = json_encoder.encode([e.event_id for e in events])
txn.execute(
"INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
"VALUES(?,?,?)",
@@ -212,20 +209,17 @@ class ApplicationServiceTransactionWorkerStore(
)
return AppServiceTransaction(service=service, id=new_txn_id, events=events)
- return self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
"create_appservice_txn", _create_appservice_txn
)
- def complete_appservice_txn(self, txn_id, service):
+ async def complete_appservice_txn(self, txn_id, service) -> None:
"""Completes an application service transaction.
Args:
txn_id(str): The transaction ID being completed.
service(ApplicationService): The application service which was sent
this transaction.
- Returns:
- A Deferred which resolves if this transaction was stored
- successfully.
"""
txn_id = int(txn_id)
@@ -261,7 +255,7 @@ class ApplicationServiceTransactionWorkerStore(
{"txn_id": txn_id, "as_id": service.id},
)
- return self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"complete_appservice_txn", _complete_appservice_txn
)
@@ -315,13 +309,13 @@ class ApplicationServiceTransactionWorkerStore(
else:
return int(last_txn_id[0]) # select 'last_txn' col
- def set_appservice_last_pos(self, pos):
+ async def set_appservice_last_pos(self, pos) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
)
- return self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"set_appservice_last_pos", set_appservice_last_pos_txn
)
|