summary refs log tree commit diff
path: root/synapse/storage/devices.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2018-11-02 10:50:53 +0000
committerGitHub <noreply@github.com>2018-11-02 10:50:53 +0000
commitefb9343c8ce31441387898fb7987c44f2677be46 (patch)
tree5ca4d9d79884c1e14d909ef8f3b1a937a4744c4c /synapse/storage/devices.py
parentMerge pull request #4133 from matrix-org/travis/fix-terms-auth (diff)
parentchangelog (diff)
downloadsynapse-efb9343c8ce31441387898fb7987c44f2677be46.tar.xz
Merge pull request #4132 from matrix-org/rav/fix_device_list_locking
Fix locked upsert on device_lists_remote_cache
Diffstat (limited to 'synapse/storage/devices.py')
-rw-r--r--synapse/storage/devices.py94
1 files changed, 87 insertions, 7 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 62497ab63f..ecdab34e7d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -22,14 +22,19 @@ from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
-from ._base import Cache, SQLBaseStore, db_to_json
+from ._base import Cache, db_to_json
 
 logger = logging.getLogger(__name__)
 
+DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
+    "drop_device_list_streams_non_unique_indexes"
+)
 
-class DeviceStore(SQLBaseStore):
+
+class DeviceStore(BackgroundUpdateStore):
     def __init__(self, db_conn, hs):
         super(DeviceStore, self).__init__(db_conn, hs)
 
@@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore):
             columns=["user_id", "device_id"],
         )
 
+        # create a unique index on device_lists_remote_cache
+        self.register_background_index_update(
+            "device_lists_remote_cache_unique_idx",
+            index_name="device_lists_remote_cache_unique_id",
+            table="device_lists_remote_cache",
+            columns=["user_id", "device_id"],
+            unique=True,
+        )
+
+        # And one on device_lists_remote_extremeties
+        self.register_background_index_update(
+            "device_lists_remote_extremeties_unique_idx",
+            index_name="device_lists_remote_extremeties_unique_idx",
+            table="device_lists_remote_extremeties",
+            columns=["user_id"],
+            unique=True,
+        )
+
+        # once they complete, we can remove the old non-unique indexes.
+        self.register_background_update_handler(
+            DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
+            self._drop_device_list_streams_non_unique_indexes,
+        )
+
     @defer.inlineCallbacks
     def store_device(self, user_id, device_id,
                      initial_device_display_name):
@@ -239,7 +268,19 @@ class DeviceStore(SQLBaseStore):
 
     def update_remote_device_list_cache_entry(self, user_id, device_id, content,
                                               stream_id):
-        """Updates a single user's device in the cache.
+        """Updates a single device in the cache of a remote user's devicelist.
+
+        Note: assumes that we are the only thread that can be updating this user's
+        device list.
+
+        Args:
+            user_id (str): User to update device list for
+            device_id (str): ID of decivice being updated
+            content (dict): new data on this device
+            stream_id (int): the version of the device list
+
+        Returns:
+            Deferred[None]
         """
         return self.runInteraction(
             "update_remote_device_list_cache_entry",
@@ -272,7 +313,11 @@ class DeviceStore(SQLBaseStore):
                 },
                 values={
                     "content": json.dumps(content),
-                }
+                },
+
+                # we don't need to lock, because we assume we are the only thread
+                # updating this user's devices.
+                lock=False,
             )
 
         txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
@@ -289,11 +334,26 @@ class DeviceStore(SQLBaseStore):
             },
             values={
                 "stream_id": stream_id,
-            }
+            },
+
+            # again, we can assume we are the only thread updating this user's
+            # extremity.
+            lock=False,
         )
 
     def update_remote_device_list_cache(self, user_id, devices, stream_id):
-        """Replace the cache of the remote user's devices.
+        """Replace the entire cache of the remote user's devices.
+
+        Note: assumes that we are the only thread that can be updating this user's
+        device list.
+
+        Args:
+            user_id (str): User to update device list for
+            devices (list[dict]): list of device objects supplied over federation
+            stream_id (int): the version of the device list
+
+        Returns:
+            Deferred[None]
         """
         return self.runInteraction(
             "update_remote_device_list_cache",
@@ -338,7 +398,11 @@ class DeviceStore(SQLBaseStore):
             },
             values={
                 "stream_id": stream_id,
-            }
+            },
+
+            # we don't need to lock, because we can assume we are the only thread
+            # updating this user's extremity.
+            lock=False,
         )
 
     def get_devices_by_remote(self, destination, from_stream_id):
@@ -722,3 +786,19 @@ class DeviceStore(SQLBaseStore):
             "_prune_old_outbound_device_pokes",
             _prune_txn,
         )
+
+    @defer.inlineCallbacks
+    def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+        def f(conn):
+            txn = conn.cursor()
+            txn.execute(
+                "DROP INDEX IF EXISTS device_lists_remote_cache_id"
+            )
+            txn.execute(
+                "DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
+            )
+            txn.close()
+
+        yield self.runWithConnection(f)
+        yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+        defer.returnValue(1)