summary refs log tree commit diff
path: root/synapse/storage/event_push_actions.py
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2016-04-19 14:24:36 +0100
committerDavid Baker <dave@matrix.org>2016-04-19 14:24:36 +0100
commit07d765209dea12229e70a09784e647611acabcda (patch)
tree08a6e675544322049e0e9ed014afed8a77aa4a78 /synapse/storage/event_push_actions.py
parentMerge pull request #735 from matrix-org/erikj/media_resource_cleanup (diff)
downloadsynapse-07d765209dea12229e70a09784e647611acabcda.tar.xz
First bits of emailpusher
Mostly logic of when to send an email
Diffstat (limited to 'synapse/storage/event_push_actions.py')
-rw-r--r--synapse/storage/event_push_actions.py57
1 files changed, 50 insertions, 7 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 86a98b6f11..ad512b2f07 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -118,15 +118,17 @@ class EventPushActionsStore(SQLBaseStore):
                                                   max_stream_ordering=None):
         def get_after_receipt(txn):
             sql = (
-                "SELECT ep.event_id, ep.stream_ordering, ep.actions "
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+                "e.received_ts "
                 "FROM event_push_actions AS ep, ("
-                "   SELECT room_id, user_id,"
-                "       max(topological_ordering) as topological_ordering,"
-                "       max(stream_ordering) as stream_ordering"
+                "   SELECT room_id, user_id, "
+                "       max(topological_ordering) as topological_ordering, "
+                "       max(stream_ordering) as stream_ordering "
                 "       FROM events"
                 "   NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
                 "   GROUP BY room_id, user_id"
                 ") AS rl "
+                "NATURAL JOIN events e "
                 "WHERE"
                 "   ep.room_id = rl.room_id"
                 "   AND ("
@@ -153,8 +155,10 @@ class EventPushActionsStore(SQLBaseStore):
 
         def get_no_receipt(txn):
             sql = (
-                "SELECT ep.event_id, ep.stream_ordering, ep.actions "
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+                "e.received_ts "
                 "FROM event_push_actions AS ep "
+                "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
                 "WHERE ep.room_id not in ("
                 "   SELECT room_id FROM events NATURAL JOIN receipts_linearized"
                 "   WHERE receipt_type = 'm.read' AND user_id = ? "
@@ -175,12 +179,31 @@ class EventPushActionsStore(SQLBaseStore):
         defer.returnValue([
             {
                 "event_id": row[0],
-                "stream_ordering": row[1],
-                "actions": json.loads(row[2]),
+                "room_id": row[1],
+                "stream_ordering": row[2],
+                "actions": json.loads(row[3]),
+                "received_ts": row[4],
             } for row in after_read_receipt + no_read_receipt
         ])
 
     @defer.inlineCallbacks
+    def get_time_of_last_push_action_before(self, stream_ordering):
+        def f(txn):
+            sql = (
+                "SELECT e.received_ts "
+                "FROM event_push_actions AS ep "
+                "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
+                "WHERE ep.stream_ordering > ? "
+                "ORDER BY ep.stream_ordering ASC "
+                "LIMIT 1"
+            )
+            txn.execute(sql, (stream_ordering,))
+            return txn.fetchone()
+        result = yield self.runInteraction("get_time_of_last_push_action_before", f)
+        defer.returnValue(result[0] if result is not None else None)
+
+
+    @defer.inlineCallbacks
     def get_latest_push_action_stream_ordering(self):
         def f(txn):
             txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
@@ -190,6 +213,26 @@ class EventPushActionsStore(SQLBaseStore):
         )
         defer.returnValue(result[0] or 0)
 
+    @defer.inlineCallbacks
+    def get_time_of_latest_push_action_by_room_for_user(self, user_id):
+        """
+        Returns only the received_ts of the last notification in each of the
+        user's rooms, in a dict by room_id
+        """
+        def f(txn):
+            txn.execute(
+                "SELECT ep.room_id, MAX(e.received_ts) "
+                "FROM event_push_actions AS ep "
+                "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
+                "GROUP BY ep.room_id"
+            )
+            return txn.fetchall()
+        result = yield self.runInteraction(
+            "get_time_of_latest_push_action_by_room_for_user", f
+        )
+
+        defer.returnValue({row[0]: row[1] for row in result})
+
     def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
         # Sad that we have to blow away the cache for the whole room here
         txn.call_after(