summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorNicolas Werner <nicolas.werner@hotmail.de>2023-07-06 20:49:40 +0200
committerNicolas Werner <nicolas.werner@hotmail.de>2023-07-06 20:51:04 +0200
commit1abb52700af5ed13088df1e90bd0bb24577345e9 (patch)
tree7b50ba159a9c0d7f55d36c4888a198a285af4b27 /src
parentFix event expiration not always stopping properly and redacting some wrong st... (diff)
downloadnheko-1abb52700af5ed13088df1e90bd0bb24577345e9.tar.xz
Add an early out cache for event expiration
Diffstat (limited to 'src')
-rw-r--r--src/Cache.cpp39
-rw-r--r--src/Cache_p.h9
-rw-r--r--src/Utils.cpp49
3 files changed, 91 insertions, 6 deletions
diff --git a/src/Cache.cpp b/src/Cache.cpp
index 7a19cba4..3fe2892b 100644
--- a/src/Cache.cpp
+++ b/src/Cache.cpp
@@ -82,6 +82,8 @@ static constexpr auto DEVICES_DB("devices");
 static constexpr auto DEVICE_KEYS_DB("device_keys");
 //! room_ids that have encryption enabled.
 static constexpr auto ENCRYPTED_ROOMS_DB("encrypted_rooms");
+//! Expiration progress for each room
+static constexpr auto EVENT_EXPIRATION_BG_JOB_DB("event_expiration_bg_job");
 
 //! room_id -> pickled OlmInboundGroupSession
 static constexpr auto INBOUND_MEGOLM_SESSIONS_DB("inbound_megolm_sessions");
@@ -327,7 +329,9 @@ Cache::setup()
     megolmSessionDataDb_     = lmdb::dbi::open(txn, MEGOLM_SESSIONS_DATA_DB, MDB_CREATE);
 
     // What rooms are encrypted
-    encryptedRooms_                      = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE);
+    encryptedRooms_   = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE);
+    eventExpiryBgJob_ = lmdb::dbi::open(txn, EVENT_EXPIRATION_BG_JOB_DB, MDB_CREATE);
+
     [[maybe_unused]] auto verificationDb = getVerificationDb(txn);
     [[maybe_unused]] auto userKeysDb     = getUserKeysDb(txn);
 
@@ -585,6 +589,39 @@ Cache::pickleSecret()
 }
 
 void
+Cache::storeEventExpirationProgress(const std::string &room,
+                                    const std::string &expirationSettings,
+                                    const std::string &stopMarker)
+{
+    nlohmann::json j;
+    j["s"] = expirationSettings;
+    j["m"] = stopMarker;
+
+    auto txn = lmdb::txn::begin(env_);
+    eventExpiryBgJob_.put(txn, room, j.dump());
+    txn.commit();
+}
+
+std::string
+Cache::loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings)
+
+{
+    try {
+        auto txn = ro_txn(env_);
+        std::string_view data;
+        if (!eventExpiryBgJob_.get(txn, room, data))
+            return "";
+
+        auto j = nlohmann::json::parse(data);
+        if (j.value("s", "") == expirationSettings)
+            return j.value("m", "");
+    } catch (...) {
+        return "";
+    }
+    return "";
+}
+
+void
 Cache::setEncryptedRoom(lmdb::txn &txn, const std::string &room_id)
 {
     nhlog::db()->info("mark room {} as encrypted", room_id);
diff --git a/src/Cache_p.h b/src/Cache_p.h
index 121e7e66..8d51c7c4 100644
--- a/src/Cache_p.h
+++ b/src/Cache_p.h
@@ -87,6 +87,13 @@ public:
     //! Retrieve if the room is tombstoned (closed or replaced by a different room)
     bool getRoomIsTombstoned(lmdb::txn &txn, lmdb::dbi &statesdb);
 
+    // for the event expiry background job
+    void storeEventExpirationProgress(const std::string &room,
+                                      const std::string &expirationSettings,
+                                      const std::string &stopMarker);
+    std::string
+    loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings);
+
     //! Get a specific state event
     template<typename T>
     std::optional<mtx::events::StateEvent<T>>
@@ -714,6 +721,8 @@ private:
 
     lmdb::dbi encryptedRooms_;
 
+    lmdb::dbi eventExpiryBgJob_;
+
     QString localUserId_;
     QString cacheDirectory_;
 
