diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/storage/database.py | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 32 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 2 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 13 |
5 files changed, 31 insertions, 19 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 41d9111019..481fec72fe 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -37,6 +37,8 @@ class SQLBaseStore(metaclass=ABCMeta): per data store (and not one per physical database). """ + db_pool: DatabasePool + def __init__( self, database: DatabasePool, diff --git a/synapse/storage/database.py b/synapse/storage/database.py index e20c5c5302..feaa6cdd07 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -499,6 +499,7 @@ class DatabasePool: """ _TXN_ID = 0 + engine: BaseDatabaseEngine def __init__( self, diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index e8b6cc6b80..85c1778a81 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -100,6 +100,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ("device_lists_outbound_pokes", "stream_id"), ("device_lists_changes_in_room", "stream_id"), ("device_lists_remote_pending", "stream_id"), + ("device_lists_changes_converted_stream_position", "stream_id"), ], is_writer=hs.config.worker.worker_app is None, ) @@ -745,42 +746,45 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): @trace @cancellable async def get_user_devices_from_cache( - self, query_list: List[Tuple[str, Optional[str]]] + self, user_ids: Set[str], user_and_device_ids: List[Tuple[str, str]] ) -> Tuple[Set[str], Dict[str, Dict[str, JsonDict]]]: """Get the devices (and keys if any) for remote users from the cache. Args: - query_list: List of (user_id, device_ids), if device_ids is - falsey then return all device ids for that user. + user_ids: users which should have all device IDs returned + user_and_device_ids: List of (user_id, device_ids) Returns: A tuple of (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is a set of user_ids and results_map is a mapping of user_id -> device_id -> device_info. """ - user_ids = {user_id for user_id, _ in query_list} - user_map = await self.get_device_list_last_stream_id_for_remotes(list(user_ids)) + unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids} + user_map = await self.get_device_list_last_stream_id_for_remotes( + list(unique_user_ids) + ) # We go and check if any of the users need to have their device lists # resynced. If they do then we remove them from the cached list. users_needing_resync = await self.get_user_ids_requiring_device_list_resync( - user_ids + unique_user_ids ) user_ids_in_cache = { user_id for user_id, stream_id in user_map.items() if stream_id } - users_needing_resync - user_ids_not_in_cache = user_ids - user_ids_in_cache + user_ids_not_in_cache = unique_user_ids - user_ids_in_cache + # First fetch all the users which all devices are to be returned. results: Dict[str, Dict[str, JsonDict]] = {} - for user_id, device_id in query_list: - if user_id not in user_ids_in_cache: - continue - - if device_id: + for user_id in user_ids: + if user_id in user_ids_in_cache: + results[user_id] = await self.get_cached_devices_for_user(user_id) + # Then fetch all device-specific requests, but skip users we've already + # fetched all devices for. + for user_id, device_id in user_and_device_ids: + if user_id in user_ids_in_cache and user_id not in user_ids: device = await self._get_cached_user_device(user_id, device_id) results.setdefault(user_id, {})[device_id] = device - else: - results[user_id] = await self.get_cached_devices_for_user(user_id) set_tag("in_cache", str(results)) set_tag("not_in_cache", str(user_ids_not_in_cache)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1536937b67..cb66376fb4 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -306,7 +306,7 @@ class PersistEventsStore: # The set of event_ids to return. This includes all soft-failed events # and their prev events. - existing_prevs = set() + existing_prevs: Set[str] = set() def _get_prevs_before_rejected_txn( txn: LoggingTransaction, batch: Collection[str] diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 3acdb39da7..6c335a9315 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -23,7 +23,7 @@ from typing_extensions import Counter as CounterType from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import LoggingDatabaseConnection -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.types import Cursor @@ -108,9 +108,14 @@ def prepare_database( # so we start one before running anything. This ensures that any upgrades # are either applied completely, or not at all. # - # (psycopg2 automatically starts a transaction as soon as we run any statements - # at all, so this is redundant but harmless there.) - cur.execute("BEGIN TRANSACTION") + # psycopg2 does not automatically start transactions when in autocommit mode. + # While it is technically harmless to nest transactions in postgres, doing so + # results in a warning in Postgres' logs per query. And we'd rather like to + # avoid doing that. + if isinstance(database_engine, Sqlite3Engine) or ( + isinstance(database_engine, PostgresEngine) and db_conn.autocommit + ): + cur.execute("BEGIN TRANSACTION") logger.info("%r: Checking existing schema version", databases) version_info = _get_or_create_schema_state(cur, database_engine) |