diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index bc3f575b1e..ecdab34e7d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -22,14 +22,19 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from ._base import Cache, SQLBaseStore, db_to_json
+from ._base import Cache, db_to_json
logger = logging.getLogger(__name__)
+DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
+ "drop_device_list_streams_non_unique_indexes"
+)
-class DeviceStore(SQLBaseStore):
+
+class DeviceStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(DeviceStore, self).__init__(db_conn, hs)
@@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore):
columns=["user_id", "device_id"],
)
+ # create a unique index on device_lists_remote_cache
+ self.register_background_index_update(
+ "device_lists_remote_cache_unique_idx",
+ index_name="device_lists_remote_cache_unique_id",
+ table="device_lists_remote_cache",
+ columns=["user_id", "device_id"],
+ unique=True,
+ )
+
+ # And one on device_lists_remote_extremeties
+ self.register_background_index_update(
+ "device_lists_remote_extremeties_unique_idx",
+ index_name="device_lists_remote_extremeties_unique_idx",
+ table="device_lists_remote_extremeties",
+ columns=["user_id"],
+ unique=True,
+ )
+
+ # once they complete, we can remove the old non-unique indexes.
+ self.register_background_update_handler(
+ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
+ self._drop_device_list_streams_non_unique_indexes,
+ )
+
@defer.inlineCallbacks
def store_device(self, user_id, device_id,
initial_device_display_name):
@@ -757,3 +786,19 @@ class DeviceStore(SQLBaseStore):
"_prune_old_outbound_device_pokes",
_prune_txn,
)
+
+ @defer.inlineCallbacks
+ def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+ def f(conn):
+ txn = conn.cursor()
+ txn.execute(
+ "DROP INDEX IF EXISTS device_lists_remote_cache_id"
+ )
+ txn.execute(
+ "DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
+ )
+ txn.close()
+
+ yield self.runWithConnection(f)
+ yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+ defer.returnValue(1)
|