From 6512474c24ff99b997f7d5f018fc0b10662bb138 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 26 Jul 2025 16:59:18 +0200 Subject: Add support for bulk event sending, back off on server failure --- LibMatrix/Extensions/MatrixHttpClient.Single.cs | 125 ++++++++++++++++++------ 1 file changed, 93 insertions(+), 32 deletions(-) (limited to 'LibMatrix/Extensions/MatrixHttpClient.Single.cs') diff --git a/LibMatrix/Extensions/MatrixHttpClient.Single.cs b/LibMatrix/Extensions/MatrixHttpClient.Single.cs index 671566f..d14e481 100644 --- a/LibMatrix/Extensions/MatrixHttpClient.Single.cs +++ b/LibMatrix/Extensions/MatrixHttpClient.Single.cs @@ -4,13 +4,12 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Net; using System.Net.Http.Headers; +using System.Net.Http.Json; using System.Reflection; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; -using ArcaneLibs; using ArcaneLibs.Extensions; -using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Requests; namespace LibMatrix.Extensions; @@ -169,31 +168,38 @@ public class MatrixHttpClient { }; // if (!content.StartsWith('{')) throw new InvalidDataException("Encountered invalid data:\n" + content); - if (!content.TrimStart().StartsWith('{')) { - responseMessage.EnsureSuccessStatusCode(); - throw new InvalidDataException("Encountered invalid data:\n" + content); + if (content.TrimStart().StartsWith('{')) { + //we have a matrix error, most likely + MatrixException? ex; + try { + ex = JsonSerializer.Deserialize(content); + } + catch (JsonException e) { + throw new LibMatrixException() { + ErrorCode = "M_INVALID_JSON", + Error = e.Message + "\nBody:\n" + await responseMessage.Content.ReadAsStringAsync(cancellationToken) + }; + } + + Debug.Assert(ex != null, nameof(ex) + " != null"); + ex.RawContent = content; + // Console.WriteLine($"Failed to send request: {ex}"); + if (ex.RetryAfterMs is null) throw ex!; + //we have a ratelimit error + await Task.Delay(ex.RetryAfterMs.Value, cancellationToken); + request.ResetSendStatus(); + return await SendAsync(request, cancellationToken); } - //we have a matrix error - MatrixException? ex; - try { - ex = JsonSerializer.Deserialize(content); - } - catch (JsonException e) { - throw new LibMatrixException() { - ErrorCode = "M_INVALID_JSON", - Error = e.Message + "\nBody:\n" + await responseMessage.Content.ReadAsStringAsync(cancellationToken) - }; + if (responseMessage.StatusCode == HttpStatusCode.BadGateway) { + // spread out retries + await Task.Delay(Random.Shared.Next(1000, 2000), cancellationToken); + request.ResetSendStatus(); + return await SendAsync(request, cancellationToken); } - Debug.Assert(ex != null, nameof(ex) + " != null"); - ex.RawContent = content; - // Console.WriteLine($"Failed to send request: {ex}"); - if (ex.RetryAfterMs is null) throw ex!; - //we have a ratelimit error - await Task.Delay(ex.RetryAfterMs.Value, cancellationToken); - request.ResetSendStatus(); - return await SendAsync(request, cancellationToken); + responseMessage.EnsureSuccessStatusCode(); + throw new InvalidDataException("Encountered invalid data:\n" + content); } // GetAsync @@ -241,22 +247,16 @@ public class MatrixHttpClient { options = GetJsonSerializerOptions(options); var request = new HttpRequestMessage(HttpMethod.Put, requestUri); request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); - request.Content = new StringContent(JsonSerializer.Serialize(value, value.GetType(), options), - Encoding.UTF8, "application/json"); + request.Content = JsonContent.Create(value, value.GetType(), new MediaTypeHeaderValue("application/json", "utf-8"), options); return await SendAsync(request, cancellationToken); } public async Task PostAsJsonAsync([StringSyntax(StringSyntaxAttribute.Uri)] string? requestUri, T value, JsonSerializerOptions? options = null, CancellationToken cancellationToken = default) where T : notnull { - options ??= new JsonSerializerOptions(); - options.Converters.Add(new JsonFloatStringConverter()); - options.Converters.Add(new JsonDoubleStringConverter()); - options.Converters.Add(new JsonDecimalStringConverter()); - options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull; + options = GetJsonSerializerOptions(options); var request = new HttpRequestMessage(HttpMethod.Post, requestUri); request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); - request.Content = new StringContent(JsonSerializer.Serialize(value, value.GetType(), options), - Encoding.UTF8, "application/json"); + request.Content = JsonContent.Create(value, value.GetType(), new MediaTypeHeaderValue("application/json", "utf-8"), options); return await SendAsync(request, cancellationToken); } @@ -297,5 +297,66 @@ public class MatrixHttpClient { }; return await SendAsync(request); } + + public async Task PostAsyncEnumerableAsJsonAsync(string url, IAsyncEnumerable payload, JsonSerializerOptions? options = null, + CancellationToken cancellationToken = default) { + options = GetJsonSerializerOptions(options); + var request = new HttpRequestMessage(HttpMethod.Post, url); + request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); + request.Content = new AsyncEnumerableJsonContent(payload, options); + return await SendAsync(request, cancellationToken); + } + + public async Task PutAsyncEnumerableAsJsonAsync(string url, IAsyncEnumerable payload, JsonSerializerOptions? options = null, + CancellationToken cancellationToken = default) { + options = GetJsonSerializerOptions(options); + var request = new HttpRequestMessage(HttpMethod.Put, url); + request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); + request.Content = new AsyncEnumerableJsonContent(payload, options); + return await SendAsync(request, cancellationToken); + } +} + +public class AsyncEnumerableJsonContent(IAsyncEnumerable payload, JsonSerializerOptions? options) : HttpContent { + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) { + await using var writer = new Utf8JsonWriter(stream, new JsonWriterOptions { + Indented = options?.WriteIndented ?? true, + Encoder = options?.Encoder, + IndentCharacter = options?.IndentCharacter ?? ' ', + IndentSize = options?.IndentSize ?? 4, + MaxDepth = options?.MaxDepth ?? 0, + NewLine = options?.NewLine ?? Environment.NewLine + }); + + writer.WriteStartArray(); + await writer.FlushAsync(); + await stream.FlushAsync(); + + await foreach (var item in payload) { + if (item is null) { + writer.WriteNullValue(); + } + else { + using var memoryStream = new MemoryStream(); + await JsonSerializer.SerializeAsync(memoryStream, item, item.GetType(), options); + + memoryStream.Position = 0; + var jsonBytes = memoryStream.ToArray(); + writer.WriteRawValue(jsonBytes, skipInputValidation: true); + } + + await writer.FlushAsync(); + await stream.FlushAsync(); + } + + writer.WriteEndArray(); + await writer.FlushAsync(); + await stream.FlushAsync(); + } + + protected override bool TryComputeLength(out long length) { + length = 0; + return false; + } } #endif \ No newline at end of file -- cgit 1.5.1