summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/events/factory.py10
-rw-r--r--synapse/api/events/room.py30
-rw-r--r--synapse/api/notifier.py3
-rw-r--r--synapse/api/streams/__init__.py19
-rw-r--r--synapse/api/streams/event.py173
-rw-r--r--synapse/federation/handler.py26
-rw-r--r--synapse/federation/replication.py9
-rw-r--r--synapse/handlers/events.py8
-rw-r--r--synapse/handlers/federation.py21
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/handlers/room.py102
-rw-r--r--synapse/rest/room.py59
-rw-r--r--synapse/state.py2
-rw-r--r--synapse/storage/__init__.py179
-rw-r--r--synapse/storage/_base.py28
-rw-r--r--synapse/storage/feedback.py72
-rw-r--r--synapse/storage/message.py81
-rw-r--r--synapse/storage/pdu.py22
-rw-r--r--synapse/storage/room.py97
-rw-r--r--synapse/storage/roomdata.py85
-rw-r--r--synapse/storage/roommember.py172
-rw-r--r--synapse/storage/schema/im.sql81
-rw-r--r--synapse/storage/stream.py445
23 files changed, 857 insertions, 869 deletions
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index 12aa04fc6e..b61dac7acd 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -15,7 +15,7 @@
 
 from synapse.api.events.room import (
     RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent,
-    InviteJoinEvent, RoomConfigEvent
+    InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent,
 )
 
 from synapse.util.stringutils import random_string
@@ -25,6 +25,7 @@ class EventFactory(object):
 
     _event_classes = [
         RoomTopicEvent,
+        RoomNameEvent,
         MessageEvent,
         RoomMemberEvent,
         FeedbackEvent,
@@ -42,10 +43,9 @@ class EventFactory(object):
         if "event_id" not in kwargs:
             kwargs["event_id"] = random_string(10)
 
-        try:
+        if etype in self._event_list:
             handler = self._event_list[etype]
-        except KeyError:  # unknown event type
-            # TODO allow custom event types.
-            raise NotImplementedError("Unknown etype=%s" % etype)
+        else:
+            handler = GenericEvent
 
         return handler(**kwargs)
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
index f3df849af2..42459f3f21 100644
--- a/synapse/api/events/room.py
+++ b/synapse/api/events/room.py
@@ -16,17 +16,45 @@
 from . import SynapseEvent
 
 
+class GenericEvent(SynapseEvent):
+    def get_content_template(self):
+        return {}
+
+
 class RoomTopicEvent(SynapseEvent):
     TYPE = "m.room.topic"
 
+    internal_keys = SynapseEvent.internal_keys + [
+        "topic",
+    ]
+
     def __init__(self, **kwargs):
         kwargs["state_key"] = ""
+        if "topic" in kwargs["content"]:
+            kwargs["topic"] = kwargs["content"]["topic"]
         super(RoomTopicEvent, self).__init__(**kwargs)
 
     def get_content_template(self):
         return {"topic": u"string"}
 
 
+class RoomNameEvent(SynapseEvent):
+    TYPE = "m.room.name"
+
+    internal_keys = SynapseEvent.internal_keys + [
+        "name",
+    ]
+
+    def __init__(self, **kwargs):
+        kwargs["state_key"] = ""
+        if "name" in kwargs["content"]:
+            kwargs["name"] = kwargs["content"]["name"]
+        super(RoomNameEvent, self).__init__(**kwargs)
+
+    def get_content_template(self):
+        return {"name": u"string"}
+
+
 class RoomMemberEvent(SynapseEvent):
     TYPE = "m.room.member"
 
@@ -38,6 +66,8 @@ class RoomMemberEvent(SynapseEvent):
     def __init__(self, **kwargs):
         if "target_user_id" in kwargs:
             kwargs["state_key"] = kwargs["target_user_id"]
+        if "membership" not in kwargs:
+            kwargs["membership"] = kwargs.get("content", {}).get("membership")
         super(RoomMemberEvent, self).__init__(**kwargs)
 
     def get_content_template(self):
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
index 65b5a4ebb3..9f622df6bb 100644
--- a/synapse/api/notifier.py
+++ b/synapse/api/notifier.py
@@ -15,6 +15,7 @@
 
 from synapse.api.constants import Membership
 from synapse.api.events.room import RoomMemberEvent
+from synapse.api.streams.event import EventsStreamData
 
 from twisted.internet import defer
 from twisted.internet import reactor
@@ -66,7 +67,7 @@ class Notifier(object):
                 self._notify_and_callback(
                     user_id=user_id,
                     event_data=event.get_dict(),
-                    stream_type=event.type,
+                    stream_type=EventsStreamData.EVENT_TYPE,
                     store_id=store_id)
 
     def on_new_user_event(self, user_id, event_data, stream_type, store_id):
diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py
index 989e63f9ec..44f4cc6078 100644
--- a/synapse/api/streams/__init__.py
+++ b/synapse/api/streams/__init__.py
@@ -20,23 +20,23 @@ class PaginationConfig(object):
 
     """A configuration object which stores pagination parameters."""
 
-    def __init__(self, from_tok=None, to_tok=None, limit=0):
+    def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0):
         self.from_tok = from_tok
         self.to_tok = to_tok
+        self.direction = direction
         self.limit = limit
 
     @classmethod
     def from_request(cls, request, raise_invalid_params=True):
         params = {
-            "from_tok": PaginationStream.TOK_START,
-            "to_tok": PaginationStream.TOK_END,
-            "limit": 0
+            "direction": 'f',
         }
 
         query_param_mappings = [  # 3-tuple of qp_key, attribute, rules
             ("from", "from_tok", lambda x: type(x) == str),
             ("to", "to_tok", lambda x: type(x) == str),
-            ("limit", "limit", lambda x: x.isdigit())
+            ("limit", "limit", lambda x: x.isdigit()),
+            ("dir", "direction", lambda x: x == 'f' or x == 'b'),
         ]
 
         for qp, attr, is_valid in query_param_mappings:
@@ -48,12 +48,17 @@ class PaginationConfig(object):
 
         return PaginationConfig(**params)
 
+    def __str__(self):
+        return (
+            "<PaginationConfig from_tok=%s, to_tok=%s, "
+            "direction=%s, limit=%s>"
+        ) % (self.from_tok, self.to_tok, self.direction, self.limit)
+
 
 class PaginationStream(object):
 
     """ An interface for streaming data as chunks. """
 
-    TOK_START = "START"
     TOK_END = "END"
 
     def get_chunk(self, config=None):
@@ -76,7 +81,7 @@ class StreamData(object):
         self.hs = hs
         self.store = hs.get_datastore()
 
-    def get_rows(self, user_id, from_pkey, to_pkey, limit):
+    def get_rows(self, user_id, from_pkey, to_pkey, limit, direction):
         """ Get event stream data between the specified pkeys.
 
         Args:
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 4b6d739e54..a5c8b2b31f 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -18,6 +18,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import EventStreamError
+from synapse.api.events import SynapseEvent
 from synapse.api.events.room import (
     RoomMemberEvent, MessageEvent, FeedbackEvent, RoomTopicEvent
 )
@@ -28,17 +29,17 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class MessagesStreamData(StreamData):
-    EVENT_TYPE = MessageEvent.TYPE
+class EventsStreamData(StreamData):
+    EVENT_TYPE = "EventsStream"
 
     def __init__(self, hs, room_id=None, feedback=False):
-        super(MessagesStreamData, self).__init__(hs)
+        super(EventsStreamData, self).__init__(hs)
         self.room_id = room_id
         self.with_feedback = feedback
 
     @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_message_stream(
+    def get_rows(self, user_id, from_key, to_key, limit, direction):
+        data, latest_ver = yield self.store.get_room_events(
             user_id=user_id,
             from_key=from_key,
             to_key=to_key,
@@ -50,74 +51,7 @@ class MessagesStreamData(StreamData):
 
     @defer.inlineCallbacks
     def max_token(self):
-        val = yield self.store.get_max_message_id()
-        defer.returnValue(val)
-
-
-class RoomMemberStreamData(StreamData):
-    EVENT_TYPE = RoomMemberEvent.TYPE
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_room_member_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key
-        )
-
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_room_member_id()
-        defer.returnValue(val)
-
-
-class FeedbackStreamData(StreamData):
-    EVENT_TYPE = FeedbackEvent.TYPE
-
-    def __init__(self, hs, room_id=None):
-        super(FeedbackStreamData, self).__init__(hs)
-        self.room_id = room_id
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_feedback_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key,
-            limit=limit,
-            room_id=self.room_id
-        )
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_feedback_id()
-        defer.returnValue(val)
-
-
-class RoomDataStreamData(StreamData):
-    EVENT_TYPE = RoomTopicEvent.TYPE  # TODO need multiple event types
-
-    def __init__(self, hs, room_id=None):
-        super(RoomDataStreamData, self).__init__(hs)
-        self.room_id = room_id
-
-    @defer.inlineCallbacks
-    def get_rows(self, user_id, from_key, to_key, limit):
-        (data, latest_ver) = yield self.store.get_room_data_stream(
-            user_id=user_id,
-            from_key=from_key,
-            to_key=to_key,
-            limit=limit,
-            room_id=self.room_id
-        )
-        defer.returnValue((data, latest_ver))
-
-    @defer.inlineCallbacks
-    def max_token(self):
-        val = yield self.store.get_max_room_data_id()
+        val = yield self.store.get_room_events_max_id()
         defer.returnValue(val)
 
 
@@ -136,6 +70,15 @@ class EventStream(PaginationStream):
             pagination_config.from_tok)
         pagination_config.to_tok = yield self.fix_token(
             pagination_config.to_tok)
+
+        if (
+            not pagination_config.to_tok
+            and pagination_config.direction == 'f'
+        ):
+            pagination_config.to_tok = yield self.get_current_max_token()
+
+        logger.debug("pagination_config: %s", pagination_config)
+
         defer.returnValue(pagination_config)
 
     @defer.inlineCallbacks
@@ -147,39 +90,42 @@ class EventStream(PaginationStream):
         Returns:
             The fixed-up token, which may == token.
         """
