diff options
author | Erik Johnston <erik@matrix.org> | 2019-10-21 12:56:42 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-10-21 16:05:06 +0100 |
commit | c66a06ac6b69b0a03f5c6284ded980399e9df94e (patch) | |
tree | 01dfd3b9098a9ace759403744d122c18efbd97ff /synapse/storage/devices.py | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-c66a06ac6b69b0a03f5c6284ded980399e9df94e.tar.xz |
Move storage classes into a main "data store".
This is in preparation for having multiple data stores that offer different functionality, e.g. splitting out state or event storage.
Diffstat (limited to 'synapse/storage/devices.py')
-rw-r--r-- | synapse/storage/devices.py | 937 |
1 files changed, 0 insertions, 937 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py deleted file mode 100644 index ac5239e50a..0000000000 --- a/synapse/storage/devices.py +++ /dev/null @@ -1,937 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging - -from six import iteritems - -from canonicaljson import json - -from twisted.internet import defer - -from synapse.api.errors import StoreError -from synapse.logging.opentracing import ( - get_active_span_text_map, - set_tag, - trace, - whitelisted_homeserver, -) -from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage._base import ( - Cache, - SQLBaseStore, - db_to_json, - make_in_list_sql_clause, -) -from synapse.storage.background_updates import BackgroundUpdateStore -from synapse.util import batch_iter -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList - -logger = logging.getLogger(__name__) - -DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( - "drop_device_list_streams_non_unique_indexes" -) - - -class DeviceWorkerStore(SQLBaseStore): - def get_device(self, user_id, device_id): - """Retrieve a device. - - Args: - user_id (str): The ID of the user which owns the device - device_id (str): The ID of the device to retrieve - Returns: - defer.Deferred for a dict containing the device information - Raises: - StoreError: if the device is not found - """ - return self._simple_select_one( - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, - retcols=("user_id", "device_id", "display_name"), - desc="get_device", - ) - - @defer.inlineCallbacks - def get_devices_by_user(self, user_id): - """Retrieve all of a user's registered devices. - - Args: - user_id (str): - Returns: - defer.Deferred: resolves to a dict from device_id to a dict - containing "device_id", "user_id" and "display_name" for each - device. - """ - devices = yield self._simple_select_list( - table="devices", - keyvalues={"user_id": user_id}, - retcols=("user_id", "device_id", "display_name"), - desc="get_devices_by_user", - ) - - return {d["device_id"]: d for d in devices} - - @trace - @defer.inlineCallbacks - def get_devices_by_remote(self, destination, from_stream_id, limit): - """Get stream of updates to send to remote servers - - Returns: - Deferred[tuple[int, list[dict]]]: - current stream id (ie, the stream id of the last update included in the - response), and the list of updates - """ - now_stream_id = self._device_list_id_gen.get_current_token() - - has_changed = self._device_list_federation_stream_cache.has_entity_changed( - destination, int(from_stream_id) - ) - if not has_changed: - return now_stream_id, [] - - # We retrieve n+1 devices from the list of outbound pokes where n is - # our outbound device update limit. We then check if the very last - # device has the same stream_id as the second-to-last device. If so, - # then we ignore all devices with that stream_id and only send the - # devices with a lower stream_id. - # - # If when culling the list we end up with no devices afterwards, we - # consider the device update to be too large, and simply skip the - # stream_id; the rationale being that such a large device list update - # is likely an error. - updates = yield self.runInteraction( - "get_devices_by_remote", - self._get_devices_by_remote_txn, - destination, - from_stream_id, - now_stream_id, - limit + 1, - ) - - # Return an empty list if there are no updates - if not updates: - return now_stream_id, [] - - # if we have exceeded the limit, we need to exclude any results with the - # same stream_id as the last row. - if len(updates) > limit: - stream_id_cutoff = updates[-1][2] - now_stream_id = stream_id_cutoff - 1 - else: - stream_id_cutoff = None - - # Perform the equivalent of a GROUP BY - # - # Iterate through the updates list and copy non-duplicate - # (user_id, device_id) entries into a map, with the value being - # the max stream_id across each set of duplicate entries - # - # maps (user_id, device_id) -> (stream_id, opentracing_context) - # as long as their stream_id does not match that of the last row - # - # opentracing_context contains the opentracing metadata for the request - # that created the poke - # - # The most recent request's opentracing_context is used as the - # context which created the Edu. - - query_map = {} - for update in updates: - if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: - # Stop processing updates - break - - key = (update[0], update[1]) - - update_context = update[3] - update_stream_id = update[2] - - previous_update_stream_id, _ = query_map.get(key, (0, None)) - - if update_stream_id > previous_update_stream_id: - query_map[key] = (update_stream_id, update_context) - - # If we didn't find any updates with a stream_id lower than the cutoff, it - # means that there are more than limit updates all of which have the same - # steam_id. - - # That should only happen if a client is spamming the server with new - # devices, in which case E2E isn't going to work well anyway. We'll just - # skip that stream_id and return an empty list, and continue with the next - # stream_id next time. - if not query_map: - return stream_id_cutoff, [] - - results = yield self._get_device_update_edus_by_remote( - destination, from_stream_id, query_map - ) - - return now_stream_id, results - - def _get_devices_by_remote_txn( - self, txn, destination, from_stream_id, now_stream_id, limit - ): - """Return device update information for a given remote destination - - Args: - txn (LoggingTransaction): The transaction to execute - destination (str): The host the device updates are intended for - from_stream_id (int): The minimum stream_id to filter updates by, exclusive - now_stream_id (int): The maximum stream_id to filter updates by, inclusive - limit (int): Maximum number of device updates to return - - Returns: - List: List of device updates - """ - sql = """ - SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes - WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? - ORDER BY stream_id - LIMIT ? - """ - txn.execute(sql, (destination, from_stream_id, now_stream_id, False, limit)) - - return list(txn) - - @defer.inlineCallbacks - def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_map): - """Returns a list of device update EDUs as well as E2EE keys - - Args: - destination (str): The host the device updates are intended for - from_stream_id (int): The minimum stream_id to filter updates by, exclusive - query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping - user_id/device_id to update stream_id and the relevent json-encoded - opentracing context - - Returns: - List[Dict]: List of objects representing an device update EDU - - """ - devices = yield self.runInteraction( - "_get_e2e_device_keys_txn", - self._get_e2e_device_keys_txn, - query_map.keys(), - include_all_devices=True, - include_deleted_devices=True, - ) - - results = [] - for user_id, user_devices in iteritems(devices): - # The prev_id for the first row is always the last row before - # `from_stream_id` - prev_id = yield self._get_last_device_update_for_remote_user( - destination, user_id, from_stream_id - ) - for device_id, device in iteritems(user_devices): - stream_id, opentracing_context = query_map[(user_id, device_id)] - result = { - "user_id": user_id, - "device_id": device_id, - "prev_id": [prev_id] if prev_id else [], - "stream_id": stream_id, - "org.matrix.opentracing_context": opentracing_context, - } - - prev_id = stream_id - - if device is not None: - key_json = device.get("key_json", None) - if key_json: - result["keys"] = db_to_json(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name - else: - result["deleted"] = True - - results.append(result) - - return results - - def _get_last_device_update_for_remote_user( - self, destination, user_id, from_stream_id - ): - def f(txn): - prev_sent_id_sql = """ - SELECT coalesce(max(stream_id), 0) as stream_id - FROM device_lists_outbound_last_success - WHERE destination = ? AND user_id = ? AND stream_id <= ? - """ - txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) - rows = txn.fetchall() - return rows[0][0] - - return self.runInteraction("get_last_device_update_for_remote_user", f) - - def mark_as_sent_devices_by_remote(self, destination, stream_id): - """Mark that updates have successfully been sent to the destination. - """ - return self.runInteraction( - "mark_as_sent_devices_by_remote", - self._mark_as_sent_devices_by_remote_txn, - destination, - stream_id, - ) - - def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): - # We update the device_lists_outbound_last_success with the successfully - # poked users. We do the join to see which users need to be inserted and - # which updated. - sql = """ - SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) - FROM device_lists_outbound_pokes as o - LEFT JOIN device_lists_outbound_last_success as s - USING (destination, user_id) - WHERE destination = ? AND o.stream_id <= ? - GROUP BY user_id - """ - txn.execute(sql, (destination, stream_id)) - rows = txn.fetchall() - - sql = """ - UPDATE device_lists_outbound_last_success - SET stream_id = ? - WHERE destination = ? AND user_id = ? - """ - txn.executemany(sql, ((row[1], destination, row[0]) for row in rows if row[2])) - - sql = """ - INSERT INTO device_lists_outbound_last_success - (destination, user_id, stream_id) VALUES (?, ?, ?) - """ - txn.executemany( - sql, ((destination, row[0], row[1]) for row in rows if not row[2]) - ) - - # Delete all sent outbound pokes - sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id <= ? - """ - txn.execute(sql, (destination, stream_id)) - - def get_device_stream_token(self): - return self._device_list_id_gen.get_current_token() - - @trace - @defer.inlineCallbacks - def get_user_devices_from_cache(self, query_list): - """Get the devices (and keys if any) for remote users from the cache. - - Args: - query_list(list): List of (user_id, device_ids), if device_ids is - falsey then return all device ids for that user. - - Returns: - (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is - a set of user_ids and results_map is a mapping of - user_id -> device_id -> device_info - """ - user_ids = set(user_id for user_id, _ in query_list) - user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) - user_ids_in_cache = set( - user_id for user_id, stream_id in user_map.items() if stream_id - ) - user_ids_not_in_cache = user_ids - user_ids_in_cache - - results = {} - for user_id, device_id in query_list: - if user_id not in user_ids_in_cache: - continue - - if device_id: - device = yield self._get_cached_user_device(user_id, device_id) - results.setdefault(user_id, {})[device_id] = device - else: - results[user_id] = yield self._get_cached_devices_for_user(user_id) - - set_tag("in_cache", results) - set_tag("not_in_cache", user_ids_not_in_cache) - - return user_ids_not_in_cache, results - - @cachedInlineCallbacks(num_args=2, tree=True) - def _get_cached_user_device(self, user_id, device_id): - content = yield self._simple_select_one_onecol( - table="device_lists_remote_cache", - keyvalues={"user_id": user_id, "device_id": device_id}, - retcol="content", - desc="_get_cached_user_device", - ) - return db_to_json(content) - - @cachedInlineCallbacks() - def _get_cached_devices_for_user(self, user_id): - devices = yield self._simple_select_list( - table="device_lists_remote_cache", - keyvalues={"user_id": user_id}, - retcols=("device_id", "content"), - desc="_get_cached_devices_for_user", - ) - return { - device["device_id"]: db_to_json(device["content"]) for device in devices - } - - def get_devices_with_keys_by_user(self, user_id): - """Get all devices (with any device keys) for a user - - Returns: - (stream_id, devices) - """ - return self.runInteraction( - "get_devices_with_keys_by_user", - self._get_devices_with_keys_by_user_txn, - user_id, - ) - - def _get_devices_with_keys_by_user_txn(self, txn, user_id): - now_stream_id = self._device_list_id_gen.get_current_token() - - devices = self._get_e2e_device_keys_txn( - txn, [(user_id, None)], include_all_devices=True - ) - - if devices: - user_devices = devices[user_id] - results = [] - for device_id, device in iteritems(user_devices): - result = {"device_id": device_id} - - key_json = device.get("key_json", None) - if key_json: - result["keys"] = db_to_json(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name - - results.append(result) - - return now_stream_id, results - - return now_stream_id, [] - - def get_users_whose_devices_changed(self, from_key, user_ids): - """Get set of users whose devices have changed since `from_key` that - are in the given list of user_ids. - - Args: - from_key (str): The device lists stream token - user_ids (Iterable[str]) - - Returns: - Deferred[set[str]]: The set of user_ids whose devices have changed - since `from_key` - """ - from_key = int(from_key) - - # Get set of users who *may* have changed. Users not in the returned - # list have definitely not changed. - to_check = list( - self._device_list_stream_cache.get_entities_changed(user_ids, from_key) - ) - - if not to_check: - return defer.succeed(set()) - - def _get_users_whose_devices_changed_txn(txn): - changes = set() - - sql = """ - SELECT DISTINCT user_id FROM device_lists_stream - WHERE stream_id > ? - AND - """ - - for chunk in batch_iter(to_check, 100): - clause, args = make_in_list_sql_clause( - txn.database_engine, "user_id", chunk - ) - txn.execute(sql + clause, (from_key,) + tuple(args)) - changes.update(user_id for user_id, in txn) - - return changes - - return self.runInteraction( - "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn - ) - - def get_all_device_list_changes_for_remotes(self, from_key, to_key): - """Return a list of `(stream_id, user_id, destination)` which is the - combined list of changes to devices, and which destinations need to be - poked. `destination` may be None if no destinations need to be poked. - """ - # We do a group by here as there can be a large number of duplicate - # entries, since we throw away device IDs. - sql = """ - SELECT MAX(stream_id) AS stream_id, user_id, destination - FROM device_lists_stream - LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) - WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id, destination - """ - return self._execute( - "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key - ) - - @cached(max_entries=10000) - def get_device_list_last_stream_id_for_remote(self, user_id): - """Get the last stream_id we got for a user. May be None if we haven't - got any information for them. - """ - return self._simple_select_one_onecol( - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - retcol="stream_id", - desc="get_device_list_last_stream_id_for_remote", - allow_none=True, - ) - - @cachedList( - cached_method_name="get_device_list_last_stream_id_for_remote", - list_name="user_ids", - inlineCallbacks=True, - ) - def get_device_list_last_stream_id_for_remotes(self, user_ids): - rows = yield self._simple_select_many_batch( - table="device_lists_remote_extremeties", - column="user_id", - iterable=user_ids, - retcols=("user_id", "stream_id"), - desc="get_device_list_last_stream_id_for_remotes", - ) - - results = {user_id: None for user_id in user_ids} - results.update({row["user_id"]: row["stream_id"] for row in rows}) - - return results - - -class DeviceBackgroundUpdateStore(BackgroundUpdateStore): - def __init__(self, db_conn, hs): - super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs) - - self.register_background_index_update( - "device_lists_stream_idx", - index_name="device_lists_stream_user_id", - table="device_lists_stream", - columns=["user_id", "device_id"], - ) - - # create a unique index on device_lists_remote_cache - self.register_background_index_update( - "device_lists_remote_cache_unique_idx", - index_name="device_lists_remote_cache_unique_id", - table="device_lists_remote_cache", - columns=["user_id", "device_id"], - unique=True, - ) - - # And one on device_lists_remote_extremeties - self.register_background_index_update( - "device_lists_remote_extremeties_unique_idx", - index_name="device_lists_remote_extremeties_unique_idx", - table="device_lists_remote_extremeties", - columns=["user_id"], - unique=True, - ) - - # once they complete, we can remove the old non-unique indexes. - self.register_background_update_handler( - DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES, - self._drop_device_list_streams_non_unique_indexes, - ) - - @defer.inlineCallbacks - def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): - def f(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id") - txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id") - txn.close() - - yield self.runWithConnection(f) - yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES) - return 1 - - -class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): - def __init__(self, db_conn, hs): - super(DeviceStore, self).__init__(db_conn, hs) - - # Map of (user_id, device_id) -> bool. If there is an entry that implies - # the device exists. - self.device_id_exists_cache = Cache( - name="device_id_exists", keylen=2, max_entries=10000 - ) - - self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) - - @defer.inlineCallbacks - def store_device(self, user_id, device_id, initial_device_display_name): - """Ensure the given device is known; add it to the store if not - - Args: - user_id (str): id of user associated with the device - device_id (str): id of device - initial_device_display_name (str): initial displayname of the - device. Ignored if device exists. - Returns: - defer.Deferred: boolean whether the device was inserted or an - existing device existed with that ID. - """ - key = (user_id, device_id) - if self.device_id_exists_cache.get(key, None): - return False - - try: - inserted = yield self._simple_insert( - "devices", - values={ - "user_id": user_id, - "device_id": device_id, - "display_name": initial_device_display_name, - }, - desc="store_device", - or_ignore=True, - ) - self.device_id_exists_cache.prefill(key, True) - return inserted - except Exception as e: - logger.error( - "store_device with device_id=%s(%r) user_id=%s(%r)" - " display_name=%s(%r) failed: %s", - type(device_id).__name__, - device_id, - type(user_id).__name__, - user_id, - type(initial_device_display_name).__name__, - initial_device_display_name, - e, - ) - raise StoreError(500, "Problem storing device.") - - @defer.inlineCallbacks - def delete_device(self, user_id, device_id): - """Delete a device. - - Args: - user_id (str): The ID of the user which owns the device - device_id (str): The ID of the device to delete - Returns: - defer.Deferred - """ - yield self._simple_delete_one( - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, - desc="delete_device", - ) - - self.device_id_exists_cache.invalidate((user_id, device_id)) - - @defer.inlineCallbacks - def delete_devices(self, user_id, device_ids): - """Deletes several devices. - - Args: - user_id (str): The ID of the user which owns the devices - device_ids (list): The IDs of the devices to delete - Returns: - defer.Deferred - """ - yield self._simple_delete_many( - table="devices", - column="device_id", - iterable=device_ids, - keyvalues={"user_id": user_id}, - desc="delete_devices", - ) - for device_id in device_ids: - self.device_id_exists_cache.invalidate((user_id, device_id)) - - def update_device(self, user_id, device_id, new_display_name=None): - """Update a device. - - Args: - user_id (str): The ID of the user which owns the device - device_id (str): The ID of the device to update - new_display_name (str|None): new displayname for device; None - to leave unchanged - Raises: - StoreError: if the device is not found - Returns: - defer.Deferred - """ - updates = {} - if new_display_name is not None: - updates["display_name"] = new_display_name - if not updates: - return defer.succeed(None) - return self._simple_update_one( - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, - updatevalues=updates, - desc="update_device", - ) - - @defer.inlineCallbacks - def mark_remote_user_device_list_as_unsubscribed(self, user_id): - """Mark that we no longer track device lists for remote user. - """ - yield self._simple_delete( - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - desc="mark_remote_user_device_list_as_unsubscribed", - ) - self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) - - def update_remote_device_list_cache_entry( - self, user_id, device_id, content, stream_id - ): - """Updates a single device in the cache of a remote user's devicelist. - - Note: assumes that we are the only thread that can be updating this user's - device list. - - Args: - user_id (str): User to update device list for - device_id (str): ID of decivice being updated - content (dict): new data on this device - stream_id (int): the version of the device list - - Returns: - Deferred[None] - """ - return self.runInteraction( - "update_remote_device_list_cache_entry", - self._update_remote_device_list_cache_entry_txn, - user_id, - device_id, - content, - stream_id, - ) - - def _update_remote_device_list_cache_entry_txn( - self, txn, user_id, device_id, content, stream_id - ): - if content.get("deleted"): - self._simple_delete_txn( - txn, - table="device_lists_remote_cache", - keyvalues={"user_id": user_id, "device_id": device_id}, - ) - - txn.call_after(self.device_id_exists_cache.invalidate, (user_id, device_id)) - else: - self._simple_upsert_txn( - txn, - table="device_lists_remote_cache", - keyvalues={"user_id": user_id, "device_id": device_id}, - values={"content": json.dumps(content)}, - # we don't need to lock, because we assume we are the only thread - # updating this user's devices. - lock=False, - ) - - txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id)) - txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) - txn.call_after( - self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) - ) - - self._simple_upsert_txn( - txn, - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - values={"stream_id": stream_id}, - # again, we can assume we are the only thread updating this user's - # extremity. - lock=False, - ) - - def update_remote_device_list_cache(self, user_id, devices, stream_id): - """Replace the entire cache of the remote user's devices. - - Note: assumes that we are the only thread that can be updating this user's - device list. - - Args: - user_id (str): User to update device list for - devices (list[dict]): list of device objects supplied over federation - stream_id (int): the version of the device list - - Returns: - Deferred[None] - """ - return self.runInteraction( - "update_remote_device_list_cache", - self._update_remote_device_list_cache_txn, - user_id, - devices, - stream_id, - ) - - def _update_remote_device_list_cache_txn(self, txn, user_id, devices, stream_id): - self._simple_delete_txn( - txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id} - ) - - self._simple_insert_many_txn( - txn, - table="device_lists_remote_cache", - values=[ - { - "user_id": user_id, - "device_id": content["device_id"], - "content": json.dumps(content), - } - for content in devices - ], - ) - - txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) - txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,)) - txn.call_after( - self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) - ) - - self._simple_upsert_txn( - txn, - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - values={"stream_id": stream_id}, - # we don't need to lock, because we can assume we are the only thread - # updating this user's extremity. - lock=False, - ) - - @defer.inlineCallbacks - def add_device_change_to_streams(self, user_id, device_ids, hosts): - """Persist that a user's devices have been updated, and which hosts - (if any) should be poked. - """ - with self._device_list_id_gen.get_next() as stream_id: - yield self.runInteraction( - "add_device_change_to_streams", - self._add_device_change_txn, - user_id, - device_ids, - hosts, - stream_id, - ) - return stream_id - - def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): - now = self._clock.time_msec() - - txn.call_after( - self._device_list_stream_cache.entity_has_changed, user_id, stream_id - ) - for host in hosts: - txn.call_after( - self._device_list_federation_stream_cache.entity_has_changed, - host, - stream_id, - ) - - # Delete older entries in the table, as we really only care about - # when the latest change happened. - txn.executemany( - """ - DELETE FROM device_lists_stream - WHERE user_id = ? AND device_id = ? AND stream_id < ? - """, - [(user_id, device_id, stream_id) for device_id in device_ids], - ) - - self._simple_insert_many_txn( - txn, - table="device_lists_stream", - values=[ - {"stream_id": stream_id, "user_id": user_id, "device_id": device_id} - for device_id in device_ids - ], - ) - - context = get_active_span_text_map() - - self._simple_insert_many_txn( - txn, - table="device_lists_outbound_pokes", - values=[ - { - "destination": destination, - "stream_id": stream_id, - "user_id": user_id, - "device_id": device_id, - "sent": False, - "ts": now, - "opentracing_context": json.dumps(context) - if whitelisted_homeserver(destination) - else "{}", - } - for destination in hosts - for device_id in device_ids - ], - ) - - def _prune_old_outbound_device_pokes(self): - """Delete old entries out of the device_lists_outbound_pokes to ensure - that we don't fill up due to dead servers. We keep one entry per - (destination, user_id) tuple to ensure that the prev_ids remain correct - if the server does come back. - """ - yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 - - def _prune_txn(txn): - select_sql = """ - SELECT destination, user_id, max(stream_id) as stream_id - FROM device_lists_outbound_pokes - GROUP BY destination, user_id - HAVING min(ts) < ? AND count(*) > 1 - """ - - txn.execute(select_sql, (yesterday,)) - rows = txn.fetchall() - - if not rows: - return - - delete_sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? - """ - - txn.executemany( - delete_sql, ((yesterday, row[0], row[1], row[2]) for row in rows) - ) - - # Since we've deleted unsent deltas, we need to remove the entry - # of last successful sent so that the prev_ids are correctly set. - sql = """ - DELETE FROM device_lists_outbound_last_success - WHERE destination = ? AND user_id = ? - """ - txn.executemany(sql, ((row[0], row[1]) for row in rows)) - - logger.info("Pruned %d device list outbound pokes", txn.rowcount) - - return run_as_background_process( - "prune_old_outbound_device_pokes", - self.runInteraction, - "_prune_old_outbound_device_pokes", - _prune_txn, - ) |