summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-07-16 11:32:19 -0400
committerGitHub <noreply@github.com>2020-07-16 11:32:19 -0400
commitf460da6031d01b2b271ded097ed6be65fd1b24f9 (patch)
tree46e683fe3355ae429e9c4cae4b24d85086479270 /synapse/storage
parentCombine nginx federation server blocks (#7823) (diff)
downloadsynapse-f460da6031d01b2b271ded097ed6be65fd1b24f9.tar.xz
Consistently use `db_to_json` to convert from database values to JSON objects. (#7849)
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py4
-rw-r--r--synapse/storage/background_updates.py5
-rw-r--r--synapse/storage/data_stores/main/account_data.py16
-rw-r--r--synapse/storage/data_stores/main/appservice.py4
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py6
-rw-r--r--synapse/storage/data_stores/main/devices.py2
-rw-r--r--synapse/storage/data_stores/main/e2e_room_keys.py10
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py2
-rw-r--r--synapse/storage/data_stores/main/event_push_actions.py4
-rw-r--r--synapse/storage/data_stores/main/events.py9
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py14
-rw-r--r--synapse/storage/data_stores/main/events_worker.py7
-rw-r--r--synapse/storage/data_stores/main/group_server.py22
-rw-r--r--synapse/storage/data_stores/main/push_rule.py6
-rw-r--r--synapse/storage/data_stores/main/pusher.py6
-rw-r--r--synapse/storage/data_stores/main/receipts.py8
-rw-r--r--synapse/storage/data_stores/main/room.py8
-rw-r--r--synapse/storage/data_stores/main/roommember.py5
-rw-r--r--synapse/storage/data_stores/main/search.py6
-rw-r--r--synapse/storage/data_stores/main/tags.py5
-rw-r--r--synapse/storage/data_stores/main/ui_auth.py12
21 files changed, 79 insertions, 82 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bfce541ca7..985a042869 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -100,8 +100,8 @@ def db_to_json(db_content):
     if isinstance(db_content, memoryview):
         db_content = db_content.tobytes()
 
-    # Decode it to a Unicode string before feeding it to json.loads, so we
-    # consistenty get a Unicode-containing object out.
+    # Decode it to a Unicode string before feeding it to json.loads, since
+    # Python 3.5 does not support deserializing bytes.
     if isinstance(db_content, (bytes, bytearray)):
         db_content = db_content.decode("utf8")
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 59f3394b0a..018826ef69 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -249,7 +249,10 @@ class BackgroundUpdater(object):
             retcol="progress_json",
         )
 
-        progress = json.loads(progress_json)
+        # Avoid a circular import.
+        from synapse.storage._base import db_to_json
+
+        progress = db_to_json(progress_json)
 
         time_start = self._clock.time_msec()
         items_updated = await update_handler(progress, batch_size)
diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py
index b58f04d00d..33cc372dfd 100644
--- a/synapse/storage/data_stores/main/account_data.py
+++ b/synapse/storage/data_stores/main/account_data.py
@@ -22,7 +22,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import Database
 from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@@ -77,7 +77,7 @@ class AccountDataWorkerStore(SQLBaseStore):
             )
 
             global_account_data = {
-                row["account_data_type"]: json.loads(row["content"]) for row in rows
+                row["account_data_type"]: db_to_json(row["content"]) for row in rows
             }
 
             rows = self.db.simple_select_list_txn(
@@ -90,7 +90,7 @@ class AccountDataWorkerStore(SQLBaseStore):
             by_room = {}
             for row in rows:
                 room_data = by_room.setdefault(row["room_id"], {})
-                room_data[row["account_data_type"]] = json.loads(row["content"])
+                room_data[row["account_data_type"]] = db_to_json(row["content"])
 
             return global_account_data, by_room
 
@@ -113,7 +113,7 @@ class AccountDataWorkerStore(SQLBaseStore):
         )
 
         if result:
