diff options
author | Nicolas Werner <nicolas.werner@hotmail.de> | 2023-07-06 20:49:40 +0200 |
---|---|---|
committer | Nicolas Werner <nicolas.werner@hotmail.de> | 2023-07-06 20:51:04 +0200 |
commit | 1abb52700af5ed13088df1e90bd0bb24577345e9 (patch) | |
tree | 7b50ba159a9c0d7f55d36c4888a198a285af4b27 /src | |
parent | Fix event expiration not always stopping properly and redacting some wrong st... (diff) | |
download | nheko-1abb52700af5ed13088df1e90bd0bb24577345e9.tar.xz |
Add an early out cache for event expiration
Diffstat (limited to 'src')
-rw-r--r-- | src/Cache.cpp | 39 | ||||
-rw-r--r-- | src/Cache_p.h | 9 | ||||
-rw-r--r-- | src/Utils.cpp | 49 |
3 files changed, 91 insertions, 6 deletions
diff --git a/src/Cache.cpp b/src/Cache.cpp index 7a19cba4..3fe2892b 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -82,6 +82,8 @@ static constexpr auto DEVICES_DB("devices"); static constexpr auto DEVICE_KEYS_DB("device_keys"); //! room_ids that have encryption enabled. static constexpr auto ENCRYPTED_ROOMS_DB("encrypted_rooms"); +//! Expiration progress for each room +static constexpr auto EVENT_EXPIRATION_BG_JOB_DB("event_expiration_bg_job"); //! room_id -> pickled OlmInboundGroupSession static constexpr auto INBOUND_MEGOLM_SESSIONS_DB("inbound_megolm_sessions"); @@ -327,7 +329,9 @@ Cache::setup() megolmSessionDataDb_ = lmdb::dbi::open(txn, MEGOLM_SESSIONS_DATA_DB, MDB_CREATE); // What rooms are encrypted - encryptedRooms_ = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE); + encryptedRooms_ = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE); + eventExpiryBgJob_ = lmdb::dbi::open(txn, EVENT_EXPIRATION_BG_JOB_DB, MDB_CREATE); + [[maybe_unused]] auto verificationDb = getVerificationDb(txn); [[maybe_unused]] auto userKeysDb = getUserKeysDb(txn); @@ -585,6 +589,39 @@ Cache::pickleSecret() } void +Cache::storeEventExpirationProgress(const std::string &room, + const std::string &expirationSettings, + const std::string &stopMarker) +{ + nlohmann::json j; + j["s"] = expirationSettings; + j["m"] = stopMarker; + + auto txn = lmdb::txn::begin(env_); + eventExpiryBgJob_.put(txn, room, j.dump()); + txn.commit(); +} + +std::string +Cache::loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings) + +{ + try { + auto txn = ro_txn(env_); + std::string_view data; + if (!eventExpiryBgJob_.get(txn, room, data)) + return ""; + + auto j = nlohmann::json::parse(data); + if (j.value("s", "") == expirationSettings) + return j.value("m", ""); + } catch (...) { + return ""; + } + return ""; +} + +void Cache::setEncryptedRoom(lmdb::txn &txn, const std::string &room_id) { nhlog::db()->info("mark room {} as encrypted", room_id); diff --git a/src/Cache_p.h b/src/Cache_p.h index 121e7e66..8d51c7c4 100644 --- a/src/Cache_p.h +++ b/src/Cache_p.h @@ -87,6 +87,13 @@ public: //! Retrieve if the room is tombstoned (closed or replaced by a different room) bool getRoomIsTombstoned(lmdb::txn &txn, lmdb::dbi &statesdb); + // for the event expiry background job + void storeEventExpirationProgress(const std::string &room, + const std::string &expirationSettings, + const std::string &stopMarker); + std::string + loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings); + //! Get a specific state event template<typename T> std::optional<mtx::events::StateEvent<T>> @@ -714,6 +721,8 @@ private: lmdb::dbi encryptedRooms_; + lmdb::dbi eventExpiryBgJob_; + QString localUserId_; QString cacheDirectory_; diff --git a/src/Utils.cpp b/src/Utils.cpp index 20c6f7d1..0ea42a27 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -1467,7 +1467,7 @@ utils::updateSpaceVias() ChatPage::instance()->callFunctionOnGuiThread( [state = std::move(state), interval = e->matrix_error.retry_after]() { - QTimer::singleShot(interval, + QTimer::singleShot(interval * 3, ChatPage::instance(), [self = std::move(state)]() mutable { next(std::move(self)); @@ -1502,7 +1502,7 @@ utils::updateSpaceVias() ChatPage::instance()->callFunctionOnGuiThread( [state = std::move(state), interval = e->matrix_error.retry_after]() { - QTimer::singleShot(interval, + QTimer::singleShot(interval * 3, ChatPage::instance(), [self = std::move(state)]() mutable { next(std::move(self)); @@ -1644,9 +1644,19 @@ utils::removeExpiredEvents() std::string currentRoom; bool firstMessagesCall = true; std::uint64_t currentRoomCount = 0; + + // batch token for pagination std::string currentRoomPrevToken; + // event id of an event redacted in a previous run + std::string currentRoomStopAt; + // event id of first event redacted in the current run, hoping that the order stays the + // same. + std::string currentRoomFirstRedactedEvent; + // (evtype,state_key) tuple to keep the latest state event of each. std::set<std::pair<std::string, std::string>> currentRoomStateEvents; + // event ids pending redaction std::vector<std::string> currentRoomRedactionQueue; + mtx::events::account_data::nheko_extensions::EventExpiry currentExpiry; static void next(std::shared_ptr<ApplyEventExpiration> state) @@ -1664,7 +1674,8 @@ utils::removeExpiredEvents() ChatPage::instance()->callFunctionOnGuiThread( [state = std::move(state), interval = e->matrix_error.retry_after]() { - QTimer::singleShot(interval, + // triple interval to allow other traffic as well + QTimer::singleShot(interval * 3, ChatPage::instance(), [self = std::move(state)]() mutable { next(std::move(self)); @@ -1681,6 +1692,10 @@ utils::removeExpiredEvents() } } else { nhlog::net()->info("Redacted event {} in {}", evid, state->currentRoom); + + if (state->currentRoomFirstRedactedEvent.empty()) + state->currentRoomFirstRedactedEvent = evid; + state->currentRoomRedactionQueue.pop_back(); next(std::move(state)); } @@ -1688,6 +1703,13 @@ utils::removeExpiredEvents() } else if (!state->currentRoom.empty()) { if (state->currentRoomPrevToken.empty() && !state->firstMessagesCall) { nhlog::net()->info("Finished room {}", state->currentRoom); + + if (!state->currentRoomFirstRedactedEvent.empty()) + cache::client()->storeEventExpirationProgress( + state->currentRoom, + nlohmann::json(state->currentExpiry).dump(), + state->currentRoomFirstRedactedEvent); + state->currentRoom.clear(); next(std::move(state)); return; @@ -1708,7 +1730,7 @@ utils::removeExpiredEvents() mtx::http::RequestErr error) mutable { if (error) { // skip success handler - nhlog::net()->info( + nhlog::net()->warn( "Finished room {} with error {}", state->currentRoom, *error); state->currentRoom.clear(); } else if (msgs.chunk.empty()) { @@ -1725,8 +1747,22 @@ utils::removeExpiredEvents() continue; if (std::holds_alternative< - mtx::events::RoomEvent<mtx::events::msg::Redacted>>(e)) + mtx::events::RoomEvent<mtx::events::msg::Redacted>>(e) || + std::holds_alternative< + mtx::events::StateEvent<mtx::events::msg::Redacted>>(e)) { + if (!state->currentRoomStopAt.empty() && + mtx::accessors::event_id(e) == state->currentRoomStopAt) { + // There is no filter to remove redacted events from + // pagination, so we try to stop early by caching what event + // we redacted last if we reached the end of a room. + nhlog::net()->info( + "Found previous redaction marker, stopping early: {}", + state->currentRoom); + state->currentRoomPrevToken.clear(); + break; + } continue; + } if (std::holds_alternative< mtx::events::StateEvent<mtx::events::msg::Redacted>>(e)) @@ -1806,6 +1842,9 @@ utils::removeExpiredEvents() state->currentRoomRedactionQueue.clear(); state->currentRoomStateEvents.clear(); + state->currentRoomStopAt = cache::client()->loadEventExpirationProgress( + state->currentRoom, nlohmann::json(state->currentExpiry).dump()); + state->roomsToUpdate.pop_back(); next(std::move(state)); } else { |