diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 47a1f2c45a..87ae961ccd 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -13,267 +13,287 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+""" This module is responsible for getting events from the DB for pagination
+and event streaming.
+
+The order it returns events in depend on whether we are streaming forwards or
+are paginating backwards. We do this because we want to handle out of order
+messages nicely, while still returning them in the correct order when we
+paginate bacwards.
+
+This is implemented by keeping two ordering columns: stream_ordering and
+topological_ordering. Stream ordering is basically insertion/received order
+(except for events from backfill requests). The topolgical_ordering is a
+weak ordering of events based on the pdu graph.
+
+This means that we have to have two different types of tokens, depending on
+what sort order was used:
+ - stream tokens are of the form: "s%d", which maps directly to the column
+ - topological tokems: "t%d-%d", where the integers map to the topological
+ and stream ordering columns respectively.
+"""
+
+from twisted.internet import defer
from ._base import SQLBaseStore
-from .message import MessagesTable
-from .feedback import FeedbackTable
-from .roomdata import RoomDataTable
-from .roommember import RoomMemberTable
+from synapse.api.errors import SynapseError
+from synapse.api.constants import Membership
+from synapse.util.logutils import log_function
import json
import logging
+
logger = logging.getLogger(__name__)
+MAX_STREAM_SIZE = 1000
+
+
+_STREAM_TOKEN = "stream"
+_TOPOLOGICAL_TOKEN = "topological"
+
+
+def _parse_stream_token(string):
+ try:
+ if string[0] != 's':
+ raise
+ return int(string[1:])
+ except:
+ raise SynapseError(400, "Invalid token")
+
+
+def _parse_topological_token(string):
+ try:
+ if string[0] != 't':
+ raise
+ parts = string[1:].split('-', 1)
+ return (int(parts[0]), int(parts[1]))
+ except:
+ raise SynapseError(400, "Invalid token")
+
+
+def is_stream_token(string):
+ try:
+ _parse_stream_token(string)
+ return True
+ except:
+ return False
+
+
+def is_topological_token(string):
+ try:
+ _parse_topological_token(string)
+ return True
+ except:
+ return False
+
+
+def _get_token_bound(token, comparison):
+ try:
+ s = _parse_stream_token(token)
+ return "%s %s %d" % ("stream_ordering", comparison, s)
+ except:
+ pass
+
+ try:
+ top, stream = _parse_topological_token(token)
+ return "%s %s %d AND %s %s %d" % (
+ "topological_ordering", comparison, top,
+ "stream_ordering", comparison, stream,
+ )
+ except:
+ pass
+
+ raise SynapseError(400, "Invalid token")
+
+
class StreamStore(SQLBaseStore):
+ @log_function
+ def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
+ direction='f', with_feedback=False):
+ # We deal with events request in two different ways depending on if
+ # this looks like an /events request or a pagination request.
+ is_events = (
+ direction == 'f'
+ and user_id
+ and is_stream_token(from_key)
+ and to_key and is_stream_token(to_key)
+ )
- def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0,
- with_feedback=False):
- """Get all messages for this user between the given keys.
-
- Args:
- user_id (str): The user who is requesting messages.
- from_key (int): The ID to start returning results from (exclusive).
- to_key (int): The ID to stop returning results (exclusive).
- room_id (str): Gets messages only for this room. Can be None, in
- which case all room messages will be returned.
- Returns:
- A tuple of rows (list of namedtuples), new_id(int)
- """
- if with_feedback and room_id: # with fb MUST specify a room ID
- return self._db_pool.runInteraction(
- self._get_message_rows_with_feedback,
- user_id, from_key, to_key, room_id, limit
+ if is_events:
+ return self.get_room_events_stream(
+ user_id=user_id,
+ from_key=from_key,
+ to_key=to_key,
+ room_id=room_id,
+ limit=limit,
+ with_feedback=with_feedback,
)
else:
- return self._db_pool.runInteraction(
- self._get_message_rows,
- user_id, from_key, to_key, room_id, limit
+ return self.paginate_room_events(
+ from_key=from_key,
+ to_key=to_key,
+ room_id=room_id,
+ limit=limit,
+ with_feedback=with_feedback,
)
- def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the messages table, bounded by the specified pkeys
+ @defer.inlineCallbacks
+ @log_function
+ def get_room_events_stream(self, user_id, from_key, to_key, room_id,
+ limit=0, with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = ("SELECT messages.* FROM messages WHERE ? IN"
- + " (SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
+ current_room_membership_sql = (
+ "SELECT m.room_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+ "WHERE m.user_id = ?"
+ )
- if room_id:
- query += " AND messages.room_id=?"
- query_args.append(room_id)
+ # We also want to get any membership events about that user, e.g.
+ # invites or leave notifications.
+ membership_sql = (
+ "SELECT m.event_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+ "WHERE m.user_id = ? "
+ )
- (query, query_args) = self._append_stream_operations(
- "messages", query, query_args, from_pkey, to_pkey, limit=limit
+ if limit:
+ limit = max(limit, MAX_STREAM_SIZE)
+ else:
+ limit = MAX_STREAM_SIZE
+
+ # From and to keys should be integers from ordering.
+ from_id = _parse_stream_token(from_key)
+ to_id = _parse_stream_token(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+ sql = (
+ "SELECT * FROM events as e WHERE "
+ "((room_id IN (%(current)s)) OR "
+ "(event_id IN (%(invites)s))) "
+ "AND e.stream_ordering > ? AND e.stream_ordering < ? "
+ "AND e.outlier = 0 "
+ "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+ ) % {
+ "current": current_room_membership_sql,
+ "invites": membership_sql,
+ "limit": limit
+ }
+
+ rows = yield self._execute_and_decode(
+ sql,
+ user_id, user_id, from_id, to_id
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, MessagesTable, from_pkey)
+ ret = [self._parse_event_from_row(r) for r in rows]
- def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey,
- room_id, limit):
- # this col represents the compressed feedback JSON as per spec
- compressed_feedback_col = (
- "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id"
- + " || '\",\"feedback_type\":\"' || f.feedback_type"
- + " || '\",\"content\":' || f.content || '}') || ']'"
- )
+ if rows:
+ key = "s%d" % max([r["stream_ordering"] for r in rows])
+ else:
+ # Assume we didn't get anything because there was nothing to get.
+ key = to_key
+
+ defer.returnValue((ret, key))
- global_msg_id_join = ("f.room_id = messages.room_id"
- + " and f.msg_id = messages.msg_id"
- + " and messages.user_id = f.msg_sender_id")
+ @defer.inlineCallbacks
+ @log_function
+ def paginate_room_events(self, room_id, from_key, to_key=None,
+ direction='b', limit=-1,
+ with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
- select_query = (
- "SELECT messages.*, f.content AS fb_content, f.fb_sender_id"
- + ", " + compressed_feedback_col + " AS compressed_fb"
- + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join)
+ from_comp = '<' if direction =='b' else '>'
+ to_comp = '>' if direction =='b' else '<'
+ order = "DESC" if direction == 'b' else "ASC"
- current_membership_sub_query = (
- "(SELECT membership from room_memberships rm"
- + " WHERE user_id=? AND room_id = rm.room_id"
- + " ORDER BY id DESC LIMIT 1)")
+ args = [room_id]
- where = (" WHERE ? IN " + current_membership_sub_query
- + " AND messages.room_id=?")
+ bounds = _get_token_bound(from_key, from_comp)
+ if to_key:
+ bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp))
+
+ if int(limit) > 0:
+ args.append(int(limit))
+ limit_str = " LIMIT ?"
+ else:
+ limit_str = ""
- query = select_query + where
- query_args = ["join", user_id, room_id]
+ sql = (
+ "SELECT * FROM events "
+ "WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
+ "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
+ ) % {"bounds": bounds, "order": order, "limit": limit_str}
- (query, query_args) = self._append_stream_operations(
- "messages", query, query_args, from_pkey, to_pkey,
- limit=limit, group_by=" GROUP BY messages.id "
+ rows = yield self._execute_and_decode(
+ sql,
+ *args
)
- cursor = txn.execute(query, query_args)
-
- # convert the result set into events
- entries = self.cursor_to_dict(cursor)
- events = []
- for entry in entries:
- # TODO we should spec the cursor > event mapping somewhere else.
- event = {}
- straight_mappings = ["msg_id", "user_id", "room_id"]
- for key in straight_mappings:
- event[key] = entry[key]
- event["content"] = json.loads(entry["content"])
- if entry["compressed_fb"]:
- event["feedback"] = json.loads(entry["compressed_fb"])
- events.append(event)
-
- latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"]
-
- return (events, latest_pkey)
-
- def get_room_member_stream(self, user_id, from_key, to_key):
- """Get all room membership events for this user between the given keys.
-
- Args:
- user_id (str): The user who is requesting membership events.
- from_key (int): The ID to start returning results from (exclusive).
- to_key (int): The ID to stop returning results (exclusive).
- Returns:
- A tuple of rows (list of namedtuples), new_id(int)
- """
- return self._db_pool.runInteraction(
- self._get_room_member_rows, user_id, from_key, to_key
+ if rows:
+ topo = rows[-1]["topological_ordering"]
+ toke = rows[-1]["stream_ordering"]
+ next_token = "t%s-%s" % (topo, toke)
+ else:
+ # TODO (erikj): We should work out what to do here instead.
+ next_token = to_key if to_key else from_key
+
+ defer.returnValue(
+ (
+ [self._parse_event_from_row(r) for r in rows],
+ next_token
+ )
)
- def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey):
- # get all room membership events for rooms which the user is
- # *currently* joined in on, or all invite events for this user.
- current_membership_sub_query = (
- "(SELECT membership FROM room_memberships"
- + " WHERE user_id=? AND room_id = rm.room_id"
- + " ORDER BY id DESC LIMIT 1)")
-
- query = ("SELECT rm.* FROM room_memberships rm "
- # all membership events for rooms you've currently joined.
- + " WHERE (? IN " + current_membership_sub_query
- # all invite membership events for this user
- + " OR rm.membership=? AND user_id=?)"
- + " AND rm.id > ?")
- query_args = ["join", user_id, "invite", user_id, from_pkey]
-
- if to_pkey != -1:
- query += " AND rm.id < ?"
- query_args.append(to_pkey)
-
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, RoomMemberTable, from_pkey)
-
- def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0):
- return self._db_pool.runInteraction(
- self._get_feedback_rows,
- user_id, from_key, to_key, room_id, limit
+ @defer.inlineCallbacks
+ def get_recent_events_for_room(self, room_id, limit, end_token,
+ with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
+
+ sql = (
+ "SELECT * FROM events "
+ "WHERE room_id = ? AND stream_ordering <= ? "
+ "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? "
)
- def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the feedback table, bounded by the specified pkeys
-
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = (
- "SELECT feedback.* FROM feedback WHERE ? IN "
- + "(SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
-
- if room_id:
- query += " AND feedback.room_id=?"
- query_args.append(room_id)
-
- (query, query_args) = self._append_stream_operations(
- "feedback", query, query_args, from_pkey, to_pkey, limit=limit
+ rows = yield self._execute_and_decode(
+ sql,
+ room_id, end_token, limit
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, FeedbackTable, from_pkey)
+ rows.reverse() # As we selected with reverse ordering
+
+ if rows:
+ topo = rows[0]["topological_ordering"]
+ toke = rows[0]["stream_ordering"]
+ start_token = "t%s-%s" % (topo, toke)
- def get_room_data_stream(self, user_id, from_key, to_key, room_id,
- limit=0):
- return self._db_pool.runInteraction(
- self._get_room_data_rows,
- user_id, from_key, to_key, room_id, limit
+ token = (start_token, end_token)
+ else:
+ token = (end_token, end_token)
+
+ defer.returnValue(
+ (
+ [self._parse_event_from_row(r) for r in rows],
+ token
+ )
)
- def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the feedback table, bounded by the specified pkeys
-
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = (
- "SELECT room_data.* FROM room_data WHERE ? IN "
- + "(SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
-
- if room_id:
- query += " AND room_data.room_id=?"
- query_args.append(room_id)
-
- (query, query_args) = self._append_stream_operations(
- "room_data", query, query_args, from_pkey, to_pkey, limit=limit
+ @defer.inlineCallbacks
+ def get_room_events_max_id(self):
+ res = yield self._execute_and_decode(
+ "SELECT MAX(stream_ordering) as m FROM events"
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, RoomDataTable, from_pkey)
-
- def _append_stream_operations(self, table_name, query, query_args,
- from_pkey, to_pkey, limit=None,
- group_by=""):
- LATEST_ROW = -1
- order_by = ""
- if to_pkey > from_pkey:
- if from_pkey != LATEST_ROW:
- # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9
- query += (" AND %s.id > ? AND %s.id < ?" %
- (table_name, table_name))
- query_args.append(from_pkey)
- query_args.append(to_pkey)
- else:
- # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC
- query += " AND %s.id > ? " % table_name
- order_by = "ORDER BY id DESC"
- query_args.append(to_pkey)
- elif from_pkey > to_pkey:
- if to_pkey != LATEST_ROW:
- # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC
- query += (" AND %s.id > ? AND %s.id < ? " %
- (table_name, table_name))
- order_by = "ORDER BY id DESC"
- query_args.append(to_pkey)
- query_args.append(from_pkey)
- else:
- # from=5 to=-1 >> from 5 to now >> id>5
- query += " AND %s.id > ?" % table_name
- query_args.append(from_pkey)
-
- query += group_by + order_by
-
- if limit and limit > 0:
- query += " LIMIT ?"
- query_args.append(str(limit))
-
- return (query, query_args)
-
- def _as_events(self, cursor, table, from_pkey):
- data_entries = table.decode_results(cursor)
- last_pkey = from_pkey
- if data_entries:
- last_pkey = data_entries[-1].id
-
- events = [
- entry.as_event(self.event_factory).get_dict()
- for entry in data_entries
- ]
-
- return (events, last_pkey)
+ logger.debug("get_room_events_max_id: %s", res)
+
+ if not res or not res[0] or not res[0]["m"]:
+ defer.returnValue("s1")
+ return
+
+ key = res[0]["m"] + 1
+ defer.returnValue("s%d" % (key,))
|