-        # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending.
-        replacements = [
-            (PaginationStream.TOK_START, "0"),
-            (PaginationStream.TOK_END, "-1")
-        ]
-        for magic_token, key in replacements:
-            if magic_token == token:
-                token = EventStream.SEPARATOR.join(
-                    [key] * len(self.stream_data)
-                )
-
-        # replace -1 values with an actual pkey
-        token_segments = self._split_token(token)
-        for i, tok in enumerate(token_segments):
-            if tok == -1:
-                # add 1 to the max token because results are EXCLUSIVE from the
-                # latest version.
-                token_segments[i] = 1 + (yield self.stream_data[i].max_token())
-        defer.returnValue(EventStream.SEPARATOR.join(
-            str(x) for x in token_segments
-        ))
+        if token == PaginationStream.TOK_END:
+            new_token = yield self.get_current_max_token()
+
+            logger.debug("fix_token: From %s to %s", token, new_token)
+
+            token = new_token
+
+        defer.returnValue(token)
 
     @defer.inlineCallbacks
-    def get_chunk(self, config=None):
+    def get_current_max_token(self):
+        new_token_parts = []
+        for s in self.stream_data:
+            mx = yield s.max_token()
+            new_token_parts.append(str(mx))
+
+        new_token = EventStream.SEPARATOR.join(new_token_parts)
+
+        logger.debug("get_current_max_token: %s", new_token)
+
+        defer.returnValue(new_token)
+
+    @defer.inlineCallbacks
+    def get_chunk(self, config):
         # no support for limit on >1 streams, makes no sense.
         if config.limit and len(self.stream_data) > 1:
             raise EventStreamError(
                 400, "Limit not supported on multiplexed streams."
             )
 
-        (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok,
-                                                            config.to_tok,
-                                                            config.limit)
+        chunk_data, next_tok = yield self._get_chunk_data(
+            config.from_tok,
+            config.to_tok,
+            config.limit,
+            config.direction,
+        )
 
         defer.returnValue({
             "chunk": chunk_data,
@@ -188,7 +134,7 @@ class EventStream(PaginationStream):
         })
 
     @defer.inlineCallbacks
-    def _get_chunk_data(self, from_tok, to_tok, limit):
+    def _get_chunk_data(self, from_tok, to_tok, limit, direction):
         """ Get event data between the two tokens.
 
         Tokens are SEPARATOR separated values representing pkey values of
@@ -206,11 +152,12 @@ class EventStream(PaginationStream):
             EventStreamError if something went wrong.
         """
         # sanity check
-        if (from_tok.count(EventStream.SEPARATOR) !=
-                to_tok.count(EventStream.SEPARATOR) or
-                (from_tok.count(EventStream.SEPARATOR) + 1) !=
-                len(self.stream_data)):
-            raise EventStreamError(400, "Token lengths don't match.")
+        if to_tok is not None:
+            if (from_tok.count(EventStream.SEPARATOR) !=
+                    to_tok.count(EventStream.SEPARATOR) or
+                    (from_tok.count(EventStream.SEPARATOR) + 1) !=
+                    len(self.stream_data)):
+                raise EventStreamError(400, "Token lengths don't match.")
 
         chunk = []
         next_ver = []
@@ -224,10 +171,13 @@ class EventStream(PaginationStream):
                 continue
 
             (event_chunk, max_pkey) = yield self.stream_data[i].get_rows(
-                self.user_id, from_pkey, to_pkey, limit
+                self.user_id, from_pkey, to_pkey, limit, direction,
             )
 
-            chunk += event_chunk
+            chunk.extend([
+                e.get_dict() if isinstance(e, SynapseEvent) else e
+                for e in event_chunk
+            ])
             next_ver.append(str(max_pkey))
 
         defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
@@ -240,9 +190,8 @@ class EventStream(PaginationStream):
         Returns:
             A list of ints.
         """
-        segments = token.split(EventStream.SEPARATOR)
-        try:
-            int_segments = [int(x) for x in segments]
-        except ValueError:
-            raise EventStreamError(400, "Bad token: %s" % token)
-        return int_segments
+        if token:
+            segments = token.split(EventStream.SEPARATOR)
+        else:
+            segments = [None] * len(self.stream_data)
+        return segments
diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py
index 580e591aca..984c1558e9 100644
--- a/synapse/federation/handler.py
+++ b/synapse/federation/handler.py
@@ -63,7 +63,7 @@ class FederationEventHandler(object):
             Deferred: Resolved when it has successfully been queued for
             processing.
         """
-        yield self._fill_out_prev_events(event)
+        yield self.fill_out_prev_events(event)
 
         pdu = self.pdu_codec.pdu_from_event(event)
 
@@ -74,10 +74,18 @@ class FederationEventHandler(object):
 
     @log_function
     @defer.inlineCallbacks
-    def backfill(self, room_id, limit):
-        # TODO: Work out which destinations to ask for backfill
-        # self.replication_layer.backfill(dest, room_id, limit)
-        pass
+    def backfill(self, dest, room_id, limit):
+        pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+
+        if not pdus:
+            defer.returnValue([])
+
+        events = [
+            self.pdu_codec.event_from_pdu(pdu)
+            for pdu in pdus
+        ]
+
+        defer.returnValue(events)
 
     @log_function
     def get_state_for_room(self, destination, room_id):
@@ -87,7 +95,7 @@ class FederationEventHandler(object):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, pdu):
+    def on_receive_pdu(self, pdu, backfilled):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it throught the StateHandler.
         """
@@ -95,7 +103,7 @@ class FederationEventHandler(object):
 
         try:
             with (yield self.lock_manager.lock(pdu.context)):
-                if event.is_state:
+                if event.is_state and not backfilled:
                     is_new_state = yield self.state_handler.handle_new_state(
                         pdu
                     )
@@ -104,7 +112,7 @@ class FederationEventHandler(object):
                 else:
                     is_new_state = False
 
-            yield self.event_handler.on_receive(event, is_new_state)
+            yield self.event_handler.on_receive(event, is_new_state, backfilled)
 
         except AuthError:
             # TODO: Implement something in federation that allows us to
@@ -129,7 +137,7 @@ class FederationEventHandler(object):
         yield self.event_handler.on_receive(new_state_event)
 
     @defer.inlineCallbacks
-    def _fill_out_prev_events(self, event):
+    def fill_out_prev_events(self, event):
         if hasattr(event, "prev_events"):
             return
 
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index c9f2e06b7b..8030d0963f 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -209,7 +209,7 @@ class ReplicationLayer(object):
 
         pdus = [Pdu(outlier=False, **p) for p in transaction.pdus]
         for pdu in pdus:
-            yield self._handle_new_pdu(pdu)
+            yield self._handle_new_pdu(pdu, backfilled=True)
 
         defer.returnValue(pdus)
 
@@ -416,7 +416,7 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_pdu(self, pdu):
+    def _handle_new_pdu(self, pdu, backfilled=False):
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
 
@@ -452,7 +452,10 @@ class ReplicationLayer(object):
         # Persist the Pdu, but don't mark it as processed yet.
         yield self.pdu_actions.persist_received(pdu)
 
-        ret = yield self.handler.on_receive_pdu(pdu)
+        if not backfilled:
+            ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled)
+        else:
+            ret = None
 
         yield self.pdu_actions.mark_as_processed(pdu)
 
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.api.streams.event import (
-    EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
-    RoomDataStreamData
+    EventStream, EventsStreamData
 )
 from synapse.handlers.presence import PresenceStreamData
 
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
 class EventStreamHandler(BaseHandler):
 
     stream_data_classes = [
-        MessagesStreamData,
-        RoomMemberStreamData,
-        FeedbackStreamData,
-        RoomDataStreamData,
+        EventsStreamData,
         PresenceStreamData,
     ]
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7026df90a2..0430a8307e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -35,7 +35,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive(self, event, is_new_state):
+    def on_receive(self, event, is_new_state, backfilled):
         if hasattr(event, "state_key") and not is_new_state:
             logger.debug("Ignoring old state.")
             return
@@ -70,6 +70,21 @@ class FederationHandler(BaseHandler):
 
         else:
             with (yield self.room_lock.lock(event.room_id)):
-                store_id = yield self.store.persist_event(event)
+                store_id = yield self.store.persist_event(event, backfilled)
 
-            yield self.notifier.on_new_room_event(event, store_id)
+            if not backfilled:
+                yield self.notifier.on_new_room_event(event, store_id)
+
+
+    @log_function
+    @defer.inlineCallbacks
+    def backfill(self, dest, room_id, limit):
+        events = yield self.hs.get_federation().backfill(dest, room_id, limit)
+
+        for event in events:
+            try:
+                yield self.store.persist_event(event, backfilled=True)
+            except:
+                logger.exception("Failed to persist event: %s", event)
+
+        defer.returnValue(events)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7e725d1027..60684f17d7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -688,7 +688,7 @@ class PresenceStreamData(StreamData):
         super(PresenceStreamData, self).__init__(hs)
         self.presence = hs.get_handlers().presence_handler
 
-    def get_rows(self, user_id, from_key, to_key, limit):
+    def get_rows(self, user_id, from_key, to_key, limit, direction):
         cachemap = self.presence._user_cachemap
 
         # TODO(paul): limit, and filter by visibility
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5d0379254b..40867ae2e0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.api.events.room import (
     RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
     RoomConfigEvent
 )
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
 from synapse.util import stringutils
 from ._base import BaseHandler
 
@@ -59,12 +59,14 @@ class MessageHandler(BaseHandler):
         yield self.auth.check_joined_room(room_id, user_id)
 
         # Pull out the message from the db
-        msg = yield self.store.get_message(room_id=room_id,
-                                           msg_id=msg_id,
-                                           user_id=sender_id)
+#        msg = yield self.store.get_message(
+#            room_id=room_id,
+#            msg_id=msg_id,
+#            user_id=sender_id
+#        )
+
+        # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this.
 
-        if msg:
-            defer.returnValue(msg)
         defer.returnValue(None)
 
     @defer.inlineCallbacks
@@ -114,8 +116,9 @@ class MessageHandler(BaseHandler):
         """
         yield self.auth.check_joined_room(room_id, user_id)
 
