diff --git a/src/Cache.cpp b/src/Cache.cpp
index 233ef2b4..8fa94d1e 100644
--- a/src/Cache.cpp
+++ b/src/Cache.cpp
@@ -2087,6 +2087,77 @@ Cache::isRoomMember(const std::string &user_id, const std::string &room_id)
}
void
+Cache::savePendingMessage(const std::string &room_id,
+ const mtx::events::collections::TimelineEvent &message)
+{
+ auto txn = lmdb::txn::begin(env_);
+
+ mtx::responses::Timeline timeline;
+ timeline.events.push_back(message.data);
+ saveTimelineMessages(txn, room_id, timeline);
+
+ auto pending = getPendingMessagesDb(txn, room_id);
+
+ int64_t now = QDateTime::currentMSecsSinceEpoch();
+ lmdb::dbi_put(txn,
+ pending,
+ lmdb::val(&now, sizeof(now)),
+ lmdb::val(mtx::accessors::event_id(message.data)));
+
+ txn.commit();
+}
+
+std::optional<mtx::events::collections::TimelineEvent>
+Cache::firstPendingMessage(const std::string &room_id)
+{
+ auto txn = lmdb::txn::begin(env_);
+ auto pending = getPendingMessagesDb(txn, room_id);
+
+ auto pendingCursor = lmdb::cursor::open(txn, pending);
+ lmdb::val tsIgnored, pendingTxn;
+ while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
+ auto eventsDb = getEventsDb(txn, room_id);
+ lmdb::val event;
+ if (!lmdb::dbi_get(txn, eventsDb, pendingTxn, event)) {
+ lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn);
+ continue;
+ }
+
+ try {
+ mtx::events::collections::TimelineEvent te;
+ mtx::events::collections::from_json(
+ json::parse(std::string_view(event.data(), event.size())), te);
+
+ txn.commit();
+ return te;
+ } catch (std::exception &e) {
+ nhlog::db()->error("Failed to parse message from cache {}", e.what());
+ lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn);
+ continue;
+ }
+ }
+
+ txn.commit();
+
+ return std::nullopt;
+}
+
+void
+Cache::removePendingStatus(const std::string &room_id, const std::string &txn_id)
+{
+ auto txn = lmdb::txn::begin(env_);
+ auto pending = getPendingMessagesDb(txn, room_id);
+ auto pendingCursor = lmdb::cursor::open(txn, pending);
+ lmdb::val tsIgnored, pendingTxn;
+ while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
+ if (std::string_view(pendingTxn.data(), pendingTxn.size()) == txn_id)
+ lmdb::cursor_del(pendingCursor);
+ }
+
+ txn.commit();
+}
+
+void
Cache::saveTimelineMessages(lmdb::txn &txn,
const std::string &room_id,
const mtx::responses::Timeline &res)
@@ -2098,12 +2169,17 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
auto relationsDb = getRelationsDb(txn, room_id);
auto orderDb = getEventOrderDb(txn, room_id);
+ auto evToOrderDb = getEventToOrderDb(txn, room_id);
auto msg2orderDb = getMessageToOrderDb(txn, room_id);
auto order2msgDb = getOrderToMessageDb(txn, room_id);
+ auto pending = getPendingMessagesDb(txn, room_id);
+
if (res.limited) {
lmdb::dbi_drop(txn, orderDb, false);
+ lmdb::dbi_drop(txn, evToOrderDb, false);
lmdb::dbi_drop(txn, msg2orderDb, false);
lmdb::dbi_drop(txn, order2msgDb, false);
+ lmdb::dbi_drop(txn, pending, true);
}
using namespace mtx::events;
@@ -2124,9 +2200,55 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
bool first = true;
for (const auto &e : res.events) {
- auto event = mtx::accessors::serialize_event(e);
- if (auto redaction =
- std::get_if<mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(&e)) {
+ auto event = mtx::accessors::serialize_event(e);
+ auto txn_id = mtx::accessors::transaction_id(e);
+
+ lmdb::val txn_order;
+ if (!txn_id.empty() &&
+ lmdb::dbi_get(txn, evToOrderDb, lmdb::val(txn_id), txn_order)) {
+ std::string event_id_val = event["event_id"].get<std::string>();
+ lmdb::val event_id = event_id_val;
+ lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump()));
+ lmdb::dbi_del(txn, eventsDb, lmdb::val(txn_id));
+
+ lmdb::val msg_txn_order;
+ if (lmdb::dbi_get(txn, msg2orderDb, lmdb::val(txn_id), msg_txn_order)) {
+ lmdb::dbi_put(txn, order2msgDb, msg_txn_order, event_id);
+ lmdb::dbi_put(txn, msg2orderDb, event_id, msg_txn_order);
+ lmdb::dbi_del(txn, msg2orderDb, lmdb::val(txn_id));
+ }
+
+ lmdb::dbi_put(txn, orderDb, txn_order, event_id);
+ lmdb::dbi_put(txn, evToOrderDb, event_id, txn_order);
+ lmdb::dbi_del(txn, evToOrderDb, lmdb::val(txn_id));
+
+ if (event.contains("content") &&
+ event["content"].contains("m.relates_to")) {
+ auto temp = event["content"]["m.relates_to"];
+ std::string relates_to = temp.contains("m.in_reply_to")
+ ? temp["m.in_reply_to"]["event_id"]
+ : temp["event_id"];
+
+ if (!relates_to.empty()) {
+ lmdb::dbi_del(txn,
+ relationsDb,
+ lmdb::val(relates_to),
+ lmdb::val(txn_id));
+ lmdb::dbi_put(
+ txn, relationsDb, lmdb::val(relates_to), event_id);
+ }
+ }
+
+ auto pendingCursor = lmdb::cursor::open(txn, pending);
+ lmdb::val tsIgnored, pendingTxn;
+ while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
+ if (std::string_view(pendingTxn.data(), pendingTxn.size()) ==
+ txn_id)
+ lmdb::cursor_del(pendingCursor);
+ }
+ } else if (auto redaction =
+ std::get_if<mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(
+ &e)) {
if (redaction->redacts.empty())
continue;
@@ -2145,15 +2267,20 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
txn, msg2orderDb, lmdb::val(redaction->event_id), oldIndex);
}
} else {
- std::string event_id_val = event["event_id"].get<std::string>();
- lmdb::val event_id = event_id_val;
+ std::string event_id_val = event.value("event_id", "");
+ if (event_id_val.empty()) {
+ nhlog::db()->error("Event without id!");
+ continue;
+ }
+
+ lmdb::val event_id = event_id_val;
lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump()));
++index;
json orderEntry = json::object();
orderEntry["event_id"] = event_id_val;
- if (first)
+ if (first && !res.prev_batch.empty())
orderEntry["prev_batch"] = res.prev_batch;
first = false;
@@ -2163,6 +2290,7 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
lmdb::val(&index, sizeof(index)),
lmdb::val(orderEntry.dump()),
MDB_APPEND);
+ lmdb::dbi_put(txn, evToOrderDb, event_id, lmdb::val(&index, sizeof(index)));
// TODO(Nico): Allow blacklisting more event types in UI
if (event["type"] != "m.reaction" && event["type"] != "m.dummy") {
|