diff --git a/src/timeline/EventStore.cpp b/src/timeline/EventStore.cpp
index 7f21e1ed..b7cf4f96 100644
--- a/src/timeline/EventStore.cpp
+++ b/src/timeline/EventStore.cpp
@@ -1,6 +1,7 @@
#include "EventStore.h"
#include <QThread>
+#include <QTimer>
#include "Cache_p.h"
#include "EventAccessors.h"
@@ -59,6 +60,104 @@ EventStore::EventStore(std::string room_id, QObject *)
}
},
Qt::QueuedConnection);
+
+ connect(this, &EventStore::processPending, this, [this]() {
+ if (!current_txn.empty()) {
+ nhlog::ui()->debug("Already processing {}", current_txn);
+ return;
+ }
+
+ auto event = cache::client()->firstPendingMessage(room_id_);
+
+ if (!event) {
+ nhlog::ui()->debug("No event to send");
+ return;
+ }
+
+ std::visit(
+ [this](auto e) {
+ auto txn_id = e.event_id;
+ this->current_txn = txn_id;
+
+ if (txn_id.empty() || txn_id[0] != 'm') {
+ nhlog::ui()->debug("Invalid txn id '{}'", txn_id);
+ cache::client()->removePendingStatus(room_id_, txn_id);
+ return;
+ }
+
+ if constexpr (mtx::events::message_content_to_type<decltype(e.content)> !=
+ mtx::events::EventType::Unsupported)
+ http::client()->send_room_message(
+ room_id_,
+ txn_id,
+ e.content,
+ [this, txn_id](const mtx::responses::EventId &,
+ mtx::http::RequestErr err) {
+ if (err) {
+ const int status_code =
+ static_cast<int>(err->status_code);
+ nhlog::net()->warn(
+ "[{}] failed to send message: {} {}",
+ txn_id,
+ err->matrix_error.error,
+ status_code);
+ emit messageFailed(txn_id);
+ return;
+ }
+ emit messageSent(txn_id);
+ });
+ },
+ event->data);
+ });
+
+ connect(
+ this,
+ &EventStore::messageFailed,
+ this,
+ [this](std::string txn_id) {
+ if (current_txn == txn_id) {
+ current_txn_error_count++;
+ if (current_txn_error_count > 10) {
+ nhlog::ui()->debug("failing txn id '{}'", txn_id);
+ cache::client()->removePendingStatus(room_id_, txn_id);
+ current_txn_error_count = 0;
+ }
+ }
+ QTimer::singleShot(1000, this, [this]() {
+ nhlog::ui()->debug("timeout");
+ this->current_txn = "";
+ emit processPending();
+ });
+ },
+ Qt::QueuedConnection);
+
+ connect(
+ this,
+ &EventStore::messageSent,
+ this,
+ [this](std::string txn_id) {
+ nhlog::ui()->debug("sent {}", txn_id);
+ cache::client()->removePendingStatus(room_id_, txn_id);
+ this->current_txn = "";
+ this->current_txn_error_count = 0;
+ emit processPending();
+ },
+ Qt::QueuedConnection);
+}
+
+void
+EventStore::addPending(mtx::events::collections::TimelineEvents event)
+{
+ if (this->thread() != QThread::currentThread())
+ nhlog::db()->warn("{} called from a different thread!", __func__);
+
+ cache::client()->savePendingMessage(this->room_id_, {event});
+ mtx::responses::Timeline events;
+ events.limited = false;
+ events.events.emplace_back(event);
+ handleSync(events);
+
+ emit processPending();
}
void
@@ -102,6 +201,16 @@ EventStore::handleSync(const mtx::responses::Timeline &events)
if (idx)
emit dataChanged(toExternalIdx(*idx), toExternalIdx(*idx));
}
+
+ if (auto txn_id = mtx::accessors::transaction_id(event); !txn_id.empty()) {
+ auto idx = cache::client()->getTimelineIndex(
+ room_id_, mtx::accessors::event_id(event));
+ if (idx) {
+ Index index{room_id_, *idx};
+ events_.remove(index);
+ emit dataChanged(toExternalIdx(*idx), toExternalIdx(*idx));
+ }
+ }
}
}
|