diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 468 |
1 files changed, 244 insertions, 224 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 47a1f2c45a..87ae961ccd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -13,267 +13,287 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" This module is responsible for getting events from the DB for pagination +and event streaming. + +The order it returns events in depend on whether we are streaming forwards or +are paginating backwards. We do this because we want to handle out of order +messages nicely, while still returning them in the correct order when we +paginate bacwards. + +This is implemented by keeping two ordering columns: stream_ordering and +topological_ordering. Stream ordering is basically insertion/received order +(except for events from backfill requests). The topolgical_ordering is a +weak ordering of events based on the pdu graph. + +This means that we have to have two different types of tokens, depending on +what sort order was used: + - stream tokens are of the form: "s%d", which maps directly to the column + - topological tokems: "t%d-%d", where the integers map to the topological + and stream ordering columns respectively. +""" + +from twisted.internet import defer from ._base import SQLBaseStore -from .message import MessagesTable -from .feedback import FeedbackTable -from .roomdata import RoomDataTable -from .roommember import RoomMemberTable +from synapse.api.errors import SynapseError +from synapse.api.constants import Membership +from synapse.util.logutils import log_function import json import logging + logger = logging.getLogger(__name__) +MAX_STREAM_SIZE = 1000 + + +_STREAM_TOKEN = "stream" +_TOPOLOGICAL_TOKEN = "topological" + + +def _parse_stream_token(string): + try: + if string[0] != 's': + raise + return int(string[1:]) + except: + raise SynapseError(400, "Invalid token") + + +def _parse_topological_token(string): + try: + if string[0] != 't': + raise + parts = string[1:].split('-', 1) + return (int(parts[0]), int(parts[1])) + except: + raise SynapseError(400, "Invalid token") + + +def is_stream_token(string): + try: + _parse_stream_token(string) + return True + except: + return False + + +def is_topological_token(string): + try: + _parse_topological_token(string) + return True + except: + return False + + +def _get_token_bound(token, comparison): + try: + s = _parse_stream_token(token) + return "%s %s %d" % ("stream_ordering", comparison, s) + except: + pass + + try: + top, stream = _parse_topological_token(token) + return "%s %s %d AND %s %s %d" % ( + "topological_ordering", comparison, top, + "stream_ordering", comparison, stream, + ) + except: + pass + + raise SynapseError(400, "Invalid token") + + class StreamStore(SQLBaseStore): + @log_function + def get_room_events(self, user_id, from_key, to_key, room_id, limit=0, + direction='f', with_feedback=False): + # We deal with events request in two different ways depending on if + # this looks like an /events request or a pagination request. + is_events = ( + direction == 'f' + and user_id + and is_stream_token(from_key) + and to_key and is_stream_token(to_key) + ) - def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0, - with_feedback=False): - """Get all messages for this user between the given keys. - - Args: - user_id (str): The user who is requesting messages. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - room_id (str): Gets messages only for this room. Can be None, in - which case all room messages will be returned. - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - if with_feedback and room_id: # with fb MUST specify a room ID - return self._db_pool.runInteraction( - self._get_message_rows_with_feedback, - user_id, from_key, to_key, room_id, limit + if is_events: + return self.get_room_events_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + room_id=room_id, + limit=limit, + with_feedback=with_feedback, ) else: - return self._db_pool.runInteraction( - self._get_message_rows, - user_id, from_key, to_key, room_id, limit + return self.paginate_room_events( + from_key=from_key, + to_key=to_key, + room_id=room_id, + limit=limit, + with_feedback=with_feedback, ) - def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the messages table, bounded by the specified pkeys + @defer.inlineCallbacks + @log_function + def get_room_events_stream(self, user_id, from_key, to_key, room_id, + limit=0, with_feedback=False): + # TODO (erikj): Handle compressed feedback - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ("SELECT messages.* FROM messages WHERE ? IN" - + " (SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] + current_room_membership_sql = ( + "SELECT m.room_id FROM room_memberships as m " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " + "WHERE m.user_id = ?" + ) - if room_id: - query += " AND messages.room_id=?" - query_args.append(room_id) + # We also want to get any membership events about that user, e.g. + # invites or leave notifications. + membership_sql = ( + "SELECT m.event_id FROM room_memberships as m " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " + "WHERE m.user_id = ? " + ) - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, limit=limit + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = MAX_STREAM_SIZE + + # From and to keys should be integers from ordering. + from_id = _parse_stream_token(from_key) + to_id = _parse_stream_token(to_key) + + if from_key == to_key: + defer.returnValue(([], to_key)) + return + + sql = ( + "SELECT * FROM events as e WHERE " + "((room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s))) " + "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "AND e.outlier = 0 " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " + ) % { + "current": current_room_membership_sql, + "invites": membership_sql, + "limit": limit + } + + rows = yield self._execute_and_decode( + sql, + user_id, user_id, from_id, to_id ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, MessagesTable, from_pkey) + ret = [self._parse_event_from_row(r) for r in rows] - def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey, - room_id, limit): - # this col represents the compressed feedback JSON as per spec - compressed_feedback_col = ( - "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id" - + " || '\",\"feedback_type\":\"' || f.feedback_type" - + " || '\",\"content\":' || f.content || '}') || ']'" - ) + if rows: + key = "s%d" % max([r["stream_ordering"] for r in rows]) + else: + # Assume we didn't get anything because there was nothing to get. + key = to_key + + defer.returnValue((ret, key)) - global_msg_id_join = ("f.room_id = messages.room_id" - + " and f.msg_id = messages.msg_id" - + " and messages.user_id = f.msg_sender_id") + @defer.inlineCallbacks + @log_function + def paginate_room_events(self, room_id, from_key, to_key=None, + direction='b', limit=-1, + with_feedback=False): + # TODO (erikj): Handle compressed feedback - select_query = ( - "SELECT messages.*, f.content AS fb_content, f.fb_sender_id" - + ", " + compressed_feedback_col + " AS compressed_fb" - + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join) + from_comp = '<' if direction =='b' else '>' + to_comp = '>' if direction =='b' else '<' + order = "DESC" if direction == 'b' else "ASC" - current_membership_sub_query = ( - "(SELECT membership from room_memberships rm" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") + args = [room_id] - where = (" WHERE ? IN " + current_membership_sub_query - + " AND messages.room_id=?") + bounds = _get_token_bound(from_key, from_comp) + if to_key: + bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp)) + + if int(limit) > 0: + args.append(int(limit)) + limit_str = " LIMIT ?" + else: + limit_str = "" - query = select_query + where - query_args = ["join", user_id, room_id] + sql = ( + "SELECT * FROM events " + "WHERE outlier = 0 AND room_id = ? AND %(bounds)s " + "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " + ) % {"bounds": bounds, "order": order, "limit": limit_str} - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, - limit=limit, group_by=" GROUP BY messages.id " + rows = yield self._execute_and_decode( + sql, + *args ) - cursor = txn.execute(query, query_args) - - # convert the result set into events - entries = self.cursor_to_dict(cursor) - events = [] - for entry in entries: - # TODO we should spec the cursor > event mapping somewhere else. - event = {} - straight_mappings = ["msg_id", "user_id", "room_id"] - for key in straight_mappings: - event[key] = entry[key] - event["content"] = json.loads(entry["content"]) - if entry["compressed_fb"]: - event["feedback"] = json.loads(entry["compressed_fb"]) - events.append(event) - - latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"] - - return (events, latest_pkey) - - def get_room_member_stream(self, user_id, from_key, to_key): - """Get all room membership events for this user between the given keys. - - Args: - user_id (str): The user who is requesting membership events. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - return self._db_pool.runInteraction( - self._get_room_member_rows, user_id, from_key, to_key + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + next_token = "t%s-%s" % (topo, toke) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + defer.returnValue( + ( + [self._parse_event_from_row(r) for r in rows], + next_token + ) ) - def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey): - # get all room membership events for rooms which the user is - # *currently* joined in on, or all invite events for this user. - current_membership_sub_query = ( - "(SELECT membership FROM room_memberships" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") - - query = ("SELECT rm.* FROM room_memberships rm " - # all membership events for rooms you've currently joined. - + " WHERE (? IN " + current_membership_sub_query - # all invite membership events for this user - + " OR rm.membership=? AND user_id=?)" - + " AND rm.id > ?") - query_args = ["join", user_id, "invite", user_id, from_pkey] - - if to_pkey != -1: - query += " AND rm.id < ?" - query_args.append(to_pkey) - - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomMemberTable, from_pkey) - - def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0): - return self._db_pool.runInteraction( - self._get_feedback_rows, - user_id, from_key, to_key, room_id, limit + @defer.inlineCallbacks + def get_recent_events_for_room(self, room_id, limit, end_token, + with_feedback=False): + # TODO (erikj): Handle compressed feedback + + sql = ( + "SELECT * FROM events " + "WHERE room_id = ? AND stream_ordering <= ? " + "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? " ) - def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys - - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT feedback.* FROM feedback WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] - - if room_id: - query += " AND feedback.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "feedback", query, query_args, from_pkey, to_pkey, limit=limit + rows = yield self._execute_and_decode( + sql, + room_id, end_token, limit ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, FeedbackTable, from_pkey) + rows.reverse() # As we selected with reverse ordering + + if rows: + topo = rows[0]["topological_ordering"] + toke = rows[0]["stream_ordering"] + start_token = "t%s-%s" % (topo, toke) - def get_room_data_stream(self, user_id, from_key, to_key, room_id, - limit=0): - return self._db_pool.runInteraction( - self._get_room_data_rows, - user_id, from_key, to_key, room_id, limit + token = (start_token, end_token) + else: + token = (end_token, end_token) + + defer.returnValue( + ( + [self._parse_event_from_row(r) for r in rows], + token + ) ) - def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys - - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT room_data.* FROM room_data WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] - - if room_id: - query += " AND room_data.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "room_data", query, query_args, from_pkey, to_pkey, limit=limit + @defer.inlineCallbacks + def get_room_events_max_id(self): + res = yield self._execute_and_decode( + "SELECT MAX(stream_ordering) as m FROM events" ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomDataTable, from_pkey) - - def _append_stream_operations(self, table_name, query, query_args, - from_pkey, to_pkey, limit=None, - group_by=""): - LATEST_ROW = -1 - order_by = "" - if to_pkey > from_pkey: - if from_pkey != LATEST_ROW: - # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9 - query += (" AND %s.id > ? AND %s.id < ?" % - (table_name, table_name)) - query_args.append(from_pkey) - query_args.append(to_pkey) - else: - # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC - query += " AND %s.id > ? " % table_name - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - elif from_pkey > to_pkey: - if to_pkey != LATEST_ROW: - # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC - query += (" AND %s.id > ? AND %s.id < ? " % - (table_name, table_name)) - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - query_args.append(from_pkey) - else: - # from=5 to=-1 >> from 5 to now >> id>5 - query += " AND %s.id > ?" % table_name - query_args.append(from_pkey) - - query += group_by + order_by - - if limit and limit > 0: - query += " LIMIT ?" - query_args.append(str(limit)) - - return (query, query_args) - - def _as_events(self, cursor, table, from_pkey): - data_entries = table.decode_results(cursor) - last_pkey = from_pkey - if data_entries: - last_pkey = data_entries[-1].id - - events = [ - entry.as_event(self.event_factory).get_dict() - for entry in data_entries - ] - - return (events, last_pkey) + logger.debug("get_room_events_max_id: %s", res) + + if not res or not res[0] or not res[0]["m"]: + defer.returnValue("s1") + return + + key = res[0]["m"] + 1 + defer.returnValue("s%d" % (key,)) |