summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-12-20 17:56:57 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-12-20 18:21:31 +0000
commit712144e7688333d24e2d6a650d6d9e6676de90fc (patch)
tree0ef3189a93c343fabf246e085d71f16004bfee81
parentSplit out the marking of failed (diff)
downloadsynapse-712144e7688333d24e2d6a650d6d9e6676de90fc.tar.xz
Batch up the DB writes when marking failures
-rw-r--r--synapse/handlers/device.py13
-rw-r--r--synapse/handlers/devicemessage.py2
-rw-r--r--synapse/handlers/federation_event.py2
-rw-r--r--synapse/storage/databases/main/devices.py30
-rw-r--r--synapse/types/__init__.py4
5 files changed, 39 insertions, 12 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py

index 0a74e4d266..68a0c8ccb4 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -1212,9 +1212,18 @@ class DeviceListUpdater(DeviceListWorkerUpdater): raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}") result = {} + failed = set() # TODO(Perf): Actually batch these up for user_id in user_ids: - result[user_id] = await self.user_device_resync(user_id) + user_result, user_failed = await self._user_device_resync_returning_failed( + user_id + ) + result[user_id] = user_result + if user_failed: + failed.add(user_id) + + if mark_failed_as_stale: + await self.store.mark_remote_users_device_caches_as_stale(failed) return result @@ -1226,7 +1235,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): if failed and mark_failed_as_stale: # Mark the remote user's device list as stale so we know we need to retry # it later. - await self.store.mark_remote_user_device_cache_as_stale(user_id) + await self.store.mark_remote_users_device_caches_as_stale((user_id,)) return result diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 75e89850f5..00c403db49 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py
@@ -195,7 +195,7 @@ class DeviceMessageHandler: sender_user_id, unknown_devices, ) - await self.store.mark_remote_user_device_cache_as_stale(sender_user_id) + await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,)) # Immediately attempt a resync in the background run_in_background(self._user_device_resync, user_id=sender_user_id) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 31df7f55cc..6df000faaf 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -1423,7 +1423,7 @@ class FederationEventHandler: """ try: - await self._store.mark_remote_user_device_cache_as_stale(sender) + await self._store.mark_remote_users_device_caches_as_stale((sender,)) # Immediately attempt a resync in the background if self._config.worker.worker_app: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index a5bb4d404e..a921332cb0 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -54,7 +54,7 @@ from synapse.storage.util.id_generators import ( AbstractStreamIdTracker, StreamIdGenerator, ) -from synapse.types import JsonDict, get_verify_key_from_cross_signing_key +from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache @@ -1062,16 +1062,30 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): return {row["user_id"] for row in rows} - async def mark_remote_user_device_cache_as_stale(self, user_id: str) -> None: + async def mark_remote_users_device_caches_as_stale( + self, user_ids: StrCollection + ) -> None: """Records that the server has reason to believe the cache of the devices for the remote users is out of date. """ - await self.db_pool.simple_upsert( - table="device_lists_remote_resync", - keyvalues={"user_id": user_id}, - values={}, - insertion_values={"added_ts": self._clock.time_msec()}, - desc="mark_remote_user_device_cache_as_stale", + + def _mark_remote_users_device_caches_as_stale_txn( + txn: LoggingTransaction, + ) -> None: + # TODO add insertion_values support to simple_upsert_many and use + # that! + for user_id in user_ids: + self.db_pool.simple_upsert_txn( + txn, + table="device_lists_remote_resync", + keyvalues={"user_id": user_id}, + values={}, + insertion_values={"added_ts": self._clock.time_msec()}, + ) + + await self.db_pool.runInteraction( + "mark_remote_users_device_caches_as_stale", + _mark_remote_users_device_caches_as_stale_txn, ) async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None: diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index f2d436ddc3..0c725eb967 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py
@@ -77,6 +77,10 @@ JsonMapping = Mapping[str, Any] # A JSON-serialisable object. JsonSerializable = object +# Collection[str] that does not include str itself; str being a Sequence[str] +# is very misleading and results in bugs. +StrCollection = Union[Tuple[str, ...], List[str], Set[str]] + # Note that this seems to require inheriting *directly* from Interface in order # for mypy-zope to realize it is an interface.