diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 86a98b6f11..9705db5c47 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -118,16 +118,19 @@ class EventPushActionsStore(SQLBaseStore):
max_stream_ordering=None):
def get_after_receipt(txn):
sql = (
- "SELECT ep.event_id, ep.stream_ordering, ep.actions "
- "FROM event_push_actions AS ep, ("
- " SELECT room_id, user_id,"
- " max(topological_ordering) as topological_ordering,"
- " max(stream_ordering) as stream_ordering"
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+ "e.received_ts "
+ "FROM ("
+ " SELECT room_id, user_id, "
+ " max(topological_ordering) as topological_ordering, "
+ " max(stream_ordering) as stream_ordering "
" FROM events"
" NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
" GROUP BY room_id, user_id"
- ") AS rl "
- "WHERE"
+ ") AS rl,"
+ " event_push_actions AS ep"
+ " INNER JOIN events AS e USING (room_id, event_id)"
+ " WHERE"
" ep.room_id = rl.room_id"
" AND ("
" ep.topological_ordering > rl.topological_ordering"
@@ -153,11 +156,13 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
- "SELECT ep.event_id, ep.stream_ordering, ep.actions "
- "FROM event_push_actions AS ep "
- "WHERE ep.room_id not in ("
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+ " e.received_ts"
+ " FROM event_push_actions AS ep"
+ " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+ " WHERE ep.room_id not in ("
" SELECT room_id FROM events NATURAL JOIN receipts_linearized"
- " WHERE receipt_type = 'm.read' AND user_id = ? "
+ " WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
") AND ep.user_id = ? AND ep.stream_ordering > ?"
)
@@ -175,12 +180,30 @@ class EventPushActionsStore(SQLBaseStore):
defer.returnValue([
{
"event_id": row[0],
- "stream_ordering": row[1],
- "actions": json.loads(row[2]),
+ "room_id": row[1],
+ "stream_ordering": row[2],
+ "actions": json.loads(row[3]),
+ "received_ts": row[4],
} for row in after_read_receipt + no_read_receipt
])
@defer.inlineCallbacks
+ def get_time_of_last_push_action_before(self, stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT e.received_ts"
+ " FROM event_push_actions AS ep"
+ " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+ " WHERE ep.stream_ordering > ?"
+ " ORDER BY ep.stream_ordering ASC"
+ " LIMIT 1"
+ )
+ txn.execute(sql, (stream_ordering,))
+ return txn.fetchone()
+ result = yield self.runInteraction("get_time_of_last_push_action_before", f)
+ defer.returnValue(result[0] if result else None)
+
+ @defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
@@ -201,6 +224,18 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id)
)
+ def _remove_push_actions_before_txn(self, txn, room_id, user_id,
+ topological_ordering):
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (room_id, user_id, )
+ )
+ txn.execute(
+ "DELETE FROM event_push_actions"
+ " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
+ (room_id, user_id, topological_ordering,)
+ )
+
def _action_has_highlight(actions):
for action in actions:
|