summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/event_push_actions.py59
1 files changed, 50 insertions, 9 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 355478957d..3269834e2f 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -116,26 +116,67 @@ class EventPushActionsStore(SQLBaseStore):
     def get_unread_push_actions_for_user_in_range(self, user_id,
                                                   min_stream_ordering,
                                                   max_stream_ordering=None):
-        def f(txn):
+        def get_after_receipt(txn):
             sql = (
-                "SELECT event_id, stream_ordering, actions"
-                " FROM event_push_actions"
-                " WHERE user_id = ? AND stream_ordering > ?"
+                "SELECT ep.event_id, ep.stream_ordering, ep.actions "
+                "FROM event_push_actions AS ep, ("
+                "   SELECT room_id, user_id,"
+                "       max(topological_ordering) as topological_ordering,"
+                "       max(stream_ordering) as stream_ordering"
+                "       FROM events"
+                "   NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
+                "   GROUP BY room_id, user_id"
+                ") AS rl "
+                "WHERE"
+                "   ep.room_id = rl.room_id"
+                "   AND ("
+                "       ep.topological_ordering > rl.topological_ordering"
+                "       OR ("
+                "           ep.topological_ordering = rl.topological_ordering"
+                "           AND ep.stream_ordering > ?"
+                "       )"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.user_id = rl.user_id"
             )
-            args = [user_id, min_stream_ordering]
+            args = [min_stream_ordering, user_id]
             if max_stream_ordering is not None:
-                sql += " AND stream_ordering <= ?"
+                sql += " AND ep.stream_ordering <= ?"
                 args.append(max_stream_ordering)
-            sql += " ORDER BY stream_ordering ASC"
+            sql += " ORDER BY ep.stream_ordering ASC"
             txn.execute(sql, args)
             return txn.fetchall()
-        ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f)
+        after_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range", get_after_receipt
+        )
+
+        def get_no_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.stream_ordering, ep.actions "
+                "FROM event_push_actions AS ep "
+                "WHERE ep.room_id not in ("
+                "   SELECT room_id FROM events NATURAL JOIN receipts_linearized"
+                "   WHERE receipt_type = 'm.read' AND user_id = ? "
+                "   GROUP BY room_id"
+                ") AND ep.user_id = ? AND ep.stream_ordering > ?"
+            )
+            args = [user_id, user_id, min_stream_ordering]
+            if max_stream_ordering is not None:
+                sql += " AND ep.stream_ordering <= ?"
+                args.append(max_stream_ordering)
+            sql += " ORDER BY ep.stream_ordering ASC"
+            txn.execute(sql, args)
+            return txn.fetchall()
+        no_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range", get_no_receipt
+        )
+
         defer.returnValue([
             {
                 "event_id": row[0],
                 "stream_ordering": row[1],
                 "actions": json.loads(row[2]),
-            } for row in ret
+            } for row in after_read_receipt+no_read_receipt
         ])
 
     @defer.inlineCallbacks