-            return json.loads(result)
+            return db_to_json(result)
         else:
             return None
 
@@ -137,7 +137,7 @@ class AccountDataWorkerStore(SQLBaseStore):
             )
 
             return {
-                row["account_data_type"]: json.loads(row["content"]) for row in rows
+                row["account_data_type"]: db_to_json(row["content"]) for row in rows
             }
 
         return self.db.runInteraction(
@@ -170,7 +170,7 @@ class AccountDataWorkerStore(SQLBaseStore):
                 allow_none=True,
             )
 
-            return json.loads(content_json) if content_json else None
+            return db_to_json(content_json) if content_json else None
 
         return self.db.runInteraction(
             "get_account_data_for_room_and_type", get_account_data_for_room_and_type_txn
@@ -255,7 +255,7 @@ class AccountDataWorkerStore(SQLBaseStore):
 
             txn.execute(sql, (user_id, stream_id))
 
-            global_account_data = {row[0]: json.loads(row[1]) for row in txn}
+            global_account_data = {row[0]: db_to_json(row[1]) for row in txn}
 
             sql = (
                 "SELECT room_id, account_data_type, content FROM room_account_data"
@@ -267,7 +267,7 @@ class AccountDataWorkerStore(SQLBaseStore):
             account_data_by_room = {}
             for row in txn:
                 room_account_data = account_data_by_room.setdefault(row[0], {})
-                room_account_data[row[1]] = json.loads(row[2])
+                room_account_data[row[1]] = db_to_json(row[2])
 
             return global_account_data, account_data_by_room
 
diff --git a/synapse/storage/data_stores/main/appservice.py b/synapse/storage/data_stores/main/appservice.py
index 7a1fe8cdd2..56659fed37 100644
--- a/synapse/storage/data_stores/main/appservice.py
+++ b/synapse/storage/data_stores/main/appservice.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
 from synapse.storage.database import Database
 
@@ -303,7 +303,7 @@ class ApplicationServiceTransactionWorkerStore(
         if not entry:
             return None
 
-        event_ids = json.loads(entry["event_ids"])
+        event_ids = db_to_json(entry["event_ids"])
 
         events = yield self.get_events_as_list(event_ids)
 
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index d313b9705f..ff86f18d40 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -21,7 +21,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
-from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import Database
 from synapse.util.caches.expiringcache import ExpiringCache
 
@@ -65,7 +65,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(json.loads(row[1]))
+                messages.append(db_to_json(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return messages, stream_pos
@@ -173,7 +173,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(json.loads(row[1]))
+                messages.append(db_to_json(row[1]))
             if len(messages) < limit:
                 log_kv({"message": "Set stream position to current position"})
                 stream_pos = current_stream_id
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 343cf9a2d5..45581a6500 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -577,7 +577,7 @@ class DeviceWorkerStore(SQLBaseStore):
             rows = yield self.db.execute(
                 "get_users_whose_signatures_changed", None, sql, user_id, from_key
             )
-            return {user for row in rows for user in json.loads(row[0])}
+            return {user for row in rows for user in db_to_json(row[0])}
         else:
             return set()
 
diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py
index 23f4570c4b..615364f018 100644
--- a/synapse/storage/data_stores/main/e2e_room_keys.py
+++ b/synapse/storage/data_stores/main/e2e_room_keys.py
@@ -14,13 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import json
+from canonicaljson import json
 
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 from synapse.logging.opentracing import log_kv, trace
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, db_to_json
 
 
 class EndToEndRoomKeyStore(SQLBaseStore):
@@ -148,7 +148,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 "forwarded_count": row["forwarded_count"],
                 # is_verified must be returned to the client as a boolean
                 "is_verified": bool(row["is_verified"]),
-                "session_data": json.loads(row["session_data"]),
+                "session_data": db_to_json(row["session_data"]),
             }
 
         return sessions
@@ -222,7 +222,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 "first_message_index": row[2],
                 "forwarded_count": row[3],
                 "is_verified": row[4],
-                "session_data": json.loads(row[5]),
+                "session_data": db_to_json(row[5]),
             }
 
         return ret
@@ -319,7 +319,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
                 retcols=("version", "algorithm", "auth_data", "etag"),
             )
-            result["auth_data"] = json.loads(result["auth_data"])
+            result["auth_data"] = db_to_json(result["auth_data"])
             result["version"] = str(result["version"])
             if result["etag"] is None:
                 result["etag"] = 0
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 6c3cff82e1..317c07a829 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -366,7 +366,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             for row in rows:
                 user_id = row["user_id"]
                 key_type = row["keytype"]
-                key = json.loads(row["keydata"])
+                key = db_to_json(row["keydata"])
                 user_info = result.setdefault(user_id, {})
                 user_info[key_type] = key
 
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index bc9f4f08ea..504babaa7e 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -21,7 +21,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
 from synapse.storage.database import Database
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
@@ -58,7 +58,7 @@ def _deserialize_action(actions, is_highlight):
     """Custom deserializer for actions. This allows us to "compress" common actions
     """
     if actions:
-        return json.loads(actions)
+        return db_to_json(actions)
 
     if is_highlight:
         return DEFAULT_HIGHLIGHT_ACTION
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 66f01aad84..6f2e0d15cc 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -20,7 +20,6 @@ from collections import OrderedDict, namedtuple
 from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
 
 import attr
-from canonicaljson import json
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -32,7 +31,7 @@ from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.logging.utils import log_function
-from synapse.storage._base import make_in_list_sql_clause
+from synapse.storage._base import db_to_json, make_in_list_sql_clause
 from synapse.storage.data_stores.main.search import SearchEntry
 from synapse.storage.database import Database, LoggingTransaction
 from synapse.storage.util.id_generators import StreamIdGenerator
@@ -236,7 +235,7 @@ class PersistEventsStore:
             )
 
             txn.execute(sql + clause, args)
-            results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
+            results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed"))
 
         for chunk in batch_iter(event_ids, 100):
             yield self.db.runInteraction(
@@ -297,7 +296,7 @@ class PersistEventsStore:
                     if prev_event_id in existing_prevs:
                         continue
 
-                    soft_failed = json.loads(metadata).get("soft_failed")
+                    soft_failed = db_to_json(metadata).get("soft_failed")
                     if soft_failed or rejected:
                         to_recursively_check.append(prev_event_id)
                         existing_prevs.add(prev_event_id)
@@ -583,7 +582,7 @@ class PersistEventsStore:
         txn.execute(sql, (room_id, EventTypes.Create, ""))
         row = txn.fetchone()
         if row:
-            event_json = json.loads(row[0])
+            event_json = db_to_json(row[0])
             content = event_json.get("content", {})
             creator = content.get("creator")
             room_version_id = content.get("room_version", RoomVersions.V1.identifier)
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 62d28f44dc..663c94b24f 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -15,12 +15,10 @@
 
 import logging
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventContentFields
-from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import Database
 
 logger = logging.getLogger(__name__)
@@ -125,7 +123,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             for row in rows:
                 try:
                     event_id = row[1]
-                    event_json = json.loads(row[2])
+                    event_json = db_to_json(row[2])
                     sender = event_json["sender"]
                     content = event_json["content"]
 
@@ -208,7 +206,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
                 for row in ev_rows:
                     event_id = row["event_id"]
-                    event_json = json.loads(row["json"])
+                    event_json = db_to_json(row["json"])
                     try:
                         origin_server_ts = event_json["origin_server_ts"]
                     except (KeyError, AttributeError):
@@ -317,7 +315,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
                 soft_failed = False
                 if metadata:
-                    soft_failed = json.loads(metadata).get("soft_failed")
+                    soft_failed = db_to_json(metadata).get("soft_failed")
 
                 if soft_failed or rejected:
                     soft_failed_events_to_lookup.add(event_id)
@@ -358,7 +356,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
                     graph[event_id] = {prev_event_id}
 
-                    soft_failed = json.loads(metadata).get("soft_failed")
+                    soft_failed = db_to_json(metadata).get("soft_failed")
                     if soft_failed or rejected:
                         soft_failed_events_to_lookup.add(event_id)
                     else:
@@ -543,7 +541,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             last_row_event_id = ""
             for (event_id, event_json_raw) in results:
                 try:
-                    event_json = json.loads(event_json_raw)
+                    event_json = db_to_json(event_json_raw)
 
                     self.db.simple_insert_many_txn(
                         txn=txn,
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 01cad7d4fa..a7d685f630 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -21,7 +21,6 @@ import threading
 from collections import namedtuple
 from typing import List, Optional, Tuple
 
-from canonicaljson import json
 from constantly import NamedConstant, Names
 
 from twisted.internet import defer
@@ -40,7 +39,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
-from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import Database
 from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import get_domain_from_id
@@ -611,8 +610,8 @@ class EventsWorkerStore(SQLBaseStore):
             if not allow_rejected and rejected_reason:
                 continue
 
-            d = json.loads(row["json"])
-            internal_metadata = json.loads(row["internal_metadata"])
+            d = db_to_json(row["json"])
+            internal_metadata = db_to_json(row["internal_metadata"])
 
             format_version = row["format_version"]
             if format_version is None:
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index 4fb9f9850c..01ff561e1a 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -21,7 +21,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, db_to_json
 
 # The category ID for the "default" category. We don't store as null in the
 # database to avoid the fun of null != null
@@ -197,7 +197,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             categories = {
                 row[0]: {
                     "is_public": row[1],
-                    "profile": json.loads(row[2]),
+                    "profile": db_to_json(row[2]),
                     "order": row[3],
                 }
                 for row in txn
@@ -221,7 +221,7 @@ class GroupServerWorkerStore(SQLBaseStore):
         return {
             row["category_id"]: {
                 "is_public": row["is_public"],
-                "profile": json.loads(row["profile"]),
+                "profile": db_to_json(row["profile"]),
             }
             for row in rows
         }
@@ -235,7 +235,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             desc="get_group_category",
         )
 
-        category["profile"] = json.loads(category["profile"])
+        category["profile"] = db_to_json(category["profile"])
 
         return category
 
@@ -251,7 +251,7 @@ class GroupServerWorkerStore(SQLBaseStore):
         return {
             row["role_id"]: {
                 "is_public": row["is_public"],
-                "profile": json.loads(row["profile"]),
+                "profile": db_to_json(row["profile"]),
             }
             for row in rows
         }
@@ -265,7 +265,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             desc="get_group_role",
         )
 
-        role["profile"] = json.loads(role["profile"])
+        role["profile"] = db_to_json(role["profile"])
 
         return role
 
@@ -333,7 +333,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             roles = {
                 row[0]: {
                     "is_public": row[1],
-                    "profile": json.loads(row[2]),
+                    "profile": db_to_json(row[2]),
                     "order": row[3],
                 }
                 for row in txn
@@ -462,7 +462,7 @@ class GroupServerWorkerStore(SQLBaseStore):
 
         now = int(self._clock.time_msec())
         if row and now < row["valid_until_ms"]:
-            return json.loads(row["attestation_json"])
+            return db_to_json(row["attestation_json"])
 
         return None
 
@@ -489,7 +489,7 @@ class GroupServerWorkerStore(SQLBaseStore):
                     "group_id": row[0],
                     "type": row[1],
                     "membership": row[2],
-                    "content": json.loads(row[3]),
+                    "content": db_to_json(row[3]),
                 }
                 for row in txn
             ]
@@ -519,7 +519,7 @@ class GroupServerWorkerStore(SQLBaseStore):
                     "group_id": group_id,
                     "membership": membership,
                     "type": gtype,
-                    "content": json.loads(content_json),
+                    "content": db_to_json(content_json),
                 }
                 for group_id, membership, gtype, content_json in txn
             ]
