summary refs log tree commit diff
path: root/src/Cache.cpp
diff options
context:
space:
mode:
authorWeblate <noreply@weblate.org>2020-09-13 17:20:51 -0400
committerWeblate <noreply@weblate.org>2020-09-13 17:20:51 -0400
commitb5669310e5d272418480def6d0a78ad26eda6c72 (patch)
tree62e3681090e7d5d9ecc502223cd63037bc86d5ba /src/Cache.cpp
parentAdd simpified chinese translations (diff)
parentMerge pull request #275 from Chethan2k1/master (diff)
downloadnheko-b5669310e5d272418480def6d0a78ad26eda6c72.tar.xz
Merge branch 'master' of github.com:Nheko-Reborn/nheko
Diffstat (limited to 'src/Cache.cpp')
-rw-r--r--src/Cache.cpp922
1 files changed, 808 insertions, 114 deletions
diff --git a/src/Cache.cpp b/src/Cache.cpp

index d435dc56..91cde9e7 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp
@@ -33,11 +33,12 @@ #include "Cache_p.h" #include "EventAccessors.h" #include "Logging.h" +#include "Olm.h" #include "Utils.h" //! Should be changed when a breaking change occurs in the cache format. //! This will reset client's data. -static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.05.01"); +static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.07.05"); static const std::string SECRET("secret"); static lmdb::val NEXT_BATCH_KEY("next_batch"); @@ -46,8 +47,9 @@ static lmdb::val CACHE_FORMAT_VERSION_KEY("cache_format_version"); constexpr size_t MAX_RESTORED_MESSAGES = 30'000; -constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB -constexpr auto MAX_DBS = 8092UL; +constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB +constexpr auto MAX_DBS = 8092UL; +constexpr auto BATCH_SIZE = 100; //! Cache databases and their format. //! @@ -63,7 +65,6 @@ constexpr auto SYNC_STATE_DB("sync_state"); //! Read receipts per room/event. constexpr auto READ_RECEIPTS_DB("read_receipts"); constexpr auto NOTIFICATIONS_DB("sent_notifications"); -//! TODO: delete pending_receipts database on old cache versions //! Encryption related databases. @@ -93,18 +94,31 @@ namespace { std::unique_ptr<Cache> instance_ = nullptr; } -int -numeric_key_comparison(const MDB_val *a, const MDB_val *b) +static bool +isHiddenEvent(mtx::events::collections::TimelineEvents e, const std::string &room_id) { - auto lhs = std::stoull(std::string((char *)a->mv_data, a->mv_size)); - auto rhs = std::stoull(std::string((char *)b->mv_data, b->mv_size)); + using namespace mtx::events; + if (auto encryptedEvent = std::get_if<EncryptedEvent<msg::Encrypted>>(&e)) { + MegolmSessionIndex index; + index.room_id = room_id; + index.session_id = encryptedEvent->content.session_id; + index.sender_key = encryptedEvent->content.sender_key; - if (lhs < rhs) - return 1; - else if (lhs == rhs) - return 0; + auto result = olm::decryptEvent(index, *encryptedEvent); + if (!result.error) + e = result.event.value(); + } - return -1; + static constexpr std::initializer_list<EventType> hiddenEvents = { + EventType::Reaction, EventType::CallCandidates, EventType::Unsupported}; + + return std::visit( + [](const auto &ev) { + return std::any_of(hiddenEvents.begin(), + hiddenEvents.end(), + [ev](EventType type) { return type == ev.type; }); + }, + e); } Cache::Cache(const QString &userId, QObject *parent) @@ -154,7 +168,10 @@ Cache::setup() } try { - env_.open(statePath.toStdString().c_str()); + // NOTE(Nico): We may want to use (MDB_MAPASYNC | MDB_WRITEMAP) in the future, but + // it can really mess up our database, so we shouldn't. For now, hopefully + // NOMETASYNC is fast enough. + env_.open(statePath.toStdString().c_str(), MDB_NOMETASYNC); } catch (const lmdb::error &e) { if (e.code() != MDB_VERSION_MISMATCH && e.code() != MDB_INVALID) { throw std::runtime_error("LMDB initialization failed" + @@ -700,8 +717,77 @@ Cache::runMigrations() nhlog::db()->info("Successfully deleted pending receipts database."); return true; }}, + {"2020.07.05", + [this]() { + try { + auto txn = lmdb::txn::begin(env_, nullptr); + auto room_ids = getRoomIds(txn); + + for (const auto &room_id : room_ids) { + try { + auto messagesDb = lmdb::dbi::open( + txn, std::string(room_id + "/messages").c_str()); + + // keep some old messages and batch token + { + auto roomsCursor = + lmdb::cursor::open(txn, messagesDb); + lmdb::val ts, stored_message; + bool start = true; + mtx::responses::Timeline oldMessages; + while (roomsCursor.get(ts, + stored_message, + start ? MDB_FIRST + : MDB_NEXT)) { + start = false; + + auto j = json::parse(std::string_view( + stored_message.data(), + stored_message.size())); + + if (oldMessages.prev_batch.empty()) + oldMessages.prev_batch = + j["token"].get<std::string>(); + else if (j["token"] != + oldMessages.prev_batch) + break; + + mtx::events::collections::TimelineEvent + te; + mtx::events::collections::from_json( + j["event"], te); + oldMessages.events.push_back(te.data); + } + // messages were stored in reverse order, so we + // need to reverse them + std::reverse(oldMessages.events.begin(), + oldMessages.events.end()); + // save messages using the new method + saveTimelineMessages(txn, room_id, oldMessages); + } + + // delete old messages db + lmdb::dbi_drop(txn, messagesDb, true); + } catch (std::exception &e) { + nhlog::db()->error( + "While migrating messages from {}, ignoring error {}", + room_id, + e.what()); + } + } + txn.commit(); + } catch (const lmdb::error &) { + nhlog::db()->critical( + "Failed to delete messages database in migration!"); + return false; + } + + nhlog::db()->info("Successfully deleted pending receipts database."); + return true; + }}, }; + nhlog::db()->info("Running migrations, this may take a while!"); for (const auto &[target_version, migration] : migrations) { if (target_version > stored_version) if (!migration()) { @@ -709,6 +795,7 @@ Cache::runMigrations() return false; } } + nhlog::db()->info("Migrations finished."); setCurrentFormat(); return true; @@ -771,8 +858,9 @@ Cache::readReceipts(const QString &event_id, const QString &room_id) txn.commit(); if (res) { - auto json_response = json::parse(std::string(value.data(), value.size())); - auto values = json_response.get<std::map<std::string, uint64_t>>(); + auto json_response = + json::parse(std::string_view(value.data(), value.size())); + auto values = json_response.get<std::map<std::string, uint64_t>>(); for (const auto &v : values) // timestamp, user_id @@ -810,8 +898,8 @@ Cache::updateReadReceipt(lmdb::txn &txn, const std::string &room_id, const Recei // If an entry for the event id already exists, we would // merge the existing receipts with the new ones. if (exists) { - auto json_value = - json::parse(std::string(prev_value.data(), prev_value.size())); + auto json_value = json::parse( + std::string_view(prev_value.data(), prev_value.size())); // Retrieve the saved receipts. saved_receipts = json_value.get<std::map<std::string, uint64_t>>(); @@ -930,7 +1018,7 @@ Cache::saveState(const mtx::responses::Sync &res) if (lmdb::dbi_get(txn, roomsDb_, lmdb::val(room.first), data)) { try { RoomInfo tmp = - json::parse(std::string(data.data(), data.size())); + json::parse(std::string_view(data.data(), data.size())); updatedInfo.tags = tmp.tags; } catch (const json::exception &e) { nhlog::db()->warn( @@ -1122,7 +1210,7 @@ Cache::singleRoomInfo(const std::string &room_id) // Check if the room is joined. if (lmdb::dbi_get(txn, roomsDb_, lmdb::val(room_id), data)) { try { - RoomInfo tmp = json::parse(std::string(data.data(), data.size())); + RoomInfo tmp = json::parse(std::string_view(data.data(), data.size())); tmp.member_count = getMembersDb(txn, room_id).size(txn); tmp.join_rule = getRoomJoinRule(txn, statesdb); tmp.guest_access = getRoomGuestAccess(txn, statesdb); @@ -1157,7 +1245,8 @@ Cache::getRoomInfo(const std::vector<std::string> &rooms) // Check if the room is joined. if (lmdb::dbi_get(txn, roomsDb_, lmdb::val(room), data)) { try { - RoomInfo tmp = json::parse(std::string(data.data(), data.size())); + RoomInfo tmp = + json::parse(std::string_view(data.data(), data.size())); tmp.member_count = getMembersDb(txn, room).size(txn); tmp.join_rule = getRoomJoinRule(txn, statesdb); tmp.guest_access = getRoomGuestAccess(txn, statesdb); @@ -1173,7 +1262,7 @@ Cache::getRoomInfo(const std::vector<std::string> &rooms) if (lmdb::dbi_get(txn, invitesDb_, lmdb::val(room), data)) { try { RoomInfo tmp = - json::parse(std::string(data.data(), data.size())); + json::parse(std::string_view(data.data(), data.size())); tmp.member_count = getInviteMembersDb(txn, room).size(txn); room_info.emplace(QString::fromStdString(room), @@ -1232,38 +1321,147 @@ Cache::getTimelineMentions() return notifs; } -mtx::responses::Timeline -Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id) +std::string +Cache::previousBatchToken(const std::string &room_id) +{ + auto txn = lmdb::txn::begin(env_, nullptr); + auto orderDb = getEventOrderDb(txn, room_id); + + auto cursor = lmdb::cursor::open(txn, orderDb); + lmdb::val indexVal, val; + if (!cursor.get(indexVal, val, MDB_FIRST)) { + return ""; + } + + auto j = json::parse(std::string_view(val.data(), val.size())); + + return j.value("prev_batch", ""); +} + +Cache::Messages +Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, uint64_t index, bool forward) { // TODO(nico): Limit the messages returned by this maybe? - auto db = getMessagesDb(txn, room_id); + auto orderDb = getOrderToMessageDb(txn, room_id); + auto eventsDb = getEventsDb(txn, room_id); - mtx::responses::Timeline timeline; - std::string timestamp, msg; + Messages messages{}; - auto cursor = lmdb::cursor::open(txn, db); + lmdb::val indexVal, event_id; - size_t index = 0; + auto cursor = lmdb::cursor::open(txn, orderDb); + if (index == std::numeric_limits<uint64_t>::max()) { + if (cursor.get(indexVal, event_id, forward ? MDB_FIRST : MDB_LAST)) { + index = *indexVal.data<uint64_t>(); + } else { + messages.end_of_cache = true; + return messages; + } + } else { + if (cursor.get(indexVal, event_id, MDB_SET)) { + index = *indexVal.data<uint64_t>(); + } else { + messages.end_of_cache = true; + return messages; + } + } - while (cursor.get(timestamp, msg, MDB_NEXT) && index < MAX_RESTORED_MESSAGES) { - auto obj = json::parse(msg); + int counter = 0; - if (obj.count("event") == 0 || obj.count("token") == 0) + bool ret; + while ((ret = cursor.get(indexVal, + event_id, + counter == 0 ? (forward ? MDB_FIRST : MDB_LAST) + : (forward ? MDB_NEXT : MDB_PREV))) && + counter++ < BATCH_SIZE) { + lmdb::val event; + bool success = lmdb::dbi_get(txn, eventsDb, event_id, event); + if (!success) continue; - mtx::events::collections::TimelineEvent event; - mtx::events::collections::from_json(obj.at("event"), event); - - index += 1; + mtx::events::collections::TimelineEvent te; + try { + mtx::events::collections::from_json( + json::parse(std::string_view(event.data(), event.size())), te); + } catch (std::exception &e) { + nhlog::db()->error("Failed to parse message from cache {}", e.what()); + continue; + } - timeline.events.push_back(event.data); - timeline.prev_batch = obj.at("token").get<std::string>(); + messages.timeline.events.push_back(std::move(te.data)); } cursor.close(); - std::reverse(timeline.events.begin(), timeline.events.end()); + // std::reverse(timeline.events.begin(), timeline.events.end()); + messages.next_index = *indexVal.data<uint64_t>(); + messages.end_of_cache = !ret; + + return messages; +} + +std::optional<mtx::events::collections::TimelineEvent> +Cache::getEvent(const std::string &room_id, const std::string &event_id) +{ + auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY); + auto eventsDb = getEventsDb(txn, room_id); + + lmdb::val event{}; + bool success = lmdb::dbi_get(txn, eventsDb, lmdb::val(event_id), event); + if (!success) + return {}; - return timeline; + mtx::events::collections::TimelineEvent te; + try { + mtx::events::collections::from_json( + json::parse(std::string_view(event.data(), event.size())), te); + } catch (std::exception &e) { + nhlog::db()->error("Failed to parse message from cache {}", e.what()); + return std::nullopt; + } + + return te; +} +void +Cache::storeEvent(const std::string &room_id, + const std::string &event_id, + const mtx::events::collections::TimelineEvent &event) +{ + auto txn = lmdb::txn::begin(env_); + auto eventsDb = getEventsDb(txn, room_id); + auto event_json = mtx::accessors::serialize_event(event.data); + lmdb::dbi_put(txn, eventsDb, lmdb::val(event_id), lmdb::val(event_json.dump())); + txn.commit(); +} + +std::vector<std::string> +Cache::relatedEvents(const std::string &room_id, const std::string &event_id) +{ + auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY); + auto relationsDb = getRelationsDb(txn, room_id); + + std::vector<std::string> related_ids; + + auto related_cursor = lmdb::cursor::open(txn, relationsDb); + lmdb::val related_to = event_id, related_event; + bool first = true; + + try { + if (!related_cursor.get(related_to, related_event, MDB_SET)) + return {}; + + while (related_cursor.get( + related_to, related_event, first ? MDB_FIRST_DUP : MDB_NEXT_DUP)) { + first = false; + if (event_id != std::string_view(related_to.data(), related_to.size())) + break; + + related_ids.emplace_back(related_event.data(), related_event.size()); + } + } catch (const lmdb::error &e) { + nhlog::db()->error("related events error: {}", e.what()); + } + + return related_ids; } QMap<QString, RoomInfo> @@ -1306,55 +1504,113 @@ Cache::roomInfo(bool withInvites) std::string Cache::getLastEventId(lmdb::txn &txn, const std::string &room_id) { - auto db = getMessagesDb(txn, room_id); + auto orderDb = getOrderToMessageDb(txn, room_id); + + lmdb::val indexVal, val; - if (db.size(txn) == 0) + auto cursor = lmdb::cursor::open(txn, orderDb); + if (!cursor.get(indexVal, val, MDB_LAST)) { return {}; + } - std::string timestamp, msg; + return std::string(val.data(), val.size()); +} - auto cursor = lmdb::cursor::open(txn, db); - while (cursor.get(timestamp, msg, MDB_NEXT)) { - auto obj = json::parse(msg); +std::optional<Cache::TimelineRange> +Cache::getTimelineRange(const std::string &room_id) +{ + auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY); + lmdb::dbi orderDb{0}; + try { + orderDb = getOrderToMessageDb(txn, room_id); + } catch (lmdb::runtime_error &e) { + nhlog::db()->error("Can't open db for room '{}', probably doesn't exist yet. ({})", + room_id, + e.what()); + return {}; + } - if (obj.count("event") == 0) - continue; + lmdb::val indexVal, val; - cursor.close(); - return obj["event"]["event_id"]; + auto cursor = lmdb::cursor::open(txn, orderDb); + if (!cursor.get(indexVal, val, MDB_LAST)) { + return {}; } - cursor.close(); - return {}; + TimelineRange range{}; + range.last = *indexVal.data<uint64_t>(); + + if (!cursor.get(indexVal, val, MDB_FIRST)) { + return {}; + } + range.first = *indexVal.data<uint64_t>(); + + return range; +} +std::optional<uint64_t> +Cache::getTimelineIndex(const std::string &room_id, std::string_view event_id) +{ + auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY); + auto orderDb = getMessageToOrderDb(txn, room_id); + + lmdb::val indexVal{event_id.data(), event_id.size()}, val; + + bool success = lmdb::dbi_get(txn, orderDb, indexVal, val); + if (!success) { + return {}; + } + + return *val.data<uint64_t>(); +} + +std::optional<std::string> +Cache::getTimelineEventId(const std::string &room_id, uint64_t index) +{ + auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY); + auto orderDb = getOrderToMessageDb(txn, room_id); + + lmdb::val indexVal{&index, sizeof(index)}, val; + + bool success = lmdb::dbi_get(txn, orderDb, indexVal, val); + if (!success) { + return {}; + } + + return std::string(val.data(), val.size()); } DescInfo Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id) { - auto db = getMessagesDb(txn, room_id); - - if (db.size(txn) == 0) + auto orderDb = getOrderToMessageDb(txn, room_id); + auto eventsDb = getEventsDb(txn, room_id); + if (orderDb.size(txn) == 0) return DescInfo{}; - std::string timestamp, msg; - const auto local_user = utils::localUser(); DescInfo fallbackDesc{}; - auto cursor = lmdb::cursor::open(txn, db); - while (cursor.get(timestamp, msg, MDB_NEXT)) { - auto obj = json::parse(msg); + lmdb::val indexVal, event_id; - if (obj.count("event") == 0) + auto cursor = lmdb::cursor::open(txn, orderDb); + bool first = true; + while (cursor.get(indexVal, event_id, first ? MDB_LAST : MDB_PREV)) { + first = false; + + lmdb::val event; + bool success = lmdb::dbi_get(txn, eventsDb, event_id, event); + if (!success) continue; - if (fallbackDesc.event_id.isEmpty() && obj["event"]["type"] == "m.room.member" && - obj["event"]["state_key"] == local_user.toStdString() && - obj["event"]["content"]["membership"] == "join") { - uint64_t ts = obj["event"]["origin_server_ts"]; + auto obj = json::parse(std::string_view(event.data(), event.size())); + + if (fallbackDesc.event_id.isEmpty() && obj["type"] == "m.room.member" && + obj["state_key"] == local_user.toStdString() && + obj["content"]["membership"] == "join") { + uint64_t ts = obj["origin_server_ts"]; auto time = QDateTime::fromMSecsSinceEpoch(ts); - fallbackDesc = DescInfo{QString::fromStdString(obj["event"]["event_id"]), + fallbackDesc = DescInfo{QString::fromStdString(obj["event_id"]), local_user, tr("You joined this room."), utils::descriptiveTime(time), @@ -1362,20 +1618,17 @@ Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id) time}; } - if (!(obj["event"]["type"] == "m.room.message" || - obj["event"]["type"] == "m.sticker" || - obj["event"]["type"] == "m.call.invite" || - obj["event"]["type"] == "m.call.answer" || - obj["event"]["type"] == "m.call.hangup" || - obj["event"]["type"] == "m.room.encrypted")) + if (!(obj["type"] == "m.room.message" || obj["type"] == "m.sticker" || + obj["type"] == "m.call.invite" || obj["type"] == "m.call.answer" || + obj["type"] == "m.call.hangup" || obj["type"] == "m.room.encrypted")) continue; - mtx::events::collections::TimelineEvent event; - mtx::events::collections::from_json(obj.at("event"), event); + mtx::events::collections::TimelineEvent te; + mtx::events::collections::from_json(obj, te); cursor.close(); return utils::getMessageDescription( - event.data, local_user, QString::fromStdString(room_id)); + te.data, local_user, QString::fromStdString(room_id)); } cursor.close(); @@ -1417,7 +1670,7 @@ Cache::getRoomAvatarUrl(lmdb::txn &txn, if (res) { try { StateEvent<Avatar> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); if (!msg.content.url.empty()) return QString::fromStdString(msg.content.url); @@ -1467,7 +1720,8 @@ Cache::getRoomName(lmdb::txn &txn, lmdb::dbi &statesdb, lmdb::dbi &membersdb) if (res) { try { - StateEvent<Name> msg = json::parse(std::string(event.data(), event.size())); + StateEvent<Name> msg = + json::parse(std::string_view(event.data(), event.size())); if (!msg.content.name.empty()) return QString::fromStdString(msg.content.name); @@ -1482,7 +1736,7 @@ Cache::getRoomName(lmdb::txn &txn, lmdb::dbi &statesdb, lmdb::dbi &membersdb) if (res) { try { StateEvent<CanonicalAlias> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); if (!msg.content.alias.empty()) return QString::fromStdString(msg.content.alias); @@ -1545,7 +1799,7 @@ Cache::getRoomJoinRule(lmdb::txn &txn, lmdb::dbi &statesdb) if (res) { try { StateEvent<state::JoinRules> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); return msg.content.join_rule; } catch (const json::exception &e) { nhlog::db()->warn("failed to parse m.room.join_rule event: {}", e.what()); @@ -1567,7 +1821,7 @@ Cache::getRoomGuestAccess(lmdb::txn &txn, lmdb::dbi &statesdb) if (res) { try { StateEvent<GuestAccess> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); return msg.content.guest_access == AccessState::CanJoin; } catch (const json::exception &e) { nhlog::db()->warn("failed to parse m.room.guest_access event: {}", @@ -1590,7 +1844,7 @@ Cache::getRoomTopic(lmdb::txn &txn, lmdb::dbi &statesdb) if (res) { try { StateEvent<Topic> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); if (!msg.content.topic.empty()) return QString::fromStdString(msg.content.topic); @@ -1615,7 +1869,7 @@ Cache::getRoomVersion(lmdb::txn &txn, lmdb::dbi &statesdb) if (res) { try { StateEvent<Create> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); if (!msg.content.room_version.empty()) return QString::fromStdString(msg.content.room_version); @@ -1641,7 +1895,7 @@ Cache::getInviteRoomName(lmdb::txn &txn, lmdb::dbi &statesdb, lmdb::dbi &members if (res) { try { StrippedEvent<state::Name> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); return QString::fromStdString(msg.content.name); } catch (const json::exception &e) { nhlog::db()->warn("failed to parse m.room.name event: {}", e.what()); @@ -1683,7 +1937,7 @@ Cache::getInviteRoomAvatarUrl(lmdb::txn &txn, lmdb::dbi &statesdb, lmdb::dbi &me if (res) { try { StrippedEvent<state::Avatar> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); return QString::fromStdString(msg.content.url); } catch (const json::exception &e) { nhlog::db()->warn("failed to parse m.room.avatar event: {}", e.what()); @@ -1725,7 +1979,7 @@ Cache::getInviteRoomTopic(lmdb::txn &txn, lmdb::dbi &db) if (res) { try { StrippedEvent<Topic> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); return QString::fromStdString(msg.content.topic); } catch (const json::exception &e) { nhlog::db()->warn("failed to parse m.room.topic event: {}", e.what()); @@ -1756,7 +2010,7 @@ Cache::getRoomAvatar(const std::string &room_id) std::string media_url; try { - RoomInfo info = json::parse(std::string(response.data(), response.size())); + RoomInfo info = json::parse(std::string_view(response.data(), response.size())); media_url = std::move(info.avatar_url); if (media_url.empty()) { @@ -1953,30 +2207,438 @@ Cache::isRoomMember(const std::string &user_id, const std::string &room_id) } void +Cache::savePendingMessage(const std::string &room_id, + const mtx::events::collections::TimelineEvent &message) +{ + auto txn = lmdb::txn::begin(env_); + + mtx::responses::Timeline timeline; + timeline.events.push_back(message.data); + saveTimelineMessages(txn, room_id, timeline); + + auto pending = getPendingMessagesDb(txn, room_id); + + int64_t now = QDateTime::currentMSecsSinceEpoch(); + lmdb::dbi_put(txn, + pending, + lmdb::val(&now, sizeof(now)), + lmdb::val(mtx::accessors::event_id(message.data))); + + txn.commit(); +} + +std::optional<mtx::events::collections::TimelineEvent> +Cache::firstPendingMessage(const std::string &room_id) +{ + auto txn = lmdb::txn::begin(env_); + auto pending = getPendingMessagesDb(txn, room_id); + + { + auto pendingCursor = lmdb::cursor::open(txn, pending); + lmdb::val tsIgnored, pendingTxn; + while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) { + auto eventsDb = getEventsDb(txn, room_id); + lmdb::val event; + if (!lmdb::dbi_get(txn, eventsDb, pendingTxn, event)) { + lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn); + continue; + } + + try { + mtx::events::collections::TimelineEvent te; + mtx::events::collections::from_json( + json::parse(std::string_view(event.data(), event.size())), te); + + pendingCursor.close(); + txn.commit(); + return te; + } catch (std::exception &e) { + nhlog::db()->error("Failed to parse message from cache {}", + e.what()); + lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn); + continue; + } + } + } + + txn.commit(); + + return std::nullopt; +} + +void +Cache::removePendingStatus(const std::string &room_id, const std::string &txn_id) +{ + auto txn = lmdb::txn::begin(env_); + auto pending = getPendingMessagesDb(txn, room_id); + + { + auto pendingCursor = lmdb::cursor::open(txn, pending); + lmdb::val tsIgnored, pendingTxn; + while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) { + if (std::string_view(pendingTxn.data(), pendingTxn.size()) == txn_id) + lmdb::cursor_del(pendingCursor); + } + } + + txn.commit(); +} + +void Cache::saveTimelineMessages(lmdb::txn &txn, const std::string &room_id, const mtx::responses::Timeline &res) { - auto db = getMessagesDb(txn, room_id); + if (res.events.empty()) + return; + + auto eventsDb = getEventsDb(txn, room_id); + auto relationsDb = getRelationsDb(txn, room_id); + + auto orderDb = getEventOrderDb(txn, room_id); + auto evToOrderDb = getEventToOrderDb(txn, room_id); + auto msg2orderDb = getMessageToOrderDb(txn, room_id); + auto order2msgDb = getOrderToMessageDb(txn, room_id); + auto pending = getPendingMessagesDb(txn, room_id); + + if (res.limited) { + lmdb::dbi_drop(txn, orderDb, false); + lmdb::dbi_drop(txn, evToOrderDb, false); + lmdb::dbi_drop(txn, msg2orderDb, false); + lmdb::dbi_drop(txn, order2msgDb, false); + lmdb::dbi_drop(txn, pending, true); + } using namespace mtx::events; using namespace mtx::events::state; + lmdb::val indexVal, val; + uint64_t index = std::numeric_limits<uint64_t>::max() / 2; + auto cursor = lmdb::cursor::open(txn, orderDb); + if (cursor.get(indexVal, val, MDB_LAST)) { + index = *indexVal.data<int64_t>(); + } + + uint64_t msgIndex = std::numeric_limits<uint64_t>::max() / 2; + auto msgCursor = lmdb::cursor::open(txn, order2msgDb); + if (msgCursor.get(indexVal, val, MDB_LAST)) { + msgIndex = *indexVal.data<uint64_t>(); + } + + bool first = true; for (const auto &e : res.events) { - if (std::holds_alternative<RedactionEvent<msg::Redaction>>(e)) + auto event = mtx::accessors::serialize_event(e); + auto txn_id = mtx::accessors::transaction_id(e); + + std::string event_id_val = event.value("event_id", ""); + if (event_id_val.empty()) { + nhlog::db()->error("Event without id!"); continue; + } + + lmdb::val event_id = event_id_val; + + json orderEntry = json::object(); + orderEntry["event_id"] = event_id_val; + if (first && !res.prev_batch.empty()) + orderEntry["prev_batch"] = res.prev_batch; + + lmdb::val txn_order; + if (!txn_id.empty() && + lmdb::dbi_get(txn, evToOrderDb, lmdb::val(txn_id), txn_order)) { + lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump())); + lmdb::dbi_del(txn, eventsDb, lmdb::val(txn_id)); + + lmdb::val msg_txn_order; + if (lmdb::dbi_get(txn, msg2orderDb, lmdb::val(txn_id), msg_txn_order)) { + lmdb::dbi_put(txn, order2msgDb, msg_txn_order, event_id); + lmdb::dbi_put(txn, msg2orderDb, event_id, msg_txn_order); + lmdb::dbi_del(txn, msg2orderDb, lmdb::val(txn_id)); + } + + lmdb::dbi_put(txn, orderDb, txn_order, lmdb::val(orderEntry.dump())); + lmdb::dbi_put(txn, evToOrderDb, event_id, txn_order); + lmdb::dbi_del(txn, evToOrderDb, lmdb::val(txn_id)); + + if (event.contains("content") && + event["content"].contains("m.relates_to")) { + auto temp = event["content"]["m.relates_to"]; + std::string relates_to = temp.contains("m.in_reply_to") + ? temp["m.in_reply_to"]["event_id"] + : temp["event_id"]; + + if (!relates_to.empty()) { + lmdb::dbi_del(txn, + relationsDb, + lmdb::val(relates_to), + lmdb::val(txn_id)); + lmdb::dbi_put( + txn, relationsDb, lmdb::val(relates_to), event_id); + } + } + + auto pendingCursor = lmdb::cursor::open(txn, pending); + lmdb::val tsIgnored, pendingTxn; + while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) { + if (std::string_view(pendingTxn.data(), pendingTxn.size()) == + txn_id) + lmdb::cursor_del(pendingCursor); + } + } else if (auto redaction = + std::get_if<mtx::events::RedactionEvent<mtx::events::msg::Redaction>>( + &e)) { + if (redaction->redacts.empty()) + continue; + + lmdb::val oldEvent; + bool success = + lmdb::dbi_get(txn, eventsDb, lmdb::val(redaction->redacts), oldEvent); + if (!success) + continue; - json obj = json::object(); + mtx::events::collections::TimelineEvent te; + try { + mtx::events::collections::from_json( + json::parse(std::string_view(oldEvent.data(), oldEvent.size())), + te); + // overwrite the content and add redation data + std::visit( + [redaction](auto &ev) { + ev.unsigned_data.redacted_because = *redaction; + ev.unsigned_data.redacted_by = redaction->event_id; + }, + te.data); + event = mtx::accessors::serialize_event(te.data); + event["content"].clear(); + + } catch (std::exception &e) { + nhlog::db()->error("Failed to parse message from cache {}", + e.what()); + continue; + } + + lmdb::dbi_put( + txn, eventsDb, lmdb::val(redaction->redacts), lmdb::val(event.dump())); + lmdb::dbi_put(txn, + eventsDb, + lmdb::val(redaction->event_id), + lmdb::val(json(*redaction).dump())); + } else { + lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump())); + + ++index; + + first = false; + + nhlog::db()->debug("saving '{}'", orderEntry.dump()); + + lmdb::cursor_put(cursor.handle(), + lmdb::val(&index, sizeof(index)), + lmdb::val(orderEntry.dump()), + MDB_APPEND); + lmdb::dbi_put(txn, evToOrderDb, event_id, lmdb::val(&index, sizeof(index))); + + // TODO(Nico): Allow blacklisting more event types in UI + if (!isHiddenEvent(e, room_id)) { + ++msgIndex; + lmdb::cursor_put(msgCursor.handle(), + lmdb::val(&msgIndex, sizeof(msgIndex)), + event_id, + MDB_APPEND); + + lmdb::dbi_put(txn, + msg2orderDb, + event_id, + lmdb::val(&msgIndex, sizeof(msgIndex))); + } + + if (event.contains("content") && + event["content"].contains("m.relates_to")) { + auto temp = event["content"]["m.relates_to"]; + std::string relates_to = temp.contains("m.in_reply_to") + ? temp["m.in_reply_to"]["event_id"] + : temp["event_id"]; + + if (!relates_to.empty()) + lmdb::dbi_put( + txn, relationsDb, lmdb::val(relates_to), event_id); + } + } + } +} + +uint64_t +Cache::saveOldMessages(const std::string &room_id, const mtx::responses::Messages &res) +{ + auto txn = lmdb::txn::begin(env_); + auto eventsDb = getEventsDb(txn, room_id); + auto relationsDb = getRelationsDb(txn, room_id); + + auto orderDb = getEventOrderDb(txn, room_id); + auto evToOrderDb = getEventToOrderDb(txn, room_id); + auto msg2orderDb = getMessageToOrderDb(txn, room_id); + auto order2msgDb = getOrderToMessageDb(txn, room_id); + + lmdb::val indexVal, val; + uint64_t index = std::numeric_limits<uint64_t>::max() / 2; + { + auto cursor = lmdb::cursor::open(txn, orderDb); + if (cursor.get(indexVal, val, MDB_FIRST)) { + index = *indexVal.data<uint64_t>(); + } + } + + uint64_t msgIndex = std::numeric_limits<uint64_t>::max() / 2; + { + auto msgCursor = lmdb::cursor::open(txn, order2msgDb); + if (msgCursor.get(indexVal, val, MDB_FIRST)) { + msgIndex = *indexVal.data<uint64_t>(); + } + } + + if (res.chunk.empty()) + return index; + + std::string event_id_val; + for (const auto &e : res.chunk) { + if (std::holds_alternative< + mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(e)) + continue; - obj["event"] = mtx::accessors::serialize_event(e); - obj["token"] = res.prev_batch; + auto event = mtx::accessors::serialize_event(e); + event_id_val = event["event_id"].get<std::string>(); + lmdb::val event_id = event_id_val; + lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump())); + + --index; + + json orderEntry = json::object(); + orderEntry["event_id"] = event_id_val; + + nhlog::db()->debug("saving '{}'", orderEntry.dump()); lmdb::dbi_put( - txn, - db, - lmdb::val(std::to_string(obj["event"]["origin_server_ts"].get<uint64_t>())), - lmdb::val(obj.dump())); + txn, orderDb, lmdb::val(&index, sizeof(index)), lmdb::val(orderEntry.dump())); + lmdb::dbi_put(txn, evToOrderDb, event_id, lmdb::val(&index, sizeof(index))); + + // TODO(Nico): Allow blacklisting more event types in UI + if (!isHiddenEvent(e, room_id)) { + --msgIndex; + lmdb::dbi_put( + txn, order2msgDb, lmdb::val(&msgIndex, sizeof(msgIndex)), event_id); + + lmdb::dbi_put( + txn, msg2orderDb, event_id, lmdb::val(&msgIndex, sizeof(msgIndex))); + } + + if (event.contains("content") && event["content"].contains("m.relates_to")) { + auto temp = event["content"]["m.relates_to"]; + std::string relates_to = temp.contains("m.in_reply_to") + ? temp["m.in_reply_to"]["event_id"] + : temp["event_id"]; + + if (!relates_to.empty()) + lmdb::dbi_put(txn, relationsDb, lmdb::val(relates_to), event_id); + } } + + json orderEntry = json::object(); + orderEntry["event_id"] = event_id_val; + orderEntry["prev_batch"] = res.end; + lmdb::dbi_put(txn, orderDb, lmdb::val(&index, sizeof(index)), lmdb::val(orderEntry.dump())); + nhlog::db()->debug("saving '{}'", orderEntry.dump()); + + txn.commit(); + + return msgIndex; +} + +void +Cache::clearTimeline(const std::string &room_id) +{ + auto txn = lmdb::txn::begin(env_); + auto eventsDb = getEventsDb(txn, room_id); + auto relationsDb = getRelationsDb(txn, room_id); + + auto orderDb = getEventOrderDb(txn, room_id); + auto evToOrderDb = getEventToOrderDb(txn, room_id); + auto msg2orderDb = getMessageToOrderDb(txn, room_id); + auto order2msgDb = getOrderToMessageDb(txn, room_id); + + lmdb::val indexVal, val; + auto cursor = lmdb::cursor::open(txn, orderDb); + + bool start = true; + bool passed_pagination_token = false; + while (cursor.get(indexVal, val, start ? MDB_LAST : MDB_PREV)) { + start = false; + json obj; + + try { + obj = json::parse(std::string_view(val.data(), val.size())); + } catch (std::exception &) { + // workaround bug in the initial db format, where we sometimes didn't store + // json... + obj = {{"event_id", std::string(val.data(), val.size())}}; + } + + if (passed_pagination_token) { + if (obj.count("event_id") != 0) { + lmdb::val event_id = obj["event_id"].get<std::string>(); + lmdb::dbi_del(txn, evToOrderDb, event_id); + lmdb::dbi_del(txn, eventsDb, event_id); + + lmdb::dbi_del(txn, relationsDb, event_id); + + lmdb::val order{}; + bool exists = lmdb::dbi_get(txn, msg2orderDb, event_id, order); + if (exists) { + lmdb::dbi_del(txn, order2msgDb, order); + lmdb::dbi_del(txn, msg2orderDb, event_id); + } + } + lmdb::cursor_del(cursor); + } else { + if (obj.count("prev_batch") != 0) + passed_pagination_token = true; + } + } + + auto msgCursor = lmdb::cursor::open(txn, order2msgDb); + start = true; + while (msgCursor.get(indexVal, val, start ? MDB_LAST : MDB_PREV)) { + start = false; + + lmdb::val eventId; + bool innerStart = true; + bool found = false; + while (cursor.get(indexVal, eventId, innerStart ? MDB_LAST : MDB_PREV)) { + innerStart = false; + + json obj; + try { + obj = json::parse(std::string_view(eventId.data(), eventId.size())); + } catch (std::exception &) { + obj = {{"event_id", std::string(eventId.data(), eventId.size())}}; + } + + if (obj["event_id"] == std::string(val.data(), val.size())) { + found = true; + break; + } + } + + if (!found) + break; + } + + do { + lmdb::cursor_del(msgCursor); + } while (msgCursor.get(indexVal, val, MDB_PREV)); + + cursor.close(); + msgCursor.close(); + txn.commit(); } mtx::responses::Notifications @@ -2111,34 +2773,60 @@ Cache::getRoomIds(lmdb::txn &txn) void Cache::deleteOldMessages() { + lmdb::val indexVal, val; + auto txn = lmdb::txn::begin(env_); auto room_ids = getRoomIds(txn); - for (const auto &id : room_ids) { - auto msg_db = getMessagesDb(txn, id); + for (const auto &room_id : room_ids) { + auto orderDb = getEventOrderDb(txn, room_id); + auto evToOrderDb = getEventToOrderDb(txn, room_id); + auto o2m = getOrderToMessageDb(txn, room_id); + auto m2o = getMessageToOrderDb(txn, room_id); + auto eventsDb = getEventsDb(txn, room_id); + auto relationsDb = getRelationsDb(txn, room_id); + auto cursor = lmdb::cursor::open(txn, orderDb); - std::string ts, event; - uint64_t idx = 0; + uint64_t first, last; + if (cursor.get(indexVal, val, MDB_LAST)) { + last = *indexVal.data<uint64_t>(); + } else { + continue; + } + if (cursor.get(indexVal, val, MDB_FIRST)) { + first = *indexVal.data<uint64_t>(); + } else { + continue; + } - const auto db_size = msg_db.size(txn); - if (db_size <= 3 * MAX_RESTORED_MESSAGES) + size_t message_count = static_cast<size_t>(last - first); + if (message_count < MAX_RESTORED_MESSAGES) continue; - nhlog::db()->info("[{}] message count: {}", id, db_size); + bool start = true; + while (cursor.get(indexVal, val, start ? MDB_FIRST : MDB_NEXT) && + message_count-- > MAX_RESTORED_MESSAGES) { + start = false; + auto obj = json::parse(std::string_view(val.data(), val.size())); - auto cursor = lmdb::cursor::open(txn, msg_db); - while (cursor.get(ts, event, MDB_NEXT)) { - idx += 1; + if (obj.count("event_id") != 0) { + lmdb::val event_id = obj["event_id"].get<std::string>(); + lmdb::dbi_del(txn, evToOrderDb, event_id); + lmdb::dbi_del(txn, eventsDb, event_id); - if (idx > MAX_RESTORED_MESSAGES) - lmdb::cursor_del(cursor); - } + lmdb::dbi_del(txn, relationsDb, event_id); + lmdb::val order{}; + bool exists = lmdb::dbi_get(txn, m2o, event_id, order); + if (exists) { + lmdb::dbi_del(txn, o2m, order); + lmdb::dbi_del(txn, m2o, event_id); + } + } + lmdb::cursor_del(cursor); + } cursor.close(); - - nhlog::db()->info("[{}] updated message count: {}", id, msg_db.size(txn)); } - txn.commit(); } @@ -2172,7 +2860,7 @@ Cache::hasEnoughPowerLevel(const std::vector<mtx::events::EventType> &eventTypes if (res) { try { StateEvent<PowerLevels> msg = - json::parse(std::string(event.data(), event.size())); + json::parse(std::string_view(event.data(), event.size())); user_level = msg.content.user_level(user_id); @@ -2277,6 +2965,9 @@ Cache::removeAvatarUrl(const QString &room_id, const QString &user_id) mtx::presence::PresenceState Cache::presenceState(const std::string &user_id) { + if (user_id.empty()) + return {}; + lmdb::val presenceVal; auto txn = lmdb::txn::begin(env_); @@ -2287,7 +2978,7 @@ Cache::presenceState(const std::string &user_id) if (res) { mtx::events::presence::Presence presence = - json::parse(std::string(presenceVal.data(), presenceVal.size())); + json::parse(std::string_view(presenceVal.data(), presenceVal.size())); state = presence.presence; } @@ -2299,6 +2990,9 @@ Cache::presenceState(const std::string &user_id) std::string Cache::statusMessage(const std::string &user_id) { + if (user_id.empty()) + return {}; + lmdb::val presenceVal; auto txn = lmdb::txn::begin(env_); @@ -2309,7 +3003,7 @@ Cache::statusMessage(const std::string &user_id) if (res) { mtx::events::presence::Presence presence = - json::parse(std::string(presenceVal.data(), presenceVal.size())); + json::parse(std::string_view(presenceVal.data(), presenceVal.size())); status_msg = presence.status_msg; }