summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_push_actions.py203
-rw-r--r--synapse/storage/keys.py38
2 files changed, 188 insertions, 53 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 3d93285f84..df4000d0da 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -117,24 +117,42 @@ class EventPushActionsStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_unread_push_actions_for_user_in_range(self, user_id,
-                                                  min_stream_ordering,
-                                                  max_stream_ordering=None,
-                                                  limit=20):
+    def get_unread_push_actions_for_user_in_range_for_http(
+        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
+    ):
+        """Get a list of the most recent unread push actions for a given user,
+        within the given stream ordering range. Called by the httppusher.
+
+        Args:
+            user_id (str): The user to fetch push actions for.
+            min_stream_ordering(int): The exclusive lower bound on the
+                stream ordering of event push actions to fetch.
+            max_stream_ordering(int): The inclusive upper bound on the
+                stream ordering of event push actions to fetch.
+            limit (int): The maximum number of rows to return.
+        Returns:
+            A promise which resolves to a list of dicts with the keys "event_id",
+            "room_id", "stream_ordering", "actions".
+            The list will be ordered by ascending stream_ordering.
+            The list will have between 0~limit entries.
+        """
+        # find rooms that have a read receipt in them and return the next
+        # push actions
         def get_after_receipt(txn):
+            # find rooms that have a read receipt in them and return the next
+            # push actions
             sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
-                "e.received_ts "
-                "FROM ("
-                "   SELECT room_id, user_id, "
-                "       max(topological_ordering) as topological_ordering, "
-                "       max(stream_ordering) as stream_ordering "
-                "       FROM events"
-                "   NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
-                "   GROUP BY room_id, user_id"
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
+                " FROM ("
+                "   SELECT room_id,"
+                "       MAX(topological_ordering) as topological_ordering,"
+                "       MAX(stream_ordering) as stream_ordering"
+                "   FROM events"
+                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
+                "   WHERE receipt_type = 'm.read' AND user_id = ?"
+                "   GROUP BY room_id"
                 ") AS rl,"
                 " event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
                 " WHERE"
                 "   ep.room_id = rl.room_id"
                 "   AND ("
@@ -144,44 +162,159 @@ class EventPushActionsStore(SQLBaseStore):
                 "           AND ep.stream_ordering > rl.stream_ordering"
                 "       )"
                 "   )"
-                "   AND ep.stream_ordering > ?"
                 "   AND ep.user_id = ?"
-                "   AND ep.user_id = rl.user_id"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering ASC LIMIT ?"
             )
-            args = [min_stream_ordering, user_id]
-            if max_stream_ordering is not None:
-                sql += " AND ep.stream_ordering <= ?"
-                args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            args.append(limit)
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
             txn.execute(sql, args)
             return txn.fetchall()
         after_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range", get_after_receipt
+            "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
         )
 
+        # There are rooms with push actions in them but you don't have a read receipt in
+        # them e.g. rooms you've been invited to, so get push actions for rooms which do
+        # not have read receipts in them too.
         def get_no_receipt(txn):
             sql = (
                 "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
                 " e.received_ts"
                 " FROM event_push_actions AS ep"
-                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
-                " WHERE ep.room_id not in ("
-                "   SELECT room_id FROM events NATURAL JOIN receipts_linearized"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id NOT IN ("
+                "     SELECT room_id FROM receipts_linearized"
+                "       WHERE receipt_type = 'm.read' AND user_id = ?"
+                "       GROUP BY room_id"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering ASC LIMIT ?"
+            )
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
+            txn.execute(sql, args)
+            return txn.fetchall()
+        no_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
+        )
+
+        notifs = [
+            {
+                "event_id": row[0],
+                "room_id": row[1],
+                "stream_ordering": row[2],
+                "actions": json.loads(row[3]),
+            } for row in after_read_receipt + no_read_receipt
+        ]
+
+        # Now sort it so it's ordered correctly, since currently it will
+        # contain results from the first query, correctly ordered, followed
+        # by results from the second query, but we want them all ordered
+        # by stream_ordering, oldest first.
+        notifs.sort(key=lambda r: r['stream_ordering'])
+
+        # Take only up to the limit. We have to stop at the limit because
+        # one of the subqueries may have hit the limit.
+        defer.returnValue(notifs[:limit])
+
+    @defer.inlineCallbacks
+    def get_unread_push_actions_for_user_in_range_for_email(
+        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
+    ):
+        """Get a list of the most recent unread push actions for a given user,
+        within the given stream ordering range. Called by the emailpusher
+
+        Args:
+            user_id (str): The user to fetch push actions for.
+            min_stream_ordering(int): The exclusive lower bound on the
+                stream ordering of event push actions to fetch.
+            max_stream_ordering(int): The inclusive upper bound on the
+                stream ordering of event push actions to fetch.
+            limit (int): The maximum number of rows to return.
+        Returns:
+            A promise which resolves to a list of dicts with the keys "event_id",
+            "room_id", "stream_ordering", "actions", "received_ts".
+            The list will be ordered by descending received_ts.
+            The list will have between 0~limit entries.
+        """
+        # find rooms that have a read receipt in them and return the most recent
+        # push actions
+        def get_after_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                "  e.received_ts"
+                " FROM ("
+                "   SELECT room_id,"
+                "       MAX(topological_ordering) as topological_ordering,"
+                "       MAX(stream_ordering) as stream_ordering"
+                "   FROM events"
+                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
                 "   WHERE receipt_type = 'm.read' AND user_id = ?"
                 "   GROUP BY room_id"
