summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/generic_worker.py103
-rw-r--r--synapse/config/workers.py6
-rw-r--r--synapse/replication/http/devices.py67
-rw-r--r--synapse/rest/client/keys.py68
4 files changed, 125 insertions, 119 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1d9aef45c2..74909b7d4a 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -14,14 +14,12 @@
 # limitations under the License.
 import logging
 import sys
-from typing import Dict, List, Optional, Tuple
+from typing import Dict, List
 
-from twisted.internet import address
 from twisted.web.resource import Resource
 
 import synapse
 import synapse.events
-from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
 from synapse.api.urls import (
     CLIENT_API_PREFIX,
     FEDERATION_PREFIX,
@@ -43,8 +41,6 @@ from synapse.config.logger import setup_logging
 from synapse.config.server import ListenerConfig
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.server import JsonResource, OptionsResource
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.http.site import SynapseRequest
 from synapse.logging.context import LoggingContext
 from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
@@ -70,12 +66,12 @@ from synapse.rest.client import (
     versions,
     voip,
 )
-from synapse.rest.client._base import client_patterns
 from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
 from synapse.rest.client.devices import DevicesRestServlet
 from synapse.rest.client.keys import (
     KeyChangesServlet,
     KeyQueryServlet,
+    KeyUploadServlet,
     OneTimeKeyServlet,
 )
 from synapse.rest.client.register import (
@@ -132,107 +128,12 @@ from synapse.storage.databases.main.transactions import TransactionWorkerStore
 from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
 from synapse.storage.databases.main.user_directory import UserDirectoryStore
 from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
-from synapse.types import JsonDict
 from synapse.util import SYNAPSE_VERSION
 from synapse.util.httpresourcetree import create_resource_tree
 
 logger = logging.getLogger("synapse.app.generic_worker")
 
 
-class KeyUploadServlet(RestServlet):
-    """An implementation of the `KeyUploadServlet` that responds to read only
-    requests, but otherwise proxies through to the master instance.
-    """
-
-    PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
-
-    def __init__(self, hs: HomeServer):
-        """
-        Args:
-            hs: server
-        """
-        super().__init__()
-        self.auth = hs.get_auth()
-        self.store = hs.get_datastores().main
-        self.http_client = hs.get_simple_http_client()
-        self.main_uri = hs.config.worker.worker_main_http_uri
-
-    async def on_POST(
-        self, request: SynapseRequest, device_id: Optional[str]
-    ) -> Tuple[int, JsonDict]:
-        requester = await self.auth.get_user_by_req(request, allow_guest=True)
-        user_id = requester.user.to_string()
-        body = parse_json_object_from_request(request)
-
-        if device_id is not None:
-            # passing the device_id here is deprecated; however, we allow it
-            # for now for compatibility with older clients.
-            if requester.device_id is not None and device_id != requester.device_id:
-                logger.warning(
-                    "Client uploading keys for a different device "
-                    "(logged in as %s, uploading for %s)",
-                    requester.device_id,
-                    device_id,
-                )
-        else:
-            device_id = requester.device_id
-
-        if device_id is None:
-            raise SynapseError(
-                400, "To upload keys, you must pass device_id when authenticating"
-            )
-
-        if body:
-            # They're actually trying to upload something, proxy to main synapse.
-
-            # Proxy headers from the original request, such as the auth headers
-            # (in case the access token is there) and the original IP /
-            # User-Agent of the request.
-            headers: Dict[bytes, List[bytes]] = {
-                header: list(request.requestHeaders.getRawHeaders(header, []))
-                for header in (b"Authorization", b"User-Agent")
-            }
-            # Add the previous hop to the X-Forwarded-For header.
-            x_forwarded_for = list(
-                request.requestHeaders.getRawHeaders(b"X-Forwarded-For", [])
-            )
-            # we use request.client here, since we want the previous hop, not the
-            # original client (as returned by request.getClientAddress()).
-            if isinstance(request.client, (address.IPv4Address, address.IPv6Address)):
-                previous_host = request.client.host.encode("ascii")
-                # If the header exists, add to the comma-separated list of the first
-                # instance of the header. Otherwise, generate a new header.
-                if x_forwarded_for:
-                    x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host]
-                    x_forwarded_for.extend(x_forwarded_for[1:])
-                else:
-                    x_forwarded_for = [previous_host]
-            headers[b"X-Forwarded-For"] = x_forwarded_for
-
-            # Replicate the original X-Forwarded-Proto header. Note that
-            # XForwardedForRequest overrides isSecure() to give us the original protocol
-            # used by the client, as opposed to the protocol used by our upstream proxy
-            # - which is what we want here.
-            headers[b"X-Forwarded-Proto"] = [
-                b"https" if request.isSecure() else b"http"
-            ]
-
-            try:
-                result = await self.http_client.post_json_get_json(
-                    self.main_uri + request.uri.decode("ascii"), body, headers=headers
-                )
-            except HttpResponseException as e:
-                raise e.to_synapse_error() from e
-            except RequestSendFailed as e:
-                raise SynapseError(502, "Failed to talk to master") from e
-
-            return 200, result
-        else:
-            # Just interested in counts.
-            result = await self.store.count_e2e_one_time_keys(user_id, device_id)
-            return 200, {"one_time_key_counts": result}
-
-
 class GenericWorkerSlavedStore(
     # FIXME(#3714): We need to add UserDirectoryStore as we write directly
     # rather than going via the correct worker.
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 88b3168cbc..c4e2273a95 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -162,7 +162,13 @@ class WorkerConfig(Config):
         self.worker_name = config.get("worker_name", self.worker_app)
         self.instance_name = self.worker_name or "master"
 
+        # FIXME: Remove this check after a suitable amount of time.
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+        if self.worker_main_http_uri is not None:
+            logger.warning(
+                "The config option worker_main_http_uri is unused since Synapse 1.72. "
+                "It can be safely removed from your configuration."
+            )
 
         # This option is really only here to support `--manhole` command line
         # argument.
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 3d63645726..c21629def8 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Tuple
 from twisted.web.server import Request
 
 from synapse.http.server import HttpServer
+from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
 from synapse.types import JsonDict
 
@@ -78,5 +79,71 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
         return 200, user_devices
 
 
+class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
+    """Ask master to upload keys for the user and send them out over federation to
+    update other servers.
+
+    For now, only the master is permitted to handle key upload requests;
+    any worker can handle key query requests (since they're read-only).
+
+    Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
+    the main process to accomplish this.
+
+    Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
+    Request format(borrowed and expanded from KeyUploadServlet):
+
+        POST /_synapse/replication/upload_keys_for_user
+
+    {
+        "user_id": "<user_id>",
+        "device_id": "<device_id>",
+        "keys": {
+            ....this part can be found in KeyUploadServlet in rest/client/keys.py....
+        }
+    }
+
+    Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet
+
+    """
+
+    NAME = "upload_keys_for_user"
+    PATH_ARGS = ()
+    CACHE = False
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self.e2e_keys_handler = hs.get_e2e_keys_handler()
+        self.store = hs.get_datastores().main
+        self.clock = hs.get_clock()
+
+    @staticmethod
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, device_id: str, keys: JsonDict
+    ) -> JsonDict:
+
+        return {
+            "user_id": user_id,
+            "device_id": device_id,
+            "keys": keys,
+        }
+
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request
+    ) -> Tuple[int, JsonDict]:
+        content = parse_json_object_from_request(request)
+
+        user_id = content["user_id"]
+        device_id = content["device_id"]
+        keys = content["keys"]
+
+        results = await self.e2e_keys_handler.upload_keys_for_user(
+            user_id, device_id, keys
+        )
+
+        return 200, results
+
+
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
+    ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index f653d2a3e1..ee038c7192 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -27,6 +27,7 @@ from synapse.http.servlet import (
 )
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import log_kv, set_tag
+from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
 from synapse.rest.client._base import client_patterns, interactive_auth_handler
 from synapse.types import JsonDict, StreamToken
 from synapse.util.cancellation import cancellable
