diff --git a/changelog.d/13455.misc b/changelog.d/13455.misc
new file mode 100644
index 0000000000..17462c56f3
--- /dev/null
+++ b/changelog.d/13455.misc
@@ -0,0 +1 @@
+Add some comments about how event push actions are stored.
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index dd2627037c..5ddddb1cf3 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -265,7 +265,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
counts.notify_count += row[1]
counts.unread_count += row[2]
- # Next we need to count highlights, which aren't summarized
+ # Next we need to count highlights, which aren't summarised
sql = """
SELECT COUNT(*) FROM event_push_actions
WHERE user_id = ?
@@ -280,7 +280,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# Finally we need to count push actions that aren't included in the
# summary returned above, e.g. recent events that haven't been
- # summarized yet, or the summary is empty due to a recent read receipt.
+ # summarised yet, or the summary is empty due to a recent read receipt.
stream_ordering = max(stream_ordering, summary_stream_ordering)
notify_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering
@@ -304,6 +304,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
Does not consult `event_push_summary` table, which may include push
actions that have been deleted from `event_push_actions` table.
+
+ Args:
+ txn: The database transaction.
+ room_id: The room ID to get unread counts for.
+ user_id: The user ID to get unread counts for.
+ stream_ordering: The (exclusive) minimum stream ordering to consider.
+ max_stream_ordering: The (inclusive) maximum stream ordering to consider.
+ If this is not given, then no maximum is applied.
+
+ Return:
+ A tuple of the notif count and unread count in the given range.
"""
# If there have been no events in the room since the stream ordering,
@@ -383,27 +394,27 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
) -> List[Tuple[str, str, int, str, bool]]:
# find rooms that have a read receipt in them and return the next
# push actions
- sql = (
- "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
- " ep.highlight "
- " FROM ("
- " SELECT room_id,"
- " MAX(stream_ordering) as stream_ordering"
- " FROM events"
- " INNER JOIN receipts_linearized USING (room_id, event_id)"
- " WHERE receipt_type = 'm.read' AND user_id = ?"
- " GROUP BY room_id"
- ") AS rl,"
- " event_push_actions AS ep"
- " WHERE"
- " ep.room_id = rl.room_id"
- " AND ep.stream_ordering > rl.stream_ordering"
- " AND ep.user_id = ?"
- " AND ep.stream_ordering > ?"
- " AND ep.stream_ordering <= ?"
- " AND ep.notif = 1"
- " ORDER BY ep.stream_ordering ASC LIMIT ?"
- )
+ sql = """
+ SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
+ ep.highlight
+ FROM (
+ SELECT room_id,
+ MAX(stream_ordering) as stream_ordering
+ FROM events
+ INNER JOIN receipts_linearized USING (room_id, event_id)
+ WHERE receipt_type = 'm.read' AND user_id = ?
+ GROUP BY room_id
+ ) AS rl,
+ event_push_actions AS ep
+ WHERE
+ ep.room_id = rl.room_id
+ AND ep.stream_ordering > rl.stream_ordering
+ AND ep.user_id = ?
+ AND ep.stream_ordering > ?
+ AND ep.stream_ordering <= ?
+ AND ep.notif = 1
+ ORDER BY ep.stream_ordering ASC LIMIT ?
+ """
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@@ -418,23 +429,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_no_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool]]:
- sql = (
- "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
- " ep.highlight "
- " FROM event_push_actions AS ep"
- " INNER JOIN events AS e USING (room_id, event_id)"
- " WHERE"
- " ep.room_id NOT IN ("
- " SELECT room_id FROM receipts_linearized"
- " WHERE receipt_type = 'm.read' AND user_id = ?"
- " GROUP BY room_id"
- " )"
- " AND ep.user_id = ?"
- " AND ep.stream_ordering > ?"
- " AND ep.stream_ordering <= ?"
- " AND ep.notif = 1"
- " ORDER BY ep.stream_ordering ASC LIMIT ?"
- )
+ sql = """
+ SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
+ ep.highlight
+ FROM event_push_actions AS ep
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ ep.room_id NOT IN (
+ SELECT room_id FROM receipts_linearized
+ WHERE receipt_type = 'm.read' AND user_id = ?
+ GROUP BY room_id
+ )
+ AND ep.user_id = ?
+ AND ep.stream_ordering > ?
+ AND ep.stream_ordering <= ?
+ AND ep.notif = 1
+ ORDER BY ep.stream_ordering ASC LIMIT ?
+ """
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@@ -490,28 +501,28 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_after_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
- sql = (
- "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
- " ep.highlight, e.received_ts"
- " FROM ("
- " SELECT room_id,"
- " MAX(stream_ordering) as stream_ordering"
- " FROM events"
- " INNER JOIN receipts_linearized USING (room_id, event_id)"
- " WHERE receipt_type = 'm.read' AND user_id = ?"
- " GROUP BY room_id"
- ") AS rl,"
- " event_push_actions AS ep"
- " INNER JOIN events AS e USING (room_id, event_id)"
- " WHERE"
- " ep.room_id = rl.room_id"
- " AND ep.stream_ordering > rl.stream_ordering"
- " AND ep.user_id = ?"
- " AND ep.stream_ordering > ?"
- " AND ep.stream_ordering <= ?"
- " AND ep.notif = 1"
- " ORDER BY ep.stream_ordering DESC LIMIT ?"
- )
+ sql = """
+ SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
+ ep.highlight, e.received_ts
+ FROM (
+ SELECT room_id,
+ MAX(stream_ordering) as stream_ordering
+ FROM events
+ INNER JOIN receipts_linearized USING (room_id, event_id)
+ WHERE receipt_type = 'm.read' AND user_id = ?
+ GROUP BY room_id
+ ) AS rl,
+ event_push_actions AS ep
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ ep.room_id = rl.room_id
+ AND ep.stream_ordering > rl.stream_ordering
+ AND ep.user_id = ?
+ AND ep.stream_ordering > ?
+ AND ep.stream_ordering <= ?
+ AND ep.notif = 1
+ ORDER BY ep.stream_ordering DESC LIMIT ?
+ """
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
@@ -526,23 +537,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_no_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
- sql = (
- "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
- " ep.highlight, e.received_ts"
- " FROM event_push_actions AS ep"
- " INNER JOIN events AS e USING (room_id, event_id)"
- " WHERE"
- " ep.room_id NOT IN ("
- " SELECT room_id FROM receipts_linearized"
- " WHERE receipt_type = 'm.read' AND user_id = ?"
- " GROUP BY room_id"
- " )"
- " AND ep.user_id = ?"
- " AND ep.stream_ordering > ?"
- " AND ep.stream_ordering <= ?"
- " AND ep.notif = 1"
- " ORDER BY ep.stream_ordering DESC LIMIT ?"
- )
+ sql = """
+ SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
+ ep.highlight, e.received_ts
+ FROM event_push_actions AS ep
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ ep.room_id NOT IN (
+ SELECT room_id FROM receipts_linearized
+ WHERE receipt_type = 'm.read' AND user_id = ?
+ GROUP BY room_id
+ )
+ AND ep.user_id = ?
+ AND ep.stream_ordering > ?
+ AND ep.stream_ordering <= ?
+ AND ep.notif = 1
+ ORDER BY ep.stream_ordering DESC LIMIT ?
+ """
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
@@ -769,12 +780,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# [10, <none>, 20], we should treat this as being equivalent to
# [10, 10, 20].
#
- sql = (
- "SELECT received_ts FROM events"
- " WHERE stream_ordering <= ?"
- " ORDER BY stream_ordering DESC"
- " LIMIT 1"
- )
+ sql = """
+ SELECT received_ts FROM events
+ WHERE stream_ordering <= ?
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ """
while range_end - range_start > 0:
middle = (range_end + range_start) // 2
@@ -802,14 +813,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
self, stream_ordering: int
) -> Optional[int]:
def f(txn: LoggingTransaction) -> Optional[Tuple[int]]:
- sql = (
- "SELECT e.received_ts"
- " FROM event_push_actions AS ep"
- " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
- " WHERE ep.stream_ordering > ? AND notif = 1"
- " ORDER BY ep.stream_ordering ASC"
- " LIMIT 1"
- )
+ sql = """
+ SELECT e.received_ts
+ FROM event_push_actions AS ep
+ JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id
+ WHERE ep.stream_ordering > ? AND notif = 1
+ ORDER BY ep.stream_ordering ASC
+ LIMIT 1
+ """
txn.execute(sql, (stream_ordering,))
return cast(Optional[Tuple[int]], txn.fetchone())
@@ -858,10 +869,13 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
Any push actions which predate the user's most recent read receipt are
now redundant, so we can remove them from `event_push_actions` and
update `event_push_summary`.
+
+ Returns true if all new receipts have been processed.
"""
limit = 100
+ # The (inclusive) receipt stream ID that was previously processed..
min_receipts_stream_id = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_last_receipt_stream_id",
@@ -871,6 +885,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
max_receipts_stream_id = self._receipts_id_gen.get_current_token()
+ # The (inclusive) event stream ordering that was previously summarised.
+ old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="event_push_summary_stream_ordering",
+ keyvalues={},
+ retcol="stream_ordering",
+ )
+
sql = """
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
FROM receipts_linearized AS r
@@ -895,13 +917,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
)
rows = txn.fetchall()
- old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="event_push_summary_stream_ordering",
- keyvalues={},
- retcol="stream_ordering",
- )
-
# For each new read receipt we delete push actions from before it and
# recalculate the summary.
for _, room_id, user_id, stream_ordering in rows:
@@ -920,10 +935,13 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(room_id, user_id, stream_ordering),
)
+ # Fetch the notification counts between the stream ordering of the
+ # latest receipt and what was previously summarised.
notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)
+ # Replace the previous summary with the new counts.
self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
@@ -956,10 +974,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return len(rows) < limit
def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool:
- """Archives older notifications into event_push_summary. Returns whether
- the archiving process has caught up or not.
+ """Archives older notifications (from event_push_actions) into event_push_summary.
+
+ Returns whether the archiving process has caught up or not.
"""
+ # The (inclusive) event stream ordering that was previously summarised.
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
@@ -974,7 +994,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
- """,
+ """,
(old_rotate_stream_ordering, self._rotate_count),
)
stream_row = txn.fetchone()
@@ -993,19 +1013,31 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
- self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
+ self._rotate_notifs_before_txn(
+ txn, old_rotate_stream_ordering, rotate_to_stream_ordering
+ )
return caught_up
def _rotate_notifs_before_txn(
- self, txn: LoggingTransaction, rotate_to_stream_ordering: int
+ self,
+ txn: LoggingTransaction,
+ old_rotate_stream_ordering: int,
+ rotate_to_stream_ordering: int,
) -> None:
- old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="event_push_summary_stream_ordering",
- keyvalues={},
- retcol="stream_ordering",
- )
+ """Archives older notifications (from event_push_actions) into event_push_summary.
+
+ Any event_push_actions between old_rotate_stream_ordering (exclusive) and
+ rotate_to_stream_ordering (inclusive) will be added to the event_push_summary
+ table.
+
+ Args:
+ txn: The database transaction.
+ old_rotate_stream_ordering: The previous maximum event stream ordering.
+ rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
+
+ Returns whether the archiving process has caught up or not.
+ """
# Calculate the new counts that should be upserted into event_push_summary
sql = """
@@ -1093,9 +1125,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
async def _remove_old_push_actions_that_have_rotated(
self,
) -> None:
- """Clear out old push actions that have been summarized."""
+ """Clear out old push actions that have been summarised."""
- # We want to clear out anything that older than a day that *has* already
+ # We want to clear out anything that is older than a day that *has* already
# been rotated.
rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol(
table="event_push_summary_stream_ordering",
@@ -1119,7 +1151,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering <= ? AND highlight = 0
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
- """,
+ """,
(
max_stream_ordering_to_delete,
batch_size,
@@ -1215,16 +1247,18 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
# NB. This assumes event_ids are globally unique since
# it makes the query easier to index
- sql = (
- "SELECT epa.event_id, epa.room_id,"
- " epa.stream_ordering, epa.topological_ordering,"
- " epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
- " FROM event_push_actions epa, events e"
- " WHERE epa.event_id = e.event_id"
- " AND epa.user_id = ? %s"
- " AND epa.notif = 1"
- " ORDER BY epa.stream_ordering DESC"
- " LIMIT ?" % (before_clause,)
+ sql = """
+ SELECT epa.event_id, epa.room_id,
+ epa.stream_ordering, epa.topological_ordering,
+ epa.actions, epa.highlight, epa.profile_tag, e.received_ts
+ FROM event_push_actions epa, events e
+ WHERE epa.event_id = e.event_id
+ AND epa.user_id = ? %s
+ AND epa.notif = 1
+ ORDER BY epa.stream_ordering DESC
+ LIMIT ?
+ """ % (
+ before_clause,
)
txn.execute(sql, args)
return cast(
|