summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py284
1 files changed, 119 insertions, 165 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py

index 264e625bd7..ab8766c75b 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -1,4 +1,5 @@ # Copyright 2016 OpenMarket Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,9 +20,17 @@ from synapse.logging import issue9533_logger from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import DatabasePool, LoggingTransaction +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.storage.engines import PostgresEngine -from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator +from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, + MultiWriterIdGenerator, + StreamIdGenerator, +) from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache @@ -34,14 +43,21 @@ logger = logging.getLogger(__name__) class DeviceInboxWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): super().__init__(database, db_conn, hs) self._instance_name = hs.get_instance_name() # Map of (user_id, device_id) to the last stream_id that has been # deleted up to. This is so that we can no op deletions. - self._last_device_delete_cache = ExpiringCache( + self._last_device_delete_cache: ExpiringCache[ + Tuple[str, Optional[str]], int + ] = ExpiringCache( cache_name="last_device_delete_cache", clock=self._clock, max_len=10000, @@ -53,14 +69,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._instance_name in hs.config.worker.writers.to_device ) - self._device_inbox_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - stream_name="to_device", - instance_name=self._instance_name, - tables=[("device_inbox", "instance_name", "stream_id")], - sequence_name="device_inbox_sequence", - writers=hs.config.worker.writers.to_device, + self._device_inbox_id_gen: AbstractStreamIdGenerator = ( + MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="to_device", + instance_name=self._instance_name, + tables=[("device_inbox", "instance_name", "stream_id")], + sequence_name="device_inbox_sequence", + writers=hs.config.worker.writers.to_device, + ) ) else: self._can_write_to_device = True @@ -101,6 +119,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): def process_replication_rows(self, stream_name, instance_name, token, rows): if stream_name == ToDeviceStream.NAME: + # If replication is happening than postgres must be being used. + assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator) self._device_inbox_id_gen.advance(instance_name, token) for row in rows: if row.entity.startswith("@"): @@ -134,7 +154,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): limit: The maximum number of messages to retrieve. Returns: - A list of messages for the device and where in the stream the messages got to. + A tuple containing: + * A list of messages for the device. + * The max stream token of these messages. There may be more to retrieve + if the given limit was reached. """ has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_stream_id @@ -153,12 +176,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): txn.execute( sql, (user_id, device_id, last_stream_id, current_stream_id, limit) ) + messages = [] + stream_pos = current_stream_id + for row in txn: stream_pos = row[0] messages.append(db_to_json(row[1])) + + # If the limit was not reached we know that there's no more data for this + # user/device pair up to current_stream_id. if len(messages) < limit: stream_pos = current_stream_id + return messages, stream_pos return await self.db_pool.runInteraction( @@ -210,11 +240,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": f"deleted {count} messages for device", "count": count}) # Update the cache, ensuring that we only ever increase the value - last_deleted_stream_id = self._last_device_delete_cache.get( + updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 ) self._last_device_delete_cache[(user_id, device_id)] = max( - last_deleted_stream_id, up_to_stream_id + updated_last_deleted_stream_id, up_to_stream_id ) return count @@ -260,13 +290,20 @@ class DeviceInboxWorkerStore(SQLBaseStore): " LIMIT ?" ) txn.execute(sql, (destination, last_stream_id, current_stream_id, limit)) + messages = [] + stream_pos = current_stream_id + for row in txn: stream_pos = row[0] messages.append(db_to_json(row[1])) + + # If the limit was not reached we know that there's no more data for this + # user/device pair up to current_stream_id. if len(messages) < limit: log_kv({"message": "Set stream position to current position"}) stream_pos = current_stream_id + return messages, stream_pos return await self.db_pool.runInteraction( @@ -372,8 +409,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): """Used to send messages from this server. Args: - local_messages_by_user_and_device: - Dictionary of user_id to device_id to message. + local_messages_by_user_then_device: + Dictionary of recipient user_id to recipient device_id to message. remote_messages_by_destination: Dictionary of destination server_name to the EDU JSON to send. @@ -415,7 +452,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) async with self._device_inbox_id_gen.get_next() as stream_id: - now_ms = self.clock.time_msec() + now_ms = self._clock.time_msec() await self.db_pool.runInteraction( "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id ) @@ -466,7 +503,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) async with self._device_inbox_id_gen.get_next() as stream_id: - now_ms = self.clock.time_msec() + now_ms = self._clock.time_msec() await self.db_pool.runInteraction( "add_messages_from_remote_to_device_inbox", add_messages_txn, @@ -562,6 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox" + REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -577,14 +615,18 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) - self.db_pool.updates.register_background_update_handler( - self.REMOVE_DELETED_DEVICES, - self._remove_deleted_devices_from_device_inbox, + # Used to be a background update that deletes all device_inboxes for deleted + # devices. + self.db_pool.updates.register_noop_background_update( + self.REMOVE_DELETED_DEVICES ) + # Used to be a background update that deletes all device_inboxes for hidden + # devices. + self.db_pool.updates.register_noop_background_update(self.REMOVE_HIDDEN_DEVICES) self.db_pool.updates.register_background_update_handler( - self.REMOVE_HIDDEN_DEVICES, - self._remove_hidden_devices_from_device_inbox, + self.REMOVE_DEAD_DEVICES_FROM_INBOX, + self._remove_dead_devices_from_device_inbox, ) async def _background_drop_index_device_inbox(self, progress, batch_size): @@ -599,171 +641,83 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): return 1 - async def _remove_deleted_devices_from_device_inbox( - self, progress: JsonDict, batch_size: int + async def _remove_dead_devices_from_device_inbox( + self, + progress: JsonDict, + batch_size: int, ) -> int: - """A background update that deletes all device_inboxes for deleted devices. - - This should only need to be run once (when users upgrade to v1.47.0) + """A background update to remove devices that were either deleted or hidden from + the device_inbox table. Args: - progress: JsonDict used to store progress of this background update - batch_size: the maximum number of rows to retrieve in a single select query + progress: The update's progress dict. + batch_size: The batch size for this update. Returns: - The number of deleted rows + The number of rows deleted. """ - def _remove_deleted_devices_from_device_inbox_txn( + def _remove_dead_devices_from_device_inbox_txn( txn: LoggingTransaction, - ) -> int: - """stream_id is not unique - we need to use an inclusive `stream_id >= ?` clause, - since we might not have deleted all dead device messages for the stream_id - returned from the previous query + ) -> Tuple[int, bool]: - Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, - to avoid problems of deleting a large number of rows all at once - due to a single device having lots of device messages. - """ + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] + else: + txn.execute("SELECT max(stream_id) FROM device_inbox") + # There's a type mismatch here between how we want to type the row and + # what fetchone says it returns, but we silence it because we know that + # res can't be None. + res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment] + if res[0] is None: + # this can only happen if the `device_inbox` table is empty, in which + # case we have no work to do. + return 0, True + else: + max_stream_id = res[0] - last_stream_id = progress.get("stream_id", 0) + start = progress.get("stream_id", 0) + stop = start + batch_size + # delete rows in `device_inbox` which do *not* correspond to a known, + # unhidden device. sql = """ - SELECT device_id, user_id, stream_id - FROM device_inbox + DELETE FROM device_inbox WHERE - stream_id >= ? - AND (device_id, user_id) NOT IN ( - SELECT device_id, user_id FROM devices + stream_id >= ? AND stream_id < ? + AND NOT EXISTS ( + SELECT * FROM devices d + WHERE + d.device_id=device_inbox.device_id + AND d.user_id=device_inbox.user_id + AND NOT hidden ) - ORDER BY stream_id - LIMIT ? - """ + """ - txn.execute(sql, (last_stream_id, batch_size)) - rows = txn.fetchall() - - num_deleted = 0 - for row in rows: - num_deleted += self.db_pool.simple_delete_txn( - txn, - "device_inbox", - {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, - ) - - if rows: - # send more than stream_id to progress - # otherwise it can happen in large deployments that - # no change of status is visible in the log file - # it may be that the stream_id does not change in several runs - self.db_pool.updates._background_update_progress_txn( - txn, - self.REMOVE_DELETED_DEVICES, - { - "device_id": rows[-1][0], - "user_id": rows[-1][1], - "stream_id": rows[-1][2], - }, - ) - - return num_deleted - - number_deleted = await self.db_pool.runInteraction( - "_remove_deleted_devices_from_device_inbox", - _remove_deleted_devices_from_device_inbox_txn, - ) + txn.execute(sql, (start, stop)) - # The task is finished when no more lines are deleted. - if not number_deleted: - await self.db_pool.updates._end_background_update( - self.REMOVE_DELETED_DEVICES + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_DEAD_DEVICES_FROM_INBOX, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, ) - return number_deleted - - async def _remove_hidden_devices_from_device_inbox( - self, progress: JsonDict, batch_size: int - ) -> int: - """A background update that deletes all device_inboxes for hidden devices. - - This should only need to be run once (when users upgrade to v1.47.0) - - Args: - progress: JsonDict used to store progress of this background update - batch_size: the maximum number of rows to retrieve in a single select query - - Returns: - The number of deleted rows - """ - - def _remove_hidden_devices_from_device_inbox_txn( - txn: LoggingTransaction, - ) -> int: - """stream_id is not unique - we need to use an inclusive `stream_id >= ?` clause, - since we might not have deleted all hidden device messages for the stream_id - returned from the previous query - - Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, - to avoid problems of deleting a large number of rows all at once - due to a single device having lots of device messages. - """ - - last_stream_id = progress.get("stream_id", 0) - - sql = """ - SELECT device_id, user_id, stream_id - FROM device_inbox - WHERE - stream_id >= ? - AND (device_id, user_id) IN ( - SELECT device_id, user_id FROM devices WHERE hidden = ? - ) - ORDER BY stream_id - LIMIT ? - """ - - txn.execute(sql, (last_stream_id, True, batch_size)) - rows = txn.fetchall() - - num_deleted = 0 - for row in rows: - num_deleted += self.db_pool.simple_delete_txn( - txn, - "device_inbox", - {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, - ) - - if rows: - # We don't just save the `stream_id` in progress as - # otherwise it can happen in large deployments that - # no change of status is visible in the log file, as - # it may be that the stream_id does not change in several runs - self.db_pool.updates._background_update_progress_txn( - txn, - self.REMOVE_HIDDEN_DEVICES, - { - "device_id": rows[-1][0], - "user_id": rows[-1][1], - "stream_id": rows[-1][2], - }, - ) - - return num_deleted + return stop > max_stream_id - number_deleted = await self.db_pool.runInteraction( - "_remove_hidden_devices_from_device_inbox", - _remove_hidden_devices_from_device_inbox_txn, + finished = await self.db_pool.runInteraction( + "_remove_devices_from_device_inbox_txn", + _remove_dead_devices_from_device_inbox_txn, ) - # The task is finished when no more lines are deleted. - if not number_deleted: + if finished: await self.db_pool.updates._end_background_update( - self.REMOVE_HIDDEN_DEVICES + self.REMOVE_DEAD_DEVICES_FROM_INBOX, ) - return number_deleted + return batch_size class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):