diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 315 |
1 files changed, 83 insertions, 232 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 47a1f2c45a..cf4b1682b6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -13,267 +13,118 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from ._base import SQLBaseStore -from .message import MessagesTable -from .feedback import FeedbackTable -from .roomdata import RoomDataTable -from .roommember import RoomMemberTable + +from synapse.api.constants import Membership import json import logging -logger = logging.getLogger(__name__) - - -class StreamStore(SQLBaseStore): - - def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0, - with_feedback=False): - """Get all messages for this user between the given keys. - Args: - user_id (str): The user who is requesting messages. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - room_id (str): Gets messages only for this room. Can be None, in - which case all room messages will be returned. - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - if with_feedback and room_id: # with fb MUST specify a room ID - return self._db_pool.runInteraction( - self._get_message_rows_with_feedback, - user_id, from_key, to_key, room_id, limit - ) - else: - return self._db_pool.runInteraction( - self._get_message_rows, - user_id, from_key, to_key, room_id, limit - ) +logger = logging.getLogger(__name__) - def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the messages table, bounded by the specified pkeys - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ("SELECT messages.* FROM messages WHERE ? IN" - + " (SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] +MAX_STREAM_SIZE = 1000 - if room_id: - query += " AND messages.room_id=?" - query_args.append(room_id) - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, limit=limit - ) +class StreamStore(SQLBaseStore): - cursor = txn.execute(query, query_args) - return self._as_events(cursor, MessagesTable, from_pkey) + @defer.inlineCallbacks + def get_room_events_stream(self, user_id, from_key, to_key, room_id, + limit=0, with_feedback=False): + # TODO (erikj): Handle compressed feedback - def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey, - room_id, limit): - # this col represents the compressed feedback JSON as per spec - compressed_feedback_col = ( - "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id" - + " || '\",\"feedback_type\":\"' || f.feedback_type" - + " || '\",\"content\":' || f.content || '}') || ']'" + current_room_membership_sql = ( + "SELECT m.room_id FROM room_memberships as m " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " + "WHERE m.user_id = ?" ) - global_msg_id_join = ("f.room_id = messages.room_id" - + " and f.msg_id = messages.msg_id" - + " and messages.user_id = f.msg_sender_id") - - select_query = ( - "SELECT messages.*, f.content AS fb_content, f.fb_sender_id" - + ", " + compressed_feedback_col + " AS compressed_fb" - + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join) - - current_membership_sub_query = ( - "(SELECT membership from room_memberships rm" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") - - where = (" WHERE ? IN " + current_membership_sub_query - + " AND messages.room_id=?") - - query = select_query + where - query_args = ["join", user_id, room_id] - - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, - limit=limit, group_by=" GROUP BY messages.id " + invites_sql = ( + "SELECT m.event_id FROM room_memberships as m " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " + "WHERE m.user_id = ? AND m.membership = ?" ) - cursor = txn.execute(query, query_args) - - # convert the result set into events - entries = self.cursor_to_dict(cursor) - events = [] - for entry in entries: - # TODO we should spec the cursor > event mapping somewhere else. - event = {} - straight_mappings = ["msg_id", "user_id", "room_id"] - for key in straight_mappings: - event[key] = entry[key] - event["content"] = json.loads(entry["content"]) - if entry["compressed_fb"]: - event["feedback"] = json.loads(entry["compressed_fb"]) - events.append(event) - - latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"] - - return (events, latest_pkey) - - def get_room_member_stream(self, user_id, from_key, to_key): - """Get all room membership events for this user between the given keys. - - Args: - user_id (str): The user who is requesting membership events. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - return self._db_pool.runInteraction( - self._get_room_member_rows, user_id, from_key, to_key + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = MAX_STREAM_SIZE + + # From and to keys should be integers from ordering. + from_key = int(from_key) + to_key = int(to_key) + + if from_key == to_key: + defer.returnValue(([], to_key)) + return + + sql = ( + "SELECT * FROM events as e WHERE " + "((room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s))) " + ) % { + "current": current_room_membership_sql, + "invites": invites_sql, + } + + # Constraints and ordering depend on direction. + if from_key < to_key: + sql += ( + "AND e.ordering > ? AND e.ordering < ? " + "ORDER BY ordering ASC LIMIT %(limit)d " + ) % {"limit": limit} + else: + sql += ( + "AND e.ordering < ? AND e.ordering > ? " + "ORDER BY ordering DESC LIMIT %(limit)d " + ) % {"limit": int(limit)} + + rows = yield self._execute_and_decode( + sql, + user_id, user_id, Membership.INVITE, from_key, to_key ) - def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey): - # get all room membership events for rooms which the user is - # *currently* joined in on, or all invite events for this user. - current_membership_sub_query = ( - "(SELECT membership FROM room_memberships" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") - - query = ("SELECT rm.* FROM room_memberships rm " - # all membership events for rooms you've currently joined. - + " WHERE (? IN " + current_membership_sub_query - # all invite membership events for this user - + " OR rm.membership=? AND user_id=?)" - + " AND rm.id > ?") - query_args = ["join", user_id, "invite", user_id, from_pkey] - - if to_pkey != -1: - query += " AND rm.id < ?" - query_args.append(to_pkey) - - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomMemberTable, from_pkey) + ret = [self._parse_event_from_row(r) for r in rows] - def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0): - return self._db_pool.runInteraction( - self._get_feedback_rows, - user_id, from_key, to_key, room_id, limit - ) - - def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys + if rows: + if from_key < to_key: + key = max([r["ordering"] for r in rows]) + else: + key = min([r["ordering"] for r in rows]) + else: + key = to_key - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT feedback.* FROM feedback WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] + defer.returnValue((ret, key)) - if room_id: - query += " AND feedback.room_id=?" - query_args.append(room_id) + @defer.inlineCallbacks + def get_recent_events_for_room(self, room_id, limit, with_feedback=False): + # TODO (erikj): Handle compressed feedback - (query, query_args) = self._append_stream_operations( - "feedback", query, query_args, from_pkey, to_pkey, limit=limit + sql = ( + "SELECT * FROM events WHERE room_id = ? " + "ORDER BY ordering DESC LIMIT ? " ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, FeedbackTable, from_pkey) - - def get_room_data_stream(self, user_id, from_key, to_key, room_id, - limit=0): - return self._db_pool.runInteraction( - self._get_room_data_rows, - user_id, from_key, to_key, room_id, limit + rows = yield self._execute_and_decode( + sql, + room_id, limit ) - def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys + rows.reverse() # As we selected with reverse ordering - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT room_data.* FROM room_data WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] + defer.returnValue([self._parse_event_from_row(r) for r in rows]) - if room_id: - query += " AND room_data.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "room_data", query, query_args, from_pkey, to_pkey, limit=limit + @defer.inlineCallbacks + def get_room_events_max_id(self): + res = yield self._execute_and_decode( + "SELECT MAX(ordering) as m FROM events" ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomDataTable, from_pkey) - - def _append_stream_operations(self, table_name, query, query_args, - from_pkey, to_pkey, limit=None, - group_by=""): - LATEST_ROW = -1 - order_by = "" - if to_pkey > from_pkey: - if from_pkey != LATEST_ROW: - # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9 - query += (" AND %s.id > ? AND %s.id < ?" % - (table_name, table_name)) - query_args.append(from_pkey) - query_args.append(to_pkey) - else: - # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC - query += " AND %s.id > ? " % table_name - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - elif from_pkey > to_pkey: - if to_pkey != LATEST_ROW: - # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC - query += (" AND %s.id > ? AND %s.id < ? " % - (table_name, table_name)) - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - query_args.append(from_pkey) - else: - # from=5 to=-1 >> from 5 to now >> id>5 - query += " AND %s.id > ?" % table_name - query_args.append(from_pkey) - - query += group_by + order_by - - if limit and limit > 0: - query += " LIMIT ?" - query_args.append(str(limit)) - - return (query, query_args) - - def _as_events(self, cursor, table, from_pkey): - data_entries = table.decode_results(cursor) - last_pkey = from_pkey - if data_entries: - last_pkey = data_entries[-1].id - - events = [ - entry.as_event(self.event_factory).get_dict() - for entry in data_entries - ] + if not res: + defer.returnValue(0) + return - return (events, last_pkey) + defer.returnValue(res[0]["m"]) |