-        data_source = [MessagesStreamData(self.hs, room_id=room_id,
-                                          feedback=feedback)]
+        data_source = [
+            EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
+        ]
         event_stream = EventStream(user_id, data_source)
         pagin_config = yield event_stream.fix_tokens(pagin_config)
         data_chunk = yield event_stream.get_chunk(config=pagin_config)
@@ -141,12 +144,7 @@ class MessageHandler(BaseHandler):
             yield self.state_handler.handle_new_event(event)
 
             # store in db
-            store_id = yield self.store.store_room_data(
-                room_id=event.room_id,
-                etype=event.type,
-                state_key=event.state_key,
-                content=json.dumps(event.content)
-            )
+            store_id = yield self.store.persist_event(event)
 
             event.destinations = yield self.store.get_joined_hosts_for_room(
                 event.room_id
@@ -201,19 +199,17 @@ class MessageHandler(BaseHandler):
                 raise RoomError(
                     403, "Member does not meet private room rules.")
 
-        data = yield self.store.get_room_data(room_id, event_type, state_key)
+        data = yield self.store.get_current_state(
+            room_id, event_type, state_key
+        )
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
-                     user_id=None, fb_sender_id=None, fb_type=None):
-        yield self.auth.check_joined_room(room_id, user_id)
+    def get_feedback(self, event_id):
+        # yield self.auth.check_joined_room(room_id, user_id)
 
         # Pull out the feedback from the db
-        fb = yield self.store.get_feedback(
-            room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
-            fb_sender_id=fb_sender_id, fb_type=fb_type
-        )
+        fb = yield self.store.get_feedback(event_id)
 
         if fb:
             defer.returnValue(fb)
@@ -260,20 +256,35 @@ class MessageHandler(BaseHandler):
             user_id=user_id,
             membership_list=[Membership.INVITE, Membership.JOIN]
         )
-        for room_info in room_list:
-            if room_info["membership"] != Membership.JOIN:
+
+        ret = []
+
+        for event in room_list:
+            d = {
+                "room_id": event.room_id,
+                "membership": event.membership,
+            }
+            ret.append(d)
+
+            if event.membership != Membership.JOIN:
                 continue
             try:
-                event_chunk = yield self.get_messages(
-                    user_id=user_id,
-                    pagin_config=pagin_config,
-                    feedback=feedback,
-                    room_id=room_info["room_id"]
+                messages, token = yield self.store.get_recent_events_for_room(
+                    event.room_id,
+                    limit=50,
                 )
-                room_info["messages"] = event_chunk
+
+                d["messages"] = {
+                    "chunk": [m.get_dict() for m in messages],
+                    "start": token[0],
+                    "end": token[1],
+                }
             except:
-                pass
-        defer.returnValue(room_list)
+                logger.exception("Failed to get snapshot")
+
+        logger.debug("snapshot_all_rooms returning: %s", ret)
+
+        defer.returnValue(ret)
 
 
 class RoomCreationHandler(BaseHandler):
@@ -451,11 +462,11 @@ class RoomMemberHandler(BaseHandler):
 
         member_list = yield self.store.get_room_members(room_id=room_id)
         event_list = [
-            entry.as_event(self.event_factory).get_dict()
+            entry.get_dict()
             for entry in member_list
         ]
         chunk_data = {
-            "start": "START",
+            "start": "START",  # FIXME (erikj): START is no longer a valid value
             "end": "END",
             "chunk": event_list
         }
@@ -495,7 +506,7 @@ class RoomMemberHandler(BaseHandler):
             SynapseError if there was a problem changing the membership.
         """
 
-        #broadcast_msg = False
+        # broadcast_msg = False
 
         prev_state = yield self.store.get_room_member(
             event.target_user_id, event.room_id
@@ -569,7 +580,8 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+    def _do_join(self, event, room_host=None, do_auth=True,
+                 broadcast_msg=True):
         joinee = self.hs.parse_userid(event.target_user_id)
         # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
@@ -598,7 +610,7 @@ class RoomMemberHandler(BaseHandler):
             if prev_state and prev_state.membership == Membership.INVITE:
                 room = yield self.store.get_room(room_id)
                 inviter = UserID.from_string(
-                    prev_state.sender, self.hs
+                    prev_state.user_id, self.hs
                 )
 
                 should_do_dance = not inviter.is_mine and not room
@@ -620,7 +632,6 @@ class RoomMemberHandler(BaseHandler):
                 broadcast_msg=broadcast_msg,
             )
 
-
         if should_do_dance:
             yield self._do_invite_join_dance(
                 room_id=room_id,
@@ -694,18 +705,12 @@ class RoomMemberHandler(BaseHandler):
             user_id=user.to_string(), membership_list=membership_list
         )
 
-        defer.returnValue([r["room_id"] for r in rooms])
+        defer.returnValue([r.room_id for r in rooms])
 
     @defer.inlineCallbacks
     def _do_local_membership_update(self, event, membership, broadcast_msg):
         # store membership
-        store_id = yield self.store.store_room_member(
-            user_id=event.target_user_id,
-            sender=event.user_id,
-            room_id=event.room_id,
-            content=event.content,
-            membership=membership
-        )
+        store_id = yield self.store.persist_event(event)
 
         # Send a PDU to all hosts who have joined the room.
         destinations = yield self.store.get_joined_hosts_for_room(
@@ -760,7 +765,7 @@ class RoomMemberHandler(BaseHandler):
             room_id, "", is_public=False
         )
 
-        #yield self.state_handler.handle_new_event(event)
+        # yield self.state_handler.handle_new_event(event)
         yield federation.handle_new_event(new_event)
         yield federation.get_state_for_room(
             target_host, room_id
@@ -805,5 +810,6 @@ class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_public_room_list(self):
-        chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+        chunk = yield self.store.get_rooms(is_public=True)
+        # FIXME (erikj): START is no longer a valid value
         defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index db8f18e8b3..1c48e63628 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -115,7 +115,7 @@ class RoomTopicRestServlet(RestServlet):
 
         if not data:
             raise SynapseError(404, "Topic not found.", errcode=Codes.NOT_FOUND)
-        defer.returnValue((200, json.loads(data.content)))
+        defer.returnValue((200, data.content))
 
     @defer.inlineCallbacks
     def on_PUT(self, request, room_id):
@@ -177,7 +177,7 @@ class RoomMemberRestServlet(RestServlet):
         if not member:
             raise SynapseError(404, "Member not found.",
                                errcode=Codes.NOT_FOUND)
-        defer.returnValue((200, json.loads(member.content)))
+        defer.returnValue((200, member.content))
 
     @defer.inlineCallbacks
     def on_DELETE(self, request, roomid, target_user_id):
@@ -287,25 +287,28 @@ class FeedbackRestServlet(RestServlet):
                feedback_type):
         user = yield (self.auth.get_user_by_req(request))
 
-        if feedback_type not in Feedback.LIST:
-            raise SynapseError(400, "Bad feedback type.",
-                               errcode=Codes.BAD_JSON)
-
-        msg_handler = self.handlers.message_handler
-        feedback = yield msg_handler.get_feedback(
-            room_id=urllib.unquote(room_id),
-            msg_sender_id=msg_sender_id,
-            msg_id=msg_id,
-            user_id=user.to_string(),
-            fb_sender_id=fb_sender_id,
-            fb_type=feedback_type
-        )
-
-        if not feedback:
-            raise SynapseError(404, "Feedback not found.",
-                               errcode=Codes.NOT_FOUND)
+        # TODO (erikj): Implement this?
+        raise NotImplementedError("Getting feedback is not supported")
 
-        defer.returnValue((200, json.loads(feedback.content)))
+#        if feedback_type not in Feedback.LIST:
+#            raise SynapseError(400, "Bad feedback type.",
+#                               errcode=Codes.BAD_JSON)
+#
+#        msg_handler = self.handlers.message_handler
+#        feedback = yield msg_handler.get_feedback(
+#            room_id=urllib.unquote(room_id),
+#            msg_sender_id=msg_sender_id,
+#            msg_id=msg_id,
+#            user_id=user.to_string(),
+#            fb_sender_id=fb_sender_id,
+#            fb_type=feedback_type
+#        )
+#
+#        if not feedback:
+#            raise SynapseError(404, "Feedback not found.",
+#                               errcode=Codes.NOT_FOUND)
+#
+#        defer.returnValue((200, json.loads(feedback.content)))
 
     @defer.inlineCallbacks
     def on_PUT(self, request, room_id, sender_id, msg_id, fb_sender_id,
@@ -382,6 +385,21 @@ class RoomMessageListRestServlet(RestServlet):
         defer.returnValue((200, msgs))
 
 
+class RoomTriggerBackfill(RestServlet):
+    PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/backfill$")
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, room_id):
+        remote_server = urllib.unquote(request.args["remote"][0])
+        room_id = urllib.unquote(room_id)
+        limit = int(request.args["limit"][0])
+
+        handler = self.handlers.federation_handler
+        events = yield handler.backfill(remote_server, room_id, limit)
+
+        res = [event.get_dict() for event in events]
+        defer.returnValue((200, res))
+
 def _parse_json(request):
     try:
         content = json.loads(request.content.read())
@@ -402,3 +420,4 @@ def register_servlets(hs, http_server):
     RoomMemberListRestServlet(hs).register(http_server)
     RoomMessageListRestServlet(hs).register(http_server)
     JoinRoomAliasServlet(hs).register(http_server)
+    RoomTriggerBackfill(hs).register(http_server)
diff --git a/synapse/state.py b/synapse/state.py
index 4f8b4d9760..ca8e1ca630 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -86,7 +86,7 @@ class StateHandler(object):
         else:
             event.depth = 0
 
-        current_state = yield self.store.get_current_state(
+        current_state = yield self.store.get_current_state_pdu(
             key.context, key.type, key.state_key
         )
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 5d5b5f7c44..470b7b7663 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -13,30 +13,35 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from synapse.api.events.room import (
     RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent,
-    RoomConfigEvent
+    RoomConfigEvent, RoomNameEvent,
 )
 
+from synapse.util.logutils import log_function
+
 from .directory import DirectoryStore
 from .feedback import FeedbackStore
-from .message import MessageStore
 from .presence import PresenceStore
 from .profile import ProfileStore
 from .registration import RegistrationStore
 from .room import RoomStore
 from .roommember import RoomMemberStore
-from .roomdata import RoomDataStore
 from .stream import StreamStore
 from .pdu import StatePduStore, PduStore
 from .transactions import TransactionStore
 
 import json
+import logging
 import os
 
 
-class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
+logger = logging.getLogger(__name__)
+
+
+class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, PduStore, StatePduStore, TransactionStore,
                 DirectoryStore):
@@ -44,51 +49,139 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
     def __init__(self, hs):
         super(DataStore, self).__init__(hs)
         self.event_factory = hs.get_event_factory()
+        self.hs = hs
 
-    def persist_event(self, event):
-        if event.type == MessageEvent.TYPE:
-            return self.store_message(
-                user_id=event.user_id,
-                room_id=event.room_id,
-                msg_id=event.msg_id,
-                content=json.dumps(event.content)
-            )
-        elif event.type == RoomMemberEvent.TYPE:
-            return self.store_room_member(
-                user_id=event.target_user_id,
-                sender=event.user_id,
-                room_id=event.room_id,
-                content=event.content,
-                membership=event.content["membership"]
-            )
+        self.min_token_deferred = self._get_min_token()
+        self.min_token = None
+
+    @defer.inlineCallbacks
+    @log_function
+    def persist_event(self, event, backfilled=False):
+        if event.type == RoomMemberEvent.TYPE:
+            yield self._store_room_member(event)
         elif event.type == FeedbackEvent.TYPE:
-            return self.store_feedback(
-                room_id=event.room_id,
-                msg_id=event.msg_id,
-                msg_sender_id=event.msg_sender_id,
-                fb_sender_id=event.user_id,
-                fb_type=event.feedback_type,
-                content=json.dumps(event.content)
-            )
+            yield self._store_feedback(event)
+#        elif event.type == RoomConfigEvent.TYPE:
+#            yield self._store_room_config(event)
+        elif event.type == RoomNameEvent.TYPE:
+            yield self._store_room_name(event)
         elif event.type == RoomTopicEvent.TYPE:
-            return self.store_room_data(
-                room_id=event.room_id,
-                etype=event.type,
-                state_key=event.state_key,
-                content=json.dumps(event.content)
+            yield self._store_room_topic(event)
+
+        ret = yield self._store_event(event, backfilled)
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def get_event(self, event_id):
+        events_dict = yield self._simple_select_one(
+            "events",
+            {"event_id": event_id},
+            [
+                "event_id",
+                "type",
+                "sender",
+                "room_id",
+                "content",
+                "unrecognized_keys"
+            ],
+        )
+
+        event = self._parse_event_from_row(events_dict)
+        defer.returnValue(event)
+
+    @defer.inlineCallbacks
+    @log_function
+    def _store_event(self, event, backfilled):
+        # FIXME (erikj): This should be removed when we start amalgamating
+        # event and pdu storage
+        yield self.hs.get_federation().fill_out_prev_events(event)
+
+        vals = {
+            "topological_ordering": event.depth,
+            "event_id": event.event_id,
+            "type": event.type,
+            "room_id": event.room_id,
+            "content": json.dumps(event.content),
+            "processed": True,
+        }
+
+        if backfilled:
+            if not self.min_token_deferred.called:
+                yield self.min_token_deferred
+            self.min_token -= 1
+            vals["stream_ordering"] = self.min_token
+
+        unrec = {
+            k: v
+            for k, v in event.get_full_dict().items()
+            if k not in vals.keys()
+        }
+        vals["unrecognized_keys"] = json.dumps(unrec)
+
+        try:
+            yield self._simple_insert("events", vals)
+        except:
+            logger.exception("Failed to persist, probably duplicate")
+            return
+
+        if not backfilled and hasattr(event, "state_key"):
+            vals = {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "state_key": event.state_key,
+            }
+
+            if hasattr(event, "prev_state"):
+                vals["prev_state"] = event.prev_state
+
+            yield self._simple_insert("state_events", vals)
+
+            yield self._simple_insert(
+                "current_state_events",
+                {
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "type": event.type,
+                    "state_key": event.state_key,
+                }
             )
-        elif event.type == RoomConfigEvent.TYPE:
-            if "visibility" in event.content:
-                visibility = event.content["visibility"]
-                return self.store_room_config(
-                    room_id=event.room_id,
-                    visibility=visibility
-                )
 
+        latest = yield self.get_room_events_max_id()
+        defer.returnValue(latest)
+
+    @defer.inlineCallbacks
+    def get_current_state(self, room_id, event_type=None, state_key=""):
+        sql = (
+            "SELECT e.* FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        )
+
+        if event_type:
+            sql += " AND s.type = ? AND s.state_key = ? "
+            args = (room_id, event_type, state_key)
         else:
-            raise NotImplementedError(
-                "Don't know how to persist type=%s" % event.type
-            )
+            args = (room_id, )
+
+        results = yield self._execute_and_decode(sql, *args)
+
+        defer.returnValue([self._parse_event_from_row(r) for r in results])
+
+    @defer.inlineCallbacks
+    def _get_min_token(self):
+        row = yield self._execute(
+            None,
+            "SELECT MIN(stream_ordering) FROM events"
+        )
+
+        self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
+        self.min_token = min(self.min_token, -1)
+
+        logger.debug("min_token is: %s", self.min_token)
+
+        defer.returnValue(self.min_token)
 
 
 def schema_path(schema):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bf1800f4bf..36cc57c1b8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 
 from twisted.internet import defer
@@ -20,6 +19,9 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 
 import collections
+import copy
+import json
+
 
 logger = logging.getLogger(__name__)
 
@@ -29,6 +31,7 @@ class SQLBaseStore(object):
     def __init__(self, hs):
         self.hs = hs
         self._db_pool = hs.get_db_pool()
+        self.event_factory = hs.get_event_factory()
         self._clock = hs.get_clock()
 
     def cursor_to_dict(self, cursor):
@@ -57,14 +60,22 @@ class SQLBaseStore(object):
             The result of decoder(results)
         """
         logger.debug(
-            "[SQL] %s  Args=%s Func=%s", query, args, decoder.__name__
+            "[SQL] %s  Args=%s Func=%s",
+            query, args, decoder.__name__ if decoder else None
         )
 
         def interaction(txn):
             cursor = txn.execute(query, args)
-            return decoder(cursor)
+            if decoder:
+                return decoder(cursor)
+            else:
+                return cursor.fetchall()
+
         return self._db_pool.runInteraction(interaction)
 
+    def _execute_and_decode(self, query, *args):
+        return self._execute(self.cursor_to_dict, query, *args)
+
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
@@ -281,6 +292,17 @@ class SQLBaseStore(object):
 
         return self._db_pool.runInteraction(func)
 
+    def _parse_event_from_row(self, row_dict):
+        d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+        d.update(json.loads(row_dict["unrecognized_keys"]))
+        d["content"] = json.loads(d["content"])
+        del d["unrecognized_keys"]
+
+        return self.event_factory.create_event(
+            etype=d["type"],
+            **d
+        )
+
 
 class Table(object):
     """ A base class used to store information about a particular table.
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index 9bd562c762..e60f98d1e1 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore, Table
 from synapse.api.events.room import FeedbackEvent
 
@@ -22,54 +24,28 @@ import json
 
 class FeedbackStore(SQLBaseStore):
 
-    def store_feedback(self, room_id, msg_id, msg_sender_id,
-                       fb_sender_id, fb_type, content):
-        return self._simple_insert(FeedbackTable.table_name, dict(
-            room_id=room_id,
-            msg_id=msg_id,
-            msg_sender_id=msg_sender_id,
-            fb_sender_id=fb_sender_id,
-            fb_type=fb_type,
-            content=content,
-        ))
-
-    def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None,
-                     fb_sender_id=None, fb_type=None):
-        query = FeedbackTable.select_statement(
-            "msg_sender_id = ? AND room_id = ? AND msg_id = ? " +
-            "AND fb_sender_id = ? AND feedback_type = ? " +
-            "ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            FeedbackTable.decode_single_result,
-            query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type,
+    def _store_feedback(self, event):
+        return self._simple_insert("feedback", {
+            "event_id": event.event_id,
+            "feedback_type": event.feedback_type,
+            "room_id": event.room_id,
+            "target_event_id": event.target_event,
+            "sender": event.user_id,
+        })
+
+    @defer.inlineCallbacks
+    def get_feedback_for_event(self, event_id):
+        sql = (
+            "SELECT events.* FROM events INNER JOIN feedback "
+            "ON events.event_id = feedback.event_id "
+            "WHERE feedback.target_event_id = ? "
         )
 
-    def get_max_feedback_id(self):
-        return self._simple_max_id(FeedbackTable.table_name)
-
-
-class FeedbackTable(Table):
-    table_name = "feedback"
+        rows = yield self._execute_and_decode(sql, event_id)
 
-    fields = [
-        "id",
-        "content",
-        "feedback_type",
-        "fb_sender_id",
-        "msg_id",
-        "room_id",
-        "msg_sender_id"
-    ]
-
-    class EntryType(collections.namedtuple("FeedbackEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=FeedbackEvent.TYPE,
-                room_id=self.room_id,
-                msg_id=self.msg_id,
-                msg_sender_id=self.msg_sender_id,
-                user_id=self.fb_sender_id,
-                feedback_type=self.feedback_type,
-                content=json.loads(self.content),
-            )
+        defer.returnValue(
+            [
+                self._parse_event_from_row(r)
+                for r in rows
+            ]
+        )
diff --git a/synapse/storage/message.py b/synapse/storage/message.py
deleted file mode 100644
index 7bb69c1384..0000000000
--- a/synapse/storage/message.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-from synapse.api.events.room import MessageEvent
-
-import collections
-import json
-
-
-class MessageStore(SQLBaseStore):
-
-    def get_message(self, user_id, room_id, msg_id):
-        """Get a message from the store.
-
-        Args:
-            user_id (str): The ID of the user who sent the message.
-            room_id (str): The room the message was sent in.
-            msg_id (str): The unique ID for this user/room combo.
-        """
-        query = MessagesTable.select_statement(
-            "user_id = ? AND room_id = ? AND msg_id = ? " +
-            "ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            MessagesTable.decode_single_result,
-            query, user_id, room_id, msg_id,
-        )
-
-    def store_message(self, user_id, room_id, msg_id, content):
-        """Store a message in the store.
-
-        Args:
-            user_id (str): The ID of the user who sent the message.
-            room_id (str): The room the message was sent in.
-            msg_id (str): The unique ID for this user/room combo.
-            content (str): The content of the message (JSON)
-        """
-        return self._simple_insert(MessagesTable.table_name, dict(
-            user_id=user_id,
-            room_id=room_id,
-            msg_id=msg_id,
-            content=content,
-        ))
-
-    def get_max_message_id(self):
-        return self._simple_max_id(MessagesTable.table_name)
-
-
-class MessagesTable(Table):
-    table_name = "messages"
-
-    fields = [
-        "id",
-        "user_id",
-        "room_id",
-        "msg_id",
-        "content"
-    ]
-
-    class EntryType(collections.namedtuple("MessageEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=MessageEvent.TYPE,
-                room_id=self.room_id,
-                user_id=self.user_id,
-                msg_id=self.msg_id,
-                content=json.loads(self.content),
-            )
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index 13adc581e1..7655f43ede 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore, Table, JoinHelper
 
 from synapse.util.logutils import log_function
@@ -319,6 +321,7 @@ class PduStore(SQLBaseStore):
 
         return [(row[0], row[1], row[2]) for row in results]
 
+    @defer.inlineCallbacks
     def get_oldest_pdus_in_context(self, context):
         """Get a list of Pdus that we haven't backfilled beyond yet (and haven't    
         seen). This list is used when we want to backfill backwards and is the 
