diff options
author | Quentin Gliech <quenting@element.io> | 2024-04-25 14:50:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-25 12:50:12 +0000 |
commit | 2e92b718d5ea063af4b2dc9412dcd2ce625b4987 (patch) | |
tree | 9d916997bcd61eaf055c622b2aa27919faeb9cc9 | |
parent | Add type annotation to `visited_chains` (#17125) (diff) | |
download | synapse-2e92b718d5ea063af4b2dc9412dcd2ce625b4987.tar.xz |
MSC4108 implementation (#17056)
Co-authored-by: Hugh Nimmo-Smith <hughns@element.io> Co-authored-by: Hugh Nimmo-Smith <hughns@users.noreply.github.com> Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
-rw-r--r-- | Cargo.lock | 164 | ||||
-rw-r--r-- | changelog.d/17056.feature | 1 | ||||
-rw-r--r-- | rust/Cargo.toml | 4 | ||||
-rw-r--r-- | rust/src/lib.rs | 2 | ||||
-rw-r--r-- | rust/src/rendezvous/mod.rs | 315 | ||||
-rw-r--r-- | rust/src/rendezvous/session.rs | 91 | ||||
-rw-r--r-- | synapse/config/experimental.py | 12 | ||||
-rw-r--r-- | synapse/http/server.py | 5 | ||||
-rw-r--r-- | synapse/rest/client/rendezvous.py | 16 | ||||
-rw-r--r-- | synapse/rest/client/versions.py | 9 | ||||
-rw-r--r-- | synapse/rest/synapse/client/__init__.py | 4 | ||||
-rw-r--r-- | synapse/rest/synapse/client/rendezvous.py | 58 | ||||
-rw-r--r-- | synapse/server.py | 5 | ||||
-rw-r--r-- | synapse/synapse_rust/rendezvous.pyi | 30 | ||||
-rw-r--r-- | tests/rest/client/test_rendezvous.py | 401 | ||||
-rw-r--r-- | tests/server.py | 7 | ||||
-rw-r--r-- | tests/unittest.py | 5 |
17 files changed, 1120 insertions, 9 deletions
diff --git a/Cargo.lock b/Cargo.lock index faac6b3c8a..4474dfb903 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,6 +60,12 @@ dependencies = [ ] [[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + +[[package]] name = "bytes" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -92,9 +98,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.10.5" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", @@ -118,6 +124,19 @@ dependencies = [ ] [[package]] +name = "getrandom" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "wasi", + "wasm-bindgen", +] + +[[package]] name = "headers" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -183,6 +202,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" [[package]] +name = "js-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +dependencies = [ + "wasm-bindgen", +] + +[[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -267,6 +295,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" [[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + +[[package]] name = "proc-macro2" version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -370,6 +404,36 @@ dependencies = [ ] [[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] name = "redox_syscall" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -462,6 +526,17 @@ dependencies = [ ] [[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] name = "smallvec" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -489,6 +564,7 @@ name = "synapse" version = "0.1.0" dependencies = [ "anyhow", + "base64", "blake2", "bytes", "headers", @@ -496,12 +572,15 @@ dependencies = [ "http", "lazy_static", "log", + "mime", "pyo3", "pyo3-log", "pythonize", "regex", "serde", "serde_json", + "sha2", + "ulid", ] [[package]] @@ -517,6 +596,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" [[package]] +name = "ulid" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" +dependencies = [ + "getrandom", + "rand", + "web-time", +] + +[[package]] name = "unicode-ident" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -535,6 +625,76 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "wasm-bindgen" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] name = "windows-sys" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/changelog.d/17056.feature b/changelog.d/17056.feature new file mode 100644 index 0000000000..b4cbe849e4 --- /dev/null +++ b/changelog.d/17056.feature @@ -0,0 +1 @@ +Implement the rendezvous mechanism described by MSC4108. diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 9ac766182b..d41a216d1c 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -23,11 +23,13 @@ name = "synapse.synapse_rust" [dependencies] anyhow = "1.0.63" +base64 = "0.21.7" bytes = "1.6.0" headers = "0.4.0" http = "1.1.0" lazy_static = "1.4.0" log = "0.4.17" +mime = "0.3.17" pyo3 = { version = "0.20.0", features = [ "macros", "anyhow", @@ -37,8 +39,10 @@ pyo3 = { version = "0.20.0", features = [ pyo3-log = "0.9.0" pythonize = "0.20.0" regex = "1.6.0" +sha2 = "0.10.8" serde = { version = "1.0.144", features = ["derive"] } serde_json = "1.0.85" +ulid = "1.1.2" [features] extension-module = ["pyo3/extension-module"] diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 36a3d64528..9bd1f17ad9 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -7,6 +7,7 @@ pub mod errors; pub mod events; pub mod http; pub mod push; +pub mod rendezvous; lazy_static! { static ref LOGGING_HANDLE: ResetHandle = pyo3_log::init(); @@ -45,6 +46,7 @@ fn synapse_rust(py: Python<'_>, m: &PyModule) -> PyResult<()> { acl::register_module(py, m)?; push::register_module(py, m)?; events::register_module(py, m)?; + rendezvous::register_module(py, m)?; Ok(()) } diff --git a/rust/src/rendezvous/mod.rs b/rust/src/rendezvous/mod.rs new file mode 100644 index 0000000000..c0f5d8b600 --- /dev/null +++ b/rust/src/rendezvous/mod.rs @@ -0,0 +1,315 @@ +/* + * This file is licensed under the Affero General Public License (AGPL) version 3. + * + * Copyright (C) 2024 New Vector, Ltd + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * See the GNU Affero General Public License for more details: + * <https://www.gnu.org/licenses/agpl-3.0.html>. + * + */ + +use std::{ + collections::{BTreeMap, HashMap}, + time::{Duration, SystemTime}, +}; + +use bytes::Bytes; +use headers::{ + AccessControlAllowOrigin, AccessControlExposeHeaders, CacheControl, ContentLength, ContentType, + HeaderMapExt, IfMatch, IfNoneMatch, Pragma, +}; +use http::{header::ETAG, HeaderMap, Response, StatusCode, Uri}; +use mime::Mime; +use pyo3::{ + exceptions::PyValueError, pyclass, pymethods, types::PyModule, Py, PyAny, PyObject, PyResult, + Python, ToPyObject, +}; +use ulid::Ulid; + +use self::session::Session; +use crate::{ + errors::{NotFoundError, SynapseError}, + http::{http_request_from_twisted, http_response_to_twisted, HeaderMapPyExt}, +}; + +mod session; + +// n.b. Because OPTIONS requests are handled by the Python code, we don't need to set Access-Control-Allow-Headers. +fn prepare_headers(headers: &mut HeaderMap, session: &Session) { + headers.typed_insert(AccessControlAllowOrigin::ANY); + headers.typed_insert(AccessControlExposeHeaders::from_iter([ETAG])); + headers.typed_insert(Pragma::no_cache()); + headers.typed_insert(CacheControl::new().with_no_store()); + headers.typed_insert(session.etag()); + headers.typed_insert(session.expires()); + headers.typed_insert(session.last_modified()); +} + +#[pyclass] +struct RendezvousHandler { + base: Uri, + clock: PyObject, + sessions: BTreeMap<Ulid, Session>, + capacity: usize, + max_content_length: u64, + ttl: Duration, +} + +impl RendezvousHandler { + /// Check the input headers of a request which sets data for a session, and return the content type. + fn check_input_headers(&self, headers: &HeaderMap) -> PyResult<Mime> { + let ContentLength(content_length) = headers.typed_get_required()?; + + if content_length > self.max_content_length { + return Err(SynapseError::new( + StatusCode::PAYLOAD_TOO_LARGE, + "Payload too large".to_owned(), + "M_TOO_LARGE", + None, + None, + )); + } + + let content_type: ContentType = headers.typed_get_required()?; + + // Content-Type must be text/plain + if content_type != ContentType::text() { + return Err(SynapseError::new( + StatusCode::BAD_REQUEST, + "Content-Type must be text/plain".to_owned(), + "M_INVALID_PARAM", + None, + None, + )); + } + + Ok(content_type.into()) + } + + /// Evict expired sessions and remove the oldest sessions until we're under the capacity. + fn evict(&mut self, now: SystemTime) { + // First remove all the entries which expired + self.sessions.retain(|_, session| !session.expired(now)); + + // Then we remove the oldest entires until we're under the limit + while self.sessions.len() > self.capacity { + self.sessions.pop_first(); + } + } +} + +#[pymethods] +impl RendezvousHandler { + #[new] + #[pyo3(signature = (homeserver, /, capacity=100, max_content_length=4*1024, eviction_interval=60*1000, ttl=60*1000))] + fn new( + py: Python<'_>, + homeserver: &PyAny, + capacity: usize, + max_content_length: u64, + eviction_interval: u64, + ttl: u64, + ) -> PyResult<Py<Self>> { + let base: String = homeserver + .getattr("config")? + .getattr("server")? + .getattr("public_baseurl")? + .extract()?; + let base = Uri::try_from(format!("{base}_synapse/client/rendezvous")) + .map_err(|_| PyValueError::new_err("Invalid base URI"))?; + + let clock = homeserver.call_method0("get_clock")?.to_object(py); + + // Construct a Python object so that we can get a reference to the + // evict method and schedule it to run. + let self_ = Py::new( + py, + Self { + base, + clock, + sessions: BTreeMap::new(), + capacity, + max_content_length, + ttl: Duration::from_millis(ttl), + }, + )?; + + let evict = self_.getattr(py, "_evict")?; + homeserver.call_method0("get_clock")?.call_method( + "looping_call", + (evict, eviction_interval), + None, + )?; + + Ok(self_) + } + + fn _evict(&mut self, py: Python<'_>) -> PyResult<()> { + let clock = self.clock.as_ref(py); + let now: u64 = clock.call_method0("time_msec")?.extract()?; + let now = SystemTime::UNIX_EPOCH + Duration::from_millis(now); + self.evict(now); + + Ok(()) + } + + fn handle_post(&mut self, py: Python<'_>, twisted_request: &PyAny) -> PyResult<()> { + let request = http_request_from_twisted(twisted_request)?; + + let content_type = self.check_input_headers(request.headers())?; + + let clock = self.clock.as_ref(py); + let now: u64 = clock.call_method0("time_msec")?.extract()?; + let now = SystemTime::UNIX_EPOCH + Duration::from_millis(now); + + // We trigger an immediate eviction if we're at 2x the capacity + if self.sessions.len() >= self.capacity * 2 { + self.evict(now); + } + + // Generate a new ULID for the session from the current time. + let id = Ulid::from_datetime(now); + + let uri = format!("{base}/{id}", base = self.base); + + let body = request.into_body(); + + let session = Session::new(body, content_type, now, self.ttl); + + let response = serde_json::json!({ + "url": uri, + }) + .to_string(); + + let mut response = Response::new(response.as_bytes()); + *response.status_mut() = StatusCode::CREATED; + response.headers_mut().typed_insert(ContentType::json()); + prepare_headers(response.headers_mut(), &session); + http_response_to_twisted(twisted_request, response)?; + + self.sessions.insert(id, session); + + Ok(()) + } + + fn handle_get(&mut self, py: Python<'_>, twisted_request: &PyAny, id: &str) -> PyResult<()> { + let request = http_request_from_twisted(twisted_request)?; + + let if_none_match: Option<IfNoneMatch> = request.headers().typed_get_optional()?; + + let now: u64 = self.clock.call_method0(py, "time_msec")?.extract(py)?; + let now = SystemTime::UNIX_EPOCH + Duration::from_millis(now); + + let id: Ulid = id.parse().map_err(|_| NotFoundError::new())?; + let session = self + .sessions + .get(&id) + .filter(|s| !s.expired(now)) + .ok_or_else(NotFoundError::new)?; + + if let Some(if_none_match) = if_none_match { + if !if_none_match.precondition_passes(&session.etag()) { + let mut response = Response::new(Bytes::new()); + *response.status_mut() = StatusCode::NOT_MODIFIED; + prepare_headers(response.headers_mut(), session); + http_response_to_twisted(twisted_request, response)?; + return Ok(()); + } + } + + let mut response = Response::new(session.data()); + *response.status_mut() = StatusCode::OK; + let headers = response.headers_mut(); + prepare_headers(headers, session); + headers.typed_insert(session.content_type()); + headers.typed_insert(session.content_length()); + http_response_to_twisted(twisted_request, response)?; + + Ok(()) + } + + fn handle_put(&mut self, py: Python<'_>, twisted_request: &PyAny, id: &str) -> PyResult<()> { + let request = http_request_from_twisted(twisted_request)?; + + let content_type = self.check_input_headers(request.headers())?; + + let if_match: IfMatch = request.headers().typed_get_required()?; + + let data = request.into_body(); + + let now: u64 = self.clock.call_method0(py, "time_msec")?.extract(py)?; + let now = SystemTime::UNIX_EPOCH + Duration::from_millis(now); + + let id: Ulid = id.parse().map_err(|_| NotFoundError::new())?; + let session = self + .sessions + .get_mut(&id) + .filter(|s| !s.expired(now)) + .ok_or_else(NotFoundError::new)?; + + if !if_match.precondition_passes(&session.etag()) { + let mut headers = HeaderMap::new(); + prepare_headers(&mut headers, session); + + let mut additional_fields = HashMap::with_capacity(1); + additional_fields.insert( + String::from("org.matrix.msc4108.errcode"), + String::from("M_CONCURRENT_WRITE"), + ); + + return Err(SynapseError::new( + StatusCode::PRECONDITION_FAILED, + "ETag does not match".to_owned(), + "M_UNKNOWN", // Would be M_CONCURRENT_WRITE + Some(additional_fields), + Some(headers), + )); + } + + session.update(data, content_type, now); + + let mut response = Response::new(Bytes::new()); + *response.status_mut() = StatusCode::ACCEPTED; + prepare_headers(response.headers_mut(), session); + http_response_to_twisted(twisted_request, response)?; + + Ok(()) + } + + fn handle_delete(&mut self, twisted_request: &PyAny, id: &str) -> PyResult<()> { + let _request = http_request_from_twisted(twisted_request)?; + + let id: Ulid = id.parse().map_err(|_| NotFoundError::new())?; + let _session = self.sessions.remove(&id).ok_or_else(NotFoundError::new)?; + + let mut response = Response::new(Bytes::new()); + *response.status_mut() = StatusCode::NO_CONTENT; + response + .headers_mut() + .typed_insert(AccessControlAllowOrigin::ANY); + http_response_to_twisted(twisted_request, response)?; + + Ok(()) + } +} + +pub fn register_module(py: Python<'_>, m: &PyModule) -> PyResult<()> { + let child_module = PyModule::new(py, "rendezvous")?; + + child_module.add_class::<RendezvousHandler>()?; + + m.add_submodule(child_module)?; + + // We need to manually add the module to sys.modules to make `from + // synapse.synapse_rust import rendezvous` work. + py.import("sys")? + .getattr("modules")? + .set_item("synapse.synapse_rust.rendezvous", child_module)?; + + Ok(()) +} diff --git a/rust/src/rendezvous/session.rs b/rust/src/rendezvous/session.rs new file mode 100644 index 0000000000..179304edfe --- /dev/null +++ b/rust/src/rendezvous/session.rs @@ -0,0 +1,91 @@ +/* + * This file is licensed under the Affero General Public License (AGPL) version 3. + * + * Copyright (C) 2024 New Vector, Ltd + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * See the GNU Affero General Public License for more details: + * <https://www.gnu.org/licenses/agpl-3.0.html>. + */ + +use std::time::{Duration, SystemTime}; + +use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; +use bytes::Bytes; +use headers::{ContentLength, ContentType, ETag, Expires, LastModified}; +use mime::Mime; +use sha2::{Digest, Sha256}; + +/// A single session, containing data, metadata, and expiry information. +pub struct Session { + hash: [u8; 32], + data: Bytes, + content_type: Mime, + last_modified: SystemTime, + expires: SystemTime, +} + +impl Session { + /// Create a new session with the given data, content type, and time-to-live. + pub fn new(data: Bytes, content_type: Mime, now: SystemTime, ttl: Duration) -> Self { + let hash = Sha256::digest(&data).into(); + Self { + hash, + data, + content_type, + expires: now + ttl, + last_modified: now, + } + } + + /// Returns true if the session has expired at the given time. + pub fn expired(&self, now: SystemTime) -> bool { + self.expires <= now + } + + /// Update the session with new data, content type, and last modified time. + pub fn update(&mut self, data: Bytes, content_type: Mime, now: SystemTime) { + self.hash = Sha256::digest(&data).into(); + self.data = data; + self.content_type = content_type; + self.last_modified = now; + } + + /// Returns the Content-Type header of the session. + pub fn content_type(&self) -> ContentType { + self.content_type.clone().into() + } + + /// Returns the Content-Length header of the session. + pub fn content_length(&self) -> ContentLength { + ContentLength(self.data.len() as _) + } + + /// Returns the ETag header of the session. + pub fn etag(&self) -> ETag { + let encoded = URL_SAFE_NO_PAD.encode(self.hash); + // SAFETY: Base64 encoding is URL-safe, so ETag-safe + format!("\"{encoded}\"") + .parse() + .expect("base64-encoded hash should be URL-safe") + } + + /// Returns the Last-Modified header of the session. + pub fn last_modified(&self) -> LastModified { + self.last_modified.into() + } + + /// Returns the Expires header of the session. + pub fn expires(&self) -> Expires { + self.expires.into() + } + + /// Returns the current data stored in the session. + pub fn data(&self) -> Bytes { + self.data.clone() + } +} diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 353ae23f91..baa3580f29 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -413,12 +413,22 @@ class ExperimentalConfig(Config): ) # MSC4108: Mechanism to allow OIDC sign in and E2EE set up via QR code + self.msc4108_enabled = experimental.get("msc4108_enabled", False) + self.msc4108_delegation_endpoint: Optional[str] = experimental.get( "msc4108_delegation_endpoint", None ) - if self.msc4108_delegation_endpoint is not None and not self.msc3861.enabled: + if ( + self.msc4108_enabled or self.msc4108_delegation_endpoint is not None + ) and not self.msc3861.enabled: raise ConfigError( "MSC4108 requires MSC3861 to be enabled", ("experimental", "msc4108_delegation_endpoint"), ) + + if self.msc4108_delegation_endpoint is not None and self.msc4108_enabled: + raise ConfigError( + "You cannot have MSC4108 both enabled and delegated at the same time", + ("experimental", "msc4108_delegation_endpoint"), + ) diff --git a/synapse/http/server.py b/synapse/http/server.py index 45b2cbffcd..211795dc39 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -909,8 +909,9 @@ def set_cors_headers(request: "SynapseRequest") -> None: request.setHeader( b"Access-Control-Allow-Methods", b"GET, HEAD, POST, PUT, DELETE, OPTIONS" ) - if request.path is not None and request.path.startswith( - b"/_matrix/client/unstable/org.matrix.msc4108/rendezvous" + if request.path is not None and ( + request.path == b"/_matrix/client/unstable/org.matrix.msc4108/rendezvous" + or request.path.startswith(b"/_synapse/client/rendezvous") ): request.setHeader( b"Access-Control-Allow-Headers", diff --git a/synapse/rest/client/rendezvous.py b/synapse/rest/client/rendezvous.py index ed06a29987..143f057651 100644 --- a/synapse/rest/client/rendezvous.py +++ b/synapse/rest/client/rendezvous.py @@ -97,9 +97,25 @@ class MSC4108DelegationRendezvousServlet(RestServlet): ) +class MSC4108RendezvousServlet(RestServlet): + PATTERNS = client_patterns( + "/org.matrix.msc4108/rendezvous$", releases=[], v1=False, unstable=True + ) + + def __init__(self, hs: "HomeServer") -> None: + super().__init__() + self._handler = hs.get_rendezvous_handler() + + def on_POST(self, request: SynapseRequest) -> None: + self._handler.handle_post(request) + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: if hs.config.experimental.msc3886_endpoint is not None: MSC3886RendezvousServlet(hs).register(http_server) + if hs.config.experimental.msc4108_enabled: + MSC4108RendezvousServlet(hs).register(http_server) + if hs.config.experimental.msc4108_delegation_endpoint is not None: MSC4108DelegationRendezvousServlet(hs).register(http_server) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 638d4c45ae..fa453a3b02 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -141,8 +141,13 @@ class VersionsRestServlet(RestServlet): # Allows clients to handle push for encrypted events. "org.matrix.msc4028": self.config.experimental.msc4028_push_encrypted_events, # MSC4108: Mechanism to allow OIDC sign in and E2EE set up via QR code - "org.matrix.msc4108": self.config.experimental.msc4108_delegation_endpoint - is not None, + "org.matrix.msc4108": ( + self.config.experimental.msc4108_enabled + or ( + self.config.experimental.msc4108_delegation_endpoint + is not None + ) + ), }, }, ) diff --git a/synapse/rest/synapse/client/__init__.py b/synapse/rest/synapse/client/__init__.py index 31544867d4..ba6576d4db 100644 --- a/synapse/rest/synapse/client/__init__.py +++ b/synapse/rest/synapse/client/__init__.py @@ -26,6 +26,7 @@ from twisted.web.resource import Resource from synapse.rest.synapse.client.new_user_consent import NewUserConsentResource from synapse.rest.synapse.client.pick_idp import PickIdpResource from synapse.rest.synapse.client.pick_username import pick_username_resource +from synapse.rest.synapse.client.rendezvous import MSC4108RendezvousSessionResource from synapse.rest.synapse.client.sso_register import SsoRegisterResource from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource @@ -76,6 +77,9 @@ def build_synapse_client_resource_tree(hs: "HomeServer") -> Mapping[str, Resourc # To be removed in Synapse v1.32.0. resources["/_matrix/saml2"] = res + if hs.config.experimental.msc4108_enabled: + resources["/_synapse/client/rendezvous"] = MSC4108RendezvousSessionResource(hs) + return resources diff --git a/synapse/rest/synapse/client/rendezvous.py b/synapse/rest/synapse/client/rendezvous.py new file mode 100644 index 0000000000..5216d30d1f --- /dev/null +++ b/synapse/rest/synapse/client/rendezvous.py @@ -0,0 +1,58 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# +# + +import logging +from typing import TYPE_CHECKING, List + +from synapse.api.errors import UnrecognizedRequestError +from synapse.http.server import DirectServeJsonResource +from synapse.http.site import SynapseRequest + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class MSC4108RendezvousSessionResource(DirectServeJsonResource): + isLeaf = True + + def __init__(self, hs: "HomeServer") -> None: + super().__init__() + self._handler = hs.get_rendezvous_handler() + + async def _async_render_GET(self, request: SynapseRequest) -> None: + postpath: List[bytes] = request.postpath # type: ignore + if len(postpath) != 1: + raise UnrecognizedRequestError() + session_id = postpath[0].decode("ascii") + + self._handler.handle_get(request, session_id) + + def _async_render_PUT(self, request: SynapseRequest) -> None: + postpath: List[bytes] = request.postpath # type: ignore + if len(postpath) != 1: + raise UnrecognizedRequestError() + session_id = postpath[0].decode("ascii") + + self._handler.handle_put(request, session_id) + + def _async_render_DELETE(self, request: SynapseRequest) -> None: + postpath: List[bytes] = request.postpath # type: ignore + if len(postpath) != 1: + raise UnrecognizedRequestError() + session_id = postpath[0].decode("ascii") + + self._handler.handle_delete(request, session_id) diff --git a/synapse/server.py b/synapse/server.py index 6d5a18fb1d..95e319d2e6 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -143,6 +143,7 @@ from synapse.state import StateHandler, StateResolutionHandler from synapse.storage import Databases from synapse.storage.controllers import StorageControllers from synapse.streams.events import EventSources +from synapse.synapse_rust.rendezvous import RendezvousHandler from synapse.types import DomainSpecificString, ISynapseReactor from synapse.util import Clock from synapse.util.distributor import Distributor @@ -860,6 +861,10 @@ class HomeServer(metaclass=abc.ABCMeta): return RoomForgetterHandler(self) @cache_in_self + def get_rendezvous_handler(self) -> RendezvousHandler: + return RendezvousHandler(self) + + @cache_in_self def get_outbound_redis_connection(self) -> "ConnectionHandler": """ The Redis connection used for replication. diff --git a/synapse/synapse_rust/rendezvous.pyi b/synapse/synapse_rust/rendezvous.pyi new file mode 100644 index 0000000000..03eae3a196 --- /dev/null +++ b/synapse/synapse_rust/rendezvous.pyi @@ -0,0 +1,30 @@ +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. + +from twisted.web.iweb import IRequest + +from synapse.server import HomeServer + +class RendezvousHandler: + def __init__( + self, + homeserver: HomeServer, + /, + capacity: int = 100, + max_content_length: int = 4 * 1024, # MSC4108 specifies 4KB + eviction_interval: int = 60 * 1000, + ttl: int = 60 * 1000, + ) -> None: ... + def handle_post(self, request: IRequest) -> None: ... + def handle_get(self, request: IRequest, session_id: str) -> None: ... + def handle_put(self, request: IRequest, session_id: str) -> None: ... + def handle_delete(self, request: IRequest, session_id: str) -> None: ... diff --git a/tests/rest/client/test_rendezvous.py b/tests/rest/client/test_rendezvous.py index c84704c090..0ab754a11a 100644 --- a/tests/rest/client/test_rendezvous.py +++ b/tests/rest/client/test_rendezvous.py @@ -2,7 +2,7 @@ # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2022 The Matrix.org Foundation C.I.C. -# Copyright (C) 2023 New Vector, Ltd +# Copyright (C) 2023-2024 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -19,9 +19,14 @@ # # +from typing import Dict +from urllib.parse import urlparse + from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource from synapse.rest.client import rendezvous +from synapse.rest.synapse.client.rendezvous import MSC4108RendezvousSessionResource from synapse.server import HomeServer from synapse.util import Clock @@ -42,6 +47,12 @@ class RendezvousServletTestCase(unittest.HomeserverTestCase): self.hs = self.setup_test_homeserver() return self.hs + def create_resource_dict(self) -> Dict[str, Resource]: + return { + **super().create_resource_dict(), + "/_synapse/client/rendezvous": MSC4108RendezvousSessionResource(self.hs), + } + def test_disabled(self) -> None: channel = self.make_request("POST", msc3886_endpoint, {}, access_token=None) self.assertEqual(channel.code, 404) @@ -75,3 +86,391 @@ class RendezvousServletTestCase(unittest.HomeserverTestCase): channel = self.make_request("POST", msc4108_endpoint, {}, access_token=None) self.assertEqual(channel.code, 307) self.assertEqual(channel.headers.getRawHeaders("Location"), ["https://asd"]) + + @unittest.skip_unless(HAS_AUTHLIB, "requires authlib") + @override_config( + { + "disable_registration": True, + "experimental_features": { + "msc4108_enabled": True, + "msc3861": { + "enabled": True, + "issuer": "https://issuer", + "client_id": "client_id", + "client_auth_method": "client_secret_post", + "client_secret": "client_secret", + "admin_token": "admin_token_value", + }, + }, + } + ) + def test_msc4108(self) -> None: + """ + Test the MSC4108 rendezvous endpoint, including: + - Creating a session + - Getting the data back + - Updating the data + - Deleting the data + - ETag handling + """ + # We can post arbitrary data to the endpoint + channel = self.make_request( + "POST", + msc4108_endpoint, + "foo=bar", + content_type=b"text/plain", + access_token=None, + ) + self.assertEqual(channel.code, 201) + self.assertSubstring("/_synapse/client/rendezvous/", channel.json_body["url"]) + headers = dict(channel.headers.getAllRawHeaders()) + self.assertIn(b"ETag", headers) + self.assertIn(b"Expires", headers) + self.assertEqual(headers[b"Content-Type"], [b"application/json"]) + self.assertEqual(headers[b"Access-Control-Allow-Origin"], [b"*"]) + self.assertEqual(headers[b"Access-Control-Expose-Headers"], [b"etag"]) + self.assertEqual(headers[b"Cache-Control"], [b"no-store"]) + self.assertEqual(headers[b"Pragma"], [b"no-cache"]) + self.assertIn("url", channel.json_body) + self.assertTrue(channel.json_body["url"].startswith("https://")) + + url = urlparse(channel.json_body["url"]) + session_endpoint = url.path + etag = headers[b"ETag"][0] + + # We can get the data back + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + + self.assertEqual(channel.code, 200) + headers = dict(channel.headers.getAllRawHeaders()) + self.assertEqual(headers[b"ETag"], [etag]) + self.assertIn(b"Expires", headers) + self.assertEqual(headers[b"Content-Type"], [b"text/plain"]) + self.assertEqual(headers[b"Access-Control-Allow-Origin"], [b"*"]) + self.assertEqual(headers[b"Access-Control-Expose-Headers"], [b"etag"]) + self.assertEqual(headers[b"Cache-Control"], [b"no-store"]) + self.assertEqual(headers[b"Pragma"], [b"no-cache"]) + self.assertEqual(channel.text_body, "foo=bar") + + # We can make sure the data hasn't changed + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + custom_headers=[("If-None-Match", etag)], + ) + + self.assertEqual(channel.code, 304) + + # We can update the data + channel = self.make_request( + "PUT", + session_endpoint, + "foo=baz", + content_type=b"text/plain", + access_token=None, + custom_headers=[("If-Match", etag)], + ) + + self.assertEqual(channel.code, 202) + headers = dict(channel.headers.getAllRawHeaders()) + old_etag = etag + new_etag = headers[b"ETag"][0] + + # If we try to update it again with the old etag, it should fail + channel = self.make_request( + "PUT", + session_endpoint, + "bar=baz", + content_type=b"text/plain", + access_token=None, + custom_headers=[("If-Match", old_etag)], + ) + + self.assertEqual(channel.code, 412) + self.assertEqual(channel.json_body["errcode"], "M_UNKNOWN") + self.assertEqual( + channel.json_body["org.matrix.msc4108.errcode"], "M_CONCURRENT_WRITE" + ) + + # If we try to get with the old etag, we should get the updated data + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + custom_headers=[("If-None-Match", old_etag)], + ) + + self.assertEqual(channel.code, 200) + headers = dict(channel.headers.getAllRawHeaders()) + self.assertEqual(headers[b"ETag"], [new_etag]) + self.assertEqual(channel.text_body, "foo=baz") + + # We can delete the data + channel = self.make_request( + "DELETE", + session_endpoint, + access_token=None, + ) + + self.assertEqual(channel.code, 204) + + # If we try to get the data again, it should fail + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + + self.assertEqual(channel.code, 404) + self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND") + + @unittest.skip_unless(HAS_AUTHLIB, "requires authlib") + @override_config( + { + "disable_registration": True, + "experimental_features": { + "msc4108_enabled": True, + "msc3861": { + "enabled": True, + "issuer": "https://issuer", + "client_id": "client_id", + "client_auth_method": "client_secret_post", + "client_secret": "client_secret", + "admin_token": "admin_token_value", + }, + }, + } + ) + def test_msc4108_expiration(self) -> None: + """ + Test that entries are evicted after a TTL. + """ + # Start a new session + channel = self.make_request( + "POST", + msc4108_endpoint, + "foo=bar", + content_type=b"text/plain", + access_token=None, + ) + self.assertEqual(channel.code, 201) + session_endpoint = urlparse(channel.json_body["url"]).path + + # Sanity check that we can get the data back + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + self.assertEqual(channel.code, 200) + self.assertEqual(channel.text_body, "foo=bar") + + # Advance the clock, TTL of entries is 1 minute + self.reactor.advance(60) + + # Get the data back, it should be gone + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + self.assertEqual(channel.code, 404) + + @unittest.skip_unless(HAS_AUTHLIB, "requires authlib") + @override_config( + { + "disable_registration": True, + "experimental_features": { + "msc4108_enabled": True, + "msc3861": { + "enabled": True, + "issuer": "https://issuer", + "client_id": "client_id", + "client_auth_method": "client_secret_post", + "client_secret": "client_secret", + "admin_token": "admin_token_value", + }, + }, + } + ) + def test_msc4108_capacity(self) -> None: + """ + Test that a capacity limit is enforced on the rendezvous sessions, as old + entries are evicted at an interval when the limit is reached. + """ + # Start a new session + channel = self.make_request( + "POST", + msc4108_endpoint, + "foo=bar", + content_type=b"text/plain", + access_token=None, + ) + self.assertEqual(channel.code, 201) + session_endpoint = urlparse(channel.json_body["url"]).path + + # Sanity check that we can get the data back + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + self.assertEqual(channel.code, 200) + self.assertEqual(channel.text_body, "foo=bar") + + # Start a lot of new sessions + for _ in range(100): + channel = self.make_request( + "POST", + msc4108_endpoint, + "foo=bar", + content_type=b"text/plain", + access_token=None, + ) + self.assertEqual(channel.code, 201) + + # Get the data back, it should still be there, as the eviction hasn't run yet + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + + self.assertEqual(channel.code, 200) + + # Advance the clock, as it will trigger the eviction + self.reactor.advance(1) + + # Get the data back, it should be gone + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + + @unittest.skip_unless(HAS_AUTHLIB, "requires authlib") + @override_config( + { + "disable_registration": True, + "experimental_features": { + "msc4108_enabled": True, + "msc3861": { + "enabled": True, + "issuer": "https://issuer", + "client_id": "client_id", + "client_auth_method": "client_secret_post", + "client_secret": "client_secret", + "admin_token": "admin_token_value", + }, + }, + } + ) + def test_msc4108_hard_capacity(self) -> None: + """ + Test that a hard capacity limit is enforced on the rendezvous sessions, as old + entries are evicted immediately when the limit is reached. + """ + # Start a new session + channel = self.make_request( + "POST", + msc4108_endpoint, + "foo=bar", + content_type=b"text/plain", + access_token=None, + ) + self.assertEqual(channel.code, 201) + session_endpoint = urlparse(channel.json_body["url"]).path + # We advance the clock to make sure that this entry is the "lowest" in the session list + self.reactor.advance(1) + + # Sanity check that we can get the data back + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + self.assertEqual(channel.code, 200) + self.assertEqual(channel.text_body, "foo=bar") + + # Start a lot of new sessions + for _ in range(200): + channel = self.make_request( + "POST", + msc4108_endpoint, + "foo=bar", + content_type=b"text/plain", + access_token=None, + ) + self.assertEqual(channel.code, 201) + + # Get the data back, it should already be gone as we hit the hard limit + channel = self.make_request( + "GET", + session_endpoint, + access_token=None, + ) + + self.assertEqual(channel.code, 404) + + @unittest.skip_unless(HAS_AUTHLIB, "requires authlib") + @override_config( + { + "disable_registration": True, + "experimental_features": { + "msc4108_enabled": True, + "msc3861": { + "enabled": True, + "issuer": "https://issuer", + "client_id": "client_id", + "client_auth_method": "client_secret_post", + "client_secret": "client_secret", + "admin_token": "admin_token_value", + }, + }, + } + ) + def test_msc4108_content_type(self) -> None: + """ + Test that the content-type is restricted to text/plain. + """ + # We cannot post invalid content-type arbitrary data to the endpoint + channel = self.make_request( + "POST", + msc4108_endpoint, + "foo=bar", + content_is_form=True, + access_token=None, + ) + self.assertEqual(channel.code, 400) + self.assertEqual(channel.json_body["errcode"], "M_INVALID_PARAM") + + # Make a valid request + channel = self.make_request( + "POST", + msc4108_endpoint, + "foo=bar", + content_type=b"text/plain", + access_token=None, + ) + self.assertEqual(channel.code, 201) + url = urlparse(channel.json_body["url"]) + session_endpoint = url.path + headers = dict(channel.headers.getAllRawHeaders()) + etag = headers[b"ETag"][0] + + # We can't update the data with invalid content-type + channel = self.make_request( + "PUT", + session_endpoint, + "foo=baz", + content_is_form=True, + access_token=None, + custom_headers=[("If-Match", etag)], + ) + self.assertEqual(channel.code, 400) + self.assertEqual(channel.json_body["errcode"], "M_INVALID_PARAM") diff --git a/tests/server.py b/tests/server.py index 4aaa91e956..434be3d22c 100644 --- a/tests/server.py +++ b/tests/server.py @@ -351,6 +351,7 @@ def make_request( request: Type[Request] = SynapseRequest, shorthand: bool = True, federation_auth_origin: Optional[bytes] = None, + content_type: Optional[bytes] = None, content_is_form: bool = False, await_result: bool = True, custom_headers: Optional[Iterable[CustomHeaderType]] = None, @@ -373,6 +374,8 @@ def make_request( with the usual REST API path, if it doesn't contain it. federation_auth_origin: if set to not-None, we will add a fake Authorization header pretenting to be the given server name. + content_type: The content-type to use for the request. If not set then will default to + application/json unless content_is_form is true. content_is_form: Whether the content is URL encoded form data. Adds the 'Content-Type': 'application/x-www-form-urlencoded' header. await_result: whether to wait for the request to complete rendering. If true, @@ -436,7 +439,9 @@ def make_request( ) if content: - if content_is_form: + if content_type is not None: + req.requestHeaders.addRawHeader(b"Content-Type", content_type) + elif content_is_form: req.requestHeaders.addRawHeader( b"Content-Type", b"application/x-www-form-urlencoded" ) diff --git a/tests/unittest.py b/tests/unittest.py index 6fe0cd4a2d..e6aad9ed40 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -523,6 +523,7 @@ class HomeserverTestCase(TestCase): request: Type[Request] = SynapseRequest, shorthand: bool = True, federation_auth_origin: Optional[bytes] = None, + content_type: Optional[bytes] = None, content_is_form: bool = False, await_result: bool = True, custom_headers: Optional[Iterable[CustomHeaderType]] = None, @@ -541,6 +542,9 @@ class HomeserverTestCase(TestCase): with the usual REST API path, if it doesn't contain it. federation_auth_origin: if set to not-None, we will add a fake Authorization header pretenting to be the given server name. + + content_type: The content-type to use for the request. If not set then will default to + application/json unless content_is_form is true. content_is_form: Whether the content is URL encoded form data. Adds the 'Content-Type': 'application/x-www-form-urlencoded' header. @@ -566,6 +570,7 @@ class HomeserverTestCase(TestCase): request, shorthand, federation_auth_origin, + content_type, content_is_form, await_result, custom_headers, |