From f460da6031d01b2b271ded097ed6be65fd1b24f9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 16 Jul 2020 11:32:19 -0400 Subject: Consistently use `db_to_json` to convert from database values to JSON objects. (#7849) --- synapse/storage/data_stores/main/deviceinbox.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage/data_stores/main/deviceinbox.py') diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index d313b9705f..ff86f18d40 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -21,7 +21,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.logging.opentracing import log_kv, set_tag, trace -from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause +from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import Database from synapse.util.caches.expiringcache import ExpiringCache @@ -65,7 +65,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): messages = [] for row in txn: stream_pos = row[0] - messages.append(json.loads(row[1])) + messages.append(db_to_json(row[1])) if len(messages) < limit: stream_pos = current_stream_id return messages, stream_pos @@ -173,7 +173,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): messages = [] for row in txn: stream_pos = row[0] - messages.append(json.loads(row[1])) + messages.append(db_to_json(row[1])) if len(messages) < limit: log_kv({"message": "Set stream position to current position"}) stream_pos = current_stream_id -- cgit 1.5.1 From 2d2acc1cf2f2c8caa1272a14658b28ede23b664f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Jul 2020 17:03:27 +0100 Subject: Stop using 'device_max_stream_id' (#7882) It serves no purpose and updating everytime we write to the device inbox stream means all such transactions will conflict, causing lots of transaction failures and retries. --- changelog.d/7882.misc | 1 + synapse/replication/slave/storage/deviceinbox.py | 2 +- synapse/storage/data_stores/main/__init__.py | 2 +- synapse/storage/data_stores/main/deviceinbox.py | 3 --- 4 files changed, 3 insertions(+), 5 deletions(-) create mode 100644 changelog.d/7882.misc (limited to 'synapse/storage/data_stores/main/deviceinbox.py') diff --git a/changelog.d/7882.misc b/changelog.d/7882.misc new file mode 100644 index 0000000000..9002749335 --- /dev/null +++ b/changelog.d/7882.misc @@ -0,0 +1 @@ +Stop using `device_max_stream_id` table and just use `device_inbox.stream_id`. diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index bd394f6b00..a8a16dbc71 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -26,7 +26,7 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): def __init__(self, database: Database, db_conn, hs): super(SlavedDeviceInboxStore, self).__init__(database, db_conn, hs) self._device_inbox_id_gen = SlavedIdTracker( - db_conn, "device_max_stream_id", "stream_id" + db_conn, "device_inbox", "stream_id" ) self._device_inbox_stream_cache = StreamChangeCache( "DeviceInboxStreamChangeCache", diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index 4b4763c701..932458f651 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -128,7 +128,7 @@ class DataStore( db_conn, "presence_stream", "stream_id" ) self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_max_stream_id", "stream_id" + db_conn, "device_inbox", "stream_id" ) self._public_room_id_gen = StreamIdGenerator( db_conn, "public_room_list_stream", "stream_id" diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index ff86f18d40..da297b31fb 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -424,9 +424,6 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) def _add_messages_to_local_device_inbox_txn( self, txn, stream_id, messages_by_user_then_device ): - sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?" - txn.execute(sql, (stream_id, stream_id)) - local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} -- cgit 1.5.1