summary refs log tree commit diff
path: root/src/timeline
diff options
context:
space:
mode:
authorNicolas Werner <nicolas.werner@hotmail.de>2020-07-18 17:43:49 +0200
committerNicolas Werner <nicolas.werner@hotmail.de>2020-07-18 17:43:49 +0200
commit56ea89aa1133f01e356b1e7dce4322b883600e53 (patch)
tree4d9a1ec309453887b223cceea2212c431b731de6 /src/timeline
parentReadd pagination and fix redactions (diff)
downloadnheko-56ea89aa1133f01e356b1e7dce4322b883600e53.tar.xz
Reenable sending messages
Diffstat (limited to 'src/timeline')
-rw-r--r--src/timeline/EventStore.cpp109
-rw-r--r--src/timeline/EventStore.h10
-rw-r--r--src/timeline/TimelineModel.cpp360
-rw-r--r--src/timeline/TimelineModel.h20
4 files changed, 174 insertions, 325 deletions
diff --git a/src/timeline/EventStore.cpp b/src/timeline/EventStore.cpp

index 7f21e1ed..b7cf4f96 100644 --- a/src/timeline/EventStore.cpp +++ b/src/timeline/EventStore.cpp
@@ -1,6 +1,7 @@ #include "EventStore.h" #include <QThread> +#include <QTimer> #include "Cache_p.h" #include "EventAccessors.h" @@ -59,6 +60,104 @@ EventStore::EventStore(std::string room_id, QObject *) } }, Qt::QueuedConnection); + + connect(this, &EventStore::processPending, this, [this]() { + if (!current_txn.empty()) { + nhlog::ui()->debug("Already processing {}", current_txn); + return; + } + + auto event = cache::client()->firstPendingMessage(room_id_); + + if (!event) { + nhlog::ui()->debug("No event to send"); + return; + } + + std::visit( + [this](auto e) { + auto txn_id = e.event_id; + this->current_txn = txn_id; + + if (txn_id.empty() || txn_id[0] != 'm') { + nhlog::ui()->debug("Invalid txn id '{}'", txn_id); + cache::client()->removePendingStatus(room_id_, txn_id); + return; + } + + if constexpr (mtx::events::message_content_to_type<decltype(e.content)> != + mtx::events::EventType::Unsupported) + http::client()->send_room_message( + room_id_, + txn_id, + e.content, + [this, txn_id](const mtx::responses::EventId &, + mtx::http::RequestErr err) { + if (err) { + const int status_code = + static_cast<int>(err->status_code); + nhlog::net()->warn( + "[{}] failed to send message: {} {}", + txn_id, + err->matrix_error.error, + status_code); + emit messageFailed(txn_id); + return; + } + emit messageSent(txn_id); + }); + }, + event->data); + }); + + connect( + this, + &EventStore::messageFailed, + this, + [this](std::string txn_id) { + if (current_txn == txn_id) { + current_txn_error_count++; + if (current_txn_error_count > 10) { + nhlog::ui()->debug("failing txn id '{}'", txn_id); + cache::client()->removePendingStatus(room_id_, txn_id); + current_txn_error_count = 0; + } + } + QTimer::singleShot(1000, this, [this]() { + nhlog::ui()->debug("timeout"); + this->current_txn = ""; + emit processPending(); + }); + }, + Qt::QueuedConnection); + + connect( + this, + &EventStore::messageSent, + this, + [this](std::string txn_id) { + nhlog::ui()->debug("sent {}", txn_id); + cache::client()->removePendingStatus(room_id_, txn_id); + this->current_txn = ""; + this->current_txn_error_count = 0; + emit processPending(); + }, + Qt::QueuedConnection); +} + +void +EventStore::addPending(mtx::events::collections::TimelineEvents event) +{ + if (this->thread() != QThread::currentThread()) + nhlog::db()->warn("{} called from a different thread!", __func__); + + cache::client()->savePendingMessage(this->room_id_, {event}); + mtx::responses::Timeline events; + events.limited = false; + events.events.emplace_back(event); + handleSync(events); + + emit processPending(); } void @@ -102,6 +201,16 @@ EventStore::handleSync(const mtx::responses::Timeline &events) if (idx) emit dataChanged(toExternalIdx(*idx), toExternalIdx(*idx)); } + + if (auto txn_id = mtx::accessors::transaction_id(event); !txn_id.empty()) { + auto idx = cache::client()->getTimelineIndex( + room_id_, mtx::accessors::event_id(event)); + if (idx) { + Index index{room_id_, *idx}; + events_.remove(index); + emit dataChanged(toExternalIdx(*idx), toExternalIdx(*idx)); + } + } } } diff --git a/src/timeline/EventStore.h b/src/timeline/EventStore.h
index f2997245..b4d5bb23 100644 --- a/src/timeline/EventStore.h +++ b/src/timeline/EventStore.h
@@ -90,6 +90,13 @@ signals: void oldMessagesRetrieved(const mtx::responses::Messages &); void fetchedMore(); + void processPending(); + void messageSent(std::string txn_id); + void messageFailed(std::string txn_id); + +public slots: + void addPending(mtx::events::collections::TimelineEvents event); + private: mtx::events::collections::TimelineEvents *decryptEvent( const IdIndex &idx, @@ -103,4 +110,7 @@ private: static QCache<IdIndex, mtx::events::collections::TimelineEvents> decryptedEvents_; static QCache<Index, mtx::events::collections::TimelineEvents> events_; static QCache<IdIndex, mtx::events::collections::TimelineEvents> events_by_id_; + + std::string current_txn; + int current_txn_error_count = 0; }; diff --git a/src/timeline/TimelineModel.cpp b/src/timeline/TimelineModel.cpp
index 60264e86..aa6cea4f 100644 --- a/src/timeline/TimelineModel.cpp +++ b/src/timeline/TimelineModel.cpp
@@ -145,67 +145,6 @@ TimelineModel::TimelineModel(TimelineViewManager *manager, QString room_id, QObj , room_id_(room_id) , manager_(manager) { - connect(this, - &TimelineModel::oldMessagesRetrieved, - this, - &TimelineModel::addBackwardsEvents, - Qt::QueuedConnection); - connect( - this, - &TimelineModel::messageFailed, - this, - [this](QString txn_id) { - nhlog::ui()->error("Failed to send {}, retrying", txn_id.toStdString()); - - QTimer::singleShot(5000, this, [this]() { emit nextPendingMessage(); }); - }, - Qt::QueuedConnection); - connect( - this, - &TimelineModel::messageSent, - this, - [this](QString txn_id, QString event_id) { - pending.removeOne(txn_id); - (void)event_id; - // auto ev = events.value(txn_id); - - // if (auto reaction = - // std::get_if<mtx::events::RoomEvent<mtx::events::msg::Reaction>>(&ev)) { - // QString reactedTo = - // QString::fromStdString(reaction->content.relates_to.event_id); - // auto &rModel = reactions[reactedTo]; - // rModel.removeReaction(*reaction); - // auto rCopy = *reaction; - // rCopy.event_id = event_id.toStdString(); - // rModel.addReaction(room_id_.toStdString(), rCopy); - //} - - // int idx = idToIndex(txn_id); - // if (idx < 0) { - // // transaction already received via sync - // return; - //} - // eventOrder[idx] = event_id; - // ev = std::visit( - // [event_id](const auto &e) -> mtx::events::collections::TimelineEvents { - // auto eventCopy = e; - // eventCopy.event_id = event_id.toStdString(); - // return eventCopy; - // }, - // ev); - - // events.remove(txn_id); - // events.insert(event_id, ev); - - //// mark our messages as read - // readEvent(event_id.toStdString()); - - // emit dataChanged(index(idx, 0), index(idx, 0)); - - if (pending.size() > 0) - emit nextPendingMessage(); - }, - Qt::QueuedConnection); connect( this, &TimelineModel::redactionFailed, @@ -214,15 +153,11 @@ TimelineModel::TimelineModel(TimelineViewManager *manager, QString room_id, QObj Qt::QueuedConnection); connect(this, - &TimelineModel::nextPendingMessage, - this, - &TimelineModel::processOnePendingMessage, - Qt::QueuedConnection); - connect(this, &TimelineModel::newMessageToSend, this, &TimelineModel::addPendingMessage, Qt::QueuedConnection); + connect(this, &TimelineModel::addPendingMessageToStore, &events, &EventStore::addPending); connect( &events, @@ -296,7 +231,7 @@ int TimelineModel::rowCount(const QModelIndex &parent) const { Q_UNUSED(parent); - return this->events.size() + static_cast<int>(pending.size()); + return this->events.size(); } QVariantMap @@ -410,7 +345,7 @@ TimelineModel::data(const mtx::events::collections::TimelineEvents &event, int r // only show read receipts for messages not from us if (acc::sender(event) != http::client()->user_id().to_string()) return qml_mtx_events::Empty; - else if (pending.contains(id)) + else if (!id.isEmpty() && id[0] == "m") return qml_mtx_events::Sent; else if (read.contains(id) || containsOthers(cache::readReceipts(id, room_id_))) return qml_mtx_events::Read; @@ -428,11 +363,7 @@ TimelineModel::data(const mtx::events::collections::TimelineEvents &event, int r case ReplyTo: return QVariant(QString::fromStdString(in_reply_to_event(event))); case Reactions: { - auto id = QString::fromStdString(event_id(event)); - if (reactions.count(id)) - return QVariant::fromValue((QObject *)&reactions.at(id)); - else - return {}; + return {}; } case RoomId: return QVariant(room_id_); @@ -561,16 +492,9 @@ TimelineModel::fetchMore(const QModelIndex &) void TimelineModel::addEvents(const mtx::responses::Timeline &timeline) { - if (isInitialSync) { - prev_batch_token_ = QString::fromStdString(timeline.prev_batch); - isInitialSync = false; - } - if (timeline.events.empty()) return; - internalAddEvents(timeline.events); - events.handleSync(timeline); if (!timeline.events.empty()) @@ -645,63 +569,13 @@ TimelineModel::updateLastMessage() } void -TimelineModel::internalAddEvents( - const std::vector<mtx::events::collections::TimelineEvents> &timeline) -{ - for (auto e : timeline) { - QString id = QString::fromStdString(mtx::accessors::event_id(e)); - - if (auto redaction = - std::get_if<mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(&e)) { - QString redacts = QString::fromStdString(redaction->redacts); - - auto event = events.event(redaction->redacts, redaction->event_id); - if (!event) - continue; - - if (auto reaction = - std::get_if<mtx::events::RoomEvent<mtx::events::msg::Reaction>>( - event)) { - QString reactedTo = - QString::fromStdString(reaction->content.relates_to.event_id); - reactions[reactedTo].removeReaction(*reaction); - int idx = idToIndex(reactedTo); - if (idx >= 0) - emit dataChanged(index(idx, 0), index(idx, 0)); - } - - continue; // don't insert redaction into timeline - } - - if (auto reaction = - std::get_if<mtx::events::RoomEvent<mtx::events::msg::Reaction>>(&e)) { - QString reactedTo = - QString::fromStdString(reaction->content.relates_to.event_id); - - // // remove local echo - // if (!txid.isEmpty()) { - // auto rCopy = *reaction; - // rCopy.event_id = txid.toStdString(); - // reactions[reactedTo].removeReaction(rCopy); - // } - - reactions[reactedTo].addReaction(room_id_.toStdString(), *reaction); - int idx = idToIndex(reactedTo); - if (idx >= 0) - emit dataChanged(index(idx, 0), index(idx, 0)); - continue; // don't insert reaction into timeline - } - } -} - -void TimelineModel::setCurrentIndex(int index) { auto oldIndex = idToIndex(currentId); currentId = indexToId(index); emit currentIndexChanged(index); - if ((oldIndex > index || oldIndex == -1) && !pending.contains(currentId) && + if ((oldIndex > index || oldIndex == -1) && !currentId.startsWith("m") && ChatPage::instance()->isActiveWindow()) { readEvent(currentId.toStdString()); } @@ -719,28 +593,6 @@ TimelineModel::readEvent(const std::string &id) }); } -void -TimelineModel::addBackwardsEvents(const mtx::responses::Messages &msgs) -{ - (void)msgs; - // std::vector<QString> ids = internalAddEvents(msgs.chunk); - - // if (!ids.empty()) { - // beginInsertRows(QModelIndex(), - // static_cast<int>(this->eventOrder.size()), - // static_cast<int>(this->eventOrder.size() + ids.size() - 1)); - // this->eventOrder.insert(this->eventOrder.end(), ids.begin(), ids.end()); - // endInsertRows(); - //} - - // prev_batch_token_ = QString::fromStdString(msgs.end); - - // if (ids.empty() && !msgs.chunk.empty()) { - // // no visible events fetched, prevent loading from stopping - // fetchMore(QModelIndex()); - //} -} - QString TimelineModel::displayName(QString id) const { @@ -902,7 +754,7 @@ TimelineModel::markEventsAsRead(const std::vector<QString> &event_ids) } void -TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json content) +TimelineModel::sendEncryptedMessage(const std::string txn_id, nlohmann::json content) { const auto room_id = room_id_.toStdString(); @@ -914,28 +766,15 @@ TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json co try { // Check if we have already an outbound megolm session then we can use. if (cache::outboundMegolmSessionExists(room_id)) { - auto data = + mtx::events::EncryptedEvent<mtx::events::msg::Encrypted> event; + event.content = olm::encrypt_group_message(room_id, http::client()->device_id(), doc); + event.event_id = txn_id; + event.room_id = room_id; + event.sender = http::client()->user_id().to_string(); + event.type = mtx::events::EventType::RoomEncrypted; - http::client()->send_room_message<msg::Encrypted, EventType::RoomEncrypted>( - room_id, - txn_id, - data, - [this, txn_id](const mtx::responses::EventId &res, - mtx::http::RequestErr err) { - if (err) { - const int status_code = - static_cast<int>(err->status_code); - nhlog::net()->warn("[{}] failed to send message: {} {}", - txn_id, - err->matrix_error.error, - status_code); - emit messageFailed(QString::fromStdString(txn_id)); - } - emit messageSent( - QString::fromStdString(txn_id), - QString::fromStdString(res.event_id.to_string())); - }); + emit this->addPendingMessageToStore(event); return; } @@ -964,40 +803,24 @@ TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json co const auto members = cache::roomMembers(room_id); nhlog::ui()->info("retrieved {} members for {}", members.size(), room_id); - auto keeper = - std::make_shared<StateKeeper>([megolm_payload, room_id, doc, txn_id, this]() { - try { - auto data = olm::encrypt_group_message( - room_id, http::client()->device_id(), doc); + auto keeper = std::make_shared<StateKeeper>([room_id, doc, txn_id, this]() { + try { + mtx::events::EncryptedEvent<mtx::events::msg::Encrypted> event; + event.content = olm::encrypt_group_message( + room_id, http::client()->device_id(), doc); + event.event_id = txn_id; + event.room_id = room_id; + event.sender = http::client()->user_id().to_string(); + event.type = mtx::events::EventType::RoomEncrypted; - http::client() - ->send_room_message<msg::Encrypted, EventType::RoomEncrypted>( - room_id, - txn_id, - data, - [this, txn_id](const mtx::responses::EventId &res, - mtx::http::RequestErr err) { - if (err) { - const int status_code = - static_cast<int>(err->status_code); - nhlog::net()->warn( - "[{}] failed to send message: {} {}", - txn_id, - err->matrix_error.error, - status_code); - emit messageFailed( - QString::fromStdString(txn_id)); - } - emit messageSent( - QString::fromStdString(txn_id), - QString::fromStdString(res.event_id.to_string())); - }); - } catch (const lmdb::error &e) { - nhlog::db()->critical( - "failed to save megolm outbound session: {}", e.what()); - emit messageFailed(QString::fromStdString(txn_id)); - } - }); + emit this->addPendingMessageToStore(event); + } catch (const lmdb::error &e) { + nhlog::db()->critical("failed to save megolm outbound session: {}", + e.what()); + emit ChatPage::instance()->showNotification( + tr("Failed to encrypt event, sending aborted!")); + } + }); mtx::requests::QueryKeys req; for (const auto &member : members) @@ -1011,8 +834,8 @@ TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json co nhlog::net()->warn("failed to query device keys: {} {}", err->matrix_error.error, static_cast<int>(err->status_code)); - // TODO: Mark the event as failed. Communicate with the UI. - emit messageFailed(QString::fromStdString(txn_id)); + emit ChatPage::instance()->showNotification( + tr("Failed to encrypt event, sending aborted!")); return; } @@ -1112,11 +935,13 @@ TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json co } catch (const lmdb::error &e) { nhlog::db()->critical( "failed to open outbound megolm session ({}): {}", room_id, e.what()); - emit messageFailed(QString::fromStdString(txn_id)); + emit ChatPage::instance()->showNotification( + tr("Failed to encrypt event, sending aborted!")); } catch (const mtx::crypto::olm_exception &e) { nhlog::crypto()->critical( "failed to open outbound megolm session ({}): {}", room_id, e.what()); - emit messageFailed(QString::fromStdString(txn_id)); + emit ChatPage::instance()->showNotification( + tr("Failed to encrypt event, sending aborted!")); } } @@ -1208,9 +1033,8 @@ TimelineModel::handleClaimedKeys(std::shared_ptr<StateKeeper> keeper, struct SendMessageVisitor { - SendMessageVisitor(const QString &txn_id, TimelineModel *model) - : txn_id_qstr_(txn_id) - , model_(model) + explicit SendMessageVisitor(TimelineModel *model) + : model_(model) {} // Do-nothing operator for all unhandled events @@ -1228,29 +1052,9 @@ struct SendMessageVisitor if (encInfo) emit model_->newEncryptedImage(encInfo.value()); - model_->sendEncryptedMessage(txn_id_qstr_.toStdString(), - nlohmann::json(msg.content)); + model_->sendEncryptedMessage(msg.event_id, nlohmann::json(msg.content)); } else { - QString txn_id_qstr = txn_id_qstr_; - TimelineModel *model = model_; - http::client()->send_room_message<T, mtx::events::EventType::RoomMessage>( - model->room_id_.toStdString(), - txn_id_qstr.toStdString(), - msg.content, - [txn_id_qstr, model](const mtx::responses::EventId &res, - mtx::http::RequestErr err) { - if (err) { - const int status_code = - static_cast<int>(err->status_code); - nhlog::net()->warn("[{}] failed to send message: {} {}", - txn_id_qstr.toStdString(), - err->matrix_error.error, - status_code); - emit model->messageFailed(txn_id_qstr); - } - emit model->messageSent( - txn_id_qstr, QString::fromStdString(res.event_id.to_string())); - }); + emit model_->addPendingMessageToStore(msg); } } @@ -1260,71 +1064,26 @@ struct SendMessageVisitor // cannot handle it correctly. See the MSC for more details: // https://github.com/matrix-org/matrix-doc/blob/matthew/msc1849/proposals/1849-aggregations.md#end-to-end-encryption void operator()(const mtx::events::RoomEvent<mtx::events::msg::Reaction> &msg) - { - QString txn_id_qstr = txn_id_qstr_; - TimelineModel *model = model_; - http::client() - ->send_room_message<mtx::events::msg::Reaction, mtx::events::EventType::Reaction>( - model->room_id_.toStdString(), - txn_id_qstr.toStdString(), - msg.content, - [txn_id_qstr, model](const mtx::responses::EventId &res, - mtx::http::RequestErr err) { - if (err) { - const int status_code = static_cast<int>(err->status_code); - nhlog::net()->warn("[{}] failed to send message: {} {}", - txn_id_qstr.toStdString(), - err->matrix_error.error, - status_code); - emit model->messageFailed(txn_id_qstr); - } - emit model->messageSent( - txn_id_qstr, QString::fromStdString(res.event_id.to_string())); - }); + emit model_->addPendingMessageToStore(msg); } - QString txn_id_qstr_; TimelineModel *model_; }; void -TimelineModel::processOnePendingMessage() -{ - // if (pending.isEmpty()) - // return; - - // QString txn_id_qstr = pending.first(); - - // auto event = events.value(txn_id_qstr); - // std::visit(SendMessageVisitor{txn_id_qstr, this}, event); -} - -void TimelineModel::addPendingMessage(mtx::events::collections::TimelineEvents event) { - (void)event; - // std::visit( - // [](auto &msg) { - // msg.type = mtx::events::EventType::RoomMessage; - // msg.event_id = http::client()->generate_txn_id(); - // msg.sender = http::client()->user_id().to_string(); - // msg.origin_server_ts = QDateTime::currentMSecsSinceEpoch(); - // }, - // event); - - // internalAddEvents({event}); - - // QString txn_id_qstr = QString::fromStdString(mtx::accessors::event_id(event)); - // pending.push_back(txn_id_qstr); - // if (!std::get_if<mtx::events::RoomEvent<mtx::events::msg::Reaction>>(&event)) { - // beginInsertRows(QModelIndex(), 0, 0); - // this->eventOrder.insert(this->eventOrder.begin(), txn_id_qstr); - // endInsertRows(); - //} - // updateLastMessage(); + std::visit( + [](auto &msg) { + msg.type = mtx::events::EventType::RoomMessage; + msg.event_id = "m" + http::client()->generate_txn_id(); + msg.sender = http::client()->user_id().to_string(); + msg.origin_server_ts = QDateTime::currentMSecsSinceEpoch(); + }, + event); - // emit nextPendingMessage(); + std::visit(SendMessageVisitor{this}, event); } bool @@ -1647,24 +1406,7 @@ TimelineModel::formatMemberEvent(QString id) if (!event->unsigned_data.replaces_state.empty()) { auto tempPrevEvent = events.event(event->unsigned_data.replaces_state, event->event_id); - if (!tempPrevEvent) { - http::client()->get_event( - this->room_id_.toStdString(), - event->unsigned_data.replaces_state, - [this, id, prevEventId = event->unsigned_data.replaces_state]( - const mtx::events::collections::TimelineEvents &timeline, - mtx::http::RequestErr err) { - if (err) { - nhlog::net()->error( - "Failed to retrieve event with id {}, which was " - "requested to show the membership for event {}", - prevEventId, - id.toStdString()); - return; - } - emit eventFetched(id, timeline); - }); - } else { + if (tempPrevEvent) { prevEvent = std::get_if<mtx::events::StateEvent<mtx::events::state::Member>>( tempPrevEvent); diff --git a/src/timeline/TimelineModel.h b/src/timeline/TimelineModel.h
index f322b482..9f9717df 100644 --- a/src/timeline/TimelineModel.h +++ b/src/timeline/TimelineModel.h
@@ -236,31 +236,23 @@ public slots: void setDecryptDescription(bool decrypt) { decryptDescription = decrypt; } private slots: - // Add old events at the top of the timeline. - void addBackwardsEvents(const mtx::responses::Messages &msgs); - void processOnePendingMessage(); void addPendingMessage(mtx::events::collections::TimelineEvents event); signals: - void oldMessagesRetrieved(const mtx::responses::Messages &res); - void messageFailed(QString txn_id); - void messageSent(QString txn_id, QString event_id); void currentIndexChanged(int index); void redactionFailed(QString id); void eventRedacted(QString id); - void nextPendingMessage(); - void newMessageToSend(mtx::events::collections::TimelineEvents event); void mediaCached(QString mxcUrl, QString cacheUrl); void newEncryptedImage(mtx::crypto::EncryptedFile encryptionInfo); - void eventFetched(QString requestingEvent, mtx::events::collections::TimelineEvents event); void typingUsersChanged(std::vector<QString> users); void replyChanged(QString reply); void paginationInProgressChanged(const bool); + void newMessageToSend(mtx::events::collections::TimelineEvents event); + void addPendingMessageToStore(mtx::events::collections::TimelineEvents event); + private: - void internalAddEvents( - const std::vector<mtx::events::collections::TimelineEvents> &timeline); - void sendEncryptedMessage(const std::string &txn_id, nlohmann::json content); + void sendEncryptedMessage(const std::string txn_id, nlohmann::json content); void handleClaimedKeys(std::shared_ptr<StateKeeper> keeper, const std::map<std::string, std::string> &room_key, const std::map<std::string, DevicePublicKeys> &pks, @@ -272,15 +264,11 @@ private: void setPaginationInProgress(const bool paginationInProgress); QSet<QString> read; - QList<QString> pending; - std::map<QString, ReactionsModel> reactions; mutable EventStore events; QString room_id_; - QString prev_batch_token_; - bool isInitialSync = true; bool decryptDescription = true; bool m_paginationInProgress = false;