@@ -331,17 +334,14 @@ class PduStore(SQLBaseStore):
         Returns:
             list: A list of PduIdTuple.
         """
-        return self._db_pool.runInteraction(
-            self._get_oldest_pdus_in_context, context
-        )
-
-    def _get_oldest_pdus_in_context(self, txn, context):
-        txn.execute(
+        results = yield self._execute(
+            None,
             "SELECT pdu_id, origin FROM %(back)s WHERE context = ?"
             % {"back": PduBackwardExtremitiesTable.table_name, },
-            (context,)
+            context
         )
-        return [PduIdTuple(i, o) for i, o in txn.fetchall()]
+
+        defer.returnValue([PduIdTuple(i, o) for i, o in results])
 
     def is_pdu_new(self, pdu_id, origin, context, depth):
         """For a given Pdu, try and figure out if it's 'new', i.e., if it's
@@ -580,7 +580,7 @@ class StatePduStore(SQLBaseStore):
 
         txn.execute(query, query_args)
 
-    def get_current_state(self, context, pdu_type, state_key):
+    def get_current_state_pdu(self, context, pdu_type, state_key):
         """For a given context, pdu_type, state_key 3-tuple, return what is
         currently considered the current state.
 
@@ -595,10 +595,10 @@ class StatePduStore(SQLBaseStore):
         """
 
         return self._db_pool.runInteraction(
-            self._get_current_state, context, pdu_type, state_key
+            self._get_current_state_pdu, context, pdu_type, state_key
         )
 
-    def _get_current_state(self, txn, context, pdu_type, state_key):
+    def _get_current_state_pdu(self, txn, context, pdu_type, state_key):
         return self._get_current_interaction(txn, context, pdu_type, state_key)
 
     def _get_current_interaction(self, txn, context, pdu_type, state_key):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index a97162831b..22f2dcca45 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -76,49 +76,80 @@ class RoomStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_rooms(self, is_public, with_topics):
+    def get_rooms(self, is_public):
         """Retrieve a list of all public rooms.
 
         Args:
             is_public (bool): True if the rooms returned should be public.
-            with_topics (bool): True to include the current topic for the room
-            in the response.
         Returns:
-            A list of room dicts containing at least a "room_id" key, and a
-            "topic" key if one is set and with_topic=True.
+            A list of room dicts containing at least a "room_id" key, a
+            "topic" key if one is set, and a "name" key if one is set
         """
