summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/event_push_actions.py36
1 files changed, 22 insertions, 14 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 808c9d22fc..52d2bd3ffb 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -168,19 +168,13 @@ class EventPushActionsStore(SQLBaseStore):
         row = txn.fetchone()
         notify_count = row[0] if row else 0
 
-        summary_notif_count = self._simple_select_one_onecol_txn(
-            txn,
-            table="event_push_summary",
-            keyvalues={
-                "user_id": user_id,
-                "room_id": room_id,
-            },
-            retcol="notif_count",
-            allow_none=True,
-        )
-
-        if summary_notif_count:
-            notify_count += summary_notif_count
+        txn.execute("""
+            SELECT notif_count FROM event_push_summary
+            WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
+        """, (room_id, user_id, stream_ordering,))
+        rows = txn.fetchall()
+        if rows:
+            notify_count += rows[0][0]
 
         # Now get the number of highlights
         sql = (
@@ -645,12 +639,20 @@ class EventPushActionsStore(SQLBaseStore):
         # We want to make sure that we only ever do this one at a time
         self.database_engine.lock_table(txn, "event_push_summary")
 
+        old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_push_summary_stream_ordering",
+            keyvalues={},
+            retcol="stream_ordering",
+        )
+
         # We don't to try and rotate millions of rows at once, so we cap the
         # maximum stream ordering we'll rotate before.
         txn.execute("""
             SELECT stream_ordering FROM event_push_actions
+            WHERE stream_ordering > ?
             ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
-        """)
+        """, (old_rotate_stream_ordering,))
         stream_row = txn.fetchone()
         if stream_row:
             offset_stream_ordering, = stream_row
@@ -662,6 +664,8 @@ class EventPushActionsStore(SQLBaseStore):
             rotate_to_stream_ordering = self.stream_ordering_day_ago
             caught_up = True
 
+        logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
+
         self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
 
         # We have caught up iff we were limited by `stream_ordering_day_ago`
@@ -695,6 +699,8 @@ class EventPushActionsStore(SQLBaseStore):
         txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
         rows = txn.fetchall()
 
+        logger.info("Rotating notifications, handling %d rows", len(rows))
+
         # If the `old.user_id` above is NULL then we know there isn't already an
         # entry in the table, so we simply insert it. Otherwise we update the
         # existing table.
@@ -726,6 +732,8 @@ class EventPushActionsStore(SQLBaseStore):
             (old_rotate_stream_ordering, rotate_to_stream_ordering,)
         )
 
+        logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
+
         txn.execute(
             "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
             (rotate_to_stream_ordering,)