@@ -567,7 +567,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             """
             txn.execute(sql, (last_id, current_id, limit))
             updates = [
-                (stream_id, (group_id, user_id, gtype, json.loads(content_json)))
+                (stream_id, (group_id, user_id, gtype, db_to_json(content_json)))
                 for stream_id, group_id, user_id, gtype, content_json in txn
             ]
 
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index f6e78ca590..d181488db7 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -24,7 +24,7 @@ from twisted.internet import defer
 
 from synapse.push.baserules import list_with_base_rules
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
 from synapse.storage.data_stores.main.pusher import PusherWorkerStore
@@ -43,8 +43,8 @@ def _load_rules(rawrules, enabled_map):
     ruleslist = []
     for rawrule in rawrules:
         rule = dict(rawrule)
-        rule["conditions"] = json.loads(rawrule["conditions"])
-        rule["actions"] = json.loads(rawrule["actions"])
+        rule["conditions"] = db_to_json(rawrule["conditions"])
+        rule["actions"] = db_to_json(rawrule["actions"])
         rule["default"] = False
         ruleslist.append(rule)
 
diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py
index 5461016240..e18f1ca87c 100644
--- a/synapse/storage/data_stores/main/pusher.py
+++ b/synapse/storage/data_stores/main/pusher.py
@@ -17,11 +17,11 @@
 import logging
 from typing import Iterable, Iterator, List, Tuple
 
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
 
 from twisted.internet import defer
 
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 
 logger = logging.getLogger(__name__)
@@ -36,7 +36,7 @@ class PusherWorkerStore(SQLBaseStore):
         for r in rows:
             dataJson = r["data"]
             try:
-                r["data"] = json.loads(dataJson)
+                r["data"] = db_to_json(dataJson)
             except Exception as e:
                 logger.warning(
                     "Invalid JSON in data for pusher %d: %s, %s",
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 8f5505bd67..1d723f2d34 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -22,7 +22,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
-from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import Database
 from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.util.async_helpers import ObservableDeferred
@@ -203,7 +203,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
         for row in rows:
             content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[
                 row["user_id"]
-            ] = json.loads(row["data"])
+            ] = db_to_json(row["data"])
 
         return [{"type": "m.receipt", "room_id": room_id, "content": content}]
 
@@ -260,7 +260,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             event_entry = room_event["content"].setdefault(row["event_id"], {})
             receipt_type = event_entry.setdefault(row["receipt_type"], {})
 
-            receipt_type[row["user_id"]] = json.loads(row["data"])
+            receipt_type[row["user_id"]] = db_to_json(row["data"])
 
         results = {
             room_id: [results[room_id]] if room_id in results else []
@@ -329,7 +329,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             """
             txn.execute(sql, (last_id, current_id, limit))
 
