diff --git a/src/Cache.cpp b/src/Cache.cpp
index 2824960b..26291cfd 100644
--- a/src/Cache.cpp
+++ b/src/Cache.cpp
@@ -37,7 +37,7 @@
//! Should be changed when a breaking change occurs in the cache format.
//! This will reset client's data.
-static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.05.01");
+static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.07.05");
static const std::string SECRET("secret");
static lmdb::val NEXT_BATCH_KEY("next_batch");
@@ -46,8 +46,9 @@ static lmdb::val CACHE_FORMAT_VERSION_KEY("cache_format_version");
constexpr size_t MAX_RESTORED_MESSAGES = 30'000;
-constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB
-constexpr auto MAX_DBS = 8092UL;
+constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB
+constexpr auto MAX_DBS = 8092UL;
+constexpr auto BATCH_SIZE = 100;
//! Cache databases and their format.
//!
@@ -63,7 +64,6 @@ constexpr auto SYNC_STATE_DB("sync_state");
//! Read receipts per room/event.
constexpr auto READ_RECEIPTS_DB("read_receipts");
constexpr auto NOTIFICATIONS_DB("sent_notifications");
-//! TODO: delete pending_receipts database on old cache versions
//! Encryption related databases.
@@ -93,20 +93,6 @@ namespace {
std::unique_ptr<Cache> instance_ = nullptr;
}
-int
-numeric_key_comparison(const MDB_val *a, const MDB_val *b)
-{
- auto lhs = std::stoull(std::string((char *)a->mv_data, a->mv_size));
- auto rhs = std::stoull(std::string((char *)b->mv_data, b->mv_size));
-
- if (lhs < rhs)
- return 1;
- else if (lhs == rhs)
- return 0;
-
- return -1;
-}
-
Cache::Cache(const QString &userId, QObject *parent)
: QObject{parent}
, env_{nullptr}
@@ -700,6 +686,27 @@ Cache::runMigrations()
nhlog::db()->info("Successfully deleted pending receipts database.");
return true;
}},
+ {"2020.07.05",
+ [this]() {
+ try {
+ auto txn = lmdb::txn::begin(env_, nullptr);
+ auto room_ids = getRoomIds(txn);
+
+ for (const auto &room_id : room_ids) {
+ auto messagesDb = lmdb::dbi::open(
+ txn, std::string(room_id + "/messages").c_str(), MDB_CREATE);
+ lmdb::dbi_drop(txn, messagesDb, true);
+ }
+ txn.commit();
+ } catch (const lmdb::error &) {
+ nhlog::db()->critical(
+ "Failed to delete messages database in migration!");
+ return false;
+ }
+
+ nhlog::db()->info("Successfully deleted pending receipts database.");
+ return true;
+ }},
};
for (const auto &[target_version, migration] : migrations) {
@@ -1232,38 +1239,64 @@ Cache::getTimelineMentions()
return notifs;
}
-mtx::responses::Timeline
-Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id)
+Cache::Messages
+Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, int64_t index, bool forward)
{
// TODO(nico): Limit the messages returned by this maybe?
- auto db = getMessagesDb(txn, room_id);
+ auto orderDb = getEventOrderDb(txn, room_id);
+ auto eventsDb = getEventsDb(txn, room_id);
- mtx::responses::Timeline timeline;
- std::string timestamp, msg;
+ Messages messages{};
- auto cursor = lmdb::cursor::open(txn, db);
+ lmdb::val indexVal, val;
+
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ if (index == std::numeric_limits<int64_t>::max()) {
+ if (cursor.get(indexVal, val, forward ? MDB_FIRST : MDB_LAST)) {
+ index = *indexVal.data<int64_t>();
+ } else {
+ messages.end_of_cache = true;
+ return messages;
+ }
+ } else {
+ if (cursor.get(indexVal, val, MDB_SET)) {
+ index = *indexVal.data<int64_t>();
+ } else {
+ messages.end_of_cache = true;
+ return messages;
+ }
+ }
- size_t index = 0;
+ int counter = 0;
- while (cursor.get(timestamp, msg, MDB_NEXT) && index < MAX_RESTORED_MESSAGES) {
- auto obj = json::parse(msg);
+ bool ret;
+ while ((ret = cursor.get(indexVal, val, forward ? MDB_NEXT : MDB_LAST)) &&
+ counter++ < BATCH_SIZE) {
+ auto obj = json::parse(std::string(val.data(), val.size()));
- if (obj.count("event") == 0 || obj.count("token") == 0)
- continue;
+ if (obj.count("event_id") == 0)
+ break;
- mtx::events::collections::TimelineEvent event;
- mtx::events::collections::from_json(obj.at("event"), event);
+ lmdb::val event;
+ bool success = lmdb::dbi_get(
+ txn, eventsDb, lmdb::val(obj["event_id"].get<std::string>()), event);
+ if (!success)
+ continue;
- index += 1;
+ mtx::events::collections::TimelineEvent te;
+ mtx::events::collections::from_json(
+ json::parse(std::string(event.data(), event.size())), te);
- timeline.events.push_back(event.data);
- timeline.prev_batch = obj.at("token").get<std::string>();
+ messages.timeline.events.push_back(std::move(te.data));
+ // timeline.prev_batch = obj.at("token").get<std::string>();
}
cursor.close();
- std::reverse(timeline.events.begin(), timeline.events.end());
+ // std::reverse(timeline.events.begin(), timeline.events.end());
+ messages.next_index = *indexVal.data<int64_t>();
+ messages.end_of_cache = !ret;
- return timeline;
+ return messages;
}
QMap<QString, RoomInfo>
@@ -1306,55 +1339,59 @@ Cache::roomInfo(bool withInvites)
std::string
Cache::getLastEventId(lmdb::txn &txn, const std::string &room_id)
{
- auto db = getMessagesDb(txn, room_id);
-
- if (db.size(txn) == 0)
- return {};
-
- std::string timestamp, msg;
-
- auto cursor = lmdb::cursor::open(txn, db);
- while (cursor.get(timestamp, msg, MDB_NEXT)) {
- auto obj = json::parse(msg);
+ auto orderDb = getEventOrderDb(txn, room_id);
- if (obj.count("event") == 0)
- continue;
+ lmdb::val indexVal, val;
- cursor.close();
- return obj["event"]["event_id"];
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ if (!cursor.get(indexVal, val, MDB_LAST)) {
+ return {};
}
- cursor.close();
- return {};
+ auto obj = json::parse(std::string(val.data(), val.size()));
+
+ if (obj.count("event_id") == 0)
+ return {};
+ else
+ return obj["event_id"];
}
DescInfo
Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id)
{
- auto db = getMessagesDb(txn, room_id);
-
- if (db.size(txn) == 0)
+ auto orderDb = getEventOrderDb(txn, room_id);
+ auto eventsDb = getEventsDb(txn, room_id);
+ if (orderDb.size(txn) == 0)
return DescInfo{};
- std::string timestamp, msg;
-
const auto local_user = utils::localUser();
DescInfo fallbackDesc{};
- auto cursor = lmdb::cursor::open(txn, db);
- while (cursor.get(timestamp, msg, MDB_NEXT)) {
- auto obj = json::parse(msg);
+ lmdb::val indexVal, val;
- if (obj.count("event") == 0)
+ auto cursor = lmdb::cursor::open(txn, orderDb);
+ cursor.get(indexVal, val, MDB_LAST);
+ while (cursor.get(indexVal, val, MDB_PREV)) {
+ auto temp = json::parse(std::string(val.data(), val.size()));
+
+ if (temp.count("event_id") == 0)
+ break;
+
+ lmdb::val event;
+ bool success = lmdb::dbi_get(
+ txn, eventsDb, lmdb::val(temp["event_id"].get<std::string>()), event);
+ if (!success)
continue;
- if (fallbackDesc.event_id.isEmpty() && obj["event"]["type"] == "m.room.member" &&
- obj["event"]["state_key"] == local_user.toStdString() &&
- obj["event"]["content"]["membership"] == "join") {
- uint64_t ts = obj["event"]["origin_server_ts"];
+ auto obj = json::parse(std::string(event.data(), event.size()));
+
+ if (fallbackDesc.event_id.isEmpty() && obj["type"] == "m.room.member" &&
+ obj["state_key"] == local_user.toStdString() &&
+ obj["content"]["membership"] == "join") {
+ uint64_t ts = obj["origin_server_ts"];
auto time = QDateTime::fromMSecsSinceEpoch(ts);
- fallbackDesc = DescInfo{QString::fromStdString(obj["event"]["event_id"]),
+ fallbackDesc = DescInfo{QString::fromStdString(obj["event_id"]),
local_user,
tr("You joined this room."),
utils::descriptiveTime(time),
@@ -1362,17 +1399,16 @@ Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id)
time};
}
- if (!(obj["event"]["type"] == "m.room.message" ||
- obj["event"]["type"] == "m.sticker" ||
- obj["event"]["type"] == "m.room.encrypted"))
+ if (!(obj["type"] == "m.room.message" || obj["type"] == "m.sticker" ||
+ obj["type"] == "m.room.encrypted"))
continue;
- mtx::events::collections::TimelineEvent event;
- mtx::events::collections::from_json(obj.at("event"), event);
+ mtx::events::collections::TimelineEvent te;
+ mtx::events::collections::from_json(obj, te);
cursor.close();
return utils::getMessageDescription(
- event.data, local_user, QString::fromStdString(room_id));
+ te.data, local_user, QString::fromStdString(room_id));
}
cursor.close();
@@ -1954,7 +1990,6 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
const std::string &room_id,
const mtx::responses::Timeline &res)
{
- auto db = getMessagesDb(txn, room_id);
auto eventsDb = getEventsDb(txn, room_id);
auto orderDb = getEventOrderDb(txn, room_id);
@@ -1966,7 +2001,7 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
lmdb::val indexVal, val;
int64_t index = 0;
- auto cursor = lmdb::cursor::open(txn, orderDb);
+ auto cursor = lmdb::cursor::open(txn, orderDb);
if (cursor.get(indexVal, val, MDB_LAST)) {
index = *indexVal.data<int64_t>();
}
@@ -1979,17 +2014,6 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
lmdb::dbi_put(
txn, eventsDb, lmdb::val(redaction->redacts), lmdb::val(event.dump()));
} else {
- json obj = json::object();
-
- obj["event"] = event;
- obj["token"] = res.prev_batch;
-
- lmdb::dbi_put(
- txn,
- db,
- lmdb::val(std::to_string(event["origin_server_ts"].get<uint64_t>())),
- lmdb::val(obj.dump()));
-
lmdb::dbi_put(txn,
eventsDb,
lmdb::val(event["event_id"].get<std::string>()),
@@ -1997,9 +2021,16 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
++index;
+ json orderEntry = json::object();
+ orderEntry["event_id"] = event["event_id"];
+ if (first)
+ orderEntry["prev_batch"] = res.prev_batch;
+
+ nhlog::db()->debug("saving '{}'", orderEntry.dump());
+
lmdb::cursor_put(cursor.handle(),
lmdb::val(&index, sizeof(index)),
- lmdb::val(first ? res.prev_batch : ""),
+ lmdb::val(orderEntry.dump()),
MDB_APPEND);
first = false;
}
@@ -2138,34 +2169,43 @@ Cache::getRoomIds(lmdb::txn &txn)
void
Cache::deleteOldMessages()
{
+ lmdb::val indexVal, val;
+
auto txn = lmdb::txn::begin(env_);
auto room_ids = getRoomIds(txn);
- for (const auto &id : room_ids) {
- auto msg_db = getMessagesDb(txn, id);
-
- std::string ts, event;
- uint64_t idx = 0;
+ for (const auto &room_id : room_ids) {
+ auto orderDb = getEventOrderDb(txn, room_id);
+ auto eventsDb = getEventsDb(txn, room_id);
+ auto cursor = lmdb::cursor::open(txn, orderDb);
- const auto db_size = msg_db.size(txn);
- if (db_size <= 3 * MAX_RESTORED_MESSAGES)
+ int64_t first, last;
+ if (cursor.get(indexVal, val, MDB_LAST)) {
+ last = *indexVal.data<int64_t>();
+ } else {
continue;
+ }
+ if (cursor.get(indexVal, val, MDB_FIRST)) {
+ first = *indexVal.data<int64_t>();
+ } else {
+ continue;
+ }
- nhlog::db()->info("[{}] message count: {}", id, db_size);
+ size_t message_count = static_cast<size_t>(last - first);
+ if (message_count < MAX_RESTORED_MESSAGES)
+ continue;
- auto cursor = lmdb::cursor::open(txn, msg_db);
- while (cursor.get(ts, event, MDB_NEXT)) {
- idx += 1;
+ while (cursor.get(indexVal, val, MDB_NEXT) &&
+ message_count-- < MAX_RESTORED_MESSAGES) {
+ auto obj = json::parse(std::string(val.data(), val.size()));
- if (idx > MAX_RESTORED_MESSAGES)
- lmdb::cursor_del(cursor);
+ if (obj.count("event_id") != 0)
+ lmdb::dbi_del(
+ txn, eventsDb, lmdb::val(obj["event_id"].get<std::string>()));
+ lmdb::cursor_del(cursor);
}
-
cursor.close();
-
- nhlog::db()->info("[{}] updated message count: {}", id, msg_db.size(txn));
}
-
txn.commit();
}
diff --git a/src/Cache_p.h b/src/Cache_p.h
index 5f01f736..37486ca0 100644
--- a/src/Cache_p.h
+++ b/src/Cache_p.h
@@ -18,6 +18,7 @@
#pragma once
+#include <limits>
#include <optional>
#include <QDateTime>
@@ -38,9 +39,6 @@
#include "CacheCryptoStructs.h"
#include "CacheStructs.h"
-int
-numeric_key_comparison(const MDB_val *a, const MDB_val *b);
-
class Cache : public QObject
{
Q_OBJECT
@@ -250,7 +248,16 @@ private:
const std::string &room_id,
const mtx::responses::Timeline &res);
- mtx::responses::Timeline getTimelineMessages(lmdb::txn &txn, const std::string &room_id);
+ struct Messages
+ {
+ mtx::responses::Timeline timeline;
+ uint64_t next_index;
+ bool end_of_cache = false;
+ };
+ Messages getTimelineMessages(lmdb::txn &txn,
+ const std::string &room_id,
+ int64_t index = std::numeric_limits<int64_t>::max(),
+ bool forward = false);
//! Remove a room from the cache.
// void removeLeftRoom(lmdb::txn &txn, const std::string &room_id);
@@ -402,15 +409,6 @@ private:
return lmdb::dbi::open(txn, "pending_receipts", MDB_CREATE);
}
- lmdb::dbi getMessagesDb(lmdb::txn &txn, const std::string &room_id)
- {
- auto db =
- lmdb::dbi::open(txn, std::string(room_id + "/messages").c_str(), MDB_CREATE);
- lmdb::dbi_set_compare(txn, db, numeric_key_comparison);
-
- return db;
- }
-
lmdb::dbi getEventsDb(lmdb::txn &txn, const std::string &room_id)
{
return lmdb::dbi::open(txn, std::string(room_id + "/events").c_str(), MDB_CREATE);
|