-        room_data_type = RoomTopicEvent.TYPE
-        public = 1 if is_public else 0
-
-        latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE "
-                        + "room_data.type = ? GROUP BY room_id")
-
-        query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms "
-                 + "LEFT JOIN "
-                 + "room_aliases ON room_aliases.room_id = rooms.room_id "
-                 + "LEFT JOIN "
-                 + "room_data ON rooms.room_id = room_data.room_id WHERE "
-                 + "(room_data.id IN (" + latest_topic + ") "
-                 + "OR room_data.id IS NULL) AND rooms.is_public = ?")
-
-        res = yield self._execute(
-            self.cursor_to_dict, query, room_data_type, public
+
+        topic_subquery = (
+            "SELECT topics.event_id as event_id, "
+            "topics.room_id as room_id, topic "
+            "FROM topics "
+            "INNER JOIN current_state_events as c "
+            "ON c.event_id = topics.event_id "
         )
 
-        # return only the keys the specification expects
-        ret_keys = ["room_id", "topic", "room_alias"]
+        name_subquery = (
+            "SELECT room_names.event_id as event_id, "
+            "room_names.room_id as room_id, name "
+            "FROM room_names "
+            "INNER JOIN current_state_events as c "
+            "ON c.event_id = room_names.event_id "
+        )
 
-        # extract topic from the json (icky) FIXME
-        for i, room_row in enumerate(res):
-            try:
-                content_json = json.loads(room_row["content"])
-                room_row["topic"] = content_json["topic"]
-            except:
-                pass  # no topic set
-            # filter the dict based on ret_keys
-            res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys}
+        # We use non printing ascii character US () as a seperator
+        sql = (
+            "SELECT r.room_id, n.name, t.topic, "
+            "group_concat(a.room_alias, '') "
+            "FROM rooms AS r "
+            "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+            "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+            "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+            "WHERE r.is_public = ? "
+            "GROUP BY r.room_id "
+        ) % {
+            "topic": topic_subquery,
+            "name": name_subquery,
+        }
+
+        rows = yield self._execute(None, sql, is_public)
+
+        ret = [
+            {
+                "room_id": r[0],
+                "name": r[1],
+                "topic": r[2],
+                "aliases": r[3].split(""),
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret)
+
+    def _store_room_topic(self, event):
+        return self._simple_insert(
+            "topics",
+            {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "topic": event.topic,
+            }
+        )
 
-        defer.returnValue(res)
+    def _store_room_name(self, event):
+        return self._simple_insert(
+            "room_names",
+            {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "name": event.name,
+            }
+        )
 
 
 class RoomsTable(Table):
diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py
deleted file mode 100644
index cc04d1ba14..0000000000
--- a/synapse/storage/roomdata.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-
-import collections
-import json
-
-
-class RoomDataStore(SQLBaseStore):
-
-    """Provides various CRUD operations for Room Events. """
-
-    def get_room_data(self, room_id, etype, state_key=""):
-        """Retrieve the data stored under this type and state_key.
-
-        Args:
-            room_id (str)
-            etype (str)
-            state_key (str)
-        Returns:
-            namedtuple: Or None if nothing exists at this path.
-        """
-        query = RoomDataTable.select_statement(
-            "room_id = ? AND type = ? AND state_key = ? "
-            "ORDER BY id DESC LIMIT 1"
-        )
-        return self._execute(
-            RoomDataTable.decode_single_result,
-            query, room_id, etype, state_key,
-        )
-
-    def store_room_data(self, room_id, etype, state_key="", content=None):
-        """Stores room specific data.
-
-        Args:
-            room_id (str)
-            etype (str)
-            state_key (str)
-            data (str)- The data to store for this path in JSON.
-        Returns:
-            The store ID for this data.
-        """
-        return self._simple_insert(RoomDataTable.table_name, dict(
-            etype=etype,
-            state_key=state_key,
-            room_id=room_id,
-            content=content,
-        ))
-
-    def get_max_room_data_id(self):
-        return self._simple_max_id(RoomDataTable.table_name)
-
-
-class RoomDataTable(Table):
-    table_name = "room_data"
-
-    fields = [
-        "id",
-        "room_id",
-        "type",
-        "state_key",
-        "content"
-    ]
-
-    class EntryType(collections.namedtuple("RoomDataEntry", fields)):
-
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=self.type,
-                room_id=self.room_id,
-                content=json.loads(self.content),
-            )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index c45d128f1b..89c87290cf 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -31,6 +31,38 @@ logger = logging.getLogger(__name__)
 
 class RoomMemberStore(SQLBaseStore):
 
+    @defer.inlineCallbacks
+    def _store_room_member(self, event):
+        """Store a room member in the database.
+        """
+        domain = self.hs.parse_userid(event.target_user_id).domain
+
+        yield self._simple_insert(
+            "room_memberships",
+            {
+                "event_id": event.event_id,
+                "user_id": event.target_user_id,
+                "sender": event.user_id,
+                "room_id": event.room_id,
+                "membership": event.membership,
+            }
+        )
+
+        # Update room hosts table
+        if event.membership == Membership.JOIN:
+            sql = (
+                "INSERT OR IGNORE INTO room_hosts (room_id, host) "
+                "VALUES (?, ?)"
+            )
+            yield self._execute(None, sql, event.room_id, domain)
+        else:
+            sql = (
+                "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
+            )
+
+            yield self._execute(None, sql, event.room_id, domain)
+
+    @defer.inlineCallbacks
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
 
@@ -38,36 +70,15 @@ class RoomMemberStore(SQLBaseStore):
             user_id (str): The member's user ID.
             room_id (str): The room the member is in.
         Returns:
-            namedtuple: The room member from the database, or None if this
-            member does not exist.
+            Deferred: Results in a MembershipEvent or None.
         """
-        query = RoomMemberTable.select_statement(
-            "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1")
-        return self._execute(
-            RoomMemberTable.decode_single_result,
-            query, room_id, user_id,
-        )
+        rows = yield self._get_members_by_dict({
+            "e.room_id": room_id,
+            "m.user_id": user_id,
+        })
 
-    def store_room_member(self, user_id, sender, room_id, membership, content):
-        """Store a room member in the database.
+        defer.returnValue(rows[0] if rows else None)
 
-        Args:
-            user_id (str): The member's user ID.
-            room_id (str): The room in relation to the member.
-            membership (synapse.api.constants.Membership): The new membership
-            state.
-            content (dict): The content of the membership (JSON).
-        """
-        content_json = json.dumps(content)
-        return self._simple_insert(RoomMemberTable.table_name, dict(
-            user_id=user_id,
-            sender=sender,
-            room_id=room_id,
-            membership=membership,
-            content=content_json,
-        ))
-
-    @defer.inlineCallbacks
     def get_room_members(self, room_id, membership=None):
         """Retrieve the current room member list for a room.
 