-            updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn]
+            updates = [(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn]
 
             limited = False
             upper_bound = current_id
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index b4817d693f..d2e1e36e7f 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -28,7 +28,7 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import RoomVersion, RoomVersions
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.data_stores.main.search import SearchStore
 from synapse.storage.database import Database, LoggingTransaction
 from synapse.types import ThirdPartyInstanceID
@@ -670,7 +670,7 @@ class RoomWorkerStore(SQLBaseStore):
             next_token = None
             for stream_ordering, content_json in txn:
                 next_token = stream_ordering
-                event_json = json.loads(content_json)
+                event_json = db_to_json(content_json)
                 content = event_json["content"]
                 content_url = content.get("url")
                 thumbnail_url = content.get("info", {}).get("thumbnail_url")
@@ -915,7 +915,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
                 if not row["json"]:
                     retention_policy = {}
                 else:
-                    ev = json.loads(row["json"])
+                    ev = db_to_json(row["json"])
                     retention_policy = ev["content"]
 
                 self.db.simple_insert_txn(
@@ -971,7 +971,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
             updates = []
             for room_id, event_json in txn:
-                event_dict = json.loads(event_json)
+                event_dict = db_to_json(event_json)
                 room_version_id = event_dict.get("content", {}).get(
                     "room_version", RoomVersions.V1.identifier
                 )
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 44bab65eac..29765890ee 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -17,8 +17,6 @@
 import logging
 from typing import Iterable, List, Set
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
@@ -27,6 +25,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import (
     LoggingTransaction,
     SQLBaseStore,
+    db_to_json,
     make_in_list_sql_clause,
 )
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
@@ -938,7 +937,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
                 event_id = row["event_id"]
                 room_id = row["room_id"]
                 try:
-                    event_json = json.loads(row["json"])
+                    event_json = db_to_json(row["json"])
                     content = event_json["content"]
                 except Exception:
                     continue
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index a8381dc577..d52228297c 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -17,12 +17,10 @@ import logging
 import re
 from collections import namedtuple
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
-from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
 from synapse.storage.database import Database
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@@ -157,7 +155,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                     stream_ordering = row["stream_ordering"]
                     origin_server_ts = row["origin_server_ts"]
                     try:
-                        event_json = json.loads(row["json"])
+                        event_json = db_to_json(row["json"])
                         content = event_json["content"]
                     except Exception:
                         continue
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 290317fd94..bd7227773a 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -21,6 +21,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.storage._base import db_to_json
 from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore
 from synapse.util.caches.descriptors import cached
 
@@ -49,7 +50,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
             tags_by_room = {}
             for row in rows:
                 room_tags = tags_by_room.setdefault(row["room_id"], {})
-                room_tags[row["tag"]] = json.loads(row["content"])
+                room_tags[row["tag"]] = db_to_json(row["content"])
             return tags_by_room
 
         return deferred
@@ -180,7 +181,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
             retcols=("tag", "content"),
             desc="get_tags_for_room",
         ).addCallback(
-            lambda rows: {row["tag"]: json.loads(row["content"]) for row in rows}
+            lambda rows: {row["tag"]: db_to_json(row["content"]) for row in rows}
         )
 
 
diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py
index 4c044b1a15..5f1b919748 100644
--- a/synapse/storage/data_stores/main/ui_auth.py
+++ b/synapse/storage/data_stores/main/ui_auth.py
@@ -12,13 +12,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import json
 from typing import Any, Dict, Optional, Union
 
 import attr
+from canonicaljson import json
 
 from synapse.api.errors import StoreError
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.types import JsonDict
 from synapse.util import stringutils as stringutils
 
@@ -118,7 +118,7 @@ class UIAuthWorkerStore(SQLBaseStore):
             desc="get_ui_auth_session",
         )
 
-        result["clientdict"] = json.loads(result["clientdict"])
+        result["clientdict"] = db_to_json(result["clientdict"])
 
         return UIAuthSessionData(session_id, **result)
 
@@ -168,7 +168,7 @@ class UIAuthWorkerStore(SQLBaseStore):
             retcols=("stage_type", "result"),
             desc="get_completed_ui_auth_stages",
         ):
-            results[row["stage_type"]] = json.loads(row["result"])
+            results[row["stage_type"]] = db_to_json(row["result"])
 
         return results
 
@@ -224,7 +224,7 @@ class UIAuthWorkerStore(SQLBaseStore):
         )
 
         # Update it and add it back to the database.
-        serverdict = json.loads(result["serverdict"])
+        serverdict = db_to_json(result["serverdict"])
         serverdict[key] = value
 
         self.db.simple_update_one_txn(
@@ -254,7 +254,7 @@ class UIAuthWorkerStore(SQLBaseStore):
             desc="get_ui_auth_session_data",
         )
 
-        serverdict = json.loads(result["serverdict"])
+        serverdict = db_to_json(result["serverdict"])
 
         return serverdict.get(key, default)