diff --git a/src/Cache.cpp b/src/Cache.cpp
index d435dc56..91cde9e7 100644
--- a/src/Cache.cpp
+++ b/src/Cache.cpp
@@ -33,11 +33,12 @@
#include "Cache_p.h"
#include "EventAccessors.h"
#include "Logging.h"
+#include "Olm.h"
#include "Utils.h"
//! Should be changed when a breaking change occurs in the cache format.
//! This will reset client's data.
-static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.05.01");
+static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.07.05");
static const std::string SECRET("secret");
static lmdb::val NEXT_BATCH_KEY("next_batch");
@@ -46,8 +47,9 @@ static lmdb::val CACHE_FORMAT_VERSION_KEY("cache_format_version");
constexpr size_t MAX_RESTORED_MESSAGES = 30'000;
-constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB
-constexpr auto MAX_DBS = 8092UL;
+constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB
+constexpr auto MAX_DBS = 8092UL;
+constexpr auto BATCH_SIZE = 100;
//! Cache databases and their format.
//!
@@ -63,7 +65,6 @@ constexpr auto SYNC_STATE_DB("sync_state");
//! Read receipts per room/event.
constexpr auto READ_RECEIPTS_DB("read_receipts");
constexpr auto NOTIFICATIONS_DB("sent_notifications");
-//! TODO: delete pending_receipts database on old cache versions
//! Encryption related databases.
@@ -93,18 +94,31 @@ namespace {
std::unique_ptr<Cache> instance_ = nullptr;
}
-int
-numeric_key_comparison(const MDB_val *a, const MDB_val *b)
+static bool
+isHiddenEvent(mtx::events::collections::TimelineEvents e, const std::string &room_id)
{
- auto lhs = std::stoull(std::string((char *)a->mv_data, a->mv_size));
- auto rhs = std::stoull(std::string((char *)b->mv_data, b->mv_size));
+ using namespace mtx::events;
+ if (auto encryptedEvent = std::get_if<EncryptedEvent<msg::Encrypted>>(&e)) {
+ MegolmSessionIndex index;
+ index.room_id = room_id;
+ index.session_id = encryptedEvent->content.session_id;
+ index.sender_key = encryptedEvent->content.sender_key;
- if (lhs < rhs)
- return 1;
- else if (lhs == rhs)
- return 0;
+ auto result = olm::decryptEvent(index, *encryptedEvent);
+ if (!result.error)
+ e = result.event.value();
+ }
- return -1;
+ static constexpr std::initializer_list<EventType> hiddenEvents = {
+ EventType::Reaction, EventType::CallCandidates, EventType::Unsupported};
+
+ return std::visit(
+ [](const auto &ev) {
+ return std::any_of(hiddenEvents.begin(),
+ hiddenEvents.end(),
+ [ev](EventType type) { return type == ev.type; });
+ },
+ e);
}
Cache::Cache(const QString &userId, QObject *parent)
@@ -154,7 +168,10 @@ Cache::setup()
}
try {
- env_.open(statePath.toStdString().c_str());
+ // NOTE(Nico): We may want to use (MDB_MAPASYNC | MDB_WRITEMAP) in the future, but
+ // it can really mess up our database, so we shouldn't. For now, hopefully
+ // NOMETASYNC is fast enough.
+ env_.open(statePath.toStdString().c_str(), MDB_NOMETASYNC);
} catch (const lmdb::error &e) {
if (e.code() != MDB_VERSION_MISMATCH && e.code() != MDB_INVALID) {
throw std::runtime_error("LMDB initialization failed" +
@@ -700,8 +717,77 @@ Cache::runMigrations()
nhlog::db()->info("Successfully deleted pending receipts database.");
return true;
}},
+ {"2020.07.05",
+ [this]() {
+ try {
+ auto txn = lmdb::txn::begin(env_, nullptr);
+ auto room_ids = getRoomIds(txn);
+
+ for (const auto &room_id : room_ids) {
+ try {
+ auto messagesDb = lmdb::dbi::open(
+ txn, std::string(room_id + "/messages").c_str());
+
+ // keep some old messages and batch token
+ {
+ auto roomsCursor =
+ lmdb::cursor::open(txn, messagesDb);
+ lmdb::val ts, stored_message;
+ bool start = true;
+ mtx::responses::Timeline oldMessages;
+ while (roomsCursor.get(ts,
+ stored_message,
+ start ? MDB_FIRST
+ : MDB_NEXT)) {
+ start = false;
+
+ auto j = json::parse(std::string_view(
+ stored_message.data(),
+ stored_message.size()));
+
+ if (oldMessages.prev_batch.empty())
+ oldMessages.prev_batch =
+ j["token"].get<std::string>();
+ else if (j["token"] !=
+ oldMessages.prev_batch)
+ break;
+
+ mtx::events::collections::TimelineEvent
+ te;
+ mtx::events::collections::from_json(
+ j["event"], te);
+ oldMessages.events.push_back(te.data);
+ }
+ // messages were stored in reverse order, so we
+ // need to reverse them
+ std::reverse(oldMessages.events.begin(),
+ oldMessages.events.end());
+ // save messages using the new method
+ saveTimelineMessages(txn, room_id, oldMessages);
+ }
+
+ // delete old messages db
+ lmdb::dbi_drop(txn, messagesDb, true);
+ } catch (std::exception &e) {
+ nhlog::db()->error(
+ "While migrating messages from {}, ignoring error {}",
+ room_id,
+ e.what());
+ }
+ }
+ txn.commit();
+ } catch (const lmdb::error &) {
+ nhlog::db()->critical(
+ "Failed to delete messages database in migration!");
+ return false;
+ }
+
+ nhlog::db()->info("Successfully deleted pending receipts database.");
+ return true;
+ }},
};
+ nhlog::db()->info("Running migrations, this may take a while!");
for (const auto &[target_version, migration] : migrations) {
if (target_version > stored_version)
if (!migration()) {
@@ -709,6 +795,7 @@ Cache::runMigrations()
return false;
}
}
+ nhlog::db()->info("Migrations finished.");
setCurrentFormat();
return true;
@@ -771,8 +858,9 @@ Cache::readReceipts(const QString &event_id, const QString &room_id)
txn.commit();
if (res) {
- auto json_response = json::parse(std::string(value.data(), value.size()));
- auto values = json_response.get<std::map<std::string, uint64_t>>();
+ auto json_response =
+ json::parse(std::string_view(value.data(), value.size()));
+ auto values = json_response.get<std::map<std::string, uint64_t>>();
for (const auto &v : values)
// timestamp, user_id
@@ -810,8 +898,8 @@ Cache::updateReadReceipt(lmdb::txn &txn, const std::string &room_id, const Recei
// If an entry for the event id already exists, we would
// merge the existing receipts with the new ones.
if (exists) {
- auto json_value =
- json::parse(std::string(prev_value.data(), prev_value.size()));
+ auto json_value = json::parse(
+ std::string_view(prev_value.data(), prev_value.size()));
// Retrieve the saved receipts.
saved_receipts = json_value.get<std::map<std::string, uint64_t>>();
@@ -930,7 +1018,7 @@ Cache::saveState(const mtx::responses::Sync &res)
if (lmdb::dbi_get(txn, roomsDb_, lmdb::val(room.first), data)) {
try {
RoomInfo tmp =
- json::parse(std::string(data.data(), data.size()));
+ json::parse(std::string_view(data.data(), data.size()));
updatedInfo.tags = tmp.tags;
} catch (const json::exception &e) {
nhlog::db()->warn(
@@ -1122,7 +1210,7 @@ Cache::singleRoomInfo(const std::string &room_id)
// Check if the room is joined.
if (lmdb::dbi_get(txn, roomsDb_, lmdb::val(room_id), data)) {
try {
- RoomInfo tmp = json::parse(std::string(data.data(), data.size()));
+ RoomInfo tmp = json::parse(std::string_view(data.data(), data.size()));
tmp.member_count = getMembersDb(txn, room_id).size(txn);
tmp.join_rule = getRoomJoinRule(txn, statesdb);
tmp.guest_access = getRoomGuestAccess(txn, statesdb);
@@ -1157,7 +1245,8 @@ Cache::getRoomInfo(const std::vector<std::string> &rooms)
// Check if the room is joined.
if (lmdb::dbi_get(txn, roomsDb_, lmdb::val(room), data)) {
try {
- RoomInfo tmp = json::parse(std::string(data.data(), data.size()));
+ RoomInfo tmp =
+ json::parse(std::string_view(data.data(), data.size()));
tmp.member_count = getMembersDb(txn, room).size(txn);
tmp.join_rule = getRoomJoinRule(txn, statesdb);
tmp.guest_access = getRoomGuestAccess(txn, statesdb);
@@ -1173,7 +1262,7 @@ Cache::getRoomInfo(const std::vector<std::string> &rooms)
if (lmdb::dbi_get(txn, invitesDb_, lmdb::val(room), data)) {
try {
RoomInfo tmp =
- json::parse(std::string(data.data(), data.size()));
+ json::parse(std::string_view(data.data(), data.size()));
tmp.member_count = getInviteMembersDb(txn, room).size(txn);
room_info.emplace(QString::fromStdString(room),
@@ -1232,38 +1321,147 @@ Cache::getTimelineMentions()
return notifs;
}
-mtx::responses::Timeline
-Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id)
+std::string
+Cache::previousBatchToken(const std::string &room_id)
+{
+ auto txn = lmdb::txn::begin(env_, nullptr);
+ auto orderDb = getEventOrderDb(txn, room_id);
+
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ lmdb::val indexVal, val;
+ if (!cursor.get(indexVal, val, MDB_FIRST)) {
+ return "";
+ }
+
+ auto j = json::parse(std::string_view(val.data(), val.size()));
+
+ return j.value("prev_batch", "");
+}
+
+Cache::Messages
+Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, uint64_t index, bool forward)
{
// TODO(nico): Limit the messages returned by this maybe?
- auto db = getMessagesDb(txn, room_id);
+ auto orderDb = getOrderToMessageDb(txn, room_id);
+ auto eventsDb = getEventsDb(txn, room_id);
- mtx::responses::Timeline timeline;
- std::string timestamp, msg;
+ Messages messages{};
- auto cursor = lmdb::cursor::open(txn, db);
+ lmdb::val indexVal, event_id;
- size_t index = 0;
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ if (index == std::numeric_limits<uint64_t>::max()) {
+ if (cursor.get(indexVal, event_id, forward ? MDB_FIRST : MDB_LAST)) {
+ index = *indexVal.data<uint64_t>();
+ } else {
+ messages.end_of_cache = true;
+ return messages;
+ }
+ } else {
+ if (cursor.get(indexVal, event_id, MDB_SET)) {
+ index = *indexVal.data<uint64_t>();
+ } else {
+ messages.end_of_cache = true;
+ return messages;
+ }
+ }
- while (cursor.get(timestamp, msg, MDB_NEXT) && index < MAX_RESTORED_MESSAGES) {
- auto obj = json::parse(msg);
+ int counter = 0;
- if (obj.count("event") == 0 || obj.count("token") == 0)
+ bool ret;
+ while ((ret = cursor.get(indexVal,
+ event_id,
+ counter == 0 ? (forward ? MDB_FIRST : MDB_LAST)
+ : (forward ? MDB_NEXT : MDB_PREV))) &&
+ counter++ < BATCH_SIZE) {
+ lmdb::val event;
+ bool success = lmdb::dbi_get(txn, eventsDb, event_id, event);
+ if (!success)
continue;
- mtx::events::collections::TimelineEvent event;
- mtx::events::collections::from_json(obj.at("event"), event);
-
- index += 1;
+ mtx::events::collections::TimelineEvent te;
+ try {
+ mtx::events::collections::from_json(
+ json::parse(std::string_view(event.data(), event.size())), te);
+ } catch (std::exception &e) {
+ nhlog::db()->error("Failed to parse message from cache {}", e.what());
+ continue;
+ }
- timeline.events.push_back(event.data);
- timeline.prev_batch = obj.at("token").get<std::string>();
+ messages.timeline.events.push_back(std::move(te.data));
}
cursor.close();
- std::reverse(timeline.events.begin(), timeline.events.end());
+ // std::reverse(timeline.events.begin(), timeline.events.end());
+ messages.next_index = *indexVal.data<uint64_t>();
+ messages.end_of_cache = !ret;
+
+ return messages;
+}
+
+std::optional<mtx::events::collections::TimelineEvent>
+Cache::getEvent(const std::string &room_id, const std::string &event_id)
+{
+ auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY);
+ auto eventsDb = getEventsDb(txn, room_id);
+
+ lmdb::val event{};
+ bool success = lmdb::dbi_get(txn, eventsDb, lmdb::val(event_id), event);
+ if (!success)
+ return {};
- return timeline;
+ mtx::events::collections::TimelineEvent te;
+ try {
+ mtx::events::collections::from_json(
+ json::parse(std::string_view(event.data(), event.size())), te);
+ } catch (std::exception &e) {
+ nhlog::db()->error("Failed to parse message from cache {}", e.what());
+ return std::nullopt;
+ }
+
+ return te;
+}
+void
+Cache::storeEvent(const std::string &room_id,
+ const std::string &event_id,
+ const mtx::events::collections::TimelineEvent &event)
+{
+ auto txn = lmdb::txn::begin(env_);
+ auto eventsDb = getEventsDb(txn, room_id);
+ auto event_json = mtx::accessors::serialize_event(event.data);
+ lmdb::dbi_put(txn, eventsDb, lmdb::val(event_id), lmdb::val(event_json.dump()));
+ txn.commit();
+}
+
+std::vector<std::string>
+Cache::relatedEvents(const std::string &room_id, const std::string &event_id)
+{
+ auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY);
+ auto relationsDb = getRelationsDb(txn, room_id);
+
+ std::vector<std::string> related_ids;
+
+ auto related_cursor = lmdb::cursor::open(txn, relationsDb);
+ lmdb::val related_to = event_id, related_event;
+ bool first = true;
+
+ try {
+ if (!related_cursor.get(related_to, related_event, MDB_SET))
+ return {};
+
+ while (related_cursor.get(
+ related_to, related_event, first ? MDB_FIRST_DUP : MDB_NEXT_DUP)) {
+ first = false;
+ if (event_id != std::string_view(related_to.data(), related_to.size()))
+ break;
+
+ related_ids.emplace_back(related_event.data(), related_event.size());
+ }
+ } catch (const lmdb::error &e) {
+ nhlog::db()->error("related events error: {}", e.what());
+ }
+
+ return related_ids;
}
QMap<QString, RoomInfo>
@@ -1306,55 +1504,113 @@ Cache::roomInfo(bool withInvites)
std::string
Cache::getLastEventId(lmdb::txn &txn, const std::string &room_id)
{
- auto db = getMessagesDb(txn, room_id);
+ auto orderDb = getOrderToMessageDb(txn, room_id);
+
+ lmdb::val indexVal, val;
- if (db.size(txn) == 0)
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ if (!cursor.get(indexVal, val, MDB_LAST)) {
return {};
+ }
- std::string timestamp, msg;
+ return std::string(val.data(), val.size());
+}
- auto cursor = lmdb::cursor::open(txn, db);
- while (cursor.get(timestamp, msg, MDB_NEXT)) {
- auto obj = json::parse(msg);
+std::optional<Cache::TimelineRange>
+Cache::getTimelineRange(const std::string &room_id)
+{
+ auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY);
+ lmdb::dbi orderDb{0};
+ try {
+ orderDb = getOrderToMessageDb(txn, room_id);
+ } catch (lmdb::runtime_error &e) {
+ nhlog::db()->error("Can't open db for room '{}', probably doesn't exist yet. ({})",
+ room_id,
+ e.what());
+ return {};
+ }
- if (obj.count("event") == 0)
- continue;
+ lmdb::val indexVal, val;
- cursor.close();
- return obj["event"]["event_id"];
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ if (!cursor.get(indexVal, val, MDB_LAST)) {
+ return {};
}
- cursor.close();
- return {};
+ TimelineRange range{};
+ range.last = *indexVal.data<uint64_t>();
+
+ if (!cursor.get(indexVal, val, MDB_FIRST)) {
+ return {};
+ }
+ range.first = *indexVal.data<uint64_t>();
+
+ return range;
+}
+std::optional<uint64_t>
+Cache::getTimelineIndex(const std::string &room_id, std::string_view event_id)
+{
+ auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY);
+ auto orderDb = getMessageToOrderDb(txn, room_id);
+
+ lmdb::val indexVal{event_id.data(), event_id.size()}, val;
+
+ bool success = lmdb::dbi_get(txn, orderDb, indexVal, val);
+ if (!success) {
+ return {};
+ }
+
+ return *val.data<uint64_t>();
+}
+
+std::optional<std::string>
+Cache::getTimelineEventId(const std::string &room_id, uint64_t index)
+{
+ auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY);
+ auto orderDb = getOrderToMessageDb(txn, room_id);
+
+ lmdb::val indexVal{&index, sizeof(index)}, val;
+
+ bool success = lmdb::dbi_get(txn, orderDb, indexVal, val);
+ if (!success) {
+ return {};
+ }
+
+ return std::string(val.data(), val.size());
}
DescInfo
Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id)
{
- auto db = getMessagesDb(txn, room_id);
-
- if (db.size(txn) == 0)
+ auto orderDb = getOrderToMessageDb(txn, room_id);
+ auto eventsDb = getEventsDb(txn, room_id);
+ if (orderDb.size(txn) == 0)
return DescInfo{};
- std::string timestamp, msg;
-
const auto local_user = utils::localUser();
DescInfo fallbackDesc{};
- auto cursor = lmdb::cursor::open(txn, db);
- while (cursor.get(timestamp, msg, MDB_NEXT)) {
- auto obj = json::parse(msg);
+ lmdb::val indexVal, event_id;
- if (obj.count("event") == 0)
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ bool first = true;
+ while (cursor.get(indexVal, event_id, first ? MDB_LAST : MDB_PREV)) {
+ first = false;
+
+ lmdb::val event;
+ bool success = lmdb::dbi_get(txn, eventsDb, event_id, event);
+ if (!success)
continue;
- if (fallbackDesc.event_id.isEmpty() && obj["event"]["type"] == "m.room.member" &&
- obj["event"]["state_key"] == local_user.toStdString() &&
- obj["event"]["content"]["membership"] == "join") {
- uint64_t ts = obj["event"]["origin_server_ts"];
+ auto obj = json::parse(std::string_view(event.data(), event.size()));
+
+ if (fallbackDesc.event_id.isEmpty() && obj["type"] == "m.room.member" &&
+ obj["state_key"] == local_user.toStdString() &&
+ obj["content"]["membership"] == "join") {
+ uint64_t ts = obj["origin_server_ts"];
auto time = QDateTime::fromMSecsSinceEpoch(ts);
- fallbackDesc = DescInfo{QString::fromStdString(obj["event"]["event_id"]),
+ fallbackDesc = DescInfo{QString::fromStdString(obj["event_id"]),
local_user,
tr("You joined this room."),
utils::descriptiveTime(time),
@@ -1362,20 +1618,17 @@ Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id)
time};
}
- if (!(obj["event"]["type"] == "m.room.message" ||
- obj["event"]["type"] == "m.sticker" ||
- obj["event"]["type"] == "m.call.invite" ||
- obj["event"]["type"] == "m.call.answer" ||
- obj["event"]["type"] == "m.call.hangup" ||
- obj["event"]["type"] == "m.room.encrypted"))
+ if (!(obj["type"] == "m.room.message" || obj["type"] == "m.sticker" ||
+ obj["type"] == "m.call.invite" || obj["type"] == "m.call.answer" ||
+ obj["type"] == "m.call.hangup" || obj["type"] == "m.room.encrypted"))
continue;
- mtx::events::collections::TimelineEvent event;
- mtx::events::collections::from_json(obj.at("event"), event);
+ mtx::events::collections::TimelineEvent te;
+ mtx::events::collections::from_json(obj, te);
cursor.close();
return utils::getMessageDescription(
- event.data, local_user, QString::fromStdString(room_id));
+ te.data, local_user, QString::fromStdString(room_id));
}
cursor.close();
@@ -1417,7 +1670,7 @@ Cache::getRoomAvatarUrl(lmdb::txn &txn,
if (res) {
try {
StateEvent<Avatar> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
if (!msg.content.url.empty())
return QString::fromStdString(msg.content.url);
@@ -1467,7 +1720,8 @@ Cache::getRoomName(lmdb::txn &txn, lmdb::dbi &statesdb, lmdb::dbi &membersdb)
if (res) {
try {
- StateEvent<Name> msg = json::parse(std::string(event.data(), event.size()));
+ StateEvent<Name> msg =
+ json::parse(std::string_view(event.data(), event.size()));
if (!msg.content.name.empty())
return QString::fromStdString(msg.content.name);
@@ -1482,7 +1736,7 @@ Cache::getRoomName(lmdb::txn &txn, lmdb::dbi &statesdb, lmdb::dbi &membersdb)
if (res) {
try {
StateEvent<CanonicalAlias> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
if (!msg.content.alias.empty())
return QString::fromStdString(msg.content.alias);
@@ -1545,7 +1799,7 @@ Cache::getRoomJoinRule(lmdb::txn &txn, lmdb::dbi &statesdb)
if (res) {
try {
StateEvent<state::JoinRules> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
return msg.content.join_rule;
} catch (const json::exception &e) {
nhlog::db()->warn("failed to parse m.room.join_rule event: {}", e.what());
@@ -1567,7 +1821,7 @@ Cache::getRoomGuestAccess(lmdb::txn &txn, lmdb::dbi &statesdb)
if (res) {
try {
StateEvent<GuestAccess> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
return msg.content.guest_access == AccessState::CanJoin;
} catch (const json::exception &e) {
nhlog::db()->warn("failed to parse m.room.guest_access event: {}",
@@ -1590,7 +1844,7 @@ Cache::getRoomTopic(lmdb::txn &txn, lmdb::dbi &statesdb)
if (res) {
try {
StateEvent<Topic> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
if (!msg.content.topic.empty())
return QString::fromStdString(msg.content.topic);
@@ -1615,7 +1869,7 @@ Cache::getRoomVersion(lmdb::txn &txn, lmdb::dbi &statesdb)
if (res) {
try {
StateEvent<Create> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
if (!msg.content.room_version.empty())
return QString::fromStdString(msg.content.room_version);
@@ -1641,7 +1895,7 @@ Cache::getInviteRoomName(lmdb::txn &txn, lmdb::dbi &statesdb, lmdb::dbi &members
if (res) {
try {
StrippedEvent<state::Name> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
return QString::fromStdString(msg.content.name);
} catch (const json::exception &e) {
nhlog::db()->warn("failed to parse m.room.name event: {}", e.what());
@@ -1683,7 +1937,7 @@ Cache::getInviteRoomAvatarUrl(lmdb::txn &txn, lmdb::dbi &statesdb, lmdb::dbi &me
if (res) {
try {
StrippedEvent<state::Avatar> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
return QString::fromStdString(msg.content.url);
} catch (const json::exception &e) {
nhlog::db()->warn("failed to parse m.room.avatar event: {}", e.what());
@@ -1725,7 +1979,7 @@ Cache::getInviteRoomTopic(lmdb::txn &txn, lmdb::dbi &db)
if (res) {
try {
StrippedEvent<Topic> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
return QString::fromStdString(msg.content.topic);
} catch (const json::exception &e) {
nhlog::db()->warn("failed to parse m.room.topic event: {}", e.what());
@@ -1756,7 +2010,7 @@ Cache::getRoomAvatar(const std::string &room_id)
std::string media_url;
try {
- RoomInfo info = json::parse(std::string(response.data(), response.size()));
+ RoomInfo info = json::parse(std::string_view(response.data(), response.size()));
media_url = std::move(info.avatar_url);
if (media_url.empty()) {
@@ -1953,30 +2207,438 @@ Cache::isRoomMember(const std::string &user_id, const std::string &room_id)
}
void
+Cache::savePendingMessage(const std::string &room_id,
+ const mtx::events::collections::TimelineEvent &message)
+{
+ auto txn = lmdb::txn::begin(env_);
+
+ mtx::responses::Timeline timeline;
+ timeline.events.push_back(message.data);
+ saveTimelineMessages(txn, room_id, timeline);
+
+ auto pending = getPendingMessagesDb(txn, room_id);
+
+ int64_t now = QDateTime::currentMSecsSinceEpoch();
+ lmdb::dbi_put(txn,
+ pending,
+ lmdb::val(&now, sizeof(now)),
+ lmdb::val(mtx::accessors::event_id(message.data)));
+
+ txn.commit();
+}
+
+std::optional<mtx::events::collections::TimelineEvent>
+Cache::firstPendingMessage(const std::string &room_id)
+{
+ auto txn = lmdb::txn::begin(env_);
+ auto pending = getPendingMessagesDb(txn, room_id);
+
+ {
+ auto pendingCursor = lmdb::cursor::open(txn, pending);
+ lmdb::val tsIgnored, pendingTxn;
+ while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
+ auto eventsDb = getEventsDb(txn, room_id);
+ lmdb::val event;
+ if (!lmdb::dbi_get(txn, eventsDb, pendingTxn, event)) {
+ lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn);
+ continue;
+ }
+
+ try {
+ mtx::events::collections::TimelineEvent te;
+ mtx::events::collections::from_json(
+ json::parse(std::string_view(event.data(), event.size())), te);
+
+ pendingCursor.close();
+ txn.commit();
+ return te;
+ } catch (std::exception &e) {
+ nhlog::db()->error("Failed to parse message from cache {}",
+ e.what());
+ lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn);
+ continue;
+ }
+ }
+ }
+
+ txn.commit();
+
+ return std::nullopt;
+}
+
+void
+Cache::removePendingStatus(const std::string &room_id, const std::string &txn_id)
+{
+ auto txn = lmdb::txn::begin(env_);
+ auto pending = getPendingMessagesDb(txn, room_id);
+
+ {
+ auto pendingCursor = lmdb::cursor::open(txn, pending);
+ lmdb::val tsIgnored, pendingTxn;
+ while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
+ if (std::string_view(pendingTxn.data(), pendingTxn.size()) == txn_id)
+ lmdb::cursor_del(pendingCursor);
+ }
+ }
+
+ txn.commit();
+}
+
+void
Cache::saveTimelineMessages(lmdb::txn &txn,
const std::string &room_id,
const mtx::responses::Timeline &res)
{
- auto db = getMessagesDb(txn, room_id);
+ if (res.events.empty())
+ return;
+
+ auto eventsDb = getEventsDb(txn, room_id);
+ auto relationsDb = getRelationsDb(txn, room_id);
+
+ auto orderDb = getEventOrderDb(txn, room_id);
+ auto evToOrderDb = getEventToOrderDb(txn, room_id);
+ auto msg2orderDb = getMessageToOrderDb(txn, room_id);
+ auto order2msgDb = getOrderToMessageDb(txn, room_id);
+ auto pending = getPendingMessagesDb(txn, room_id);
+
+ if (res.limited) {
+ lmdb::dbi_drop(txn, orderDb, false);
+ lmdb::dbi_drop(txn, evToOrderDb, false);
+ lmdb::dbi_drop(txn, msg2orderDb, false);
+ lmdb::dbi_drop(txn, order2msgDb, false);
+ lmdb::dbi_drop(txn, pending, true);
+ }
using namespace mtx::events;
using namespace mtx::events::state;
+ lmdb::val indexVal, val;
+ uint64_t index = std::numeric_limits<uint64_t>::max() / 2;
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ if (cursor.get(indexVal, val, MDB_LAST)) {
+ index = *indexVal.data<int64_t>();
+ }
+
+ uint64_t msgIndex = std::numeric_limits<uint64_t>::max() / 2;
+ auto msgCursor = lmdb::cursor::open(txn, order2msgDb);
+ if (msgCursor.get(indexVal, val, MDB_LAST)) {
+ msgIndex = *indexVal.data<uint64_t>();
+ }
+
+ bool first = true;
for (const auto &e : res.events) {
- if (std::holds_alternative<RedactionEvent<msg::Redaction>>(e))
+ auto event = mtx::accessors::serialize_event(e);
+ auto txn_id = mtx::accessors::transaction_id(e);
+
+ std::string event_id_val = event.value("event_id", "");
+ if (event_id_val.empty()) {
+ nhlog::db()->error("Event without id!");
continue;
+ }
+
+ lmdb::val event_id = event_id_val;
+
+ json orderEntry = json::object();
+ orderEntry["event_id"] = event_id_val;
+ if (first && !res.prev_batch.empty())
+ orderEntry["prev_batch"] = res.prev_batch;
+
+ lmdb::val txn_order;
+ if (!txn_id.empty() &&
+ lmdb::dbi_get(txn, evToOrderDb, lmdb::val(txn_id), txn_order)) {
+ lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump()));
+ lmdb::dbi_del(txn, eventsDb, lmdb::val(txn_id));
+
+ lmdb::val msg_txn_order;
+ if (lmdb::dbi_get(txn, msg2orderDb, lmdb::val(txn_id), msg_txn_order)) {
+ lmdb::dbi_put(txn, order2msgDb, msg_txn_order, event_id);
+ lmdb::dbi_put(txn, msg2orderDb, event_id, msg_txn_order);
+ lmdb::dbi_del(txn, msg2orderDb, lmdb::val(txn_id));
+ }
+
+ lmdb::dbi_put(txn, orderDb, txn_order, lmdb::val(orderEntry.dump()));
+ lmdb::dbi_put(txn, evToOrderDb, event_id, txn_order);
+ lmdb::dbi_del(txn, evToOrderDb, lmdb::val(txn_id));
+
+ if (event.contains("content") &&
+ event["content"].contains("m.relates_to")) {
+ auto temp = event["content"]["m.relates_to"];
+ std::string relates_to = temp.contains("m.in_reply_to")
+ ? temp["m.in_reply_to"]["event_id"]
+ : temp["event_id"];
+
+ if (!relates_to.empty()) {
+ lmdb::dbi_del(txn,
+ relationsDb,
+ lmdb::val(relates_to),
+ lmdb::val(txn_id));
+ lmdb::dbi_put(
+ txn, relationsDb, lmdb::val(relates_to), event_id);
+ }
+ }
+
+ auto pendingCursor = lmdb::cursor::open(txn, pending);
+ lmdb::val tsIgnored, pendingTxn;
+ while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
+ if (std::string_view(pendingTxn.data(), pendingTxn.size()) ==
+ txn_id)
+ lmdb::cursor_del(pendingCursor);
+ }
+ } else if (auto redaction =
+ std::get_if<mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(
+ &e)) {
+ if (redaction->redacts.empty())
+ continue;
+
+ lmdb::val oldEvent;
+ bool success =
+ lmdb::dbi_get(txn, eventsDb, lmdb::val(redaction->redacts), oldEvent);
+ if (!success)
+ continue;
- json obj = json::object();
+ mtx::events::collections::TimelineEvent te;
+ try {
+ mtx::events::collections::from_json(
+ json::parse(std::string_view(oldEvent.data(), oldEvent.size())),
+ te);
+ // overwrite the content and add redation data
+ std::visit(
+ [redaction](auto &ev) {
+ ev.unsigned_data.redacted_because = *redaction;
+ ev.unsigned_data.redacted_by = redaction->event_id;
+ },
+ te.data);
+ event = mtx::accessors::serialize_event(te.data);
+ event["content"].clear();
+
+ } catch (std::exception &e) {
+ nhlog::db()->error("Failed to parse message from cache {}",
+ e.what());
+ continue;
+ }
+
+ lmdb::dbi_put(
+ txn, eventsDb, lmdb::val(redaction->redacts), lmdb::val(event.dump()));
+ lmdb::dbi_put(txn,
+ eventsDb,
+ lmdb::val(redaction->event_id),
+ lmdb::val(json(*redaction).dump()));
+ } else {
+ lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump()));
+
+ ++index;
+
+ first = false;
+
+ nhlog::db()->debug("saving '{}'", orderEntry.dump());
+
+ lmdb::cursor_put(cursor.handle(),
+ lmdb::val(&index, sizeof(index)),
+ lmdb::val(orderEntry.dump()),
+ MDB_APPEND);
+ lmdb::dbi_put(txn, evToOrderDb, event_id, lmdb::val(&index, sizeof(index)));
+
+ // TODO(Nico): Allow blacklisting more event types in UI
+ if (!isHiddenEvent(e, room_id)) {
+ ++msgIndex;
+ lmdb::cursor_put(msgCursor.handle(),
+ lmdb::val(&msgIndex, sizeof(msgIndex)),
+ event_id,
+ MDB_APPEND);
+
+ lmdb::dbi_put(txn,
+ msg2orderDb,
+ event_id,
+ lmdb::val(&msgIndex, sizeof(msgIndex)));
+ }
+
+ if (event.contains("content") &&
+ event["content"].contains("m.relates_to")) {
+ auto temp = event["content"]["m.relates_to"];
+ std::string relates_to = temp.contains("m.in_reply_to")
+ ? temp["m.in_reply_to"]["event_id"]
+ : temp["event_id"];
+
+ if (!relates_to.empty())
+ lmdb::dbi_put(
+ txn, relationsDb, lmdb::val(relates_to), event_id);
+ }
+ }
+ }
+}
+
+uint64_t
+Cache::saveOldMessages(const std::string &room_id, const mtx::responses::Messages &res)
+{
+ auto txn = lmdb::txn::begin(env_);
+ auto eventsDb = getEventsDb(txn, room_id);
+ auto relationsDb = getRelationsDb(txn, room_id);
+
+ auto orderDb = getEventOrderDb(txn, room_id);
+ auto evToOrderDb = getEventToOrderDb(txn, room_id);
+ auto msg2orderDb = getMessageToOrderDb(txn, room_id);
+ auto order2msgDb = getOrderToMessageDb(txn, room_id);
+
+ lmdb::val indexVal, val;
+ uint64_t index = std::numeric_limits<uint64_t>::max() / 2;
+ {
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ if (cursor.get(indexVal, val, MDB_FIRST)) {
+ index = *indexVal.data<uint64_t>();
+ }
+ }
+
+ uint64_t msgIndex = std::numeric_limits<uint64_t>::max() / 2;
+ {
+ auto msgCursor = lmdb::cursor::open(txn, order2msgDb);
+ if (msgCursor.get(indexVal, val, MDB_FIRST)) {
+ msgIndex = *indexVal.data<uint64_t>();
+ }
+ }
+
+ if (res.chunk.empty())
+ return index;
+
+ std::string event_id_val;
+ for (const auto &e : res.chunk) {
+ if (std::holds_alternative<
+ mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(e))
+ continue;
- obj["event"] = mtx::accessors::serialize_event(e);
- obj["token"] = res.prev_batch;
+ auto event = mtx::accessors::serialize_event(e);
+ event_id_val = event["event_id"].get<std::string>();
+ lmdb::val event_id = event_id_val;
+ lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump()));
+
+ --index;
+
+ json orderEntry = json::object();
+ orderEntry["event_id"] = event_id_val;
+
+ nhlog::db()->debug("saving '{}'", orderEntry.dump());
lmdb::dbi_put(
- txn,
- db,
- lmdb::val(std::to_string(obj["event"]["origin_server_ts"].get<uint64_t>())),
- lmdb::val(obj.dump()));
+ txn, orderDb, lmdb::val(&index, sizeof(index)), lmdb::val(orderEntry.dump()));
+ lmdb::dbi_put(txn, evToOrderDb, event_id, lmdb::val(&index, sizeof(index)));
+
+ // TODO(Nico): Allow blacklisting more event types in UI
+ if (!isHiddenEvent(e, room_id)) {
+ --msgIndex;
+ lmdb::dbi_put(
+ txn, order2msgDb, lmdb::val(&msgIndex, sizeof(msgIndex)), event_id);
+
+ lmdb::dbi_put(
+ txn, msg2orderDb, event_id, lmdb::val(&msgIndex, sizeof(msgIndex)));
+ }
+
+ if (event.contains("content") && event["content"].contains("m.relates_to")) {
+ auto temp = event["content"]["m.relates_to"];
+ std::string relates_to = temp.contains("m.in_reply_to")
+ ? temp["m.in_reply_to"]["event_id"]
+ : temp["event_id"];
+
+ if (!relates_to.empty())
+ lmdb::dbi_put(txn, relationsDb, lmdb::val(relates_to), event_id);
+ }
}
+
+ json orderEntry = json::object();
+ orderEntry["event_id"] = event_id_val;
+ orderEntry["prev_batch"] = res.end;
+ lmdb::dbi_put(txn, orderDb, lmdb::val(&index, sizeof(index)), lmdb::val(orderEntry.dump()));
+ nhlog::db()->debug("saving '{}'", orderEntry.dump());
+
+ txn.commit();
+
+ return msgIndex;
+}
+
+void
+Cache::clearTimeline(const std::string &room_id)
+{
+ auto txn = lmdb::txn::begin(env_);
+ auto eventsDb = getEventsDb(txn, room_id);
+ auto relationsDb = getRelationsDb(txn, room_id);
+
+ auto orderDb = getEventOrderDb(txn, room_id);
+ auto evToOrderDb = getEventToOrderDb(txn, room_id);
+ auto msg2orderDb = getMessageToOrderDb(txn, room_id);
+ auto order2msgDb = getOrderToMessageDb(txn, room_id);
+
+ lmdb::val indexVal, val;
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+
+ bool start = true;
+ bool passed_pagination_token = false;
+ while (cursor.get(indexVal, val, start ? MDB_LAST : MDB_PREV)) {
+ start = false;
+ json obj;
+
+ try {
+ obj = json::parse(std::string_view(val.data(), val.size()));
+ } catch (std::exception &) {
+ // workaround bug in the initial db format, where we sometimes didn't store
+ // json...
+ obj = {{"event_id", std::string(val.data(), val.size())}};
+ }
+
+ if (passed_pagination_token) {
+ if (obj.count("event_id") != 0) {
+ lmdb::val event_id = obj["event_id"].get<std::string>();
+ lmdb::dbi_del(txn, evToOrderDb, event_id);
+ lmdb::dbi_del(txn, eventsDb, event_id);
+
+ lmdb::dbi_del(txn, relationsDb, event_id);
+
+ lmdb::val order{};
+ bool exists = lmdb::dbi_get(txn, msg2orderDb, event_id, order);
+ if (exists) {
+ lmdb::dbi_del(txn, order2msgDb, order);
+ lmdb::dbi_del(txn, msg2orderDb, event_id);
+ }
+ }
+ lmdb::cursor_del(cursor);
+ } else {
+ if (obj.count("prev_batch") != 0)
+ passed_pagination_token = true;
+ }
+ }
+
+ auto msgCursor = lmdb::cursor::open(txn, order2msgDb);
+ start = true;
+ while (msgCursor.get(indexVal, val, start ? MDB_LAST : MDB_PREV)) {
+ start = false;
+
+ lmdb::val eventId;
+ bool innerStart = true;
+ bool found = false;
+ while (cursor.get(indexVal, eventId, innerStart ? MDB_LAST : MDB_PREV)) {
+ innerStart = false;
+
+ json obj;
+ try {
+ obj = json::parse(std::string_view(eventId.data(), eventId.size()));
+ } catch (std::exception &) {
+ obj = {{"event_id", std::string(eventId.data(), eventId.size())}};
+ }
+
+ if (obj["event_id"] == std::string(val.data(), val.size())) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ break;
+ }
+
+ do {
+ lmdb::cursor_del(msgCursor);
+ } while (msgCursor.get(indexVal, val, MDB_PREV));
+
+ cursor.close();
+ msgCursor.close();
+ txn.commit();
}
mtx::responses::Notifications
@@ -2111,34 +2773,60 @@ Cache::getRoomIds(lmdb::txn &txn)
void
Cache::deleteOldMessages()
{
+ lmdb::val indexVal, val;
+
auto txn = lmdb::txn::begin(env_);
auto room_ids = getRoomIds(txn);
- for (const auto &id : room_ids) {
- auto msg_db = getMessagesDb(txn, id);
+ for (const auto &room_id : room_ids) {
+ auto orderDb = getEventOrderDb(txn, room_id);
+ auto evToOrderDb = getEventToOrderDb(txn, room_id);
+ auto o2m = getOrderToMessageDb(txn, room_id);
+ auto m2o = getMessageToOrderDb(txn, room_id);
+ auto eventsDb = getEventsDb(txn, room_id);
+ auto relationsDb = getRelationsDb(txn, room_id);
+ auto cursor = lmdb::cursor::open(txn, orderDb);
- std::string ts, event;
- uint64_t idx = 0;
+ uint64_t first, last;
+ if (cursor.get(indexVal, val, MDB_LAST)) {
+ last = *indexVal.data<uint64_t>();
+ } else {
+ continue;
+ }
+ if (cursor.get(indexVal, val, MDB_FIRST)) {
+ first = *indexVal.data<uint64_t>();
+ } else {
+ continue;
+ }
- const auto db_size = msg_db.size(txn);
- if (db_size <= 3 * MAX_RESTORED_MESSAGES)
+ size_t message_count = static_cast<size_t>(last - first);
+ if (message_count < MAX_RESTORED_MESSAGES)
continue;
- nhlog::db()->info("[{}] message count: {}", id, db_size);
+ bool start = true;
+ while (cursor.get(indexVal, val, start ? MDB_FIRST : MDB_NEXT) &&
+ message_count-- > MAX_RESTORED_MESSAGES) {
+ start = false;
+ auto obj = json::parse(std::string_view(val.data(), val.size()));
- auto cursor = lmdb::cursor::open(txn, msg_db);
- while (cursor.get(ts, event, MDB_NEXT)) {
- idx += 1;
+ if (obj.count("event_id") != 0) {
+ lmdb::val event_id = obj["event_id"].get<std::string>();
+ lmdb::dbi_del(txn, evToOrderDb, event_id);
+ lmdb::dbi_del(txn, eventsDb, event_id);
- if (idx > MAX_RESTORED_MESSAGES)
- lmdb::cursor_del(cursor);
- }
+ lmdb::dbi_del(txn, relationsDb, event_id);
+ lmdb::val order{};
+ bool exists = lmdb::dbi_get(txn, m2o, event_id, order);
+ if (exists) {
+ lmdb::dbi_del(txn, o2m, order);
+ lmdb::dbi_del(txn, m2o, event_id);
+ }
+ }
+ lmdb::cursor_del(cursor);
+ }
cursor.close();
-
- nhlog::db()->info("[{}] updated message count: {}", id, msg_db.size(txn));
}
-
txn.commit();
}
@@ -2172,7 +2860,7 @@ Cache::hasEnoughPowerLevel(const std::vector<mtx::events::EventType> &eventTypes
if (res) {
try {
StateEvent<PowerLevels> msg =
- json::parse(std::string(event.data(), event.size()));
+ json::parse(std::string_view(event.data(), event.size()));
user_level = msg.content.user_level(user_id);
@@ -2277,6 +2965,9 @@ Cache::removeAvatarUrl(const QString &room_id, const QString &user_id)
mtx::presence::PresenceState
Cache::presenceState(const std::string &user_id)
{
+ if (user_id.empty())
+ return {};
+
lmdb::val presenceVal;
auto txn = lmdb::txn::begin(env_);
@@ -2287,7 +2978,7 @@ Cache::presenceState(const std::string &user_id)
if (res) {
mtx::events::presence::Presence presence =
- json::parse(std::string(presenceVal.data(), presenceVal.size()));
+ json::parse(std::string_view(presenceVal.data(), presenceVal.size()));
state = presence.presence;
}
@@ -2299,6 +2990,9 @@ Cache::presenceState(const std::string &user_id)
std::string
Cache::statusMessage(const std::string &user_id)
{
+ if (user_id.empty())
+ return {};
+
lmdb::val presenceVal;
auto txn = lmdb::txn::begin(env_);
@@ -2309,7 +3003,7 @@ Cache::statusMessage(const std::string &user_id)
if (res) {
mtx::events::presence::Presence presence =
- json::parse(std::string(presenceVal.data(), presenceVal.size()));
+ json::parse(std::string_view(presenceVal.data(), presenceVal.size()));
status_msg = presence.status_msg;
}
|