@@ -43,24 +44,48 @@ class KeyUploadServlet(RestServlet):
     Content-Type: application/json
 
     {
-      "device_keys": {
-        "user_id": "<user_id>",
-        "device_id": "<device_id>",
-        "valid_until_ts": <millisecond_timestamp>,
-        "algorithms": [
-          "m.olm.curve25519-aes-sha2",
-        ]
-        "keys": {
-          "<algorithm>:<device_id>": "<key_base64>",
+        "device_keys": {
+            "user_id": "<user_id>",
+            "device_id": "<device_id>",
+            "valid_until_ts": <millisecond_timestamp>,
+            "algorithms": [
+                "m.olm.curve25519-aes-sha2",
+            ]
+            "keys": {
+                "<algorithm>:<device_id>": "<key_base64>",
+            },
+            "signatures:" {
+                "<user_id>" {
+                    "<algorithm>:<device_id>": "<signature_base64>"
+                }
+            }
+        },
+        "fallback_keys": {
+            "<algorithm>:<device_id>": "<key_base64>",
+            "signed_<algorithm>:<device_id>": {
+                "fallback": true,
+                "key": "<key_base64>",
+                "signatures": {
+                    "<user_id>": {
+                        "<algorithm>:<device_id>": "<key_base64>"
+                    }
+                }
+            }
+        }
+        "one_time_keys": {
+            "<algorithm>:<key_id>": "<key_base64>"
         },
-        "signatures:" {
-          "<user_id>" {
-            "<algorithm>:<device_id>": "<signature_base64>"
-      } } },
-      "one_time_keys": {
-        "<algorithm>:<key_id>": "<key_base64>"
-      },
     }
+
+    response, e.g.:
+
+    {
+        "one_time_key_counts": {
+            "curve25519": 10,
+            "signed_curve25519": 20
+        }
+    }
+
     """
 
     PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -71,6 +96,13 @@ class KeyUploadServlet(RestServlet):
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
         self.device_handler = hs.get_device_handler()
 
+        if hs.config.worker.worker_app is None:
+            # if main process
+            self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
+        else:
+            # then a worker
+            self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)
+
     async def on_POST(
         self, request: SynapseRequest, device_id: Optional[str]
     ) -> Tuple[int, JsonDict]:
@@ -109,8 +141,8 @@ class KeyUploadServlet(RestServlet):
                 400, "To upload keys, you must pass device_id when authenticating"
             )
 
-        result = await self.e2e_keys_handler.upload_keys_for_user(
-            user_id, device_id, body
+        result = await self.key_uploader(
+            user_id=user_id, device_id=device_id, keys=body
         )
         return 200, result