summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py123
-rw-r--r--synapse/storage/_base.py27
-rw-r--r--synapse/storage/feedback.py72
-rw-r--r--synapse/storage/message.py81
-rw-r--r--synapse/storage/room.py90
-rw-r--r--synapse/storage/roomdata.py85
-rw-r--r--synapse/storage/roommember.py166
-rw-r--r--synapse/storage/schema/im.sql75
-rw-r--r--synapse/storage/stream.py282
9 files changed, 339 insertions, 662 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 5d5b5f7c44..f62cee3c39 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -13,21 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from synapse.api.events.room import (
     RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent,
-    RoomConfigEvent
+    RoomConfigEvent, RoomNameEvent,
 )
 
 from .directory import DirectoryStore
 from .feedback import FeedbackStore
-from .message import MessageStore
 from .presence import PresenceStore
 from .profile import ProfileStore
 from .registration import RegistrationStore
 from .room import RoomStore
 from .roommember import RoomMemberStore
-from .roomdata import RoomDataStore
 from .stream import StreamStore
 from .pdu import StatePduStore, PduStore
 from .transactions import TransactionStore
@@ -36,7 +35,7 @@ import json
 import os
 
 
-class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
+class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, PduStore, StatePduStore, TransactionStore,
                 DirectoryStore):
@@ -45,50 +44,86 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
         super(DataStore, self).__init__(hs)
         self.event_factory = hs.get_event_factory()
 
+    @defer.inlineCallbacks
     def persist_event(self, event):
-        if event.type == MessageEvent.TYPE:
-            return self.store_message(
-                user_id=event.user_id,
-                room_id=event.room_id,
-                msg_id=event.msg_id,
-                content=json.dumps(event.content)
-            )
-        elif event.type == RoomMemberEvent.TYPE:
-            return self.store_room_member(
-                user_id=event.target_user_id,
-                sender=event.user_id,
-                room_id=event.room_id,
-                content=event.content,
-                membership=event.content["membership"]
-            )
+        if event.type == RoomMemberEvent.TYPE:
+            yield self._store_room_member(event)
         elif event.type == FeedbackEvent.TYPE:
-            return self.store_feedback(
-                room_id=event.room_id,
-                msg_id=event.msg_id,
-                msg_sender_id=event.msg_sender_id,
-                fb_sender_id=event.user_id,
-                fb_type=event.feedback_type,
-                content=json.dumps(event.content)
-            )
-        elif event.type == RoomTopicEvent.TYPE:
-            return self.store_room_data(
-                room_id=event.room_id,
-                etype=event.type,
-                state_key=event.state_key,
-                content=json.dumps(event.content)
-            )
+            yield self._store_feedback(event)
         elif event.type == RoomConfigEvent.TYPE:
-            if "visibility" in event.content:
-                visibility = event.content["visibility"]
-                return self.store_room_config(
-                    room_id=event.room_id,
-                    visibility=visibility
-                )
-
+            yield self._store_room_config(event)
+        elif event.type == RoomNameEvent.TYPE:
+            yield self._store_room_name(event)
+        elif event.type == RoomTopicEvent.TYPE:
+            yield self._store_room_topic(event)
+
+        yield self._store_event(event)
+
+    @defer.inlineCallbacks
+    def get_event(self, event_id):
+        events_dict = yield self._simple_select_one(
+            "events",
+            {"event_id": event_id},
+            [
+                "event_id",
+                "type",
+                "sender",
+                "room_id",
+                "content",
+                "unrecognized_keys"
+            ],
+        )
+
+        event = self._parse_event_from_row(events_dict)
+        defer.returnValue(event)
+
+    @defer.inlineCallbacks
+    def _store_event(self, event):
+        vals = {
+            "event_id": event.event_id,
+            "type": event.type,
+            "room_id": event.room_id,
+            "content": json.dumps(event.content),
+        }
+
+        unrec = {k: v for k, v in event.get_full_dict().items() if k not in vals.keys()}
+        vals["unrecognized_keys"] = json.dumps(unrec)
+
+        yield self._simple_insert("events", vals)
+
+        if hasattr(event, "state_key"):
+            vals = {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "state_key": event.state_key,
+            }
+
+            if hasattr(event, "prev_state"):
+                vals["prev_state"] = event.prev_state
+
+            yield self._simple_insert("state_events", vals)
+
+            # TODO (erikj): We also need to update the current state table?
+
+    @defer.inlineCallbacks
+    def get_current_state(self, room_id, event_type=None, state_key=""):
+        sql = (
+            "SELECT e.* FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        )
+
+        if event_type:
+            sql += " AND s.type = ? AND s.state_key = ? "
+            args = (room_id, event_type, state_key)
         else:
-            raise NotImplementedError(
-                "Don't know how to persist type=%s" % event.type
-            )
+            args = (room_id, )
+
+        results = yield self._execute_and_decode(sql, *args)
+
+        defer.returnValue([self._parse_event_from_row(r) for r in results])
 
 
 def schema_path(schema):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bf1800f4bf..c26e9a0f98 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 
 from twisted.internet import defer
@@ -20,6 +19,9 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 
 import collections
+import copy
+import json
+
 
 logger = logging.getLogger(__name__)
 
@@ -29,6 +31,7 @@ class SQLBaseStore(object):
     def __init__(self, hs):
         self.hs = hs
         self._db_pool = hs.get_db_pool()
+        self.event_factory = hs.get_event_factory()
         self._clock = hs.get_clock()
 
     def cursor_to_dict(self, cursor):
@@ -57,14 +60,21 @@ class SQLBaseStore(object):
             The result of decoder(results)
         """
         logger.debug(
-            "[SQL] %s  Args=%s Func=%s", query, args, decoder.__name__
+            "[SQL] %s  Args=%s Func=%s", query, args, decoder.__name__ if decoder else None
         )
 
         def interaction(txn):
             cursor = txn.execute(query, args)
-            return decoder(cursor)
+            if decoder:
+                return decoder(cursor)
+            else:
+                return cursor.fetchall()
+
         return self._db_pool.runInteraction(interaction)
 
+    def _execute_and_decode(self, query, *args):
+        return self._execute(self.cursor_to_dict, query, *args)
+
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
@@ -281,6 +291,17 @@ class SQLBaseStore(object):
 
         return self._db_pool.runInteraction(func)
 
+    def _parse_event_from_row(self, row_dict):
+        d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+        d.update(json.loads(json.loads(row_dict["unrecognized_keys"])))
+        d["content"] = json.loads(d["content"])
+        del d["unrecognized_keys"]
+
+        return self.event_factory.create_event(
+            etype=d["type"],
+            **d
+        )
+
 
 class Table(object):
     """ A base class used to store information about a particular table.
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index 9bd562c762..e60f98d1e1 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore, Table
 from synapse.api.events.room import FeedbackEvent
 
@@ -22,54 +24,28 @@ import json
 
 class FeedbackStore(SQLBaseStore):
 
-    def store_feedback(self, room_id, msg_id, msg_sender_id,
-                       fb_sender_id, fb_type, content):
-        return self._simple_insert(FeedbackTable.table_name, dict(
-            room_id=room_id,
-            msg_id=msg_id,
-            msg_sender_id=msg_sender_id,
-            fb_sender_id=fb_sender_id,
-            fb_type=fb_type,
-            content=content,
-        ))
-
-    def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None,
-                     fb_sender_id=None, fb_type=None):
-        query = FeedbackTable.select_statement(
-            "msg_sender_id = ? AND room_id = ? AND msg_id = ? " +
-            "AND fb_sender_id = ? AND feedback_type = ? " +
-            "ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            FeedbackTable.decode_single_result,
-            query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type,
+    def _store_feedback(self, event):
+        return self._simple_insert("feedback", {
+            "event_id": event.event_id,
+            "feedback_type": event.feedback_type,
+            "room_id": event.room_id,
+            "target_event_id": event.target_event,
+            "sender": event.user_id,
+        })
+
+    @defer.inlineCallbacks
+    def get_feedback_for_event(self, event_id):
+        sql = (
+            "SELECT events.* FROM events INNER JOIN feedback "
+            "ON events.event_id = feedback.event_id "
+            "WHERE feedback.target_event_id = ? "
         )
 
-    def get_max_feedback_id(self):
-        return self._simple_max_id(FeedbackTable.table_name)
-
-
-class FeedbackTable(Table):
-    table_name = "feedback"
+        rows = yield self._execute_and_decode(sql, event_id)
 
-    fields = [
-        "id",
-        "content",
-        "feedback_type",
-        "fb_sender_id",
-        "msg_id",
-        "room_id",
-        "msg_sender_id"
-    ]
-
-    class EntryType(collections.namedtuple("FeedbackEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=FeedbackEvent.TYPE,
-                room_id=self.room_id,
-                msg_id=self.msg_id,
-                msg_sender_id=self.msg_sender_id,
-                user_id=self.fb_sender_id,
-                feedback_type=self.feedback_type,
-                content=json.loads(self.content),
-            )
+        defer.returnValue(
+            [
+                self._parse_event_from_row(r)
+                for r in rows
+            ]
+        )
diff --git a/synapse/storage/message.py b/synapse/storage/message.py
deleted file mode 100644
index 7bb69c1384..0000000000
--- a/synapse/storage/message.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-from synapse.api.events.room import MessageEvent
-
-import collections
-import json
-
-
-class MessageStore(SQLBaseStore):
-
-    def get_message(self, user_id, room_id, msg_id):
-        """Get a message from the store.
-
-        Args:
-            user_id (str): The ID of the user who sent the message.
-            room_id (str): The room the message was sent in.
-            msg_id (str): The unique ID for this user/room combo.
-        """
-        query = MessagesTable.select_statement(
-            "user_id = ? AND room_id = ? AND msg_id = ? " +
-            "ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            MessagesTable.decode_single_result,
-            query, user_id, room_id, msg_id,
-        )
-
-    def store_message(self, user_id, room_id, msg_id, content):
-        """Store a message in the store.
-
-        Args:
-            user_id (str): The ID of the user who sent the message.
-            room_id (str): The room the message was sent in.
-            msg_id (str): The unique ID for this user/room combo.
-            content (str): The content of the message (JSON)
-        """
-        return self._simple_insert(MessagesTable.table_name, dict(
-            user_id=user_id,
-            room_id=room_id,
-            msg_id=msg_id,
-            content=content,
-        ))
-
-    def get_max_message_id(self):
-        return self._simple_max_id(MessagesTable.table_name)
-
-
-class MessagesTable(Table):
-    table_name = "messages"
-
-    fields = [
-        "id",
-        "user_id",
-        "room_id",
-        "msg_id",
-        "content"
-    ]
-
-    class EntryType(collections.namedtuple("MessageEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=MessageEvent.TYPE,
-                room_id=self.room_id,
-                user_id=self.user_id,
-                msg_id=self.msg_id,
-                content=json.loads(self.content),
-            )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index a97162831b..8f44b67d8c 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -76,49 +76,73 @@ class RoomStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_rooms(self, is_public, with_topics):
+    def get_rooms(self, is_public):
         """Retrieve a list of all public rooms.
 
         Args:
             is_public (bool): True if the rooms returned should be public.
-            with_topics (bool): True to include the current topic for the room
-            in the response.
         Returns:
-            A list of room dicts containing at least a "room_id" key, and a
-            "topic" key if one is set and with_topic=True.
+            A list of room dicts containing at least a "room_id" key, a
+            "topic" key if one is set, and a "name" key if one is set
         """
-        room_data_type = RoomTopicEvent.TYPE
-        public = 1 if is_public else 0
-
-        latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE "
-                        + "room_data.type = ? GROUP BY room_id")
-
-        query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms "
-                 + "LEFT JOIN "
-                 + "room_aliases ON room_aliases.room_id = rooms.room_id "
-                 + "LEFT JOIN "
-                 + "room_data ON rooms.room_id = room_data.room_id WHERE "
-                 + "(room_data.id IN (" + latest_topic + ") "
-                 + "OR room_data.id IS NULL) AND rooms.is_public = ?")
-
-        res = yield self._execute(
-            self.cursor_to_dict, query, room_data_type, public
+
+        topic_subquery = (
+            "SELECT topics.event_id as event_id, topics.room_id as room_id, topic FROM topics "
+            "INNER JOIN current_state_events as c "
+            "ON c.event_id = topics.event_id "
         )
 
-        # return only the keys the specification expects
-        ret_keys = ["room_id", "topic", "room_alias"]
+        name_subquery = (
+            "SELECT room_names.event_id as event_id, room_names.room_id as room_id, name FROM room_names "
+            "INNER JOIN current_state_events as c "
+            "ON c.event_id = room_names.event_id "
+        )
 
-        # extract topic from the json (icky) FIXME
-        for i, room_row in enumerate(res):
-            try:
-                content_json = json.loads(room_row["content"])
-                room_row["topic"] = content_json["topic"]
-            except:
-                pass  # no topic set
-            # filter the dict based on ret_keys
-            res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys}
+        sql = (
+            "SELECT r.room_id, n.name, t.topic, group_concat(a.room_alias) FROM rooms AS r "
+            "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+            "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+            "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+            "WHERE r.is_public = ? "
+            "GROUP BY r.room_id "
+        ) % {
+            "topic": topic_subquery,
+            "name": name_subquery,
+        }
+
+        rows = yield self._execute(None, sql, is_public)
+
+        ret = [
+            {
+                "room_id": r[0],
+                "name": r[1],
+                "topic": r[2],
+                "aliases": r[3].split(","),
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret)
+
+    def _store_room_topic(self, event):
+        return self._simple_insert(
+            "topics",
+            {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "topic": event.topic,
+            }
+        )
 
-        defer.returnValue(res)
+    def _store_room_name(self, event):
+        return self._simple_insert(
+            "room_names",
+            {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "name": event.name,
+            }
+        )
 
 
 class RoomsTable(Table):
diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py
deleted file mode 100644
index cc04d1ba14..0000000000
--- a/synapse/storage/roomdata.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-
-import collections
-import json
-
-
-class RoomDataStore(SQLBaseStore):
-
-    """Provides various CRUD operations for Room Events. """
-
-    def get_room_data(self, room_id, etype, state_key=""):
-        """Retrieve the data stored under this type and state_key.
-
-        Args:
-            room_id (str)
-            etype (str)
-            state_key (str)
-        Returns:
-            namedtuple: Or None if nothing exists at this path.
-        """
-        query = RoomDataTable.select_statement(
-            "room_id = ? AND type = ? AND state_key = ? "
-            "ORDER BY id DESC LIMIT 1"
-        )
-        return self._execute(
-            RoomDataTable.decode_single_result,
-            query, room_id, etype, state_key,
-        )
-
-    def store_room_data(self, room_id, etype, state_key="", content=None):
-        """Stores room specific data.
-
-        Args:
-            room_id (str)
-            etype (str)
-            state_key (str)
-            data (str)- The data to store for this path in JSON.
-        Returns:
-            The store ID for this data.
-        """
-        return self._simple_insert(RoomDataTable.table_name, dict(
-            etype=etype,
-            state_key=state_key,
-            room_id=room_id,
-            content=content,
-        ))
-
-    def get_max_room_data_id(self):
-        return self._simple_max_id(RoomDataTable.table_name)
-
-
-class RoomDataTable(Table):
-    table_name = "room_data"
-
-    fields = [
-        "id",
-        "room_id",
-        "type",
-        "state_key",
-        "content"
-    ]
-
-    class EntryType(collections.namedtuple("RoomDataEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=self.type,
-                room_id=self.room_id,
-                content=json.loads(self.content),
-            )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ef73be4af4..8c4b04f190 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -31,6 +31,38 @@ logger = logging.getLogger(__name__)
 
 class RoomMemberStore(SQLBaseStore):
 
+    @defer.inlineCallbacks
+    def _store_room_member(self, event):
+        """Store a room member in the database.
+        """
+        domain = self.hs.parse_userid(event.target_user_id).domain
+
+        yield self._simple_insert(
+            "room_memberships",
+            {
+                "event_id": event.event_id,
+                "user_id": event.target_user_id,
+                "sender": event.user_id,
+                "room_id": event.room_id,
+                "membership": event.membership,
+            }
+        )
+
+        # Update room hosts table
+        if event.membership == Membership.JOIN:
+            sql = (
+                "INSERT OR IGNORE INTO room_hosts (room_id, host) "
+                "VALUES (?, ?)"
+            )
+            yield self._execute(None, sql, event.room_id, domain)
+        else:
+            sql = (
+                "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
+            )
+
+            yield self._execute(None, sql, event.room_id, domain)
+
+
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
 
@@ -38,36 +70,13 @@ class RoomMemberStore(SQLBaseStore):
             user_id (str): The member's user ID.
             room_id (str): The room the member is in.
         Returns:
-            namedtuple: The room member from the database, or None if this
-            member does not exist.
+            Deferred: Results in a MembershipEvent or None.
         """
-        query = RoomMemberTable.select_statement(
-            "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            RoomMemberTable.decode_single_result,
-            query, room_id, user_id,
-        )
-
-    def store_room_member(self, user_id, sender, room_id, membership, content):
-        """Store a room member in the database.
+        return self._get_members_by_dict({
+            "e.room_id": room_id,
+            "m.user_id": user_id,
+        })
 
-        Args:
-            user_id (str): The member's user ID.
-            room_id (str): The room in relation to the member.
-            membership (synapse.api.constants.Membership): The new membership
-            state.
-            content (dict): The content of the membership (JSON).
-        """
-        content_json = json.dumps(content)
-        return self._simple_insert(RoomMemberTable.table_name, dict(
-            user_id=user_id,
-            sender=sender,
-            room_id=room_id,
-            membership=membership,
-            content=content_json,
-        ))
-
-    @defer.inlineCallbacks
     def get_room_members(self, room_id, membership=None):
         """Retrieve the current room member list for a room.
 
@@ -79,17 +88,12 @@ class RoomMemberStore(SQLBaseStore):
         Returns:
             list of namedtuples representing the members in this room.
         """
-        query = RoomMemberTable.select_statement(
-            "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
-            + " WHERE room_id = ? GROUP BY user_id)"
-        )
-        res = yield self._execute(
-            RoomMemberTable.decode_results, query, room_id,
-        )
-        # strip memberships which don't match
+
+        where = {"m.room_id": room_id}
         if membership:
-            res = [entry for entry in res if entry.membership == membership]
-        defer.returnValue(res)
+            where["m.membership"] = membership
+
+        return self._get_members_by_dict(where)
 
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
@@ -106,67 +110,37 @@ class RoomMemberStore(SQLBaseStore):
             return defer.succeed(None)
 
         args = [user_id]
-        membership_placeholder = ["membership=?"] * len(membership_list)
-        where_membership = "(" + " OR ".join(membership_placeholder) + ")"
-        for membership in membership_list:
-            args.append(membership)
-
-        query = ("SELECT room_id, membership FROM room_memberships"
-                 + " WHERE user_id=? AND " + where_membership
-                 + " GROUP BY room_id ORDER BY id DESC")
-        return self._execute(
-            self.cursor_to_dict, query, *args
-        )
+        args.extend(membership_list)
 
-    @defer.inlineCallbacks
-    def get_joined_hosts_for_room(self, room_id):
-        query = RoomMemberTable.select_statement(
-            "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
-            + " WHERE room_id = ? GROUP BY user_id)"
-        )
-
-        res = yield self._execute(
-            RoomMemberTable.decode_results, query, room_id,
+        where_clause = "user_id = ? AND (%s)" % (
+            " OR ".join(["membership = ?" for _ in membership_list]),
         )
 
-        def host_from_user_id_string(user_id):
-            domain = UserID.from_string(entry.user_id, self.hs).domain
-            return domain
-
-        # strip memberships which don't match
-        hosts = [
-            host_from_user_id_string(entry.user_id)
-            for entry in res
-            if entry.membership == Membership.JOIN
-        ]
+        return self._get_members_query(where_clause, args)
 
-        logger.debug("Returning hosts: %s from results: %s", hosts, res)
-
-        defer.returnValue(hosts)
-
-    def get_max_room_member_id(self):
-        return self._simple_max_id(RoomMemberTable.table_name)
-
-
-class RoomMemberTable(Table):
-    table_name = "room_memberships"
-
-    fields = [
-        "id",
-        "user_id",
-        "sender",
-        "room_id",
-        "membership",
-        "content"
-    ]
+    def get_joined_hosts_for_room(self, room_id):
+        return self._simple_select_onecol(
+            "room_hosts",
+            {"room_id": room_id},
+            "host"
+        )
 
-    class EntryType(collections.namedtuple("RoomMemberEntry", fields)):
+    def _get_members_by_dict(self, where_dict):
+        clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
+        vals = where_dict.values()
+        return self._get_members_query(clause, vals)
 
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=RoomMemberEvent.TYPE,
-                room_id=self.room_id,
-                target_user_id=self.user_id,
-                user_id=self.sender,
-                content=json.loads(self.content),
-            )
+    @defer.inlineCallbacks
+    def _get_members_query(self, where_clause, where_values):
+        sql = (
+            "SELECT e.* FROM events as e "
+            "INNER JOIN room_memberships as m "
+            "ON e.event_id = m.event_id "
+            "INNER JOIN current_state_events as c "
+            "ON m.event_id = c.event_id "
+            "WHERE %s "
+        ) % (where_clause,)
+
+        rows = yield self._execute_and_decode(sql, *where_values)
+        results = [self._parse_event_from_row(r) for r in rows]
+        defer.returnValue(results)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 77096546b2..9a0f2145d5 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -12,43 +12,64 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-CREATE TABLE IF NOT EXISTS rooms(
-    room_id TEXT PRIMARY KEY NOT NULL,
-    is_public INTEGER,
-    creator TEXT
+
+CREATE TABLE IF NOT EXISTS events(
+    ordering INTEGER PRIMARY KEY AUTOINCREMENT,
+    event_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    room_id TEXT,
+    content TEXT,
+    unrecognized_keys TEXT
 );
 
-CREATE TABLE IF NOT EXISTS room_memberships(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server
-    sender TEXT NOT NULL,
+CREATE TABLE IF NOT EXISTS state_events(
+    event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    membership TEXT NOT NULL,
-    content TEXT NOT NULL
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    prev_state TEXT
 );
 
-CREATE TABLE IF NOT EXISTS messages(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id TEXT, 
-    room_id TEXT,
-    msg_id TEXT,
-    content TEXT
+CREATE TABLE IF NOT EXISTS current_state_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    sender TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    membership TEXT NOT NULL
 );
 
 CREATE TABLE IF NOT EXISTS feedback(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    content TEXT,
+    event_id TEXT NOT NULL,
     feedback_type TEXT,
-    fb_sender_id TEXT,
-    msg_id TEXT,
-    room_id TEXT,
-    msg_sender_id TEXT
+    target_event_id TEXT,
+    sender TEXT,
+    room_id TEXT
 );
 
-CREATE TABLE IF NOT EXISTS room_data(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
+CREATE TABLE IF NOT EXISTS topics(
+    event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    content TEXT
+    topic TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS room_names(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    name TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS rooms(
+    room_id TEXT PRIMARY KEY NOT NULL,
+    is_public INTEGER,
+    creator TEXT
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+    room_id TEXT NOT NULL,
+    host TEXT NOT NULL
 );
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 47a1f2c45a..9937239c22 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -13,267 +13,59 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from ._base import SQLBaseStore
-from .message import MessagesTable
-from .feedback import FeedbackTable
-from .roomdata import RoomDataTable
-from .roommember import RoomMemberTable
+
+from synapse.api.constants import Membership
 
 import json
 import logging
 
-logger = logging.getLogger(__name__)
-
-
-class StreamStore(SQLBaseStore):
-
-    def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0,
-                           with_feedback=False):
-        """Get all messages for this user between the given keys.
-
-        Args:
-            user_id (str): The user who is requesting messages.
-            from_key (int): The ID to start returning results from (exclusive).
-            to_key (int): The ID to stop returning results (exclusive).
-            room_id (str): Gets messages only for this room. Can be None, in
-            which case all room messages will be returned.
-        Returns:
-            A tuple of rows (list of namedtuples), new_id(int)
-        """
-        if with_feedback and room_id:  # with fb MUST specify a room ID
-            return self._db_pool.runInteraction(
-                self._get_message_rows_with_feedback,
-                user_id, from_key, to_key, room_id, limit
-            )
-        else:
-            return self._db_pool.runInteraction(
-                self._get_message_rows,
-                user_id, from_key, to_key, room_id, limit
-            )
-
-    def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                          limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the messages table, bounded by the specified pkeys
-
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = ("SELECT messages.* FROM messages WHERE ? IN"
-                 + " (SELECT membership from room_memberships WHERE user_id=?"
-                 + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
-
-        if room_id:
-            query += " AND messages.room_id=?"
-            query_args.append(room_id)
-
-        (query, query_args) = self._append_stream_operations(
-            "messages", query, query_args, from_pkey, to_pkey, limit=limit
-        )
-
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, MessagesTable, from_pkey)
-
-    def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey,
-                                        room_id, limit):
-        # this col represents the compressed feedback JSON as per spec
-        compressed_feedback_col = (
-            "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id"
-            + " || '\",\"feedback_type\":\"' || f.feedback_type"
-            + " || '\",\"content\":' || f.content || '}') || ']'"
-        )
-
-        global_msg_id_join = ("f.room_id = messages.room_id"
-                              + " and f.msg_id = messages.msg_id"
-                              + " and messages.user_id = f.msg_sender_id")
-
-        select_query = (
-            "SELECT messages.*, f.content AS fb_content, f.fb_sender_id"
-            + ", " + compressed_feedback_col + " AS compressed_fb"
-            + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join)
-
-        current_membership_sub_query = (
-            "(SELECT membership from room_memberships rm"
-            + " WHERE user_id=? AND room_id = rm.room_id"
-            + " ORDER BY id DESC LIMIT 1)")
-
-        where = (" WHERE ? IN " + current_membership_sub_query
-                 + " AND messages.room_id=?")
-
-        query = select_query + where
-        query_args = ["join", user_id, room_id]
-
-        (query, query_args) = self._append_stream_operations(
-            "messages", query, query_args, from_pkey, to_pkey,
-            limit=limit, group_by=" GROUP BY messages.id "
-        )
-
-        cursor = txn.execute(query, query_args)
 
-        # convert the result set into events
-        entries = self.cursor_to_dict(cursor)
-        events = []
-        for entry in entries:
-            # TODO we should spec the cursor > event mapping somewhere else.
-            event = {}
-            straight_mappings = ["msg_id", "user_id", "room_id"]
-            for key in straight_mappings:
-                event[key] = entry[key]
-            event["content"] = json.loads(entry["content"])
-            if entry["compressed_fb"]:
-                event["feedback"] = json.loads(entry["compressed_fb"])
-            events.append(event)
-
-        latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"]
-
-        return (events, latest_pkey)
-
-    def get_room_member_stream(self, user_id, from_key, to_key):
-        """Get all room membership events for this user between the given keys.
-
-        Args:
-            user_id (str): The user who is requesting membership events.
-            from_key (int): The ID to start returning results from (exclusive).
-            to_key (int): The ID to stop returning results (exclusive).
-        Returns:
-            A tuple of rows (list of namedtuples), new_id(int)
-        """
-        return self._db_pool.runInteraction(
-            self._get_room_member_rows, user_id, from_key, to_key
-        )
-
-    def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey):
-        # get all room membership events for rooms which the user is
-        # *currently* joined in on, or all invite events for this user.
-        current_membership_sub_query = (
-            "(SELECT membership FROM room_memberships"
-            + " WHERE user_id=? AND room_id = rm.room_id"
-            + " ORDER BY id DESC LIMIT 1)")
-
-        query = ("SELECT rm.* FROM room_memberships rm "
-                 # all membership events for rooms you've currently joined.
-                 + " WHERE (? IN " + current_membership_sub_query
-                 # all invite membership events for this user
-                 + " OR rm.membership=? AND user_id=?)"
-                 + " AND rm.id > ?")
-        query_args = ["join", user_id, "invite", user_id, from_pkey]
-
-        if to_pkey != -1:
-            query += " AND rm.id < ?"
-            query_args.append(to_pkey)
+logger = logging.getLogger(__name__)
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, RoomMemberTable, from_pkey)
 
-    def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0):
-        return self._db_pool.runInteraction(
-            self._get_feedback_rows,
-            user_id, from_key, to_key, room_id, limit
-        )
+MAX_STREAM_SIZE = 1000
 
-    def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                           limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the feedback table, bounded by the specified pkeys
 
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = (
-            "SELECT feedback.* FROM feedback WHERE ? IN "
-            + "(SELECT membership from room_memberships WHERE user_id=?"
-            + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
+class StreamStore(SQLBaseStore):
 
-        if room_id:
-            query += " AND feedback.room_id=?"
-            query_args.append(room_id)
+    @defer.inlineCallbacks
+    def get_room_events_stream(self, user_id, from_key, to_key, room_id,
+                               limit=0, with_feedback=False):
 
-        (query, query_args) = self._append_stream_operations(
-            "feedback", query, query_args, from_pkey, to_pkey, limit=limit
+        current_room_membership_sql = (
+            "SELECT m.room_id FROM room_memberships as m "
+            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+            "WHERE m.user_id = ?"
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, FeedbackTable, from_pkey)
-
-    def get_room_data_stream(self, user_id, from_key, to_key, room_id,
-                             limit=0):
-        return self._db_pool.runInteraction(
-            self._get_room_data_rows,
-            user_id, from_key, to_key, room_id, limit
+        invites_sql = (
+            "SELECT m.event_id FROM room_membershipas as m "
+            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+            "WHERE m.user_id = ? AND m.membership = ?"
         )
 
-    def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                            limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the feedback table, bounded by the specified pkeys
-
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = (
-            "SELECT room_data.* FROM room_data WHERE ? IN "
-            + "(SELECT membership from room_memberships WHERE user_id=?"
-            + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
-
-        if room_id:
-            query += " AND room_data.room_id=?"
-            query_args.append(room_id)
-
-        (query, query_args) = self._append_stream_operations(
-            "room_data", query, query_args, from_pkey, to_pkey, limit=limit
+        if limit:
+            limit = max(limit, MAX_STREAM_SIZE)
+        else:
+            limit = 1000
+
+        sql = (
+            "SELECT * FROM events as e WHERE "
+            "(room_id IN (%(current)s)) OR "
+            "(event_id IN (%(invites)s)) "
+            "ORDER BY ordering ASC LIMIT %(limit)d"
+        ) % {
+            "current": current_room_membership_sql,
+            "invites": invites_sql,
+            "limit": limit,
+        }
+
+        rows = yield self._execute_and_decode(
+            sql,
+            user_id, user_id, Membership.INVITE
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, RoomDataTable, from_pkey)
-
-    def _append_stream_operations(self, table_name, query, query_args,
-                                  from_pkey, to_pkey, limit=None,
-                                  group_by=""):
-        LATEST_ROW = -1
-        order_by = ""
-        if to_pkey > from_pkey:
-            if from_pkey != LATEST_ROW:
-                # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9
-                query += (" AND %s.id > ? AND %s.id < ?" %
-                         (table_name, table_name))
-                query_args.append(from_pkey)
-                query_args.append(to_pkey)
-            else:
-                # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC
-                query += " AND %s.id > ? " % table_name
-                order_by = "ORDER BY id DESC"
-                query_args.append(to_pkey)
-        elif from_pkey > to_pkey:
-            if to_pkey != LATEST_ROW:
-                # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC
-                query += (" AND %s.id > ? AND %s.id < ? " %
-                          (table_name, table_name))
-                order_by = "ORDER BY id DESC"
-                query_args.append(to_pkey)
-                query_args.append(from_pkey)
-            else:
-                # from=5 to=-1 >> from 5 to now >> id>5
-                query += " AND %s.id > ?" % table_name
-                query_args.append(from_pkey)
-
-        query += group_by + order_by
-
-        if limit and limit > 0:
-            query += " LIMIT ?"
-            query_args.append(str(limit))
-
-        return (query, query_args)
-
-    def _as_events(self, cursor, table, from_pkey):
-        data_entries = table.decode_results(cursor)
-        last_pkey = from_pkey
-        if data_entries:
-            last_pkey = data_entries[-1].id
-
-        events = [
-            entry.as_event(self.event_factory).get_dict()
-            for entry in data_entries
-        ]
-
-        return (events, last_pkey)
+        defer.returnValue([self._parse_event_from_row(r) for r in rows])