summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py315
1 files changed, 83 insertions, 232 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 47a1f2c45a..cf4b1682b6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -13,267 +13,118 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from ._base import SQLBaseStore
-from .message import MessagesTable
-from .feedback import FeedbackTable
-from .roomdata import RoomDataTable
-from .roommember import RoomMemberTable
+
+from synapse.api.constants import Membership
 
 import json
 import logging
 
-logger = logging.getLogger(__name__)
-
-
-class StreamStore(SQLBaseStore):
-
-    def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0,
-                           with_feedback=False):
-        """Get all messages for this user between the given keys.
 
-        Args:
-            user_id (str): The user who is requesting messages.
-            from_key (int): The ID to start returning results from (exclusive).
-            to_key (int): The ID to stop returning results (exclusive).
-            room_id (str): Gets messages only for this room. Can be None, in
-            which case all room messages will be returned.
-        Returns:
-            A tuple of rows (list of namedtuples), new_id(int)
-        """
-        if with_feedback and room_id:  # with fb MUST specify a room ID
-            return self._db_pool.runInteraction(
-                self._get_message_rows_with_feedback,
-                user_id, from_key, to_key, room_id, limit
-            )
-        else:
-            return self._db_pool.runInteraction(
-                self._get_message_rows,
-                user_id, from_key, to_key, room_id, limit
-            )
+logger = logging.getLogger(__name__)
 
