diff --git a/changelog.d/9068.feature b/changelog.d/9068.feature
new file mode 100644
index 0000000000..cdf1844fa7
--- /dev/null
+++ b/changelog.d/9068.feature
@@ -0,0 +1 @@
+Add experimental support for handling `/keys/claim` and `/room_keys` APIs on worker processes.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index a57535989a..f24c648ac7 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -100,14 +100,18 @@ from synapse.rest.client.v1.profile import (
)
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
from synapse.rest.client.v1.voip import VoipRestServlet
-from synapse.rest.client.v2_alpha import groups, sync, user_directory
+from synapse.rest.client.v2_alpha import groups, room_keys, sync, user_directory
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.account_data import (
AccountDataServlet,
RoomAccountDataServlet,
)
-from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
+from synapse.rest.client.v2_alpha.keys import (
+ KeyChangesServlet,
+ KeyQueryServlet,
+ OneTimeKeyServlet,
+)
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
from synapse.rest.client.versions import VersionsRestServlet
@@ -116,6 +120,7 @@ from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
+from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
@@ -447,6 +452,7 @@ class GenericWorkerSlavedStore(
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
+ EndToEndRoomKeyStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
@@ -504,6 +510,7 @@ class GenericWorkerServer(HomeServer):
LoginRestServlet(self).register(resource)
ThreepidRestServlet(self).register(resource)
KeyQueryServlet(self).register(resource)
+ OneTimeKeyServlet(self).register(resource)
KeyChangesServlet(self).register(resource)
VoipRestServlet(self).register(resource)
PushRuleRestServlet(self).register(resource)
@@ -521,6 +528,7 @@ class GenericWorkerServer(HomeServer):
room.register_servlets(self, resource, True)
room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)
+ room_keys.register_servlets(self, resource)
SendToDeviceRestServlet(self).register(resource)
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 4d1b92d1aa..1b6ccd51c8 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -707,50 +707,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
"""Get the current stream id from the _device_list_id_gen"""
...
-
-class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
- async def set_e2e_device_keys(
- self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
- ) -> bool:
- """Stores device keys for a device. Returns whether there was a change
- or the keys were already in the database.
- """
-
- def _set_e2e_device_keys_txn(txn):
- set_tag("user_id", user_id)
- set_tag("device_id", device_id)
- set_tag("time_now", time_now)
- set_tag("device_keys", device_keys)
-
- old_key_json = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="e2e_device_keys_json",
- keyvalues={"user_id": user_id, "device_id": device_id},
- retcol="key_json",
- allow_none=True,
- )
-
- # In py3 we need old_key_json to match new_key_json type. The DB
- # returns unicode while encode_canonical_json returns bytes.
- new_key_json = encode_canonical_json(device_keys).decode("utf-8")
-
- if old_key_json == new_key_json:
- log_kv({"Message": "Device key already stored."})
- return False
-
- self.db_pool.simple_upsert_txn(
- txn,
- table="e2e_device_keys_json",
- keyvalues={"user_id": user_id, "device_id": device_id},
- values={"ts_added_ms": time_now, "key_json": new_key_json},
- )
- log_kv({"message": "Device keys stored."})
- return True
-
- return await self.db_pool.runInteraction(
- "set_e2e_device_keys", _set_e2e_device_keys_txn
- )
-
async def claim_e2e_one_time_keys(
self, query_list: Iterable[Tuple[str, str, str]]
) -> Dict[str, Dict[str, Dict[str, bytes]]]:
@@ -840,6 +796,50 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)
+
+class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
+ async def set_e2e_device_keys(
+ self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
+ ) -> bool:
+ """Stores device keys for a device. Returns whether there was a change
+ or the keys were already in the database.
+ """
+
+ def _set_e2e_device_keys_txn(txn):
+ set_tag("user_id", user_id)
+ set_tag("device_id", device_id)
+ set_tag("time_now", time_now)
+ set_tag("device_keys", device_keys)
+
+ old_key_json = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="e2e_device_keys_json",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ retcol="key_json",
+ allow_none=True,
+ )
+
+ # In py3 we need old_key_json to match new_key_json type. The DB
+ # returns unicode while encode_canonical_json returns bytes.
+ new_key_json = encode_canonical_json(device_keys).decode("utf-8")
+
+ if old_key_json == new_key_json:
+ log_kv({"Message": "Device key already stored."})
+ return False
+
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="e2e_device_keys_json",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ values={"ts_added_ms": time_now, "key_json": new_key_json},
+ )
+ log_kv({"message": "Device keys stored."})
+ return True
+
+ return await self.db_pool.runInteraction(
+ "set_e2e_device_keys", _set_e2e_device_keys_txn
+ )
+
async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None:
def delete_e2e_keys_by_device_txn(txn):
log_kv(
|