diff options
author | Erik Johnston <erik@matrix.org> | 2017-03-20 17:25:41 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-03-20 17:25:41 +0000 |
commit | e0e214556a82370528ef1f9943773c42270c5336 (patch) | |
tree | 7fcbb016a63202eb1733e6b69a4999ba94193e8c /synapse/storage/devices.py | |
parent | Merge pull request #2028 from majewsky/readme-fix-1 (diff) | |
parent | Bump changelog and version (diff) | |
download | synapse-e0e214556a82370528ef1f9943773c42270c5336.tar.xz |
Merge branch 'release-v0.19.3' of github.com:matrix-org/synapse v0.19.3
Diffstat (limited to 'synapse/storage/devices.py')
-rw-r--r-- | synapse/storage/devices.py | 139 |
1 files changed, 92 insertions, 47 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8e17800364..bd56ba2515 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -19,6 +19,8 @@ from twisted.internet import defer from synapse.api.errors import StoreError from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks + logger = logging.getLogger(__name__) @@ -31,6 +33,13 @@ class DeviceStore(SQLBaseStore): self._prune_old_outbound_device_pokes, 60 * 60 * 1000 ) + self.register_background_index_update( + "device_lists_stream_idx", + index_name="device_lists_stream_user_id", + table="device_lists_stream", + columns=["user_id", "device_id"], + ) + @defer.inlineCallbacks def store_device(self, user_id, device_id, initial_device_display_name): @@ -144,6 +153,7 @@ class DeviceStore(SQLBaseStore): defer.returnValue({d["device_id"]: d for d in devices}) + @cached(max_entries=10000) def get_device_list_last_stream_id_for_remote(self, user_id): """Get the last stream_id we got for a user. May be None if we haven't got any information for them. @@ -156,16 +166,36 @@ class DeviceStore(SQLBaseStore): allow_none=True, ) + @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote", + list_name="user_ids", inlineCallbacks=True) + def get_device_list_last_stream_id_for_remotes(self, user_ids): + rows = yield self._simple_select_many_batch( + table="device_lists_remote_extremeties", + column="user_id", + iterable=user_ids, + retcols=("user_id", "stream_id",), + desc="get_user_devices_from_cache", + ) + + results = {user_id: None for user_id in user_ids} + results.update({ + row["user_id"]: row["stream_id"] for row in rows + }) + + defer.returnValue(results) + + @defer.inlineCallbacks def mark_remote_user_device_list_as_unsubscribed(self, user_id): """Mark that we no longer track device lists for remote user. """ - return self._simple_delete( + yield self._simple_delete( table="device_lists_remote_extremeties", keyvalues={ "user_id": user_id, }, desc="mark_remote_user_device_list_as_unsubscribed", ) + self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): @@ -191,6 +221,12 @@ class DeviceStore(SQLBaseStore): } ) + txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,)) + txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) + txn.call_after( + self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) + ) + self._simple_upsert_txn( txn, table="device_lists_remote_extremeties", @@ -234,6 +270,12 @@ class DeviceStore(SQLBaseStore): ] ) + txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) + txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,)) + txn.call_after( + self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) + ) + self._simple_upsert_txn( txn, table="device_lists_remote_extremeties", @@ -320,6 +362,7 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, results) + @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -332,27 +375,11 @@ class DeviceStore(SQLBaseStore): a set of user_ids and results_map is a mapping of user_id -> device_id -> device_info """ - return self.runInteraction( - "get_user_devices_from_cache", self._get_user_devices_from_cache_txn, - query_list, + user_ids = set(user_id for user_id, _ in query_list) + user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) + user_ids_in_cache = set( + user_id for user_id, stream_id in user_map.items() if stream_id ) - - def _get_user_devices_from_cache_txn(self, txn, query_list): - user_ids = {user_id for user_id, _ in query_list} - - user_ids_in_cache = set() - for user_id in user_ids: - stream_ids = self._simple_select_onecol_txn( - txn, - table="device_lists_remote_extremeties", - keyvalues={ - "user_id": user_id, - }, - retcol="stream_id", - ) - if stream_ids: - user_ids_in_cache.add(user_id) - user_ids_not_in_cache = user_ids - user_ids_in_cache results = {} @@ -361,32 +388,40 @@ class DeviceStore(SQLBaseStore): continue if device_id: - content = self._simple_select_one_onecol_txn( - txn, - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - retcol="content", - ) - results.setdefault(user_id, {})[device_id] = json.loads(content) + device = yield self._get_cached_user_device(user_id, device_id) + results.setdefault(user_id, {})[device_id] = device else: - devices = self._simple_select_list_txn( - txn, - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - }, - retcols=("device_id", "content"), - ) - results[user_id] = { - device["device_id"]: json.loads(device["content"]) - for device in devices - } - user_ids_in_cache.discard(user_id) + results[user_id] = yield self._get_cached_devices_for_user(user_id) + + defer.returnValue((user_ids_not_in_cache, results)) + + @cachedInlineCallbacks(num_args=2, tree=True) + def _get_cached_user_device(self, user_id, device_id): + content = yield self._simple_select_one_onecol( + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + retcol="content", + desc="_get_cached_user_device", + ) + defer.returnValue(json.loads(content)) - return user_ids_not_in_cache, results + @cachedInlineCallbacks() + def _get_cached_devices_for_user(self, user_id): + devices = yield self._simple_select_list( + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + }, + retcols=("device_id", "content"), + desc="_get_cached_devices_for_user", + ) + defer.returnValue({ + device["device_id"]: json.loads(device["content"]) + for device in devices + }) def get_devices_with_keys_by_user(self, user_id): """Get all devices (with any device keys) for a user @@ -473,7 +508,7 @@ class DeviceStore(SQLBaseStore): defer.returnValue(set(changed)) sql = """ - SELECT user_id FROM device_lists_stream WHERE stream_id > ? + SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? """ rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row[0] for row in rows)) @@ -489,7 +524,7 @@ class DeviceStore(SQLBaseStore): WHERE stream_id > ? """ return self._execute( - "get_users_and_hosts_device_list", None, + "get_all_device_list_changes_for_remotes", None, sql, from_key, ) @@ -518,6 +553,16 @@ class DeviceStore(SQLBaseStore): host, stream_id, ) + # Delete older entries in the table, as we really only care about + # when the latest change happened. + txn.executemany( + """ + DELETE FROM device_lists_stream + WHERE user_id = ? AND device_id = ? AND stream_id < ? + """, + [(user_id, device_id, stream_id) for device_id in device_ids] + ) + self._simple_insert_many_txn( txn, table="device_lists_stream", |