-    def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                          limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the messages table, bounded by the specified pkeys
 
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = ("SELECT messages.* FROM messages WHERE ? IN"
-                 + " (SELECT membership from room_memberships WHERE user_id=?"
-                 + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
+MAX_STREAM_SIZE = 1000
 
-        if room_id:
-            query += " AND messages.room_id=?"
-            query_args.append(room_id)
 
-        (query, query_args) = self._append_stream_operations(
-            "messages", query, query_args, from_pkey, to_pkey, limit=limit
-        )
+class StreamStore(SQLBaseStore):
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, MessagesTable, from_pkey)
+    @defer.inlineCallbacks
+    def get_room_events_stream(self, user_id, from_key, to_key, room_id,
+                               limit=0, with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
 
-    def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey,
-                                        room_id, limit):
-        # this col represents the compressed feedback JSON as per spec
-        compressed_feedback_col = (
-            "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id"
-            + " || '\",\"feedback_type\":\"' || f.feedback_type"
-            + " || '\",\"content\":' || f.content || '}') || ']'"
+        current_room_membership_sql = (
+            "SELECT m.room_id FROM room_memberships as m "
+            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+            "WHERE m.user_id = ?"
         )
 
-        global_msg_id_join = ("f.room_id = messages.room_id"
-                              + " and f.msg_id = messages.msg_id"
-                              + " and messages.user_id = f.msg_sender_id")
-
-        select_query = (
-            "SELECT messages.*, f.content AS fb_content, f.fb_sender_id"
-            + ", " + compressed_feedback_col + " AS compressed_fb"
-            + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join)
-
-        current_membership_sub_query = (
-            "(SELECT membership from room_memberships rm"
-            + " WHERE user_id=? AND room_id = rm.room_id"
-            + " ORDER BY id DESC LIMIT 1)")
-
-        where = (" WHERE ? IN " + current_membership_sub_query
-                 + " AND messages.room_id=?")
-
-        query = select_query + where
-        query_args = ["join", user_id, room_id]
-
-        (query, query_args) = self._append_stream_operations(
-            "messages", query, query_args, from_pkey, to_pkey,
-            limit=limit, group_by=" GROUP BY messages.id "
+        invites_sql = (
+            "SELECT m.event_id FROM room_memberships as m "
+            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+            "WHERE m.user_id = ? AND m.membership = ?"
         )
 
-        cursor = txn.execute(query, query_args)
-
-        # convert the result set into events
-        entries = self.cursor_to_dict(cursor)
-        events = []
-        for entry in entries:
-            # TODO we should spec the cursor > event mapping somewhere else.
-            event = {}
-            straight_mappings = ["msg_id", "user_id", "room_id"]
-            for key in straight_mappings:
-                event[key] = entry[key]
-            event["content"] = json.loads(entry["content"])
-            if entry["compressed_fb"]:
-                event["feedback"] = json.loads(entry["compressed_fb"])
-            events.append(event)
-
-        latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"]
-
-        return (events, latest_pkey)
-
-    def get_room_member_stream(self, user_id, from_key, to_key):
-        """Get all room membership events for this user between the given keys.
-
-        Args:
-            user_id (str): The user who is requesting membership events.
-            from_key (int): The ID to start returning results from (exclusive).
-            to_key (int): The ID to stop returning results (exclusive).
-        Returns:
-            A tuple of rows (list of namedtuples), new_id(int)
-        """
-        return self._db_pool.runInteraction(
-            self._get_room_member_rows, user_id, from_key, to_key
+        if limit:
+            limit = max(limit, MAX_STREAM_SIZE)
+        else:
+            limit = MAX_STREAM_SIZE
+
+        # From and to keys should be integers from ordering.
+        from_key = int(from_key)
+        to_key = int(to_key)
+
+        if from_key == to_key:
+            defer.returnValue(([], to_key))
+            return
+
+        sql = (
+            "SELECT * FROM events as e WHERE "
+            "((room_id IN (%(current)s)) OR "
+            "(event_id IN (%(invites)s))) "
+        ) % {
+            "current": current_room_membership_sql,
+            "invites": invites_sql,
+        }
+
+        # Constraints and ordering depend on direction.
+        if from_key < to_key:
+            sql += (
+                "AND e.ordering > ? AND e.ordering < ? "
+                "ORDER BY ordering ASC LIMIT %(limit)d "
+            ) % {"limit": limit}
+        else:
+            sql += (
+                "AND e.ordering < ? AND e.ordering > ? "
+                "ORDER BY ordering DESC LIMIT %(limit)d "
+            ) % {"limit": int(limit)}
+
+        rows = yield self._execute_and_decode(
+            sql,
+            user_id, user_id, Membership.INVITE, from_key, to_key
         )
 
-    def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey):
-        # get all room membership events for rooms which the user is
-        # *currently* joined in on, or all invite events for this user.
-        current_membership_sub_query = (
-            "(SELECT membership FROM room_memberships"
-            + " WHERE user_id=? AND room_id = rm.room_id"
-            + " ORDER BY id DESC LIMIT 1)")
-
-        query = ("SELECT rm.* FROM room_memberships rm "
-                 # all membership events for rooms you've currently joined.
-                 + " WHERE (? IN " + current_membership_sub_query
-                 # all invite membership events for this user
-                 + " OR rm.membership=? AND user_id=?)"
-                 + " AND rm.id > ?")
-        query_args = ["join", user_id, "invite", user_id, from_pkey]
-
-        if to_pkey != -1:
-            query += " AND rm.id < ?"
-            query_args.append(to_pkey)
-
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, RoomMemberTable, from_pkey)
+        ret = [self._parse_event_from_row(r) for r in rows]
 
-    def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0):
-        return self._db_pool.runInteraction(
-            self._get_feedback_rows,
-            user_id, from_key, to_key, room_id, limit
-        )
-
-    def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                           limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the feedback table, bounded by the specified pkeys
+        if rows:
+            if from_key < to_key:
+                key = max([r["ordering"] for r in rows])
+            else:
+                key = min([r["ordering"] for r in rows])
+        else:
+            key = to_key
 
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = (
-            "SELECT feedback.* FROM feedback WHERE ? IN "
-            + "(SELECT membership from room_memberships WHERE user_id=?"
-            + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
+        defer.returnValue((ret, key))
 
-        if room_id:
-            query += " AND feedback.room_id=?"
-            query_args.append(room_id)
+    @defer.inlineCallbacks
+    def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
 
-        (query, query_args) = self._append_stream_operations(
-            "feedback", query, query_args, from_pkey, to_pkey, limit=limit
+        sql = (
+            "SELECT * FROM events WHERE room_id = ? "
+            "ORDER BY ordering DESC LIMIT ? "
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, FeedbackTable, from_pkey)
-
-    def get_room_data_stream(self, user_id, from_key, to_key, room_id,
-                             limit=0):
-        return self._db_pool.runInteraction(
-            self._get_room_data_rows,
-            user_id, from_key, to_key, room_id, limit
+        rows = yield self._execute_and_decode(
+            sql,
+            room_id, limit
         )
 
-    def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                            limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the feedback table, bounded by the specified pkeys
+        rows.reverse()  # As we selected with reverse ordering
 
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = (
-            "SELECT room_data.* FROM room_data WHERE ? IN "
-            + "(SELECT membership from room_memberships WHERE user_id=?"
-            + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
+        defer.returnValue([self._parse_event_from_row(r) for r in rows])
 
-        if room_id:
-            query += " AND room_data.room_id=?"
-            query_args.append(room_id)
-
-        (query, query_args) = self._append_stream_operations(
-            "room_data", query, query_args, from_pkey, to_pkey, limit=limit
+    @defer.inlineCallbacks
+    def get_room_events_max_id(self):
+        res = yield self._execute_and_decode(
+            "SELECT MAX(ordering) as m FROM events"
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, RoomDataTable, from_pkey)
-
-    def _append_stream_operations(self, table_name, query, query_args,
-                                  from_pkey, to_pkey, limit=None,
-                                  group_by=""):
-        LATEST_ROW = -1
-        order_by = ""
-        if to_pkey > from_pkey:
-            if from_pkey != LATEST_ROW:
-                # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9
-                query += (" AND %s.id > ? AND %s.id < ?" %
-                         (table_name, table_name))
-                query_args.append(from_pkey)
-                query_args.append(to_pkey)
-            else:
-                # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC
-                query += " AND %s.id > ? " % table_name
-                order_by = "ORDER BY id DESC"
-                query_args.append(to_pkey)
-        elif from_pkey > to_pkey:
-            if to_pkey != LATEST_ROW:
-                # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC
-                query += (" AND %s.id > ? AND %s.id < ? " %
-                          (table_name, table_name))
-                order_by = "ORDER BY id DESC"
-                query_args.append(to_pkey)
-                query_args.append(from_pkey)
-            else:
-                # from=5 to=-1 >> from 5 to now >> id>5
-                query += " AND %s.id > ?" % table_name
-                query_args.append(from_pkey)
-
-        query += group_by + order_by
-
-        if limit and limit > 0:
-            query += " LIMIT ?"
-            query_args.append(str(limit))
-
-        return (query, query_args)
-
-    def _as_events(self, cursor, table, from_pkey):
-        data_entries = table.decode_results(cursor)
-        last_pkey = from_pkey
-        if data_entries:
-            last_pkey = data_entries[-1].id
-
-        events = [
-            entry.as_event(self.event_factory).get_dict()
-            for entry in data_entries
-        ]
+        if not res:
+            defer.returnValue(0)
+            return
 
-        return (events, last_pkey)
+        defer.returnValue(res[0]["m"])