-                ") AND ep.user_id = ? AND ep.stream_ordering > ?"
+                ") AS rl,"
+                " event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id = rl.room_id"
+                "   AND ("
+                "       ep.topological_ordering > rl.topological_ordering"
+                "       OR ("
+                "           ep.topological_ordering = rl.topological_ordering"
+                "           AND ep.stream_ordering > rl.stream_ordering"
+                "       )"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering DESC LIMIT ?"
+            )
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
+            txn.execute(sql, args)
+            return txn.fetchall()
+        after_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
+        )
+
+        # There are rooms with push actions in them but you don't have a read receipt in
+        # them e.g. rooms you've been invited to, so get push actions for rooms which do
+        # not have read receipts in them too.
+        def get_no_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                " e.received_ts"
+                " FROM event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id NOT IN ("
+                "     SELECT room_id FROM receipts_linearized"
+                "       WHERE receipt_type = 'm.read' AND user_id = ?"
+                "       GROUP BY room_id"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering DESC LIMIT ?"
             )
-            args = [user_id, user_id, min_stream_ordering]
-            if max_stream_ordering is not None:
-                sql += " AND ep.stream_ordering <= ?"
-                args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            args.append(limit)
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
             txn.execute(sql, args)
             return txn.fetchall()
         no_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range", get_no_receipt
+            "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
         )
 
         # Make a list of dicts from the two sets of results.
@@ -198,7 +331,7 @@ class EventPushActionsStore(SQLBaseStore):
         # Now sort it so it's ordered correctly, since currently it will
         # contain results from the first query, correctly ordered, followed
         # by results from the second query, but we want them all ordered
-        # by received_ts
+        # by received_ts (most recent first)
         notifs.sort(key=lambda r: -(r['received_ts'] or 0))
 
         # Now return the first `limit`
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a495a8a7d9..86b37b9ddd 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -22,6 +22,10 @@ import OpenSSL
 from signedjson.key import decode_verify_key_bytes
 import hashlib
 
+import logging
+
+logger = logging.getLogger(__name__)
+
 
 class KeyStore(SQLBaseStore):
     """Persistence for signature verification keys and tls X.509 certificates
@@ -74,22 +78,22 @@ class KeyStore(SQLBaseStore):
         )
 
     @cachedInlineCallbacks()
-    def get_all_server_verify_keys(self, server_name):
-        rows = yield self._simple_select_list(
+    def _get_server_verify_key(self, server_name, key_id):
+        verify_key_bytes = yield self._simple_select_one_onecol(
             table="server_signature_keys",
             keyvalues={
                 "server_name": server_name,
+                "key_id": key_id,
             },
-            retcols=["key_id", "verify_key"],
-            desc="get_all_server_verify_keys",
+            retcol="verify_key",
+            desc="_get_server_verify_key",
+            allow_none=True,
         )
 
-        defer.returnValue({
-            row["key_id"]: decode_verify_key_bytes(
-                row["key_id"], str(row["verify_key"])
-            )
-            for row in rows
-        })
+        if verify_key_bytes:
+            defer.returnValue(decode_verify_key_bytes(
+                key_id, str(verify_key_bytes)
+            ))
 
     @defer.inlineCallbacks
     def get_server_verify_keys(self, server_name, key_ids):
@@ -101,12 +105,12 @@ class KeyStore(SQLBaseStore):
         Returns:
             (list of VerifyKey): The verification keys.
         """
-        keys = yield self.get_all_server_verify_keys(server_name)
-        defer.returnValue({
-            k: keys[k]
-            for k in key_ids
-            if k in keys and keys[k]
-        })
+        keys = {}
+        for key_id in key_ids:
+            key = yield self._get_server_verify_key(server_name, key_id)
+            if key:
+                keys[key_id] = key
+        defer.returnValue(keys)
 
     @defer.inlineCallbacks
     def store_server_verify_key(self, server_name, from_server, time_now_ms,
@@ -133,8 +137,6 @@ class KeyStore(SQLBaseStore):
             desc="store_server_verify_key",
         )
 
-        self.get_all_server_verify_keys.invalidate((server_name,))
-
     def store_server_keys_json(self, server_name, key_id, from_server,
                                ts_now_ms, ts_expires_ms, key_json_bytes):
         """Stores the JSON bytes for a set of keys from a server