diff --git a/src/Cache.cpp b/src/Cache.cpp
index 7a19cba4..3fe2892b 100644
--- a/src/Cache.cpp
+++ b/src/Cache.cpp
@@ -82,6 +82,8 @@ static constexpr auto DEVICES_DB("devices");
static constexpr auto DEVICE_KEYS_DB("device_keys");
//! room_ids that have encryption enabled.
static constexpr auto ENCRYPTED_ROOMS_DB("encrypted_rooms");
+//! Expiration progress for each room
+static constexpr auto EVENT_EXPIRATION_BG_JOB_DB("event_expiration_bg_job");
//! room_id -> pickled OlmInboundGroupSession
static constexpr auto INBOUND_MEGOLM_SESSIONS_DB("inbound_megolm_sessions");
@@ -327,7 +329,9 @@ Cache::setup()
megolmSessionDataDb_ = lmdb::dbi::open(txn, MEGOLM_SESSIONS_DATA_DB, MDB_CREATE);
// What rooms are encrypted
- encryptedRooms_ = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE);
+ encryptedRooms_ = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE);
+ eventExpiryBgJob_ = lmdb::dbi::open(txn, EVENT_EXPIRATION_BG_JOB_DB, MDB_CREATE);
+
[[maybe_unused]] auto verificationDb = getVerificationDb(txn);
[[maybe_unused]] auto userKeysDb = getUserKeysDb(txn);
@@ -585,6 +589,39 @@ Cache::pickleSecret()
}
void
+Cache::storeEventExpirationProgress(const std::string &room,
+ const std::string &expirationSettings,
+ const std::string &stopMarker)
+{
+ nlohmann::json j;
+ j["s"] = expirationSettings;
+ j["m"] = stopMarker;
+
+ auto txn = lmdb::txn::begin(env_);
+ eventExpiryBgJob_.put(txn, room, j.dump());
+ txn.commit();
+}
+
+std::string
+Cache::loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings)
+
+{
+ try {
+ auto txn = ro_txn(env_);
+ std::string_view data;
+ if (!eventExpiryBgJob_.get(txn, room, data))
+ return "";
+
+ auto j = nlohmann::json::parse(data);
+ if (j.value("s", "") == expirationSettings)
+ return j.value("m", "");
+ } catch (...) {
+ return "";
+ }
+ return "";
+}
+
+void
Cache::setEncryptedRoom(lmdb::txn &txn, const std::string &room_id)
{
nhlog::db()->info("mark room {} as encrypted", room_id);
diff --git a/src/Cache_p.h b/src/Cache_p.h
index 121e7e66..8d51c7c4 100644
--- a/src/Cache_p.h
+++ b/src/Cache_p.h
@@ -87,6 +87,13 @@ public:
//! Retrieve if the room is tombstoned (closed or replaced by a different room)
bool getRoomIsTombstoned(lmdb::txn &txn, lmdb::dbi &statesdb);
+ // for the event expiry background job
+ void storeEventExpirationProgress(const std::string &room,
+ const std::string &expirationSettings,
+ const std::string &stopMarker);
+ std::string
+ loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings);
+
//! Get a specific state event
template<typename T>
std::optional<mtx::events::StateEvent<T>>
@@ -714,6 +721,8 @@ private:
lmdb::dbi encryptedRooms_;
+ lmdb::dbi eventExpiryBgJob_;
+
QString localUserId_;
QString cacheDirectory_;
diff --git a/src/Utils.cpp b/src/Utils.cpp
index 20c6f7d1..0ea42a27 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -1467,7 +1467,7 @@ utils::updateSpaceVias()
ChatPage::instance()->callFunctionOnGuiThread(
[state = std::move(state),
interval = e->matrix_error.retry_after]() {
- QTimer::singleShot(interval,
+ QTimer::singleShot(interval * 3,
ChatPage::instance(),
[self = std::move(state)]() mutable {
next(std::move(self));
@@ -1502,7 +1502,7 @@ utils::updateSpaceVias()
ChatPage::instance()->callFunctionOnGuiThread(
[state = std::move(state),
interval = e->matrix_error.retry_after]() {
- QTimer::singleShot(interval,
+ QTimer::singleShot(interval * 3,
ChatPage::instance(),
[self = std::move(state)]() mutable {
next(std::move(self));
@@ -1644,9 +1644,19 @@ utils::removeExpiredEvents()
std::string currentRoom;
bool firstMessagesCall = true;
std::uint64_t currentRoomCount = 0;
+
+ // batch token for pagination
std::string currentRoomPrevToken;
+ // event id of an event redacted in a previous run
+ std::string currentRoomStopAt;
+ // event id of first event redacted in the current run, hoping that the order stays the
+ // same.
+ std::string currentRoomFirstRedactedEvent;
+ // (evtype,state_key) tuple to keep the latest state event of each.
std::set<std::pair<std::string, std::string>> currentRoomStateEvents;
+ // event ids pending redaction
std::vector<std::string> currentRoomRedactionQueue;
+
mtx::events::account_data::nheko_extensions::EventExpiry currentExpiry;
static void next(std::shared_ptr<ApplyEventExpiration> state)
@@ -1664,7 +1674,8 @@ utils::removeExpiredEvents()
ChatPage::instance()->callFunctionOnGuiThread(
[state = std::move(state),
interval = e->matrix_error.retry_after]() {
- QTimer::singleShot(interval,
+ // triple interval to allow other traffic as well
+ QTimer::singleShot(interval * 3,
ChatPage::instance(),
[self = std::move(state)]() mutable {
next(std::move(self));
@@ -1681,6 +1692,10 @@ utils::removeExpiredEvents()
}
} else {
nhlog::net()->info("Redacted event {} in {}", evid, state->currentRoom);
+
+ if (state->currentRoomFirstRedactedEvent.empty())
+ state->currentRoomFirstRedactedEvent = evid;
+
state->currentRoomRedactionQueue.pop_back();
next(std::move(state));
}
@@ -1688,6 +1703,13 @@ utils::removeExpiredEvents()
} else if (!state->currentRoom.empty()) {
if (state->currentRoomPrevToken.empty() && !state->firstMessagesCall) {
nhlog::net()->info("Finished room {}", state->currentRoom);
+
+ if (!state->currentRoomFirstRedactedEvent.empty())
+ cache::client()->storeEventExpirationProgress(
+ state->currentRoom,
+ nlohmann::json(state->currentExpiry).dump(),
+ state->currentRoomFirstRedactedEvent);
+
state->currentRoom.clear();
next(std::move(state));
return;
@@ -1708,7 +1730,7 @@ utils::removeExpiredEvents()
mtx::http::RequestErr error) mutable {
if (error) {
// skip success handler
- nhlog::net()->info(
+ nhlog::net()->warn(
"Finished room {} with error {}", state->currentRoom, *error);
state->currentRoom.clear();
} else if (msgs.chunk.empty()) {
@@ -1725,8 +1747,22 @@ utils::removeExpiredEvents()
continue;
if (std::holds_alternative<
- mtx::events::RoomEvent<mtx::events::msg::Redacted>>(e))
+ mtx::events::RoomEvent<mtx::events::msg::Redacted>>(e) ||
+ std::holds_alternative<
+ mtx::events::StateEvent<mtx::events::msg::Redacted>>(e)) {
+ if (!state->currentRoomStopAt.empty() &&
+ mtx::accessors::event_id(e) == state->currentRoomStopAt) {
+ // There is no filter to remove redacted events from
+ // pagination, so we try to stop early by caching what event
+ // we redacted last if we reached the end of a room.
+ nhlog::net()->info(
+ "Found previous redaction marker, stopping early: {}",
+ state->currentRoom);
+ state->currentRoomPrevToken.clear();
+ break;
+ }
continue;
+ }
if (std::holds_alternative<
mtx::events::StateEvent<mtx::events::msg::Redacted>>(e))
@@ -1806,6 +1842,9 @@ utils::removeExpiredEvents()
state->currentRoomRedactionQueue.clear();
state->currentRoomStateEvents.clear();
+ state->currentRoomStopAt = cache::client()->loadEventExpirationProgress(
+ state->currentRoom, nlohmann::json(state->currentExpiry).dump());
+
state->roomsToUpdate.pop_back();
next(std::move(state));
} else {
|