diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 3933b6e2c5..5f61743e34 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore):
)
defer.returnValue(ret)
+ @defer.inlineCallbacks
+ def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
+ " stream_ordering >= ? AND stream_ordering >= ?"
+ )
+ txn.execute(sql, (min_stream_ordering, max_stream_ordering))
+ return [r[0] for r in txn.fetchall()]
+ ret = yield self.runInteraction("get_push_action_users_in_range", f)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_unread_push_actions_for_user_in_range(self, user_id,
+ min_stream_ordering,
+ max_stream_ordering=None):
+ def f(txn):
+ sql = (
+ "SELECT event_id, stream_ordering, actions"
+ " FROM event_push_actions"
+ " WHERE user_id = ? AND stream_ordering > ?"
+ )
+ args = [user_id, min_stream_ordering]
+ if max_stream_ordering is not None:
+ sql += " AND stream_ordering <= ?"
+ args.append(max_stream_ordering)
+ sql += " ORDER BY stream_ordering ASC"
+ txn.execute(sql, args)
+ return txn.fetchall()
+ ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f)
+ defer.returnValue([
+ {
+ "event_id": row[0],
+ "stream_ordering": row[1],
+ "actions": json.loads(row[2]),
+ } for row in ret
+ ])
+
+ @defer.inlineCallbacks
+ def get_latest_push_action_stream_ordering(self):
+ def f(txn):
+ txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
+ return txn.fetchone()
+ result = yield self.runInteraction(
+ "get_latest_push_action_stream_ordering", f
+ )
+ defer.returnValue(result[0] or 0)
+
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
|