diff --git a/src/Utils.cpp b/src/Utils.cpp
index 20c6f7d1..0ea42a27 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -1467,7 +1467,7 @@ utils::updateSpaceVias()
                               ChatPage::instance()->callFunctionOnGuiThread(
                                 [state    = std::move(state),
                                  interval = e->matrix_error.retry_after]() {
-                                    QTimer::singleShot(interval,
+                                    QTimer::singleShot(interval * 3,
                                                        ChatPage::instance(),
                                                        [self = std::move(state)]() mutable {
                                                            next(std::move(self));
@@ -1502,7 +1502,7 @@ utils::updateSpaceVias()
                               ChatPage::instance()->callFunctionOnGuiThread(
                                 [state    = std::move(state),
                                  interval = e->matrix_error.retry_after]() {
-                                    QTimer::singleShot(interval,
+                                    QTimer::singleShot(interval * 3,
                                                        ChatPage::instance(),
                                                        [self = std::move(state)]() mutable {
                                                            next(std::move(self));
@@ -1644,9 +1644,19 @@ utils::removeExpiredEvents()
         std::string currentRoom;
         bool firstMessagesCall         = true;
         std::uint64_t currentRoomCount = 0;
+
+        // batch token for pagination
         std::string currentRoomPrevToken;
+        // event id of an event redacted in a previous run
+        std::string currentRoomStopAt;
+        // event id of first event redacted in the current run, hoping that the order stays the
+        // same.
+        std::string currentRoomFirstRedactedEvent;
+        // (evtype,state_key) tuple to keep the latest state event of each.
         std::set<std::pair<std::string, std::string>> currentRoomStateEvents;
+        // event ids pending redaction
         std::vector<std::string> currentRoomRedactionQueue;
+
         mtx::events::account_data::nheko_extensions::EventExpiry currentExpiry;
 
         static void next(std::shared_ptr<ApplyEventExpiration> state)
@@ -1664,7 +1674,8 @@ utils::removeExpiredEvents()
                               ChatPage::instance()->callFunctionOnGuiThread(
                                 [state    = std::move(state),
                                  interval = e->matrix_error.retry_after]() {
-                                    QTimer::singleShot(interval,
+                                    // triple interval to allow other traffic as well
+                                    QTimer::singleShot(interval * 3,
                                                        ChatPage::instance(),
                                                        [self = std::move(state)]() mutable {
                                                            next(std::move(self));
@@ -1681,6 +1692,10 @@ utils::removeExpiredEvents()
                           }
                       } else {
                           nhlog::net()->info("Redacted event {} in {}", evid, state->currentRoom);
+
+                          if (state->currentRoomFirstRedactedEvent.empty())
+                              state->currentRoomFirstRedactedEvent = evid;
+
                           state->currentRoomRedactionQueue.pop_back();
                           next(std::move(state));
                       }
@@ -1688,6 +1703,13 @@ utils::removeExpiredEvents()
             } else if (!state->currentRoom.empty()) {
                 if (state->currentRoomPrevToken.empty() && !state->firstMessagesCall) {
                     nhlog::net()->info("Finished room {}", state->currentRoom);
+
+                    if (!state->currentRoomFirstRedactedEvent.empty())
+                        cache::client()->storeEventExpirationProgress(
+                          state->currentRoom,
+                          nlohmann::json(state->currentExpiry).dump(),
+                          state->currentRoomFirstRedactedEvent);
+
                     state->currentRoom.clear();
                     next(std::move(state));
                     return;
@@ -1708,7 +1730,7 @@ utils::removeExpiredEvents()
                                              mtx::http::RequestErr error) mutable {
                       if (error) {
                           // skip success handler
-                          nhlog::net()->info(
+                          nhlog::net()->warn(
                             "Finished room {} with error {}", state->currentRoom, *error);
                           state->currentRoom.clear();
                       } else if (msgs.chunk.empty()) {
@@ -1725,8 +1747,22 @@ utils::removeExpiredEvents()
                                   continue;
 
                               if (std::holds_alternative<
-                                    mtx::events::RoomEvent<mtx::events::msg::Redacted>>(e))
+                                    mtx::events::RoomEvent<mtx::events::msg::Redacted>>(e) ||
+                                  std::holds_alternative<
+                                    mtx::events::StateEvent<mtx::events::msg::Redacted>>(e)) {
+                                  if (!state->currentRoomStopAt.empty() &&
+                                      mtx::accessors::event_id(e) == state->currentRoomStopAt) {
+                                      // There is no filter to remove redacted events from
+                                      // pagination, so we try to stop early by caching what event
+                                      // we redacted last if we reached the end of a room.
+                                      nhlog::net()->info(
+                                        "Found previous redaction marker, stopping early: {}",
+                                        state->currentRoom);
+                                      state->currentRoomPrevToken.clear();
+                                      break;
+                                  }
                                   continue;
+                              }
 
                               if (std::holds_alternative<
                                     mtx::events::StateEvent<mtx::events::msg::Redacted>>(e))
@@ -1806,6 +1842,9 @@ utils::removeExpiredEvents()
                 state->currentRoomRedactionQueue.clear();
                 state->currentRoomStateEvents.clear();
 
+                state->currentRoomStopAt = cache::client()->loadEventExpirationProgress(
+                  state->currentRoom, nlohmann::json(state->currentExpiry).dump());
+
                 state->roomsToUpdate.pop_back();
                 next(std::move(state));
             } else {