summary refs log tree commit diff
path: root/synapse/storage/devices.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/devices.py')
-rw-r--r--synapse/storage/devices.py100
1 files changed, 92 insertions, 8 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d10ff9e4b9..ecdab34e7d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -22,14 +22,19 @@ from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
-from ._base import Cache, SQLBaseStore, db_to_json
+from ._base import Cache, db_to_json
 
 logger = logging.getLogger(__name__)
 
+DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
+    "drop_device_list_streams_non_unique_indexes"
+)
 
-class DeviceStore(SQLBaseStore):
+
+class DeviceStore(BackgroundUpdateStore):
     def __init__(self, db_conn, hs):
         super(DeviceStore, self).__init__(db_conn, hs)
 
@@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore):
             columns=["user_id", "device_id"],
         )
 
+        # create a unique index on device_lists_remote_cache
+        self.register_background_index_update(
+            "device_lists_remote_cache_unique_idx",
+            index_name="device_lists_remote_cache_unique_id",
+            table="device_lists_remote_cache",
+            columns=["user_id", "device_id"],
+            unique=True,
+        )
+
+        # And one on device_lists_remote_extremeties
+        self.register_background_index_update(
+            "device_lists_remote_extremeties_unique_idx",
+            index_name="device_lists_remote_extremeties_unique_idx",
+            table="device_lists_remote_extremeties",
+            columns=["user_id"],
+            unique=True,
+        )
+
+        # once they complete, we can remove the old non-unique indexes.
+        self.register_background_update_handler(
+            DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
+            self._drop_device_list_streams_non_unique_indexes,
+        )
+
     @defer.inlineCallbacks
     def store_device(self, user_id, device_id,
                      initial_device_display_name):
@@ -239,7 +268,19 @@ class DeviceStore(SQLBaseStore):
 
     def update_remote_device_list_cache_entry(self, user_id, device_id, content,
                                               stream_id):
-        """Updates a single user's device in the cache.
+        """Updates a single device in the cache of a remote user's devicelist.
+
+        Note: assumes that we are the only thread that can be updating this user's
+        device list.
+
+        Args:
+            user_id (str): User to update device list for
+            device_id (str): ID of decivice being updated
+            content (dict): new data on this device
+            stream_id (int): the version of the device list
+
+        Returns:
+            Deferred[None]
         """
         return self.runInteraction(
             "update_remote_device_list_cache_entry",
@@ -272,7 +313,11 @@ class DeviceStore(SQLBaseStore):
                 },
                 values={
                     "content": json.dumps(content),
-                }
+                },
+
+                # we don't need to lock, because we assume we are the only thread
+                # updating this user's devices.
+                lock=False,
             )
 
         txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
@@ -289,11 +334,26 @@ class DeviceStore(SQLBaseStore):
             },
             values={
                 "stream_id": stream_id,
-            }
+            },
+
+            # again, we can assume we are the only thread updating this user's
+            # extremity.
+            lock=False,
         )
 
     def update_remote_device_list_cache(self, user_id, devices, stream_id):
-        """Replace the cache of the remote user's devices.
+        """Replace the entire cache of the remote user's devices.
+
+        Note: assumes that we are the only thread that can be updating this user's
+        device list.
+
+        Args:
+            user_id (str): User to update device list for
+            devices (list[dict]): list of device objects supplied over federation
+            stream_id (int): the version of the device list
+
+        Returns:
+            Deferred[None]
         """
         return self.runInteraction(
             "update_remote_device_list_cache",
@@ -338,7 +398,11 @@ class DeviceStore(SQLBaseStore):
             },
             values={
                 "stream_id": stream_id,
-            }
+            },
+
+            # we don't need to lock, because we can assume we are the only thread
+            # updating this user's extremity.
+            lock=False,
         )
 
     def get_devices_by_remote(self, destination, from_stream_id):
@@ -589,10 +653,14 @@ class DeviceStore(SQLBaseStore):
         combined list of changes to devices, and which destinations need to be
         poked. `destination` may be None if no destinations need to be poked.
         """
+        # We do a group by here as there can be a large number of duplicate
+        # entries, since we throw away device IDs.
         sql = """
-            SELECT stream_id, user_id, destination FROM device_lists_stream
+            SELECT MAX(stream_id) AS stream_id, user_id, destination
+            FROM device_lists_stream
             LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
             WHERE ? < stream_id AND stream_id <= ?
+            GROUP BY user_id, destination
         """
         return self._execute(
             "get_all_device_list_changes_for_remotes", None,
@@ -718,3 +786,19 @@ class DeviceStore(SQLBaseStore):
             "_prune_old_outbound_device_pokes",
             _prune_txn,
         )
+
+    @defer.inlineCallbacks
+    def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+        def f(conn):
+            txn = conn.cursor()
+            txn.execute(
+                "DROP INDEX IF EXISTS device_lists_remote_cache_id"
+            )
+            txn.execute(
+                "DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
+            )
+            txn.close()
+
+        yield self.runWithConnection(f)
+        yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+        defer.returnValue(1)