diff options
Diffstat (limited to 'synapse/storage/event_push_actions.py')
-rw-r--r-- | synapse/storage/event_push_actions.py | 57 |
1 files changed, 50 insertions, 7 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 86a98b6f11..ad512b2f07 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -118,15 +118,17 @@ class EventPushActionsStore(SQLBaseStore): max_stream_ordering=None): def get_after_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " "FROM event_push_actions AS ep, (" - " SELECT room_id, user_id," - " max(topological_ordering) as topological_ordering," - " max(stream_ordering) as stream_ordering" + " SELECT room_id, user_id, " + " max(topological_ordering) as topological_ordering, " + " max(stream_ordering) as stream_ordering " " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" ") AS rl " + "NATURAL JOIN events e " "WHERE" " ep.room_id = rl.room_id" " AND (" @@ -153,8 +155,10 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " "WHERE ep.room_id not in (" " SELECT room_id FROM events NATURAL JOIN receipts_linearized" " WHERE receipt_type = 'm.read' AND user_id = ? " @@ -175,12 +179,31 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue([ { "event_id": row[0], - "stream_ordering": row[1], - "actions": json.loads(row[2]), + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + "received_ts": row[4], } for row in after_read_receipt + no_read_receipt ]) @defer.inlineCallbacks + def get_time_of_last_push_action_before(self, stream_ordering): + def f(txn): + sql = ( + "SELECT e.received_ts " + "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " + "WHERE ep.stream_ordering > ? " + "ORDER BY ep.stream_ordering ASC " + "LIMIT 1" + ) + txn.execute(sql, (stream_ordering,)) + return txn.fetchone() + result = yield self.runInteraction("get_time_of_last_push_action_before", f) + defer.returnValue(result[0] if result is not None else None) + + + @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): def f(txn): txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") @@ -190,6 +213,26 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(result[0] or 0) + @defer.inlineCallbacks + def get_time_of_latest_push_action_by_room_for_user(self, user_id): + """ + Returns only the received_ts of the last notification in each of the + user's rooms, in a dict by room_id + """ + def f(txn): + txn.execute( + "SELECT ep.room_id, MAX(e.received_ts) " + "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " + "GROUP BY ep.room_id" + ) + return txn.fetchall() + result = yield self.runInteraction( + "get_time_of_latest_push_action_by_room_for_user", f + ) + + defer.returnValue({row[0]: row[1] for row in result}) + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( |