@@ -79,17 +90,12 @@ class RoomMemberStore(SQLBaseStore):
         Returns:
             list of namedtuples representing the members in this room.
         """
-        query = RoomMemberTable.select_statement(
-            "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
-            + " WHERE room_id = ? GROUP BY user_id)"
-        )
-        res = yield self._execute(
-            RoomMemberTable.decode_results, query, room_id,
-        )
-        # strip memberships which don't match
+
+        where = {"m.room_id": room_id}
         if membership:
-            res = [entry for entry in res if entry.membership == membership]
-        defer.returnValue(res)
+            where["m.membership"] = membership
+
+        return self._get_members_by_dict(where)
 
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
@@ -106,70 +112,40 @@ class RoomMemberStore(SQLBaseStore):
             return defer.succeed(None)
 
         args = [user_id]
-        membership_placeholder = ["membership=?"] * len(membership_list)
-        where_membership = "(" + " OR ".join(membership_placeholder) + ")"
-        for membership in membership_list:
-            args.append(membership)
-
-        # sub-select finds the row ID for the most recent (i.e. current)
-        # state change of this user per room, then the outer select finds those
-        query = ("SELECT room_id, membership FROM room_memberships"
-                 + " WHERE id IN (SELECT MAX(id) FROM room_memberships"
-                 + "   WHERE user_id=? GROUP BY room_id)"
-                 + " AND " + where_membership)
-        return self._execute(
-            self.cursor_to_dict, query, *args
-        )
+        args.extend(membership_list)
 
-    @defer.inlineCallbacks
-    def get_joined_hosts_for_room(self, room_id):
-        query = RoomMemberTable.select_statement(
-            "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
-            + " WHERE room_id = ? GROUP BY user_id)"
-        )
-
-        res = yield self._execute(
-            RoomMemberTable.decode_results, query, room_id,
+        where_clause = "user_id = ? AND (%s)" % (
+            " OR ".join(["membership = ?" for _ in membership_list]),
         )
 
-        def host_from_user_id_string(user_id):
-            domain = UserID.from_string(entry.user_id, self.hs).domain
-            return domain
-
-        # strip memberships which don't match
-        hosts = [
-            host_from_user_id_string(entry.user_id)
-            for entry in res
-            if entry.membership == Membership.JOIN
-        ]
+        return self._get_members_query(where_clause, args)
 
-        logger.debug("Returning hosts: %s from results: %s", hosts, res)
-
-        defer.returnValue(hosts)
-
-    def get_max_room_member_id(self):
-        return self._simple_max_id(RoomMemberTable.table_name)
-
-
-class RoomMemberTable(Table):
-    table_name = "room_memberships"
-
-    fields = [
-        "id",
-        "user_id",
-        "sender",
-        "room_id",
-        "membership",
-        "content"
-    ]
+    def get_joined_hosts_for_room(self, room_id):
+        return self._simple_select_onecol(
+            "room_hosts",
+            {"room_id": room_id},
+            "host"
+        )
 
-    class EntryType(collections.namedtuple("RoomMemberEntry", fields)):
+    def _get_members_by_dict(self, where_dict):
+        clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
+        vals = where_dict.values()
+        return self._get_members_query(clause, vals)
 
-        def as_event(self, event_factory):
-            return event_factory.create_event(
-                etype=RoomMemberEvent.TYPE,
-                room_id=self.room_id,
-                target_user_id=self.user_id,
-                user_id=self.sender,
-                content=json.loads(self.content),
-            )
+    @defer.inlineCallbacks
+    def _get_members_query(self, where_clause, where_values):
+        sql = (
+            "SELECT e.* FROM events as e "
+            "INNER JOIN room_memberships as m "
+            "ON e.event_id = m.event_id "
+            "INNER JOIN current_state_events as c "
+            "ON m.event_id = c.event_id "
+            "WHERE %s "
+        ) % (where_clause,)
+
+        rows = yield self._execute_and_decode(sql, *where_values)
+
+        logger.debug("_get_members_query Got rows %s", rows)
+
+        results = [self._parse_event_from_row(r) for r in rows]
+        defer.returnValue(results)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 77096546b2..ea04261ff0 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -12,43 +12,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-CREATE TABLE IF NOT EXISTS rooms(
-    room_id TEXT PRIMARY KEY NOT NULL,
-    is_public INTEGER,
-    creator TEXT
+
+CREATE TABLE IF NOT EXISTS events(
+    stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
+    topological_ordering INTEGER NOT NULL,
+    event_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    content TEXT NOT NULL,
+    unrecognized_keys TEXT,
+    processed BOOL NOT NULL,
+    CONSTRAINT ev_uniq UNIQUE (event_id)
 );
 
-CREATE TABLE IF NOT EXISTS room_memberships(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server
-    sender TEXT NOT NULL,
+CREATE TABLE IF NOT EXISTS state_events(
+    event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    membership TEXT NOT NULL,
-    content TEXT NOT NULL
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    prev_state TEXT
 );
 
-CREATE TABLE IF NOT EXISTS messages(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id TEXT, 
-    room_id TEXT,
-    msg_id TEXT,
-    content TEXT
+CREATE TABLE IF NOT EXISTS current_state_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    sender TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    membership TEXT NOT NULL
 );
 
 CREATE TABLE IF NOT EXISTS feedback(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    content TEXT,
+    event_id TEXT NOT NULL,
     feedback_type TEXT,
-    fb_sender_id TEXT,
-    msg_id TEXT,
-    room_id TEXT,
-    msg_sender_id TEXT
+    target_event_id TEXT,
+    sender TEXT,
+    room_id TEXT
 );
 
-CREATE TABLE IF NOT EXISTS room_data(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
+CREATE TABLE IF NOT EXISTS topics(
+    event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    content TEXT
+    topic TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS room_names(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    name TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS rooms(
+    room_id TEXT PRIMARY KEY NOT NULL,
+    is_public INTEGER,
+    creator TEXT
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+    room_id TEXT NOT NULL,
+    host TEXT NOT NULL
 );
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 47a1f2c45a..87fc978813 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -13,267 +13,264 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from ._base import SQLBaseStore
-from .message import MessagesTable
-from .feedback import FeedbackTable
-from .roomdata import RoomDataTable
-from .roommember import RoomMemberTable
+from synapse.api.errors import SynapseError
+from synapse.api.constants import Membership
+from synapse.util.logutils import log_function
 
 import json
 import logging
 
+
 logger = logging.getLogger(__name__)
 
 
+MAX_STREAM_SIZE = 1000
+
+
+_STREAM_TOKEN = "stream"
+_TOPOLOGICAL_TOKEN = "topological"
+
+
+def _parse_stream_token(string):
+    try:
+        if string[0] != 's':
+            raise
+        return int(string[1:])
+    except:
+        logger.debug("Not stream token: %s", string)
+        raise SynapseError(400, "Invalid token")
+
+
+def _parse_topological_token(string):
+    try:
+        if string[0] != 't':
+            raise
+        parts = string[1:].split('-', 1)
+        return (int(parts[0]), int(parts[1]))
+    except:
+        logger.debug("Not topological token: %s", string)
+        raise SynapseError(400, "Invalid token")
+
+
+def is_stream_token(string):
+    try:
+        _parse_stream_token(string)
+        return True
+    except:
+        return False
+
+
+def is_topological_token(string):
+    try:
+        _parse_topological_token(string)
+        return True
+    except:
+        return False
+
+
+def _get_token_bound(token, comparison):
+    try:
+        s = _parse_stream_token(token)
+        return "%s %s %d" % ("stream_ordering", comparison, s)
+    except:
+        pass
+
+    try:
+        top, stream = _parse_topological_token(token)
+        return "%s %s %d AND %s %s %d" % (
+            "topological_ordering", comparison, top,
+            "stream_ordering", comparison, stream,
+        )
+    except:
+        pass
+
+    raise SynapseError(400, "Invalid token")
+
+
 class StreamStore(SQLBaseStore):
+    @log_function
+    def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
+                        direction='f', with_feedback=False):
+        is_events = (
+            direction == 'f'
+            and is_stream_token(from_key)
+            and to_key and is_stream_token(to_key)
+        )
 
-    def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0,
-                           with_feedback=False):
-        """Get all messages for this user between the given keys.
-
-        Args:
-            user_id (str): The user who is requesting messages.
-            from_key (int): The ID to start returning results from (exclusive).
-            to_key (int): The ID to stop returning results (exclusive).
-            room_id (str): Gets messages only for this room. Can be None, in
-            which case all room messages will be returned.
-        Returns:
-            A tuple of rows (list of namedtuples), new_id(int)
-        """
-        if with_feedback and room_id:  # with fb MUST specify a room ID
-            return self._db_pool.runInteraction(
-                self._get_message_rows_with_feedback,
-                user_id, from_key, to_key, room_id, limit
+        if is_events:
+            return self.get_room_events_stream(
+                user_id=user_id,
+                from_key=from_key,
+                to_key=to_key,
+                room_id=room_id,
+                limit=limit,
+                with_feedback=with_feedback,
             )
         else:
-            return self._db_pool.runInteraction(
-                self._get_message_rows,
-                user_id, from_key, to_key, room_id, limit
+            return self.paginate_room_events(
+                from_key=from_key,
+                to_key=to_key,
+                room_id=room_id,
+                limit=limit,
+                with_feedback=with_feedback,
             )
 
-    def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                          limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the messages table, bounded by the specified pkeys
+    @defer.inlineCallbacks
+    @log_function
+    def get_room_events_stream(self, user_id, from_key, to_key, room_id,
+                               limit=0, with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
 
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = ("SELECT messages.* FROM messages WHERE ? IN"
-                 + " (SELECT membership from room_memberships WHERE user_id=?"
-                 + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
+        current_room_membership_sql = (
+            "SELECT m.room_id FROM room_memberships as m "
+            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+            "WHERE m.user_id = ?"
+        )
 
-        if room_id:
-            query += " AND messages.room_id=?"
-            query_args.append(room_id)
+        invites_sql = (
+            "SELECT m.event_id FROM room_memberships as m "
+            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+            "WHERE m.user_id = ? AND m.membership = ?"
+        )
 
-        (query, query_args) = self._append_stream_operations(
-            "messages", query, query_args, from_pkey, to_pkey, limit=limit
+        if limit:
+            limit = max(limit, MAX_STREAM_SIZE)
+        else:
+            limit = MAX_STREAM_SIZE
+
+        # From and to keys should be integers from ordering.
+        from_id = _parse_stream_token(from_key)
+        to_id = _parse_stream_token(to_key)
+
+        if from_key == to_key:
+            defer.returnValue(([], to_key))
+            return
+
+        sql = (
+            "SELECT * FROM events as e WHERE "
+            "((room_id IN (%(current)s)) OR "
+            "(event_id IN (%(invites)s))) "
+            "AND e.stream_ordering > ? AND e.stream_ordering < ? "
+            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+        ) % {
+            "current": current_room_membership_sql,
+            "invites": invites_sql,
+            "limit": limit
+        }
+
+        rows = yield self._execute_and_decode(
+            sql,
+            user_id, user_id, Membership.INVITE, from_id, to_id
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, MessagesTable, from_pkey)
+        ret = [self._parse_event_from_row(r) for r in rows]
 
-    def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey,
-                                        room_id, limit):
-        # this col represents the compressed feedback JSON as per spec
-        compressed_feedback_col = (
-            "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id"
-            + " || '\",\"feedback_type\":\"' || f.feedback_type"
-            + " || '\",\"content\":' || f.content || '}') || ']'"
-        )
+        if rows:
+            key = "s%d" % max([r["stream_ordering"] for r in rows])
+        else:
+            # Assume we didn't get anything because there was nothing to get.
+            key = to_key
 
-        global_msg_id_join = ("f.room_id = messages.room_id"
-                              + " and f.msg_id = messages.msg_id"
-                              + " and messages.user_id = f.msg_sender_id")
+        defer.returnValue((ret, key))
 
-        select_query = (
-            "SELECT messages.*, f.content AS fb_content, f.fb_sender_id"
-            + ", " + compressed_feedback_col + " AS compressed_fb"
-            + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join)
+    @defer.inlineCallbacks
+    @log_function
+    def paginate_room_events(self, room_id, from_key, to_key=None,
+                             direction='b', limit=-1,
+                             with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
 
-        current_membership_sub_query = (
-            "(SELECT membership from room_memberships rm"
-            + " WHERE user_id=? AND room_id = rm.room_id"
-            + " ORDER BY id DESC LIMIT 1)")
+        from_comp = '<' if direction =='b' else '>'
+        to_comp = '>' if direction =='b' else '<'
+        order = "DESC" if direction == 'b' else "ASC"
 
-        where = (" WHERE ? IN " + current_membership_sub_query
-                 + " AND messages.room_id=?")
+        args = [room_id]
 
-        query = select_query + where
-        query_args = ["join", user_id, room_id]
+        bounds = _get_token_bound(from_key, from_comp)
+        if to_key:
+            bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp))
 
-        (query, query_args) = self._append_stream_operations(
-            "messages", query, query_args, from_pkey, to_pkey,
-            limit=limit, group_by=" GROUP BY messages.id "
+        if int(limit) > 0:
+            args.append(int(limit))
+            limit_str = " LIMIT ?"
+        else:
+            limit_str = ""
+
+        sql = (
+            "SELECT * FROM events "
+            "WHERE room_id = ? AND %(bounds)s "
+            "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
+        ) % {"bounds": bounds, "order": order, "limit": limit_str}
+
+        rows = yield self._execute_and_decode(
+            sql,
+            *args
         )
 
-        cursor = txn.execute(query, query_args)
-
-        # convert the result set into events
-        entries = self.cursor_to_dict(cursor)
-        events = []
-        for entry in entries:
-            # TODO we should spec the cursor > event mapping somewhere else.
-            event = {}
-            straight_mappings = ["msg_id", "user_id", "room_id"]
-            for key in straight_mappings:
-                event[key] = entry[key]
-            event["content"] = json.loads(entry["content"])
-            if entry["compressed_fb"]:
-                event["feedback"] = json.loads(entry["compressed_fb"])
-            events.append(event)
-
-        latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"]
-
-        return (events, latest_pkey)
-
-    def get_room_member_stream(self, user_id, from_key, to_key):
-        """Get all room membership events for this user between the given keys.
-
-        Args:
-            user_id (str): The user who is requesting membership events.
-            from_key (int): The ID to start returning results from (exclusive).
-            to_key (int): The ID to stop returning results (exclusive).
-        Returns:
-            A tuple of rows (list of namedtuples), new_id(int)
-        """
-        return self._db_pool.runInteraction(
-            self._get_room_member_rows, user_id, from_key, to_key
+        if rows:
+            topo = rows[-1]["topological_ordering"]
+            toke = rows[-1]["stream_ordering"]
+            next_token = "t%s-%s" % (topo, toke)
+        else:
+            # TODO (erikj): We should work out what to do here instead.
+            next_token = to_key if to_key else from_key
+
+        defer.returnValue(
+            (
+                [self._parse_event_from_row(r) for r in rows],
+                next_token
+            )
         )
 
-    def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey):
-        # get all room membership events for rooms which the user is
-        # *currently* joined in on, or all invite events for this user.
-        current_membership_sub_query = (
-            "(SELECT membership FROM room_memberships"
-            + " WHERE user_id=? AND room_id = rm.room_id"
-            + " ORDER BY id DESC LIMIT 1)")
-
-        query = ("SELECT rm.* FROM room_memberships rm "
-                 # all membership events for rooms you've currently joined.
-                 + " WHERE (? IN " + current_membership_sub_query
-                 # all invite membership events for this user
-                 + " OR rm.membership=? AND user_id=?)"
-                 + " AND rm.id > ?")
-        query_args = ["join", user_id, "invite", user_id, from_pkey]
-
-        if to_pkey != -1:
-            query += " AND rm.id < ?"
-            query_args.append(to_pkey)
-
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, RoomMemberTable, from_pkey)
-
-    def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0):
-        return self._db_pool.runInteraction(
-            self._get_feedback_rows,
-            user_id, from_key, to_key, room_id, limit
+    @defer.inlineCallbacks
+    def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+        # TODO (erikj): Handle compressed feedback
+
+        end_token = yield self.get_room_events_max_id()
+
+        sql = (
+            "SELECT * FROM events "
+            "WHERE room_id = ? AND stream_ordering <= ? "
+            "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? "
         )
 
-    def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                           limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the feedback table, bounded by the specified pkeys
-
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = (
-            "SELECT feedback.* FROM feedback WHERE ? IN "
-            + "(SELECT membership from room_memberships WHERE user_id=?"
-            + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
-
-        if room_id:
-            query += " AND feedback.room_id=?"
-            query_args.append(room_id)
-
-        (query, query_args) = self._append_stream_operations(
-            "feedback", query, query_args, from_pkey, to_pkey, limit=limit
+        rows = yield self._execute_and_decode(
+            sql,
+            room_id, end_token, limit
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, FeedbackTable, from_pkey)
+        rows.reverse()  # As we selected with reverse ordering
 
-    def get_room_data_stream(self, user_id, from_key, to_key, room_id,
-                             limit=0):
-        return self._db_pool.runInteraction(
-            self._get_room_data_rows,
-            user_id, from_key, to_key, room_id, limit
+        if rows:
+            topo = rows[0]["topological_ordering"]
+            toke = rows[0]["stream_ordering"]
+            start_token = "p%s-%s" % (topo, toke)
+
+            token = (start_token, end_token)
+        else:
+            token = (end_token, end_token)
+
+        defer.returnValue(
+            (
+                [self._parse_event_from_row(r) for r in rows],
+                token
+            )
         )
 
-    def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
-                            limit):
-        # work out which rooms this user is joined in on and join them with
-        # the room id on the feedback table, bounded by the specified pkeys
-
-        # get all messages where the *current* membership state is 'join' for
-        # this user in that room.
-        query = (
-            "SELECT room_data.* FROM room_data WHERE ? IN "
-            + "(SELECT membership from room_memberships WHERE user_id=?"
-            + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)")
-        query_args = ["join", user_id]
-
-        if room_id:
-            query += " AND room_data.room_id=?"
-            query_args.append(room_id)
-
-        (query, query_args) = self._append_stream_operations(
-            "room_data", query, query_args, from_pkey, to_pkey, limit=limit
+    @defer.inlineCallbacks
+    def get_room_events_max_id(self):
+        res = yield self._execute_and_decode(
+            "SELECT MAX(stream_ordering) as m FROM events"
         )
 
-        cursor = txn.execute(query, query_args)
-        return self._as_events(cursor, RoomDataTable, from_pkey)
-
-    def _append_stream_operations(self, table_name, query, query_args,
-                                  from_pkey, to_pkey, limit=None,
-                                  group_by=""):
-        LATEST_ROW = -1
-        order_by = ""
-        if to_pkey > from_pkey:
-            if from_pkey != LATEST_ROW:
-                # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9
-                query += (" AND %s.id > ? AND %s.id < ?" %
-                         (table_name, table_name))
-                query_args.append(from_pkey)
-                query_args.append(to_pkey)
-            else:
-                # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC
-                query += " AND %s.id > ? " % table_name
-                order_by = "ORDER BY id DESC"
-                query_args.append(to_pkey)
-        elif from_pkey > to_pkey:
-            if to_pkey != LATEST_ROW:
-                # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC
-                query += (" AND %s.id > ? AND %s.id < ? " %
-                          (table_name, table_name))
-                order_by = "ORDER BY id DESC"
-                query_args.append(to_pkey)
-                query_args.append(from_pkey)
-            else:
-                # from=5 to=-1 >> from 5 to now >> id>5
-                query += " AND %s.id > ?" % table_name
-                query_args.append(from_pkey)
-
-        query += group_by + order_by
-
-        if limit and limit > 0:
-            query += " LIMIT ?"
-            query_args.append(str(limit))
-
-        return (query, query_args)
-
-    def _as_events(self, cursor, table, from_pkey):
-        data_entries = table.decode_results(cursor)
-        last_pkey = from_pkey
-        if data_entries:
-            last_pkey = data_entries[-1].id
-
-        events = [
-            entry.as_event(self.event_factory).get_dict()
-            for entry in data_entries
-        ]
-
-        return (events, last_pkey)
+        logger.debug("get_room_events_max_id: %s", res)
+
+        if not res or not res[0] or not res[0]["m"]:
+            defer.returnValue("s1")
+            return
+
+        key = res[0]["m"] + 1
+        defer.returnValue("s%d" % (key,))