summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/events/factory.py10
-rw-r--r--synapse/api/events/room.py28
-rw-r--r--synapse/api/notifier.py3
-rw-r--r--synapse/api/streams/event.py79
-rw-r--r--synapse/handlers/events.py8
-rw-r--r--synapse/handlers/room.py80
-rw-r--r--synapse/rest/room.py39
-rw-r--r--synapse/state.py2
-rw-r--r--synapse/storage/__init__.py135
-rw-r--r--synapse/storage/_base.py28
-rw-r--r--synapse/storage/feedback.py72
-rw-r--r--synapse/storage/message.py81
-rw-r--r--synapse/storage/pdu.py6
-rw-r--r--synapse/storage/room.py97
-rw-r--r--synapse/storage/roomdata.py85
-rw-r--r--synapse/storage/roommember.py172
-rw-r--r--synapse/storage/schema/im.sql78
-rw-r--r--synapse/storage/stream.py315
18 files changed, 521 insertions, 797 deletions
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index 12aa04fc6e..b61dac7acd 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -15,7 +15,7 @@
 
 from synapse.api.events.room import (
     RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent,
-    InviteJoinEvent, RoomConfigEvent
+    InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent,
 )
 
 from synapse.util.stringutils import random_string
@@ -25,6 +25,7 @@ class EventFactory(object):
 
     _event_classes = [
         RoomTopicEvent,
+        RoomNameEvent,
         MessageEvent,
         RoomMemberEvent,
         FeedbackEvent,
@@ -42,10 +43,9 @@ class EventFactory(object):
         if "event_id" not in kwargs:
             kwargs["event_id"] = random_string(10)
 
-        try:
+        if etype in self._event_list:
             handler = self._event_list[etype]
-        except KeyError:  # unknown event type
-            # TODO allow custom event types.
-            raise NotImplementedError("Unknown etype=%s" % etype)
+        else:
+            handler = GenericEvent
 
         return handler(**kwargs)
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
index f3df849af2..dbf537fb88 100644
--- a/synapse/api/events/room.py
+++ b/synapse/api/events/room.py
@@ -16,17 +16,45 @@
 from . import SynapseEvent
 
 
+class GenericEvent(SynapseEvent):
+    def get_content_template(self):
+        return {}
+
+
 class RoomTopicEvent(SynapseEvent):
     TYPE = "m.room.topic"
 
+    internal_keys = SynapseEvent.internal_keys + [
+        "topic",
+    ]
+
     def __init__(self, **kwargs):
         kwargs["state_key"] = ""
+        if "topic" in kwargs["content"]:
+            kwargs["topic"] = kwargs["content"]["topic"]
         super(RoomTopicEvent, self).__init__(**kwargs)
 
     def get_content_template(self):
         return {"topic": u"string"}
 
 
+class RoomNameEvent(SynapseEvent):
+    TYPE = "m.room.name"
+
+    internal_keys = SynapseEvent.internal_keys + [
+        "name",
+    ]
+
+    def __init__(self, **kwargs):
+        kwargs["state_key"] = ""
+        if "name" in kwargs["content"]:
+            kwargs["name"] = kwargs["content"]["name"]
+        super(RoomNameEvent, self).__init__(**kwargs)
+
+    def get_content_template(self):
+        return {"name": u"string"}
+
+
 class RoomMemberEvent(SynapseEvent):
     TYPE = "m.room.member"
 
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
index 65b5a4ebb3..9f622df6bb 100644
--- a/synapse/api/notifier.py
+++ b/synapse/api/notifier.py
@@ -15,6 +15,7 @@
 
 from synapse.api.constants import Membership
 from synapse.api.events.room import RoomMemberEvent
+from synapse.api.streams.event import EventsStreamData
 
 from twisted.internet import defer
 from twisted.internet import reactor
@@ -66,7 +67,7 @@ class Notifier(object):
                 self._notify_and_callback(
                     user_id=user_id,
                     event_data=event.get_dict(),
-                    stream_type=event.type,
+                    stream_type=EventsStreamData.EVENT_TYPE,
                     store_id=store_id)
 
     def on_new_user_event(self, user_id, event_data, stream_type, store_id):
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 4b6d739e54..895a96b5b9 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -28,17 +28,17 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class MessagesStreamData(StreamData):
-    EVENT_TYPE = MessageEvent.TYPE
+class EventsStreamData(StreamData):
+    EVENT_TYPE = "EventsStream"
 
     def __init__(self, hs, room_id=None, feedback=False):
-        super(MessagesStreamData, self).__init__(hs)
+        super(EventsStreamData, self).__init__(hs)
         self.room_id = room_id
         self.with_feedback = feedback
 
     @defer.inlineCallbacks
     def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_message_stream(
+        data, latest_ver = yield self.store.get_room_events_stream(
             user_id=user_id,
             from_key=from_key,
             to_key=to_key,
@@ -50,74 +50,7 @@ class MessagesStreamData(StreamData):
 
     @defer.inlineCallbacks
     def max_token(self):
-        val = yield self.store.get_max_message_id()
-        defer.returnValue(val)
-
-
-class RoomMemberStreamData(StreamData):
-    EVENT_TYPE = RoomMemberEvent.TYPE
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_room_member_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key
-        )
-
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_room_member_id()
-        defer.returnValue(val)
-
-
-class FeedbackStreamData(StreamData):
-    EVENT_TYPE = FeedbackEvent.TYPE
-
-    def __init__(self, hs, room_id=None):
-        super(FeedbackStreamData, self).__init__(hs)
-        self.room_id = room_id
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_feedback_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key,
-            limit=limit,
-            room_id=self.room_id
-        )
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_feedback_id()
-        defer.returnValue(val)
-
-
-class RoomDataStreamData(StreamData):
-    EVENT_TYPE = RoomTopicEvent.TYPE  # TODO need multiple event types
-
-    def __init__(self, hs, room_id=None):
-        super(RoomDataStreamData, self).__init__(hs)
-        self.room_id = room_id
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_room_data_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key,
-            limit=limit,
-            room_id=self.room_id
-        )
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_room_data_id()
+        val = yield self.store.get_room_events_max_id()
         defer.returnValue(val)
 
 
@@ -227,7 +160,7 @@ class EventStream(PaginationStream):
                 self.user_id, from_pkey, to_pkey, limit
             )
 
