diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 0b6d1f2b05..3f0b2f5d84 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -282,9 +282,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
"count_devices_by_users", count_devices_by_users_txn, user_ids
)
+ @cached()
async def get_device(
self, user_id: str, device_id: str
- ) -> Optional[Dict[str, Any]]:
+ ) -> Optional[Mapping[str, Any]]:
"""Retrieve a device. Only returns devices that are not marked as
hidden.
@@ -1817,6 +1818,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
},
desc="store_device",
)
+ await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
+
if not inserted:
# if the device already exists, check if it's a real device, or
# if the device ID is reserved by something else
@@ -1882,6 +1885,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
values=device_ids,
keyvalues={"user_id": user_id},
)
+ self._invalidate_cache_and_stream_bulk(
+ txn, self.get_device, [(user_id, device_id) for device_id in device_ids]
+ )
for batch in batch_iter(device_ids, 100):
await self.db_pool.runInteraction(
@@ -1915,6 +1921,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
updatevalues=updates,
desc="update_device",
)
+ await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
async def update_remote_device_list_cache_entry(
self, user_id: str, device_id: str, content: JsonDict, stream_id: str
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 8380930c70..eadbf4901c 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -759,6 +759,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
external_id: id on that system
user_id: complete mxid that it is mapped to
"""
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_external_id, (auth_provider, external_id)
+ )
self.db_pool.simple_insert_txn(
txn,
@@ -789,6 +792,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
},
desc="remove_user_external_id",
)
+ await self.invalidate_cache_and_stream(
+ "get_user_by_external_id", (auth_provider, external_id)
+ )
async def replace_user_external_id(
self,
@@ -809,29 +815,20 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
ExternalIDReuseException if the new external_id could not be mapped.
"""
- def _remove_user_external_ids_txn(
+ def _replace_user_external_id_txn(
txn: LoggingTransaction,
- user_id: str,
) -> None:
- """Remove all mappings from external user ids to a mxid
- If these mappings are not found, this method does nothing.
-
- Args:
- user_id: complete mxid that it is mapped to
- """
-
self.db_pool.simple_delete_txn(
txn,
table="user_external_ids",
keyvalues={"user_id": user_id},
)
- def _replace_user_external_id_txn(
- txn: LoggingTransaction,
- ) -> None:
- _remove_user_external_ids_txn(txn, user_id)
-
for auth_provider, external_id in record_external_ids:
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_external_id, (auth_provider, external_id)
+ )
+
self._record_user_external_id_txn(
txn,
auth_provider,
@@ -847,6 +844,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
except self.database_engine.module.IntegrityError:
raise ExternalIDReuseException()
+ @cached()
async def get_user_by_external_id(
self, auth_provider: str, external_id: str
) -> Optional[str]:
|