diff --git a/changelog.d/14400.misc b/changelog.d/14400.misc
new file mode 100644
index 0000000000..6e025329c4
--- /dev/null
+++ b/changelog.d/14400.misc
@@ -0,0 +1 @@
+Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication.
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 62b1bab297..c1e1544536 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -213,10 +213,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
- "worker_extra_conf": (
- "worker_main_http_uri: http://127.0.0.1:%d"
- % (MAIN_PROCESS_HTTP_LISTENER_PORT,)
- ),
+ "worker_extra_conf": "",
},
"account_data": {
"app": "synapse.app.generic_worker",
diff --git a/docs/workers.md b/docs/workers.md
index 7ee8801161..4604650803 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -135,8 +135,8 @@ In the config file for each worker, you must specify:
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)).
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
with an `http` listener.
- * If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
- the main process (`worker_main_http_uri`).
+ * **Synapse 1.71 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
+ the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.72 and newer.
For example:
@@ -221,7 +221,6 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/search$
# Encryption requests
- # Note that ^/_matrix/client/(r0|v3|unstable)/keys/upload/ requires `worker_main_http_uri`
^/_matrix/client/(r0|v3|unstable)/keys/query$
^/_matrix/client/(r0|v3|unstable)/keys/changes$
^/_matrix/client/(r0|v3|unstable)/keys/claim$
@@ -376,7 +375,7 @@ responsible for
- persisting them to the DB, and finally
- updating the events stream.
-Because load is sharded in this way, you *must* restart all worker instances when
+Because load is sharded in this way, you *must* restart all worker instances when
adding or removing event persisters.
An `event_persister` should not be mistaken for an `event_creator`.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1d9aef45c2..74909b7d4a 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -14,14 +14,12 @@
# limitations under the License.
import logging
import sys
-from typing import Dict, List, Optional, Tuple
+from typing import Dict, List
-from twisted.internet import address
from twisted.web.resource import Resource
import synapse
import synapse.events
-from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.api.urls import (
CLIENT_API_PREFIX,
FEDERATION_PREFIX,
@@ -43,8 +41,6 @@ from synapse.config.logger import setup_logging
from synapse.config.server import ListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.server import JsonResource, OptionsResource
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.http.site import SynapseRequest
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
@@ -70,12 +66,12 @@ from synapse.rest.client import (
versions,
voip,
)
-from synapse.rest.client._base import client_patterns
from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
from synapse.rest.client.devices import DevicesRestServlet
from synapse.rest.client.keys import (
KeyChangesServlet,
KeyQueryServlet,
+ KeyUploadServlet,
OneTimeKeyServlet,
)
from synapse.rest.client.register import (
@@ -132,107 +128,12 @@ from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
-from synapse.types import JsonDict
from synapse.util import SYNAPSE_VERSION
from synapse.util.httpresourcetree import create_resource_tree
logger = logging.getLogger("synapse.app.generic_worker")
-class KeyUploadServlet(RestServlet):
- """An implementation of the `KeyUploadServlet` that responds to read only
- requests, but otherwise proxies through to the master instance.
- """
-
- PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
-
- def __init__(self, hs: HomeServer):
- """
- Args:
- hs: server
- """
- super().__init__()
- self.auth = hs.get_auth()
- self.store = hs.get_datastores().main
- self.http_client = hs.get_simple_http_client()
- self.main_uri = hs.config.worker.worker_main_http_uri
-
- async def on_POST(
- self, request: SynapseRequest, device_id: Optional[str]
- ) -> Tuple[int, JsonDict]:
- requester = await self.auth.get_user_by_req(request, allow_guest=True)
- user_id = requester.user.to_string()
- body = parse_json_object_from_request(request)
-
- if device_id is not None:
- # passing the device_id here is deprecated; however, we allow it
- # for now for compatibility with older clients.
- if requester.device_id is not None and device_id != requester.device_id:
- logger.warning(
- "Client uploading keys for a different device "
- "(logged in as %s, uploading for %s)",
- requester.device_id,
- device_id,
- )
- else:
- device_id = requester.device_id
-
- if device_id is None:
- raise SynapseError(
- 400, "To upload keys, you must pass device_id when authenticating"
- )
-
- if body:
- # They're actually trying to upload something, proxy to main synapse.
-
- # Proxy headers from the original request, such as the auth headers
- # (in case the access token is there) and the original IP /
- # User-Agent of the request.
- headers: Dict[bytes, List[bytes]] = {
- header: list(request.requestHeaders.getRawHeaders(header, []))
- for header in (b"Authorization", b"User-Agent")
- }
- # Add the previous hop to the X-Forwarded-For header.
- x_forwarded_for = list(
- request.requestHeaders.getRawHeaders(b"X-Forwarded-For", [])
- )
- # we use request.client here, since we want the previous hop, not the
- # original client (as returned by request.getClientAddress()).
- if isinstance(request.client, (address.IPv4Address, address.IPv6Address)):
- previous_host = request.client.host.encode("ascii")
- # If the header exists, add to the comma-separated list of the first
- # instance of the header. Otherwise, generate a new header.
- if x_forwarded_for:
- x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host]
- x_forwarded_for.extend(x_forwarded_for[1:])
- else:
- x_forwarded_for = [previous_host]
- headers[b"X-Forwarded-For"] = x_forwarded_for
-
- # Replicate the original X-Forwarded-Proto header. Note that
- # XForwardedForRequest overrides isSecure() to give us the original protocol
- # used by the client, as opposed to the protocol used by our upstream proxy
- # - which is what we want here.
- headers[b"X-Forwarded-Proto"] = [
- b"https" if request.isSecure() else b"http"
- ]
-
- try:
- result = await self.http_client.post_json_get_json(
- self.main_uri + request.uri.decode("ascii"), body, headers=headers
- )
- except HttpResponseException as e:
- raise e.to_synapse_error() from e
- except RequestSendFailed as e:
- raise SynapseError(502, "Failed to talk to master") from e
-
- return 200, result
- else:
- # Just interested in counts.
- result = await self.store.count_e2e_one_time_keys(user_id, device_id)
- return 200, {"one_time_key_counts": result}
-
-
class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 88b3168cbc..c4e2273a95 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -162,7 +162,13 @@ class WorkerConfig(Config):
self.worker_name = config.get("worker_name", self.worker_app)
self.instance_name = self.worker_name or "master"
+ # FIXME: Remove this check after a suitable amount of time.
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+ if self.worker_main_http_uri is not None:
+ logger.warning(
+ "The config option worker_main_http_uri is unused since Synapse 1.72. "
+ "It can be safely removed from your configuration."
+ )
# This option is really only here to support `--manhole` command line
# argument.
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 3d63645726..c21629def8 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Tuple
from twisted.web.server import Request
from synapse.http.server import HttpServer
+from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict
@@ -78,5 +79,71 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
return 200, user_devices
+class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
+ """Ask master to upload keys for the user and send them out over federation to
+ update other servers.
+
+ For now, only the master is permitted to handle key upload requests;
+ any worker can handle key query requests (since they're read-only).
+
+ Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
+ the main process to accomplish this.
+
+ Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
+ Request format(borrowed and expanded from KeyUploadServlet):
+
+ POST /_synapse/replication/upload_keys_for_user
+
+ {
+ "user_id": "<user_id>",
+ "device_id": "<device_id>",
+ "keys": {
+ ....this part can be found in KeyUploadServlet in rest/client/keys.py....
+ }
+ }
+
+ Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet
+
+ """
+
+ NAME = "upload_keys_for_user"
+ PATH_ARGS = ()
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.e2e_keys_handler = hs.get_e2e_keys_handler()
+ self.store = hs.get_datastores().main
+ self.clock = hs.get_clock()
+
+ @staticmethod
+ async def _serialize_payload( # type: ignore[override]
+ user_id: str, device_id: str, keys: JsonDict
+ ) -> JsonDict:
+
+ return {
+ "user_id": user_id,
+ "device_id": device_id,
+ "keys": keys,
+ }
+
+ async def _handle_request( # type: ignore[override]
+ self, request: Request
+ ) -> Tuple[int, JsonDict]:
+ content = parse_json_object_from_request(request)
+
+ user_id = content["user_id"]
+ device_id = content["device_id"]
+ keys = content["keys"]
+
+ results = await self.e2e_keys_handler.upload_keys_for_user(
+ user_id, device_id, keys
+ )
+
+ return 200, results
+
+
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
+ ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index f653d2a3e1..ee038c7192 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -27,6 +27,7 @@ from synapse.http.servlet import (
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import log_kv, set_tag
+from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
from synapse.rest.client._base import client_patterns, interactive_auth_handler
from synapse.types import JsonDict, StreamToken
from synapse.util.cancellation import cancellable
@@ -43,24 +44,48 @@ class KeyUploadServlet(RestServlet):
Content-Type: application/json
{
- "device_keys": {
- "user_id": "<user_id>",
- "device_id": "<device_id>",
- "valid_until_ts": <millisecond_timestamp>,
- "algorithms": [
- "m.olm.curve25519-aes-sha2",
- ]
- "keys": {
- "<algorithm>:<device_id>": "<key_base64>",
+ "device_keys": {
+ "user_id": "<user_id>",
+ "device_id": "<device_id>",
+ "valid_until_ts": <millisecond_timestamp>,
+ "algorithms": [
+ "m.olm.curve25519-aes-sha2",
+ ]
+ "keys": {
+ "<algorithm>:<device_id>": "<key_base64>",
+ },
+ "signatures:" {
+ "<user_id>" {
+ "<algorithm>:<device_id>": "<signature_base64>"
+ }
+ }
+ },
+ "fallback_keys": {
+ "<algorithm>:<device_id>": "<key_base64>",
+ "signed_<algorithm>:<device_id>": {
+ "fallback": true,
+ "key": "<key_base64>",
+ "signatures": {
+ "<user_id>": {
+ "<algorithm>:<device_id>": "<key_base64>"
+ }
+ }
+ }
+ }
+ "one_time_keys": {
+ "<algorithm>:<key_id>": "<key_base64>"
},
- "signatures:" {
- "<user_id>" {
- "<algorithm>:<device_id>": "<signature_base64>"
- } } },
- "one_time_keys": {
- "<algorithm>:<key_id>": "<key_base64>"
- },
}
+
+ response, e.g.:
+
+ {
+ "one_time_key_counts": {
+ "curve25519": 10,
+ "signed_curve25519": 20
+ }
+ }
+
"""
PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -71,6 +96,13 @@ class KeyUploadServlet(RestServlet):
self.e2e_keys_handler = hs.get_e2e_keys_handler()
self.device_handler = hs.get_device_handler()
+ if hs.config.worker.worker_app is None:
+ # if main process
+ self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
+ else:
+ # then a worker
+ self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)
+
async def on_POST(
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
@@ -109,8 +141,8 @@ class KeyUploadServlet(RestServlet):
400, "To upload keys, you must pass device_id when authenticating"
)
- result = await self.e2e_keys_handler.upload_keys_for_user(
- user_id, device_id, body
+ result = await self.key_uploader(
+ user_id=user_id, device_id=device_id, keys=body
)
return 200, result
|