-            chunk += event_chunk
+            chunk += [e.get_dict() for e in event_chunk]
             next_ver.append(str(max_pkey))
 
         defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.api.streams.event import (
-    EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
-    RoomDataStreamData
+    EventStream, EventsStreamData
 )
 from synapse.handlers.presence import PresenceStreamData
 
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
 class EventStreamHandler(BaseHandler):
 
     stream_data_classes = [
-        MessagesStreamData,
-        RoomMemberStreamData,
-        FeedbackStreamData,
-        RoomDataStreamData,
+        EventsStreamData,
         PresenceStreamData,
     ]
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5d0379254b..14ffddc630 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.api.events.room import (
     RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
     RoomConfigEvent
 )
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
 from synapse.util import stringutils
 from ._base import BaseHandler
 
@@ -114,8 +114,9 @@ class MessageHandler(BaseHandler):
         """
         yield self.auth.check_joined_room(room_id, user_id)
 
-        data_source = [MessagesStreamData(self.hs, room_id=room_id,
-                                          feedback=feedback)]
+        data_source = [
+            EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
+        ]
         event_stream = EventStream(user_id, data_source)
         pagin_config = yield event_stream.fix_tokens(pagin_config)
         data_chunk = yield event_stream.get_chunk(config=pagin_config)
@@ -141,12 +142,7 @@ class MessageHandler(BaseHandler):
             yield self.state_handler.handle_new_event(event)
 
             # store in db
-            store_id = yield self.store.store_room_data(
-                room_id=event.room_id,
-                etype=event.type,
-                state_key=event.state_key,
-                content=json.dumps(event.content)
-            )
+            store_id = yield self.store.persist_event(event)
 
             event.destinations = yield self.store.get_joined_hosts_for_room(
                 event.room_id
@@ -201,19 +197,17 @@ class MessageHandler(BaseHandler):
                 raise RoomError(
                     403, "Member does not meet private room rules.")
 
-        data = yield self.store.get_room_data(room_id, event_type, state_key)
+        data = yield self.store.get_current_state(
+            room_id, event_type, state_key
+        )
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
-                     user_id=None, fb_sender_id=None, fb_type=None):
-        yield self.auth.check_joined_room(room_id, user_id)
+    def get_feedback(self, event_id):
+        # yield self.auth.check_joined_room(room_id, user_id)
 
         # Pull out the feedback from the db
-        fb = yield self.store.get_feedback(
-            room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
-            fb_sender_id=fb_sender_id, fb_type=fb_type
-        )
+        fb = yield self.store.get_feedback(event_id)
 
         if fb:
             defer.returnValue(fb)
@@ -260,20 +254,30 @@ class MessageHandler(BaseHandler):
             user_id=user_id,
             membership_list=[Membership.INVITE, Membership.JOIN]
         )
-        for room_info in room_list:
-            if room_info["membership"] != Membership.JOIN:
+
+        ret = []
+
+        for event in room_list:
+            d = {
+                "room_id": event.room_id,
+                "membership": event.membership,
+            }
+            ret.append(d)
+
+            if event.membership != Membership.JOIN:
                 continue
             try:
-                event_chunk = yield self.get_messages(
-                    user_id=user_id,
-                    pagin_config=pagin_config,
-                    feedback=feedback,
-                    room_id=room_info["room_id"]
+                messages = yield self.store.get_recent_events_for_room(
+                    event.room_id,
+                    limit=50,
                 )
-                room_info["messages"] = event_chunk
+                d["messages"] = [m.get_dict() for m in messages]
             except:
                 pass
-        defer.returnValue(room_list)
+
+        logger.debug("snapshot_all_rooms returning: %s", ret)
+
+        defer.returnValue(ret)
 
 
 class RoomCreationHandler(BaseHandler):
@@ -451,7 +455,7 @@ class RoomMemberHandler(BaseHandler):
 
         member_list = yield self.store.get_room_members(room_id=room_id)
         event_list = [
-            entry.as_event(self.event_factory).get_dict()
+            entry.get_dict()
             for entry in member_list
         ]
         chunk_data = {
@@ -495,7 +499,7 @@ class RoomMemberHandler(BaseHandler):
             SynapseError if there was a problem changing the membership.
         """
 
-        #broadcast_msg = False
+        # broadcast_msg = False
 
         prev_state = yield self.store.get_room_member(
             event.target_user_id, event.room_id
@@ -569,7 +573,8 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+    def _do_join(self, event, room_host=None, do_auth=True,
+                 broadcast_msg=True):
         joinee = self.hs.parse_userid(event.target_user_id)
         # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
@@ -598,7 +603,7 @@ class RoomMemberHandler(BaseHandler):
             if prev_state and prev_state.membership == Membership.INVITE:
                 room = yield self.store.get_room(room_id)
                 inviter = UserID.from_string(
-                    prev_state.sender, self.hs
+                    prev_state.user_id, self.hs
                 )
 
                 should_do_dance = not inviter.is_mine and not room
@@ -620,7 +625,6 @@ class RoomMemberHandler(BaseHandler):
                 broadcast_msg=broadcast_msg,
             )
 
-
         if should_do_dance:
             yield self._do_invite_join_dance(
                 room_id=room_id,
@@ -694,18 +698,12 @@ class RoomMemberHandler(BaseHandler):
             user_id=user.to_string(), membership_list=membership_list
         )
 
-        defer.returnValue([r["room_id"] for r in rooms])
+        defer.returnValue([r.room_id for r in rooms])
 
     @defer.inlineCallbacks
     def _do_local_membership_update(self, event, membership, broadcast_msg):
         # store membership
-        store_id = yield self.store.store_room_member(
-            user_id=event.target_user_id,
-            sender=event.user_id,
-            room_id=event.room_id,
-            content=event.content,
-            membership=membership
-        )
+        store_id = yield self.store.persist_event(event)
 
         # Send a PDU to all hosts who have joined the room.
         destinations = yield self.store.get_joined_hosts_for_room(
@@ -760,7 +758,7 @@ class RoomMemberHandler(BaseHandler):
             room_id, "", is_public=False
         )
 
-        #yield self.state_handler.handle_new_event(event)
+        # yield self.state_handler.handle_new_event(event)
         yield federation.handle_new_event(new_event)
         yield federation.get_state_for_room(
             target_host, room_id
@@ -805,5 +803,5 @@ class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_public_room_list(self):
-        chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+        chunk = yield self.store.get_rooms(is_public=True)
         defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index 799dd58b2d..303759b718 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -285,25 +285,28 @@ class FeedbackRestServlet(RestServlet):
                feedback_type):
         user = yield (self.auth.get_user_by_req(request))
 
