summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-08-15 13:58:28 +0100
committerErik Johnston <erik@matrix.org>2014-08-15 13:58:28 +0100
commit114984a2361ee41005a769f6dc127c470ee08aee (patch)
treea295be465b71cbc048260b714e9fd042b4accb45
parentMerge branch 'master' of github.com:matrix-org/synapse into sql_refactor (diff)
downloadsynapse-114984a2361ee41005a769f6dc127c470ee08aee.tar.xz
Start chagning the events stream to work with the new DB schema
-rw-r--r--synapse/api/streams/event.py77
-rw-r--r--synapse/handlers/events.py8
-rw-r--r--synapse/handlers/room.py79
-rw-r--r--synapse/storage/__init__.py10
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/roommember.py8
-rw-r--r--synapse/storage/schema/im.sql5
-rw-r--r--synapse/storage/stream.py31
8 files changed, 102 insertions, 118 deletions
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 4b6d739e54..427363cad4 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -28,17 +28,17 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class MessagesStreamData(StreamData):
-    EVENT_TYPE = MessageEvent.TYPE
+class EventsStreamData(StreamData):
+    EVENT_TYPE = "EventsStream"
 
     def __init__(self, hs, room_id=None, feedback=False):
-        super(MessagesStreamData, self).__init__(hs)
+        super(EventsStreamData, self).__init__(hs)
         self.room_id = room_id
         self.with_feedback = feedback
 
     @defer.inlineCallbacks
     def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_message_stream(
+        data, latest_ver = yield self.store.get_room_events_stream(
             user_id=user_id,
             from_key=from_key,
             to_key=to_key,
@@ -50,74 +50,7 @@ class MessagesStreamData(StreamData):
 
     @defer.inlineCallbacks
     def max_token(self):
-        val = yield self.store.get_max_message_id()
-        defer.returnValue(val)
-
-
-class RoomMemberStreamData(StreamData):
-    EVENT_TYPE = RoomMemberEvent.TYPE
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_room_member_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key
-        )
-
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_room_member_id()
-        defer.returnValue(val)
-
-
-class FeedbackStreamData(StreamData):
-    EVENT_TYPE = FeedbackEvent.TYPE
-
-    def __init__(self, hs, room_id=None):
-        super(FeedbackStreamData, self).__init__(hs)
-        self.room_id = room_id
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_feedback_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key,
-            limit=limit,
-            room_id=self.room_id
-        )
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_feedback_id()
-        defer.returnValue(val)
-
-
-class RoomDataStreamData(StreamData):
-    EVENT_TYPE = RoomTopicEvent.TYPE  # TODO need multiple event types
-
-    def __init__(self, hs, room_id=None):
-        super(RoomDataStreamData, self).__init__(hs)
-        self.room_id = room_id
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_room_data_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key,
-            limit=limit,
-            room_id=self.room_id
-        )
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_room_data_id()
+        val = yield self.store.get_room_events_max_id()
         defer.returnValue(val)
 
 
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.api.streams.event import (
-    EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
-    RoomDataStreamData
+    EventStream, EventsStreamData
 )
 from synapse.handlers.presence import PresenceStreamData
 
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
 class EventStreamHandler(BaseHandler):
 
     stream_data_classes = [
-        MessagesStreamData,
-        RoomMemberStreamData,
-        FeedbackStreamData,
-        RoomDataStreamData,
+        EventsStreamData,
         PresenceStreamData,
     ]
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 432d13982a..3451250008 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.api.events.room import (
     RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
     RoomConfigEvent
 )
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
 from synapse.util import stringutils
 from ._base import BaseHandler
 
@@ -97,30 +97,30 @@ class MessageHandler(BaseHandler):
             self.notifier.on_new_room_event(event, store_id)
 
         yield self.hs.get_federation().handle_new_event(event)
