diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/event_push_actions.py | 203 | ||||
-rw-r--r-- | synapse/storage/keys.py | 38 |
2 files changed, 188 insertions, 53 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3d93285f84..df4000d0da 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -117,24 +117,42 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks - def get_unread_push_actions_for_user_in_range(self, user_id, - min_stream_ordering, - max_stream_ordering=None, - limit=20): + def get_unread_push_actions_for_user_in_range_for_http( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. Called by the httppusher. + + Args: + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions". + The list will be ordered by ascending stream_ordering. + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the next + # push actions def get_after_receipt(txn): + # find rooms that have a read receipt in them and return the next + # push actions sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " - "e.received_ts " - "FROM (" - " SELECT room_id, user_id, " - " max(topological_ordering) as topological_ordering, " - " max(stream_ordering) as stream_ordering " - " FROM events" - " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" - " GROUP BY room_id, user_id" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" ") AS rl," " event_push_actions AS ep" - " INNER JOIN events AS e USING (room_id, event_id)" " WHERE" " ep.room_id = rl.room_id" " AND (" @@ -144,44 +162,159 @@ class EventPushActionsStore(SQLBaseStore): " AND ep.stream_ordering > rl.stream_ordering" " )" " )" - " AND ep.stream_ordering > ?" " AND ep.user_id = ?" - " AND ep.user_id = rl.user_id" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" ) - args = [min_stream_ordering, user_id] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_after_receipt + "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt ) + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. def get_no_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," " e.received_ts" " FROM event_push_actions AS ep" - " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" - " WHERE ep.room_id not in (" - " SELECT room_id FROM events NATURAL JOIN receipts_linearized" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + no_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt + ) + + notifs = [ + { + "event_id": row[0], + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + } for row in after_read_receipt + no_read_receipt + ] + + # Now sort it so it's ordered correctly, since currently it will + # contain results from the first query, correctly ordered, followed + # by results from the second query, but we want them all ordered + # by stream_ordering, oldest first. + notifs.sort(key=lambda r: r['stream_ordering']) + + # Take only up to the limit. We have to stop at the limit because + # one of the subqueries may have hit the limit. + defer.returnValue(notifs[:limit]) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range_for_email( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. Called by the emailpusher + + Args: + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions", "received_ts". + The list will be ordered by descending received_ts. + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the most recent + # push actions + def get_after_receipt(txn): + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" " WHERE receipt_type = 'm.read' AND user_id = ?" " GROUP BY room_id" - ") AND ep.user_id = ? AND ep.stream_ordering > ?" + ") AS rl," + " event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id = rl.room_id" + " AND (" + " ep.topological_ordering > rl.topological_ordering" + " OR (" + " ep.topological_ordering = rl.topological_ordering" + " AND ep.stream_ordering > rl.stream_ordering" + " )" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + after_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt + ) + + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. + def get_no_receipt(txn): + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" ) - args = [user_id, user_id, min_stream_ordering] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() no_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_no_receipt + "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt ) # Make a list of dicts from the two sets of results. @@ -198,7 +331,7 @@ class EventPushActionsStore(SQLBaseStore): # Now sort it so it's ordered correctly, since currently it will # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered - # by received_ts + # by received_ts (most recent first) notifs.sort(key=lambda r: -(r['received_ts'] or 0)) # Now return the first `limit` diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index a495a8a7d9..86b37b9ddd 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -22,6 +22,10 @@ import OpenSSL from signedjson.key import decode_verify_key_bytes import hashlib +import logging + +logger = logging.getLogger(__name__) + class KeyStore(SQLBaseStore): """Persistence for signature verification keys and tls X.509 certificates @@ -74,22 +78,22 @@ class KeyStore(SQLBaseStore): ) @cachedInlineCallbacks() - def get_all_server_verify_keys(self, server_name): - rows = yield self._simple_select_list( + def _get_server_verify_key(self, server_name, key_id): + verify_key_bytes = yield self._simple_select_one_onecol( table="server_signature_keys", keyvalues={ "server_name": server_name, + "key_id": key_id, }, - retcols=["key_id", "verify_key"], - desc="get_all_server_verify_keys", + retcol="verify_key", + desc="_get_server_verify_key", + allow_none=True, ) - defer.returnValue({ - row["key_id"]: decode_verify_key_bytes( - row["key_id"], str(row["verify_key"]) - ) - for row in rows - }) + if verify_key_bytes: + defer.returnValue(decode_verify_key_bytes( + key_id, str(verify_key_bytes) + )) @defer.inlineCallbacks def get_server_verify_keys(self, server_name, key_ids): @@ -101,12 +105,12 @@ class KeyStore(SQLBaseStore): Returns: (list of VerifyKey): The verification keys. """ - keys = yield self.get_all_server_verify_keys(server_name) - defer.returnValue({ - k: keys[k] - for k in key_ids - if k in keys and keys[k] - }) + keys = {} + for key_id in key_ids: + key = yield self._get_server_verify_key(server_name, key_id) + if key: + keys[key_id] = key + defer.returnValue(keys) @defer.inlineCallbacks def store_server_verify_key(self, server_name, from_server, time_now_ms, @@ -133,8 +137,6 @@ class KeyStore(SQLBaseStore): desc="store_server_verify_key", ) - self.get_all_server_verify_keys.invalidate((server_name,)) - def store_server_keys_json(self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes): """Stores the JSON bytes for a set of keys from a server |