-        if feedback_type not in Feedback.LIST:
-            raise SynapseError(400, "Bad feedback type.",
-                               errcode=Codes.BAD_JSON)
-
-        msg_handler = self.handlers.message_handler
-        feedback = yield msg_handler.get_feedback(
-            room_id=urllib.unquote(room_id),
-            msg_sender_id=msg_sender_id,
-            msg_id=msg_id,
-            user_id=user.to_string(),
-            fb_sender_id=fb_sender_id,
-            fb_type=feedback_type
-        )
-
-        if not feedback:
-            raise SynapseError(404, "Feedback not found.",
-                               errcode=Codes.NOT_FOUND)
+        # TODO (erikj): Implement this?
+        raise NotImplementedError("Getting feedback is not supported")
 
-        defer.returnValue((200, json.loads(feedback.content)))
+#        if feedback_type not in Feedback.LIST:
+#            raise SynapseError(400, "Bad feedback type.",
+#                               errcode=Codes.BAD_JSON)
+#
+#        msg_handler = self.handlers.message_handler
+#        feedback = yield msg_handler.get_feedback(
+#            room_id=urllib.unquote(room_id),
+#            msg_sender_id=msg_sender_id,
+#            msg_id=msg_id,
+#            user_id=user.to_string(),
+#            fb_sender_id=fb_sender_id,
+#            fb_type=feedback_type
+#        )
+#
+#        if not feedback:
+#            raise SynapseError(404, "Feedback not found.",
+#                               errcode=Codes.NOT_FOUND)
+#
+#        defer.returnValue((200, json.loads(feedback.content)))
 
     @defer.inlineCallbacks
     def on_PUT(self, request, room_id, sender_id, msg_id, fb_sender_id,
diff --git a/synapse/state.py b/synapse/state.py
index 4f8b4d9760..ca8e1ca630 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -86,7 +86,7 @@ class StateHandler(object):
         else:
             event.depth = 0
 
-        current_state = yield self.store.get_current_state(
+        current_state = yield self.store.get_current_state_pdu(
             key.context, key.type, key.state_key
         )
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 5d5b5f7c44..841ad8f132 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -13,21 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from synapse.api.events.room import (
     RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent,
-    RoomConfigEvent
+    RoomConfigEvent, RoomNameEvent,
 )
 
 from .directory import DirectoryStore
 from .feedback import FeedbackStore
-from .message import MessageStore
 from .presence import PresenceStore
 from .profile import ProfileStore
 from .registration import RegistrationStore
 from .room import RoomStore
 from .roommember import RoomMemberStore
-from .roomdata import RoomDataStore
 from .stream import StreamStore
 from .pdu import StatePduStore, PduStore
 from .transactions import TransactionStore
@@ -36,7 +35,7 @@ import json
 import os
 
 
-class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
+class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, PduStore, StatePduStore, TransactionStore,
                 DirectoryStore):
@@ -45,50 +44,102 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
         super(DataStore, self).__init__(hs)
         self.event_factory = hs.get_event_factory()
 
+    @defer.inlineCallbacks
     def persist_event(self, event):
-        if event.type == MessageEvent.TYPE:
-            return self.store_message(
-                user_id=event.user_id,
-                room_id=event.room_id,
-                msg_id=event.msg_id,
-                content=json.dumps(event.content)
-            )
-        elif event.type == RoomMemberEvent.TYPE:
-            return self.store_room_member(
-                user_id=event.target_user_id,
-                sender=event.user_id,
-                room_id=event.room_id,
-                content=event.content,
-                membership=event.content["membership"]
-            )
+        if event.type == RoomMemberEvent.TYPE:
+            yield self._store_room_member(event)
         elif event.type == FeedbackEvent.TYPE:
-            return self.store_feedback(
-                room_id=event.room_id,
-                msg_id=event.msg_id,
-                msg_sender_id=event.msg_sender_id,
-                fb_sender_id=event.user_id,
-                fb_type=event.feedback_type,
-                content=json.dumps(event.content)
-            )
+            yield self._store_feedback(event)
+#        elif event.type == RoomConfigEvent.TYPE:
+#            yield self._store_room_config(event)
+        elif event.type == RoomNameEvent.TYPE:
+            yield self._store_room_name(event)
         elif event.type == RoomTopicEvent.TYPE:
-            return self.store_room_data(
-                room_id=event.room_id,
-                etype=event.type,
-                state_key=event.state_key,
-                content=json.dumps(event.content)
+            yield self._store_room_topic(event)
+
+        ret = yield self._store_event(event)
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def get_event(self, event_id):
+        events_dict = yield self._simple_select_one(
+            "events",
+            {"event_id": event_id},
+            [
+                "event_id",
+                "type",
+                "sender",
+                "room_id",
+                "content",
+                "unrecognized_keys"
+            ],
+        )
+
+        event = self._parse_event_from_row(events_dict)
+        defer.returnValue(event)
+
+    @defer.inlineCallbacks
+    def _store_event(self, event):
+        vals = {
+            "event_id": event.event_id,
+            "type": event.type,
+            "room_id": event.room_id,
+            "content": json.dumps(event.content),
+        }
+
+        unrec = {
+            k: v
+            for k, v in event.get_full_dict().items()
+            if k not in vals.keys()
+        }
+        vals["unrecognized_keys"] = json.dumps(unrec)
+
+        yield self._simple_insert("events", vals)
+
+        if hasattr(event, "state_key"):
+            vals = {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "state_key": event.state_key,
+            }
+
+            if hasattr(event, "prev_state"):
+                vals["prev_state"] = event.prev_state
+
+            yield self._simple_insert("state_events", vals)
+
+            yield self._simple_insert(
+                "current_state_events",
+                {
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "type": event.type,
+                    "state_key": event.state_key,
+                }
             )
-        elif event.type == RoomConfigEvent.TYPE:
-            if "visibility" in event.content:
-                visibility = event.content["visibility"]
-                return self.store_room_config(
-                    room_id=event.room_id,
-                    visibility=visibility
-                )
 
+        latest = yield self.get_room_events_max_id()
+        defer.returnValue(latest)
+
+    @defer.inlineCallbacks
+    def get_current_state(self, room_id, event_type=None, state_key=""):
+        sql = (
+            "SELECT e.* FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        )
+
+        if event_type:
+            sql += " AND s.type = ? AND s.state_key = ? "
+            args = (room_id, event_type, state_key)
         else:
-            raise NotImplementedError(
-                "Don't know how to persist type=%s" % event.type
-            )
+            args = (room_id, )
+
+        results = yield self._execute_and_decode(sql, *args)
+
+        defer.returnValue([self._parse_event_from_row(r) for r in results])
 
 
 def schema_path(schema):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bf1800f4bf..36cc57c1b8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 
 from twisted.internet import defer
@@ -20,6 +19,9 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 
 import collections
+import copy
+import json
+
 
 logger = logging.getLogger(__name__)
 
@@ -29,6 +31,7 @@ class SQLBaseStore(object):
     def __init__(self, hs):
         self.hs = hs
         self._db_pool = hs.get_db_pool()
+        self.event_factory = hs.get_event_factory()
         self._clock = hs.get_clock()
 
     def cursor_to_dict(self, cursor):
@@ -57,14 +60,22 @@ class SQLBaseStore(object):
             The result of decoder(results)
         """
         logger.debug(
-            "[SQL] %s  Args=%s Func=%s", query, args, decoder.__name__
+            "[SQL] %s  Args=%s Func=%s",
+            query, args, decoder.__name__ if decoder else None
         )
 
         def interaction(txn):
             cursor = txn.execute(query, args)
-            return decoder(cursor)
+            if decoder:
+                return decoder(cursor)
+            else:
+                return cursor.fetchall()
+
         return self._db_pool.runInteraction(interaction)
 
+    def _execute_and_decode(self, query, *args):
+        return self._execute(self.cursor_to_dict, query, *args)
+
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
@@ -281,6 +292,17 @@ class SQLBaseStore(object):
 
         return self._db_pool.runInteraction(func)
 
+    def _parse_event_from_row(self, row_dict):
+        d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+        d.update(json.loads(row_dict["unrecognized_keys"]))
+        d["content"] = json.loads(d["content"])
+        del d["unrecognized_keys"]
+
+        return self.event_factory.create_event(
+            etype=d["type"],
+            **d
+        )
+
 
 class Table(object):
     """ A base class used to store information about a particular table.
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index 9bd562c762..e60f98d1e1 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore, Table
 from synapse.api.events.room import FeedbackEvent
 
@@ -22,54 +24,28 @@ import json
 
 class FeedbackStore(SQLBaseStore):
 
-    def store_feedback(self, room_id, msg_id, msg_sender_id,
-                       fb_sender_id, fb_type, content):
-        return self._simple_insert(FeedbackTable.table_name, dict(
-            room_id=room_id,
-            msg_id=msg_id,
-            msg_sender_id=msg_sender_id,
-            fb_sender_id=fb_sender_id,
-            fb_type=fb_type,
-            content=content,
-        ))
-
-    def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None,
-                     fb_sender_id=None, fb_type=None):
-        query = FeedbackTable.select_statement(
-            "msg_sender_id = ? AND room_id = ? AND msg_id = ? " +
-            "AND fb_sender_id = ? AND feedback_type = ? " +
-            "ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            FeedbackTable.decode_single_result,
-            query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type,
+    def _store_feedback(self, event):
+        return self._simple_insert("feedback", {
+            "event_id": event.event_id,
+            "feedback_type": event.feedback_type,
+            "room_id": event.room_id,
+            "target_event_id": event.target_event,
+            "sender": event.user_id,
+        })
+
+    @defer.inlineCallbacks
+    def get_feedback_for_event(self, event_id):
+        sql = (
+            "SELECT events.* FROM events INNER JOIN feedback "
+            "ON events.event_id = feedback.event_id "
+            "WHERE feedback.target_event_id = ? "
         )
 
-    def get_max_feedback_id(self):
-        return self._simple_max_id(FeedbackTable.table_name)
-
-
-class FeedbackTable(Table):
-    table_name = "feedback"
+        rows = yield self._execute_and_decode(sql, event_id)
 
-    fields = [
-        "id",
-        "content",
-        "feedback_type",
-        "fb_sender_id",
-        "msg_id",
-        "room_id",
-        "msg_sender_id"
-    ]
-
-    class EntryType(collections.namedtuple("FeedbackEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=FeedbackEvent.TYPE,
-                room_id=self.room_id,
-                msg_id=self.msg_id,
-                msg_sender_id=self.msg_sender_id,
-                user_id=self.fb_sender_id,
-                feedback_type=self.feedback_type,
-                content=json.loads(self.content),
-            )
+        defer.returnValue(
+            [
+                self._parse_event_from_row(r)
+                for r in rows
+            ]
+        )
diff --git a/synapse/storage/message.py b/synapse/storage/message.py
deleted file mode 100644
index 7bb69c1384..0000000000
--- a/synapse/storage/message.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-from synapse.api.events.room import MessageEvent
-
-import collections
-import json
-
-
-class MessageStore(SQLBaseStore):
-
-    def get_message(self, user_id, room_id, msg_id):
-        """Get a message from the store.
-
-        Args:
-            user_id (str): The ID of the user who sent the message.
-            room_id (str): The room the message was sent in.
-            msg_id (str): The unique ID for this user/room combo.
-        """
-        query = MessagesTable.select_statement(
-            "user_id = ? AND room_id = ? AND msg_id = ? " +
-            "ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            MessagesTable.decode_single_result,
-            query, user_id, room_id, msg_id,
-        )
-
-    def store_message(self, user_id, room_id, msg_id, content):
-        """Store a message in the store.
-
-        Args:
-            user_id (str): The ID of the user who sent the message.
-            room_id (str): The room the message was sent in.
-            msg_id (str): The unique ID for this user/room combo.
-            content (str): The content of the message (JSON)
-        """
-        return self._simple_insert(MessagesTable.table_name, dict(
-            user_id=user_id,
-            room_id=room_id,
-            msg_id=msg_id,
-            content=content,
-        ))
-
-    def get_max_message_id(self):
-        return self._simple_max_id(MessagesTable.table_name)
-
-
-class MessagesTable(Table):
-    table_name = "messages"
-
-    fields = [
-        "id",
-        "user_id",
-        "room_id",
-        "msg_id",
-        "content"
-    ]
-
-    class EntryType(collections.namedtuple("MessageEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=MessageEvent.TYPE,
-                room_id=self.room_id,
-                user_id=self.user_id,
-                msg_id=self.msg_id,
-                content=json.loads(self.content),
-            )
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index 13adc581e1..a24ce7ab78 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -580,7 +580,7 @@ class StatePduStore(SQLBaseStore):
 
         txn.execute(query, query_args)
 
-    def get_current_state(self, context, pdu_type, state_key):
+    def get_current_state_pdu(self, context, pdu_type, state_key):
         """For a given context, pdu_type, state_key 3-tuple, return what is
         currently considered the current state.
 
@@ -595,10 +595,10 @@ class StatePduStore(SQLBaseStore):
         """
 
         return self._db_pool.runInteraction(
-            self._get_current_state, context, pdu_type, state_key
+            self._get_current_state_pdu, context, pdu_type, state_key
         )
 
-    def _get_current_state(self, txn, context, pdu_type, state_key):
+    def _get_current_state_pdu(self, txn, context, pdu_type, state_key):
         return self._get_current_interaction(txn, context, pdu_type, state_key)
 
     def _get_current_interaction(self, txn, context, pdu_type, state_key):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index a97162831b..22f2dcca45 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -76,49 +76,80 @@ class RoomStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_rooms(self, is_public, with_topics):
+    def get_rooms(self, is_public):
         """Retrieve a list of all public rooms.
 
         Args:
             is_public (bool): True if the rooms returned should be public.
-            with_topics (bool): True to include the current topic for the room
-            in the response.
         Returns:
-            A list of room dicts containing at least a "room_id" key, and a
-            "topic" key if one is set and with_topic=True.
+            A list of room dicts containing at least a "room_id" key, a
+            "topic" key if one is set, and a "name" key if one is set
         """
-        room_data_type = RoomTopicEvent.TYPE
-        public = 1 if is_public else 0
-
-        latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE "
-                        + "room_data.type = ? GROUP BY room_id")
-
-        query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms "
-                 + "LEFT JOIN "
-                 + "room_aliases ON room_aliases.room_id = rooms.room_id "
-                 + "LEFT JOIN "
-                 + "room_data ON rooms.room_id = room_data.room_id WHERE "
-                 + "(room_data.id IN (" + latest_topic + ") "
-                 + "OR room_data.id IS NULL) AND rooms.is_public = ?")
-
-        res = yield self._execute(
-            self.cursor_to_dict, query, room_data_type, public
+
+        topic_subquery = (
+            "SELECT topics.event_id as event_id, "
+            "topics.room_id as room_id, topic "
+            "FROM topics "
+            "INNER JOIN current_state_events as c "
+            "ON c.event_id = topics.event_id "
         )
 
-        # return only the keys the specification expects
-        ret_keys = ["room_id", "topic", "room_alias"]
+        name_subquery = (
+            "SELECT room_names.event_id as event_id, "
+            "room_names.room_id as room_id, name "
+            "FROM room_names "
+            "INNER JOIN current_state_events as c "
+            "ON c.event_id = room_names.event_id "
+        )
 
-        # extract topic from the json (icky) FIXME
-        for i, room_row in enumerate(res):
-            try:
-                content_json = json.loads(room_row["content"])
-                room_row["topic"] = content_json["topic"]
-            except:
-                pass  # no topic set
-            # filter the dict based on ret_keys
-            res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys}
+        # We use non printing ascii character US () as a seperator
+        sql = (
+            "SELECT r.room_id, n.name, t.topic, "
+            "group_concat(a.room_alias, '') "
+            "FROM rooms AS r "
+            "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+            "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+            "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+            "WHERE r.is_public = ? "
+            "GROUP BY r.room_id "
+        ) % {
+            "topic": topic_subquery,
+            "name": name_subquery,
+        }
+
+        rows = yield self._execute(None, sql, is_public)
+
+        ret = [
+            {
+                "room_id": r[0],
+                "name": r[1],
+                "topic": r[2],
+                "aliases": r[3].split(""),
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret)
+
+    def _store_room_topic(self, event):
+        return self._simple_insert(
+            "topics",
+            {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "topic": event.topic,
+            }
+        )
 
-        defer.returnValue(res)
+    def _store_room_name(self, event):
+        return self._simple_insert(
+            "room_names",
+            {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "name": event.name,
+            }
+        )
 
 
 class RoomsTable(Table):
diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py
deleted file mode 100644
index cc04d1ba14..0000000000
--- a/synapse/storage/roomdata.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-
-import collections
-import json
-
-
-class RoomDataStore(SQLBaseStore):
-
-    """Provides various CRUD operations for Room Events. """
-
-    def get_room_data(self, room_id, etype, state_key=""):
-        """Retrieve the data stored under this type and state_key.
-
-        Args:
-            room_id (str)
-            etype (str)
-            state_key (str)
-        Returns:
-            namedtuple: Or None if nothing exists at this path.
-        """
-        query = RoomDataTable.select_statement(
-            "room_id = ? AND type = ? AND state_key = ? "
-            "ORDER BY id DESC LIMIT 1"
-        )
-        return self._execute(
-            RoomDataTable.decode_single_result,
-            query, room_id, etype, state_key,
-        )
-
-    def store_room_data(self, room_id, etype, state_key="", content=None):
-        """Stores room specific data.
-
-        Args:
-            room_id (str)
-            etype (str)
-            state_key (str)
-            data (str)- The data to store for this path in JSON.
-        Returns:
-            The store ID for this data.
-        """
-        return self._simple_insert(RoomDataTable.table_name, dict(
-            etype=etype,
-            state_key=state_key,
-            room_id=room_id,
-            content=content,
-        ))
-
-    def get_max_room_data_id(self):
-        return self._simple_max_id(RoomDataTable.table_name)
-
-
-class RoomDataTable(Table):
-    table_name = "room_data"
-
-    fields = [
-        "id",
-        "room_id",
-        "type",
-        "state_key",
-        "content"
-    ]
-
-    class EntryType(collections.namedtuple("RoomDataEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=self.type,
-                room_id=self.room_id,
-                content=json.loads(self.content),
-            )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index c45d128f1b..89c87290cf 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -31,6 +31,38 @@ logger = logging.getLogger(__name__)
 
 class RoomMemberStore(SQLBaseStore):
 
+    @defer.inlineCallbacks
+    def _store_room_member(self, event):
+        """Store a room member in the database.
+        """
+        domain = self.hs.parse_userid(event.target_user_id).domain
+
+        yield self._simple_insert(
+            "room_memberships",
+            {
+                "event_id": event.event_id,
+                "user_id": event.target_user_id,
+                "sender": event.user_id,
+                "room_id": event.room_id,
+                "membership": event.membership,
+            }
+        )
+
+        # Update room hosts table
+        if event.membership == Membership.JOIN:
+            sql = (
+                "INSERT OR IGNORE INTO room_hosts (room_id, host) "
+                "VALUES (?, ?)"
+            )
+            yield self._execute(None, sql, event.room_id, domain)
+        else:
+            sql = (
+                "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
+            )
+
+            yield self._execute(None, sql, event.room_id, domain)
+
+    @defer.inlineCallbacks
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
 
@@ -38,36 +70,15 @@ class RoomMemberStore(SQLBaseStore):
             user_id (str): The member's user ID.
             room_id (str): The room the member is in.
         Returns:
-            namedtuple: The room member from the database, or None if this
-            member does not exist.
+            Deferred: Results in a MembershipEvent or None.
         """
-        query = RoomMemberTable.select_statement(
-            "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            RoomMemberTable.decode_single_result,
-            query, room_id, user_id,
-        )
+        rows = yield self._get_members_by_dict({
+            "e.room_id": room_id,
+            "m.user_id": user_id,
+        })
 
-    def store_room_member(self, user_id, sender, room_id, membership, content):
-        """Store a room member in the database.
+        defer.returnValue(rows[0] if rows else None)
 
-        Args:
-            user_id (str): The member's user ID.
-            room_id (str): The room in relation to the member.
-            membership (synapse.api.constants.Membership): The new membership
-            state.
-            content (dict): The content of the membership (JSON).
-        """
-        content_json = json.dumps(content)
-        return self._simple_insert(RoomMemberTable.table_name, dict(
-            user_id=user_id,
-            sender=sender,
-            room_id=room_id,
-            membership=membership,
-            content=content_json,
-        ))
-
-    @defer.inlineCallbacks
     def get_room_members(self, room_id, membership=None):
         """Retrieve the current room member list for a room.
 
@@ -79,17 +90,12 @@ class RoomMemberStore(SQLBaseStore):
         Returns:
             list of namedtuples representing the members in this room.
         """
-        query = RoomMemberTable.select_statement(
-            "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
-            + " WHERE room_id = ? GROUP BY user_id)"
-        )
-        res = yield self._execute(
-            RoomMemberTable.decode_results, query, room_id,
-        )
-        # strip memberships which don't match
+
+        where = {"m.room_id": room_id}
         if membership:
-            res = [entry for entry in res if entry.membership == membership]
-        defer.returnValue(res)
+            where["m.membership"] = membership
+
+        return self._get_members_by_dict(where)
 
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
@@ -106,70 +112,40 @@ class RoomMemberStore(SQLBaseStore):
             return defer.succeed(None)
 
         args = [user_id]
-        membership_placeholder = ["membership=?"] * len(membership_list)
-        where_membership = "(" + " OR ".join(membership_placeholder) + ")"
-        for membership in membership_list:
-            args.append(membership)
-
-        # sub-select finds the row ID for the most recent (i.e. current)
-        # state change of this user per room, then the outer select finds those
-        query = ("SELECT room_id, membership FROM room_memberships"
-                 + " WHERE id IN (SELECT MAX(id) FROM room_memberships"
-                 + "   WHERE user_id=? GROUP BY room_id)"
-                 + " AND " + where_membership)
-        return self._execute(
-            self.cursor_to_dict, query, *args
-        )
+        args.extend(membership_list)
 
-    @defer.inlineCallbacks
-    def get_joined_hosts_for_room(self, room_id):
-        query = RoomMemberTable.select_statement(
-            "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
-            + " WHERE room_id = ? GROUP BY user_id)"
-        )
-
-        res = yield self._execute(
-            RoomMemberTable.decode_results, query, room_id,
+        where_clause = "user_id = ? AND (%s)" % (
+            " OR ".join(["membership = ?" for _ in membership_list]),
         )
 
-        def host_from_user_id_string(user_id):
-            domain = UserID.from_string(entry.user_id, self.hs).domain
-            return domain
-
-        # strip memberships which don't match
-        hosts = [
-            host_from_user_id_string(entry.user_id)
-            for entry in res
-            if entry.membership == Membership.JOIN
-        ]
+        return self._get_members_query(where_clause, args)
 
-        logger.debug("Returning hosts: %s from results: %s", hosts, res)
-
-        defer.returnValue(hosts)
-
-    def get_max_room_member_id(self):
-        return self._simple_max_id(RoomMemberTable.table_name)
-
-
-class RoomMemberTable(Table):
-    table_name = "room_memberships"
-
-    fields = [
-        "id",
-        "user_id",
-        "sender",
-        "room_id",
-        "membership",
-        "content"
-    ]
+    def get_joined_hosts_for_room(self, room_id):
+        return self._simple_select_onecol(
+            "room_hosts",
+            {"room_id": room_id},
+            "host"
+        )
 
-    class EntryType(collections.namedtuple("RoomMemberEntry", fields)):
+    def _get_members_by_dict(self, where_dict):
+        clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
+        vals = where_dict.values()
+        return self._get_members_query(clause, vals)
 
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=RoomMemberEvent.TYPE,
-                room_id=self.room_id,
-                target_user_id=self.user_id,
-                user_id=self.sender,
-                content=json.loads(self.content),
-            )
+    @defer.inlineCallbacks
+    def _get_members_query(self, where_clause, where_values):
+        sql = (
+            "SELECT e.* FROM events as e "
+            "INNER JOIN room_memberships as m "
+            "ON e.event_id = m.event_id "
+            "INNER JOIN current_state_events as c "
+            "ON m.event_id = c.event_id "
+            "WHERE %s "
+        ) % (where_clause,)
+
+        rows = yield self._execute_and_decode(sql, *where_values)
+
+        logger.debug("_get_members_query Got rows %s", rows)
+
+        results = [self._parse_event_from_row(r) for r in rows]
+        defer.returnValue(results)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 77096546b2..2452890ea4 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -12,43 +12,67 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-CREATE TABLE IF NOT EXISTS rooms(
-    room_id TEXT PRIMARY KEY NOT NULL,
-    is_public INTEGER,
-    creator TEXT
+
+CREATE TABLE IF NOT EXISTS events(
+    ordering INTEGER PRIMARY KEY AUTOINCREMENT,
+    event_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    room_id TEXT,
+    content TEXT,
+    unrecognized_keys TEXT
 );
 
-CREATE TABLE IF NOT EXISTS room_memberships(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server
-    sender TEXT NOT NULL,
+CREATE TABLE IF NOT EXISTS state_events(
+    event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    membership TEXT NOT NULL,
-    content TEXT NOT NULL
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    prev_state TEXT
 );
 
-CREATE TABLE IF NOT EXISTS messages(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id TEXT, 
-    room_id TEXT,
-    msg_id TEXT,
-    content TEXT
+CREATE TABLE IF NOT EXISTS current_state_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    sender TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    membership TEXT NOT NULL
 );
 
 CREATE TABLE IF NOT EXISTS feedback(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    content TEXT,
+    event_id TEXT NOT NULL,
     feedback_type TEXT,
-    fb_sender_id TEXT,
-    msg_id TEXT,
-    room_id TEXT,
-    msg_sender_id TEXT
+    target_event_id TEXT,
+    sender TEXT,
+    room_id TEXT
 );
 
-CREATE TABLE IF NOT EXISTS room_data(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
+CREATE TABLE IF NOT EXISTS topics(
+    event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    content TEXT
+    topic TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS room_names(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    name TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS rooms(
+    room_id TEXT PRIMARY KEY NOT NULL,
+    is_public INTEGER,
+    creator TEXT
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+    room_id TEXT NOT NULL,
+    host TEXT NOT NULL
 );
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 47a1f2c45a..cf4b1682b6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -13,267 +13,118 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from ._base import SQLBaseStore
-from .message import MessagesTable
-from .feedback import FeedbackTable
-from .roomdata import RoomDataTable
-from .roommember import RoomMemberTable
+
+from synapse.api.constants import Membership
 
 import json
 import logging
 
-logger = logging.getLogger(__name__)
-
-
-class StreamStore(SQLBaseStore):
-
-    def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0,
-                           with_feedback=False):
-        """Get all messages for this user between the given keys.
 
-        Args:
-            user_id (str): The user who is requesting messages.
-            from_key (int): The ID to start returning results from (exclusive).
-            to_key (int): The ID to stop returning results (exclusive).
-            room_id (str): Gets messages only for this room. Can be None, in
-            which case all room messages will be returned.
-        Returns:
-            A tuple of rows (list of namedtuples), new_id(int)
-        """
-        if with_feedback and room_id:  # with fb MUST specify a room ID
-            return self._db_pool.runInteraction(
-                self._get_message_rows_with_feedback,
-                user_id, from_key, to_key, room_id, limit
-            )
-        else:
-            return self._db_pool.runInteraction(
-                self._get_message_rows,
-                user_id, from_key, to_key, room_id, limit
-            )
+logger = logging.getLogger(__name__)
 
-    def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                          limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the messages table, bounded by the specified pkeys
 
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = ("SELECT messages.* FROM messages WHERE ? IN"
-                 + " (SELECT membership from room_memberships WHERE user_id=?"
-                 + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
+MAX_STREAM_SIZE = 1000
 
-        if room_id:
-            query += " AND messages.room_id=?"
-            query_args.append(room_id)
 
-        (query, query_args) = self._append_stream_operations(
-            "messages", query, query_args, from_pkey, to_pkey, limit=limit
-        )
+class StreamStore(SQLBaseStore):
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, MessagesTable, from_pkey)
+    @defer.inlineCallbacks
+    def get_room_events_stream(self, user_id, from_key, to_key, room_id,
+                               limit=0, with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
 
-    def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey,
-                                        room_id, limit):
-        # this col represents the compressed feedback JSON as per spec
-        compressed_feedback_col = (
-            "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id"
-            + " || '\",\"feedback_type\":\"' || f.feedback_type"
-            + " || '\",\"content\":' || f.content || '}') || ']'"
+        current_room_membership_sql = (
+            "SELECT m.room_id FROM room_memberships as m "
+            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+            "WHERE m.user_id = ?"
         )
 
-        global_msg_id_join = ("f.room_id = messages.room_id"
-                              + " and f.msg_id = messages.msg_id"
-                              + " and messages.user_id = f.msg_sender_id")
-
-        select_query = (
-            "SELECT messages.*, f.content AS fb_content, f.fb_sender_id"
-            + ", " + compressed_feedback_col + " AS compressed_fb"
-            + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join)
-
-        current_membership_sub_query = (
-            "(SELECT membership from room_memberships rm"
-            + " WHERE user_id=? AND room_id = rm.room_id"
-            + " ORDER BY id DESC LIMIT 1)")
-
-        where = (" WHERE ? IN " + current_membership_sub_query
-                 + " AND messages.room_id=?")
-
-        query = select_query + where
-        query_args = ["join", user_id, room_id]
-
-        (query, query_args) = self._append_stream_operations(
-            "messages", query, query_args, from_pkey, to_pkey,
-            limit=limit, group_by=" GROUP BY messages.id "
+        invites_sql = (
+            "SELECT m.event_id FROM room_memberships as m "
+            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+            "WHERE m.user_id = ? AND m.membership = ?"
         )
 
-        cursor = txn.execute(query, query_args)
-
-        # convert the result set into events
-        entries = self.cursor_to_dict(cursor)
-        events = []
-        for entry in entries:
-            # TODO we should spec the cursor > event mapping somewhere else.
-            event = {}
-            straight_mappings = ["msg_id", "user_id", "room_id"]
-            for key in straight_mappings:
-                event[key] = entry[key]
-            event["content"] = json.loads(entry["content"])
-            if entry["compressed_fb"]:
-                event["feedback"] = json.loads(entry["compressed_fb"])
-            events.append(event)
-
-        latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"]
-
-        return (events, latest_pkey)
-
-    def get_room_member_stream(self, user_id, from_key, to_key):
-        """Get all room membership events for this user between the given keys.
-
-        Args:
-            user_id (str): The user who is requesting membership events.
-            from_key (int): The ID to start returning results from (exclusive).
-            to_key (int): The ID to stop returning results (exclusive).
-        Returns:
-            A tuple of rows (list of namedtuples), new_id(int)
-        """
-        return self._db_pool.runInteraction(
-            self._get_room_member_rows, user_id, from_key, to_key
+        if limit:
+            limit = max(limit, MAX_STREAM_SIZE)
+        else:
+            limit = MAX_STREAM_SIZE
+
+        # From and to keys should be integers from ordering.
+        from_key = int(from_key)
+        to_key = int(to_key)
+
+        if from_key == to_key:
+            defer.returnValue(([], to_key))
+            return
+
+        sql = (
+            "SELECT * FROM events as e WHERE "
+            "((room_id IN (%(current)s)) OR "
+            "(event_id IN (%(invites)s))) "
+        ) % {
+            "current": current_room_membership_sql,
+            "invites": invites_sql,
+        }
+
+        # Constraints and ordering depend on direction.
+        if from_key < to_key:
+            sql += (
+                "AND e.ordering > ? AND e.ordering < ? "
+                "ORDER BY ordering ASC LIMIT %(limit)d "
+            ) % {"limit": limit}
+        else:
+            sql += (
+                "AND e.ordering < ? AND e.ordering > ? "
+                "ORDER BY ordering DESC LIMIT %(limit)d "
+            ) % {"limit": int(limit)}
+
+        rows = yield self._execute_and_decode(
+            sql,
+            user_id, user_id, Membership.INVITE, from_key, to_key
         )
 
-    def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey):
-        # get all room membership events for rooms which the user is
-        # *currently* joined in on, or all invite events for this user.
-        current_membership_sub_query = (
-            "(SELECT membership FROM room_memberships"
-            + " WHERE user_id=? AND room_id = rm.room_id"
-            + " ORDER BY id DESC LIMIT 1)")
-
-        query = ("SELECT rm.* FROM room_memberships rm "
-                 # all membership events for rooms you've currently joined.
-                 + " WHERE (? IN " + current_membership_sub_query
-                 # all invite membership events for this user
-                 + " OR rm.membership=? AND user_id=?)"
-                 + " AND rm.id > ?")
-        query_args = ["join", user_id, "invite", user_id, from_pkey]
-
-        if to_pkey != -1:
-            query += " AND rm.id < ?"
-            query_args.append(to_pkey)
-
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, RoomMemberTable, from_pkey)
+        ret = [self._parse_event_from_row(r) for r in rows]
 
-    def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0):
-        return self._db_pool.runInteraction(
-            self._get_feedback_rows,
-            user_id, from_key, to_key, room_id, limit
-        )
-
-    def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                           limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the feedback table, bounded by the specified pkeys
+        if rows:
+            if from_key < to_key:
+                key = max([r["ordering"] for r in rows])
+            else:
+                key = min([r["ordering"] for r in rows])
+        else:
+            key = to_key
 
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = (
-            "SELECT feedback.* FROM feedback WHERE ? IN "
-            + "(SELECT membership from room_memberships WHERE user_id=?"
-            + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
+        defer.returnValue((ret, key))
 
-        if room_id:
-            query += " AND feedback.room_id=?"
-            query_args.append(room_id)
+    @defer.inlineCallbacks
+    def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
 
-        (query, query_args) = self._append_stream_operations(
-            "feedback", query, query_args, from_pkey, to_pkey, limit=limit
+        sql = (
+            "SELECT * FROM events WHERE room_id = ? "
+            "ORDER BY ordering DESC LIMIT ? "
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, FeedbackTable, from_pkey)
-
-    def get_room_data_stream(self, user_id, from_key, to_key, room_id,
-                             limit=0):
-        return self._db_pool.runInteraction(
-            self._get_room_data_rows,
-            user_id, from_key, to_key, room_id, limit
+        rows = yield self._execute_and_decode(
+            sql,
+            room_id, limit
         )
 
-    def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                            limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the feedback table, bounded by the specified pkeys
+        rows.reverse()  # As we selected with reverse ordering
 
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = (
-            "SELECT room_data.* FROM room_data WHERE ? IN "
-            + "(SELECT membership from room_memberships WHERE user_id=?"
-            + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
+        defer.returnValue([self._parse_event_from_row(r) for r in rows])
 
-        if room_id:
-            query += " AND room_data.room_id=?"
-            query_args.append(room_id)
-
-        (query, query_args) = self._append_stream_operations(
-            "room_data", query, query_args, from_pkey, to_pkey, limit=limit
+    @defer.inlineCallbacks
+    def get_room_events_max_id(self):
+        res = yield self._execute_and_decode(
+            "SELECT MAX(ordering) as m FROM events"
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, RoomDataTable, from_pkey)
-
-    def _append_stream_operations(self, table_name, query, query_args,
-                                  from_pkey, to_pkey, limit=None,
-                                  group_by=""):
-        LATEST_ROW = -1
-        order_by = ""
-        if to_pkey > from_pkey:
-            if from_pkey != LATEST_ROW:
-                # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9
-                query += (" AND %s.id > ? AND %s.id < ?" %
-                         (table_name, table_name))
-                query_args.append(from_pkey)
-                query_args.append(to_pkey)
-            else:
-                # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC
-                query += " AND %s.id > ? " % table_name
-                order_by = "ORDER BY id DESC"
-                query_args.append(to_pkey)
-        elif from_pkey > to_pkey:
-            if to_pkey != LATEST_ROW:
-                # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC
-                query += (" AND %s.id > ? AND %s.id < ? " %
-                          (table_name, table_name))
-                order_by = "ORDER BY id DESC"
-                query_args.append(to_pkey)
-                query_args.append(from_pkey)
-            else:
-                # from=5 to=-1 >> from 5 to now >> id>5
-                query += " AND %s.id > ?" % table_name
-                query_args.append(from_pkey)
-
-        query += group_by + order_by
-
-        if limit and limit > 0:
-            query += " LIMIT ?"
-            query_args.append(str(limit))
-
-        return (query, query_args)
-
-    def _as_events(self, cursor, table, from_pkey):
-        data_entries = table.decode_results(cursor)
-        last_pkey = from_pkey
-        if data_entries:
-            last_pkey = data_entries[-1].id
-
-        events = [
-            entry.as_event(self.event_factory).get_dict()
-            for entry in data_entries
-        ]
+        if not res:
+            defer.returnValue(0)
+            return
 
-        return (events, last_pkey)
+        defer.returnValue(res[0]["m"])