-
-    @defer.inlineCallbacks
-    def get_messages(self, user_id=None, room_id=None, pagin_config=None,
-                     feedback=False):
-        """Get messages in a room.
-
-        Args:
-            user_id (str): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-            config rules to apply, if any.
-            feedback (bool): True to get compressed feedback with the messages
-        Returns:
-            dict: Pagination API results
-        """
-        yield self.auth.check_joined_room(room_id, user_id)
-
-        data_source = [MessagesStreamData(self.hs, room_id=room_id,
-                                          feedback=feedback)]
-        event_stream = EventStream(user_id, data_source)
-        pagin_config = yield event_stream.fix_tokens(pagin_config)
-        data_chunk = yield event_stream.get_chunk(config=pagin_config)
-        defer.returnValue(data_chunk)
-
+#
+#    @defer.inlineCallbacks
+#    def get_messages(self, user_id=None, room_id=None, pagin_config=None,
+#                     feedback=False):
+#        """Get messages in a room.
+#
+#        Args:
+#            user_id (str): The user requesting messages.
+#            room_id (str): The room they want messages from.
+#            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+#            config rules to apply, if any.
+#            feedback (bool): True to get compressed feedback with the messages
+#        Returns:
+#            dict: Pagination API results
+#        """
+#        yield self.auth.check_joined_room(room_id, user_id)
+#
+#        data_source = [MessagesStreamData(self.hs, room_id=room_id,
+#                                          feedback=feedback)]
+#        event_stream = EventStream(user_id, data_source)
+#        pagin_config = yield event_stream.fix_tokens(pagin_config)
+#        data_chunk = yield event_stream.get_chunk(config=pagin_config)
+#        defer.returnValue(data_chunk)
+#
     @defer.inlineCallbacks
     def store_room_data(self, event=None, stamp_event=True):
         """ Stores data for a room.
@@ -251,20 +251,27 @@ class MessageHandler(BaseHandler):
             user_id=user_id,
             membership_list=[Membership.INVITE, Membership.JOIN]
         )
-        for room_info in room_list:
-            if room_info["membership"] != Membership.JOIN:
+
+        ret = []
+
+        for event in room_list:
+            d = event.get_dict()
+            ret.append(d)
+
+            if event.membership != Membership.JOIN:
                 continue
             try:
-                event_chunk = yield self.get_messages(
-                    user_id=user_id,
-                    pagin_config=pagin_config,
-                    feedback=feedback,
-                    room_id=room_info["room_id"]
+                messages = yield self.store.get_recent_events_for_room(
+                    event.room_id,
+                    limit=50,
                 )
-                room_info["messages"] = event_chunk
+                d["messages"] = [m.get_dict() for m in messages]
             except:
                 pass
-        defer.returnValue(room_list)
+
+        logger.debug("snapshot_all_rooms returning: %s", ret)
+
+        defer.returnValue(ret)
 
 
 class RoomCreationHandler(BaseHandler):
@@ -442,7 +449,7 @@ class RoomMemberHandler(BaseHandler):
 
         member_list = yield self.store.get_room_members(room_id=room_id)
         event_list = [
-            entry.as_event(self.event_factory).get_dict()
+            entry.get_dict()
             for entry in member_list
         ]
         chunk_data = {
@@ -685,7 +692,7 @@ class RoomMemberHandler(BaseHandler):
             user_id=user.to_string(), membership_list=membership_list
         )
 
-        defer.returnValue([r["room_id"] for r in rooms])
+        defer.returnValue([r.room_id for r in rooms])
 
     @defer.inlineCallbacks
     def _do_local_membership_update(self, event, membership, broadcast_msg):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f62cee3c39..46b9dbcbbf 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -104,7 +104,15 @@ class DataStore(RoomMemberStore, RoomStore,
 
             yield self._simple_insert("state_events", vals)
 
-            # TODO (erikj): We also need to update the current state table?
+            yield self._simple_insert(
+                "current_state_events",
+                {
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "type": event.type,
+                    "state_key": event.state_key,
+                }
+            )
 
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c26e9a0f98..413838f798 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -293,7 +293,7 @@ class SQLBaseStore(object):
 
     def _parse_event_from_row(self, row_dict):
         d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
-        d.update(json.loads(json.loads(row_dict["unrecognized_keys"])))
+        d.update(json.loads(row_dict["unrecognized_keys"]))
         d["content"] = json.loads(d["content"])
         del d["unrecognized_keys"]
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8c4b04f190..a0620677b9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -63,6 +63,7 @@ class RoomMemberStore(SQLBaseStore):
             yield self._execute(None, sql, event.room_id, domain)
 
 
+    @defer.inlineCallbacks
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
 
@@ -72,11 +73,13 @@ class RoomMemberStore(SQLBaseStore):
         Returns:
             Deferred: Results in a MembershipEvent or None.
         """
-        return self._get_members_by_dict({
+        rows = yield self._get_members_by_dict({
             "e.room_id": room_id,
             "m.user_id": user_id,
         })
 
+        defer.returnValue(rows[0] if rows else None)
+
     def get_room_members(self, room_id, membership=None):
         """Retrieve the current room member list for a room.
 
@@ -142,5 +145,8 @@ class RoomMemberStore(SQLBaseStore):
         ) % (where_clause,)
 
         rows = yield self._execute_and_decode(sql, *where_values)
+
+        logger.debug("_get_members_query Got rows %s", rows)
+
         results = [self._parse_event_from_row(r) for r in rows]
         defer.returnValue(results)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 9a0f2145d5..2452890ea4 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -32,7 +32,10 @@ CREATE TABLE IF NOT EXISTS state_events(
 
 CREATE TABLE IF NOT EXISTS current_state_events(
     event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
 );
 
 CREATE TABLE IF NOT EXISTS room_memberships(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 9937239c22..c5c3770a40 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -34,6 +34,7 @@ class StreamStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
                                limit=0, with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
 
         current_room_membership_sql = (
             "SELECT m.room_id FROM room_memberships as m "
@@ -69,3 +70,33 @@ class StreamStore(SQLBaseStore):
         )
 
         defer.returnValue([self._parse_event_from_row(r) for r in rows])
+
+    @defer.inlineCallbacks
+    def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
+
+        sql = (
+            "SELECT * FROM events WHERE room_id = ? "
+            "ORDER BY ordering DESC LIMIT ? "
+        )
+
+        rows = yield self._execute_and_decode(
+            sql,
+            room_id, limit
+        )
+
+        rows.reverse()  # As we selected with reverse ordering
+
+        defer.returnValue([self._parse_event_from_row(r) for r in rows])
+
+    @defer.inlineCallbacks
+    def get_room_events_max_id(self):
+        res = yield self._execute_and_decode(
+            "SELECT MAX(ordering) as m FROM events"
+        )
+
+        if not res:
+            defer.returnValue(0)
+            return
+
+        defer.returnValue(res[0]["m"])