summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py60
1 files changed, 32 insertions, 28 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py

index 744e98c6d0..3e7425d4a6 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -344,18 +344,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): # Note that this is more efficient than just dropping `device_id` from the query, # since device_inbox has an index on `(user_id, device_id, stream_id)` if not device_ids_to_query: - user_device_dicts = self.db_pool.simple_select_many_txn( - txn, - table="devices", - column="user_id", - iterable=user_ids_to_query, - keyvalues={"hidden": False}, - retcols=("device_id",), + user_device_dicts = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="devices", + column="user_id", + iterable=user_ids_to_query, + keyvalues={"hidden": False}, + retcols=("device_id",), + ), ) - device_ids_to_query.update( - {row["device_id"] for row in user_device_dicts} - ) + device_ids_to_query.update({row[0] for row in user_device_dicts}) if not device_ids_to_query: # We've ended up with no devices to query. @@ -449,7 +450,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): user_id: str, device_id: Optional[str], up_to_stream_id: int, - limit: int, + limit: Optional[int] = None, ) -> int: """ Args: @@ -477,17 +478,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 - ROW_ID_NAME = self.database_engine.row_id_name - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: + limit_statement = "" if limit is None else f"LIMIT {limit}" sql = f""" - DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN ( - SELECT {ROW_ID_NAME} FROM device_inbox - WHERE user_id = ? AND device_id = ? AND stream_id <= ? - LIMIT {limit} + DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? AND stream_id <= ? + ORDER BY stream_id + {limit_statement} + ) AS q1 ) """ - txn.execute(sql, (user_id, device_id, up_to_stream_id)) + txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id)) return txn.rowcount count = await self.db_pool.runInteraction( @@ -845,20 +848,21 @@ class DeviceInboxWorkerStore(SQLBaseStore): # We exclude hidden devices (such as cross-signing keys) here as they are # not expected to receive to-device messages. - rows = self.db_pool.simple_select_many_txn( - txn, - table="devices", - keyvalues={"user_id": user_id, "hidden": False}, - column="device_id", - iterable=devices, - retcols=("device_id",), + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="devices", + keyvalues={"user_id": user_id, "hidden": False}, + column="device_id", + iterable=devices, + retcols=("device_id",), + ), ) - for row in rows: + for (device_id,) in rows: # Only insert into the local inbox if the device exists on # this server - device_id = row["device_id"] - with start_active_span("serialise_to_device_message"): msg = messages_by_device[device_id] set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])