diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/devices.py | 94 | ||||
-rw-r--r-- | synapse/storage/schema/delta/40/device_list_streams.sql | 9 | ||||
-rw-r--r-- | synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql | 36 |
3 files changed, 128 insertions, 11 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 62497ab63f..ecdab34e7d 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -22,14 +22,19 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList -from ._base import Cache, SQLBaseStore, db_to_json +from ._base import Cache, db_to_json logger = logging.getLogger(__name__) +DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( + "drop_device_list_streams_non_unique_indexes" +) -class DeviceStore(SQLBaseStore): + +class DeviceStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): super(DeviceStore, self).__init__(db_conn, hs) @@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore): columns=["user_id", "device_id"], ) + # create a unique index on device_lists_remote_cache + self.register_background_index_update( + "device_lists_remote_cache_unique_idx", + index_name="device_lists_remote_cache_unique_id", + table="device_lists_remote_cache", + columns=["user_id", "device_id"], + unique=True, + ) + + # And one on device_lists_remote_extremeties + self.register_background_index_update( + "device_lists_remote_extremeties_unique_idx", + index_name="device_lists_remote_extremeties_unique_idx", + table="device_lists_remote_extremeties", + columns=["user_id"], + unique=True, + ) + + # once they complete, we can remove the old non-unique indexes. + self.register_background_update_handler( + DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES, + self._drop_device_list_streams_non_unique_indexes, + ) + @defer.inlineCallbacks def store_device(self, user_id, device_id, initial_device_display_name): @@ -239,7 +268,19 @@ class DeviceStore(SQLBaseStore): def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): - """Updates a single user's device in the cache. + """Updates a single device in the cache of a remote user's devicelist. + + Note: assumes that we are the only thread that can be updating this user's + device list. + + Args: + user_id (str): User to update device list for + device_id (str): ID of decivice being updated + content (dict): new data on this device + stream_id (int): the version of the device list + + Returns: + Deferred[None] """ return self.runInteraction( "update_remote_device_list_cache_entry", @@ -272,7 +313,11 @@ class DeviceStore(SQLBaseStore): }, values={ "content": json.dumps(content), - } + }, + + # we don't need to lock, because we assume we are the only thread + # updating this user's devices. + lock=False, ) txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,)) @@ -289,11 +334,26 @@ class DeviceStore(SQLBaseStore): }, values={ "stream_id": stream_id, - } + }, + + # again, we can assume we are the only thread updating this user's + # extremity. + lock=False, ) def update_remote_device_list_cache(self, user_id, devices, stream_id): - """Replace the cache of the remote user's devices. + """Replace the entire cache of the remote user's devices. + + Note: assumes that we are the only thread that can be updating this user's + device list. + + Args: + user_id (str): User to update device list for + devices (list[dict]): list of device objects supplied over federation + stream_id (int): the version of the device list + + Returns: + Deferred[None] """ return self.runInteraction( "update_remote_device_list_cache", @@ -338,7 +398,11 @@ class DeviceStore(SQLBaseStore): }, values={ "stream_id": stream_id, - } + }, + + # we don't need to lock, because we can assume we are the only thread + # updating this user's extremity. + lock=False, ) def get_devices_by_remote(self, destination, from_stream_id): @@ -722,3 +786,19 @@ class DeviceStore(SQLBaseStore): "_prune_old_outbound_device_pokes", _prune_txn, ) + + @defer.inlineCallbacks + def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): + def f(conn): + txn = conn.cursor() + txn.execute( + "DROP INDEX IF EXISTS device_lists_remote_cache_id" + ) + txn.execute( + "DROP INDEX IF EXISTS device_lists_remote_extremeties_id" + ) + txn.close() + + yield self.runWithConnection(f) + yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES) + defer.returnValue(1) diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql index 54841b3843..dd6dcb65f1 100644 --- a/synapse/storage/schema/delta/40/device_list_streams.sql +++ b/synapse/storage/schema/delta/40/device_list_streams.sql @@ -20,9 +20,6 @@ CREATE TABLE device_lists_remote_cache ( content TEXT NOT NULL ); -CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); - - -- The last update we got for a user. Empty if we're not receiving updates for -- that user. CREATE TABLE device_lists_remote_extremeties ( @@ -30,7 +27,11 @@ CREATE TABLE device_lists_remote_extremeties ( stream_id TEXT NOT NULL ); -CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id); +-- we used to create non-unique indexes on these tables, but as of update 52 we create +-- unique indexes concurrently: +-- +-- CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); +-- CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id); -- Stream of device lists updates. Includes both local and remotes diff --git a/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql new file mode 100644 index 0000000000..bfa49e6f92 --- /dev/null +++ b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql @@ -0,0 +1,36 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- register a background update which will create a unique index on +-- device_lists_remote_cache +INSERT into background_updates (update_name, progress_json) + VALUES ('device_lists_remote_cache_unique_idx', '{}'); + +-- and one on device_lists_remote_extremeties +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ( + 'device_lists_remote_extremeties_unique_idx', '{}', + + -- doesn't really depend on this, but we need to make sure both happen + -- before we drop the old indexes. + 'device_lists_remote_cache_unique_idx' + ); + +-- once they complete, we can drop the old indexes. +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ( + 'drop_device_list_streams_non_unique_indexes', '{}', + 'device_lists_remote_extremeties_unique_idx' + ); |