summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-03-20 13:52:56 +0000
committerErik Johnston <erik@matrix.org>2015-03-20 14:11:38 +0000
commit87db64b83962873a3cf2af951e4c4bc2e4d50d76 (patch)
tree326af613b0f40a1c5ad6898bbafb517c18d7ba5c
parentMerge pull request #112 from matrix-org/hotfixes-v0.8.1-r2 (diff)
downloadsynapse-87db64b83962873a3cf2af951e4c4bc2e4d50d76.tar.xz
Rearrange storage modules
-rw-r--r--synapse/storage/__init__.py472
-rw-r--r--synapse/storage/_base.py7
-rw-r--r--synapse/storage/events.py394
-rw-r--r--synapse/storage/feedback.py47
-rw-r--r--synapse/storage/room.py37
-rw-r--r--synapse/storage/state.py32
-rw-r--r--synapse/storage/stream.py19
7 files changed, 493 insertions, 515 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4b16f445d6..4295f7348e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -13,14 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
-from synapse.util.logutils import log_function
-from synapse.api.constants import EventTypes
-
 from .appservice import ApplicationServiceStore
 from .directory import DirectoryStore
-from .feedback import FeedbackStore
+from .events import EventsStore
 from .presence import PresenceStore
 from .profile import ProfileStore
 from .registration import RegistrationStore
@@ -39,11 +34,6 @@ from .state import StateStore
 from .signatures import SignatureStore
 from .filtering import FilteringStore
 
-from syutil.base64util import decode_base64
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import compute_event_reference_hash
-
 
 import fnmatch
 import imp
@@ -62,15 +52,8 @@ SCHEMA_VERSION = 14
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
 
-class _RollbackButIsFineException(Exception):
-    """ This exception is used to rollback a transaction without implying
-    something went wrong.
-    """
-    pass
-
-
 class DataStore(RoomMemberStore, RoomStore,
-                RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
+                RegistrationStore, StreamStore, ProfileStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 ApplicationServiceStore,
@@ -79,7 +62,8 @@ class DataStore(RoomMemberStore, RoomStore,
                 RejectionsStore,
                 FilteringStore,
                 PusherStore,
-                PushRuleStore
+                PushRuleStore,
+                EventsStore,
                 ):
 
     def __init__(self, hs):
@@ -89,422 +73,6 @@ class DataStore(RoomMemberStore, RoomStore,
         self.min_token_deferred = self._get_min_token()
         self.min_token = None
 
-    @defer.inlineCallbacks
-    @log_function
-    def persist_event(self, event, context, backfilled=False,
-                      is_new_state=True, current_state=None):
-        stream_ordering = None
-        if backfilled:
-            if not self.min_token_deferred.called:
-                yield self.min_token_deferred
-            self.min_token -= 1
-            stream_ordering = self.min_token
-
-        try:
-            yield self.runInteraction(
-                "persist_event",
-                self._persist_event_txn,
-                event=event,
-                context=context,
-                backfilled=backfilled,
-                stream_ordering=stream_ordering,
-                is_new_state=is_new_state,
-                current_state=current_state,
-            )
-        except _RollbackButIsFineException:
-            pass
-
-    @defer.inlineCallbacks
-    def get_event(self, event_id, check_redacted=True,
-                  get_prev_content=False, allow_rejected=False,
-                  allow_none=False):
-        """Get an event from the database by event_id.
-
-        Args:
-            event_id (str): The event_id of the event to fetch
-            check_redacted (bool): If True, check if event has been redacted
-                and redact it.
-            get_prev_content (bool): If True and event is a state event,
-                include the previous states content in the unsigned field.
-            allow_rejected (bool): If True return rejected events.
-            allow_none (bool): If True, return None if no event found, if
-                False throw an exception.
-
-        Returns:
-            Deferred : A FrozenEvent.
-        """
-        event = yield self.runInteraction(
-            "get_event", self._get_event_txn,
-            event_id,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
-
-        if not event and not allow_none:
-            raise RuntimeError("Could not find event %s" % (event_id,))
-
-        defer.returnValue(event)
-
-    @log_function
-    def _persist_event_txn(self, txn, event, context, backfilled,
-                           stream_ordering=None, is_new_state=True,
-                           current_state=None):
-
-        # Remove the any existing cache entries for the event_id
-        self._get_event_cache.pop(event.event_id)
-
-        # We purposefully do this first since if we include a `current_state`
-        # key, we *want* to update the `current_state_events` table
-        if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
-            )
-
-            for s in current_state:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": s.event_id,
-                        "room_id": s.room_id,
-                        "type": s.type,
-                        "state_key": s.state_key,
-                    },
-                    or_replace=True,
-                )
-
-        if event.is_state() and is_new_state:
-            if not backfilled and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    table="state_forward_extremities",
-                    values={
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    or_replace=True,
-                )
-
-                for prev_state_id, _ in event.prev_state:
-                    self._simple_delete_txn(
-                        txn,
-                        table="state_forward_extremities",
-                        keyvalues={
-                            "event_id": prev_state_id,
-                        }
-                    )
-
-        outlier = event.internal_metadata.is_outlier()
-
-        if not outlier:
-            self._store_state_groups_txn(txn, event, context)
-
-            self._update_min_depth_for_room_txn(
-                txn,
-                event.room_id,
-                event.depth
-            )
-
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
-        have_persisted = self._simple_select_one_onecol_txn(
-            txn,
-            table="event_json",
-            keyvalues={"event_id": event.event_id},
-            retcol="event_id",
-            allow_none=True,
-        )
-
-        metadata_json = encode_canonical_json(
-            event.internal_metadata.get_dict()
-        )
-
-        # If we have already persisted this event, we don't need to do any
-        # more processing.
-        # The processing above must be done on every call to persist event,
-        # since they might not have happened on previous calls. For example,
-        # if we are persisting an event that we had persisted as an outlier,
-        # but is no longer one.
-        if have_persisted:
-            if not outlier:
-                sql = (
-                    "UPDATE event_json SET internal_metadata = ?"
-                    " WHERE event_id = ?"
-                )
-                txn.execute(
-                    sql,
-                    (metadata_json.decode("UTF-8"), event.event_id,)
-                )
-
-                sql = (
-                    "UPDATE events SET outlier = 0"
-                    " WHERE event_id = ?"
-                )
-                txn.execute(
-                    sql,
-                    (event.event_id,)
-                )
-            return
-
-        if event.type == EventTypes.Member:
-            self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
-        elif event.type == EventTypes.Name:
-            self._store_room_name_txn(txn, event)
-        elif event.type == EventTypes.Topic:
-            self._store_room_topic_txn(txn, event)
-        elif event.type == EventTypes.Redaction:
-            self._store_redaction(txn, event)
-
-        event_dict = {
-            k: v
-            for k, v in event.get_dict().items()
-            if k not in [
-                "redacted",
-                "redacted_because",
-            ]
-        }
-
-        self._simple_insert_txn(
-            txn,
-            table="event_json",
-            values={
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "internal_metadata": metadata_json.decode("UTF-8"),
-                "json": encode_canonical_json(event_dict).decode("UTF-8"),
-            },
-            or_replace=True,
-        )
-
-        content = encode_canonical_json(
-            event.content
-        ).decode("UTF-8")
-
-        vals = {
-            "topological_ordering": event.depth,
-            "event_id": event.event_id,
-            "type": event.type,
-            "room_id": event.room_id,
-            "content": content,
-            "processed": True,
-            "outlier": outlier,
-            "depth": event.depth,
-        }
-
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
-
-        unrec = {
-            k: v
-            for k, v in event.get_dict().items()
-            if k not in vals.keys() and k not in [
-                "redacted",
-                "redacted_because",
-                "signatures",
-                "hashes",
-                "prev_events",
-            ]
-        }
-
-        vals["unrecognized_keys"] = encode_canonical_json(
-            unrec
-        ).decode("UTF-8")
-
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
-            )
-            raise _RollbackButIsFineException("_persist_event")
-
-        if context.rejected:
-            self._store_rejections_txn(txn, event.event_id, context.rejected)
-
-        if event.is_state():
-            vals = {
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "type": event.type,
-                "state_key": event.state_key,
-            }
-
-            # TODO: How does this work with backfilling?
-            if hasattr(event, "replaces_state"):
-                vals["prev_state"] = event.replaces_state
-
-            self._simple_insert_txn(
-                txn,
-                "state_events",
-                vals,
-                or_replace=True,
-            )
-
-            if is_new_state and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    or_replace=True,
-                )
-
-            for e_id, h in event.prev_state:
-                self._simple_insert_txn(
-                    txn,
-                    table="event_edges",
-                    values={
-                        "event_id": event.event_id,
-                        "prev_event_id": e_id,
-                        "room_id": event.room_id,
-                        "is_state": 1,
-                    },
-                    or_ignore=True,
-                )
-
-        for hash_alg, hash_base64 in event.hashes.items():
-            hash_bytes = decode_base64(hash_base64)
-            self._store_event_content_hash_txn(
-                txn, event.event_id, hash_alg, hash_bytes,
-            )
-
-        for prev_event_id, prev_hashes in event.prev_events:
-            for alg, hash_base64 in prev_hashes.items():
-                hash_bytes = decode_base64(hash_base64)
-                self._store_prev_event_hash_txn(
-                    txn, event.event_id, prev_event_id, alg, hash_bytes
-                )
-
-        for auth_id, _ in event.auth_events:
-            self._simple_insert_txn(
-                txn,
-                table="event_auth",
-                values={
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "auth_id": auth_id,
-                },
-                or_ignore=True,
-            )
-
-        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
-        )
-
-    def _store_redaction(self, txn, event):
-        # invalidate the cache for the redacted event
-        self._get_event_cache.pop(event.redacts)
-        txn.execute(
-            "INSERT OR IGNORE INTO redactions "
-            "(event_id, redacts) VALUES (?,?)",
-            (event.event_id, event.redacts)
-        )
-
-    @defer.inlineCallbacks
-    def get_current_state(self, room_id, event_type=None, state_key=""):
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
-            "LIMIT 1"
-        )
-
-        sql = (
-            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
-            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
-            "INNER JOIN state_events as s ON e.event_id = s.event_id "
-            "WHERE c.room_id = ? "
-        ) % {
-            "redacted": del_sql,
-        }
-
-        if event_type and state_key is not None:
-            sql += " AND s.type = ? AND s.state_key = ? "
-            args = (room_id, event_type, state_key)
-        elif event_type:
-            sql += " AND s.type = ?"
-            args = (room_id, event_type)
-        else:
-            args = (room_id, )
-
-        results = yield self._execute_and_decode("get_current_state", sql, *args)
-
-        events = yield self._parse_events(results)
-        defer.returnValue(events)
-
-    @defer.inlineCallbacks
-    def get_room_name_and_aliases(self, room_id):
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
-            "LIMIT 1"
-        )
-
-        sql = (
-            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
-            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
-            "INNER JOIN state_events as s ON e.event_id = s.event_id "
-            "WHERE c.room_id = ? "
-        ) % {
-            "redacted": del_sql,
-        }
-
-        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
-        sql += " OR s.type = 'm.room.aliases')"
-        args = (room_id,)
-
-        results = yield self._execute_and_decode("get_current_state", sql, *args)
-
-        events = yield self._parse_events(results)
-
-        name = None
-        aliases = []
-
-        for e in events:
-            if e.type == 'm.room.name':
-                if 'name' in e.content:
-                    name = e.content['name']
-            elif e.type == 'm.room.aliases':
-                if 'aliases' in e.content:
-                    aliases.extend(e.content['aliases'])
-
-        defer.returnValue((name, aliases))
-
-    @defer.inlineCallbacks
-    def _get_min_token(self):
-        row = yield self._execute(
-            "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
-        )
-
-        self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
-        self.min_token = min(self.min_token, -1)
-
-        logger.debug("min_token is: %s", self.min_token)
-
-        defer.returnValue(self.min_token)
-
     def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
         return self._simple_insert(
             "user_ips",
@@ -527,38 +95,6 @@ class DataStore(RoomMemberStore, RoomStore,
             ],
         )
 
-    def have_events(self, event_ids):
-        """Given a list of event ids, check if we have already processed them.
-
-        Returns:
-            dict: Has an entry for each event id we already have seen. Maps to
-            the rejected reason string if we rejected the event, else maps to
-            None.
-        """
-        if not event_ids:
-            return defer.succeed({})
-
-        def f(txn):
-            sql = (
-                "SELECT e.event_id, reason FROM events as e "
-                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
-                "WHERE e.event_id = ?"
-            )
-
-            res = {}
-            for event_id in event_ids:
-                txn.execute(sql, (event_id,))
-                row = txn.fetchone()
-                if row:
-                    _, rejected = row
-                    res[event_id] = rejected
-
-            return res
-
-        return self.runInteraction(
-            "have_events", f,
-        )
-
 
 def read_schema(path):
     """ Read the named database schema.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 9125bb1198..0260b4e645 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -789,6 +789,13 @@ class SQLBaseStore(object):
         return result[0] if result else None
 
 
+class _RollbackButIsFineException(Exception):
+    """ This exception is used to rollback a transaction without implying
+    something went wrong.
+    """
+    pass
+
+
 class Table(object):
     """ A base class used to store information about a particular table.
     """
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
new file mode 100644
index 0000000000..b295dc5b27
--- /dev/null
+++ b/synapse/storage/events.py
@@ -0,0 +1,394 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from _base import SQLBaseStore, _RollbackButIsFineException
+
+from twisted.internet import defer
+
+from synapse.util.logutils import log_function
+from synapse.api.constants import EventTypes
+from synapse.crypto.event_signing import compute_event_reference_hash
+
+from syutil.base64util import decode_base64
+from syutil.jsonutil import encode_canonical_json
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class EventsStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    @log_function
+    def persist_event(self, event, context, backfilled=False,
+                      is_new_state=True, current_state=None):
+        stream_ordering = None
+        if backfilled:
+            if not self.min_token_deferred.called:
+                yield self.min_token_deferred
+            self.min_token -= 1
+            stream_ordering = self.min_token
+
+        try:
+            yield self.runInteraction(
+                "persist_event",
+                self._persist_event_txn,
+                event=event,
+                context=context,
+                backfilled=backfilled,
+                stream_ordering=stream_ordering,
+                is_new_state=is_new_state,
+                current_state=current_state,
+            )
+        except _RollbackButIsFineException:
+            pass
+
+    @defer.inlineCallbacks
+    def get_event(self, event_id, check_redacted=True,
+                  get_prev_content=False, allow_rejected=False,
+                  allow_none=False):
+        """Get an event from the database by event_id.
+
+        Args:
+            event_id (str): The event_id of the event to fetch
+            check_redacted (bool): If True, check if event has been redacted
+                and redact it.
+            get_prev_content (bool): If True and event is a state event,
+                include the previous states content in the unsigned field.
+            allow_rejected (bool): If True return rejected events.
+            allow_none (bool): If True, return None if no event found, if
+                False throw an exception.
+
+        Returns:
+            Deferred : A FrozenEvent.
+        """
+        event = yield self.runInteraction(
+            "get_event", self._get_event_txn,
+            event_id,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            allow_rejected=allow_rejected,
+        )
+
+        if not event and not allow_none:
+            raise RuntimeError("Could not find event %s" % (event_id,))
+
+        defer.returnValue(event)
+
+    @log_function
+    def _persist_event_txn(self, txn, event, context, backfilled,
+                           stream_ordering=None, is_new_state=True,
+                           current_state=None):
+
+        # Remove the any existing cache entries for the event_id
+        self._get_event_cache.pop(event.event_id)
+
+        # We purposefully do this first since if we include a `current_state`
+        # key, we *want* to update the `current_state_events` table
+        if current_state:
+            txn.execute(
+                "DELETE FROM current_state_events WHERE room_id = ?",
+                (event.room_id,)
+            )
+
+            for s in current_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": s.event_id,
+                        "room_id": s.room_id,
+                        "type": s.type,
+                        "state_key": s.state_key,
+                    },
+                    or_replace=True,
+                )
+
+        if event.is_state() and is_new_state:
+            if not backfilled and not context.rejected:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_forward_extremities",
+                    values={
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    or_replace=True,
+                )
+
+                for prev_state_id, _ in event.prev_state:
+                    self._simple_delete_txn(
+                        txn,
+                        table="state_forward_extremities",
+                        keyvalues={
+                            "event_id": prev_state_id,
+                        }
+                    )
+
+        outlier = event.internal_metadata.is_outlier()
+
+        if not outlier:
+            self._store_state_groups_txn(txn, event, context)
+
+            self._update_min_depth_for_room_txn(
+                txn,
+                event.room_id,
+                event.depth
+            )
+
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
+        have_persisted = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_json",
+            keyvalues={"event_id": event.event_id},
+            retcol="event_id",
+            allow_none=True,
+        )
+
+        metadata_json = encode_canonical_json(
+            event.internal_metadata.get_dict()
+        )
+
+        # If we have already persisted this event, we don't need to do any
+        # more processing.
+        # The processing above must be done on every call to persist event,
+        # since they might not have happened on previous calls. For example,
+        # if we are persisting an event that we had persisted as an outlier,
+        # but is no longer one.
+        if have_persisted:
+            if not outlier:
+                sql = (
+                    "UPDATE event_json SET internal_metadata = ?"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (metadata_json.decode("UTF-8"), event.event_id,)
+                )
+
+                sql = (
+                    "UPDATE events SET outlier = 0"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (event.event_id,)
+                )
+            return
+
+        if event.type == EventTypes.Member:
+            self._store_room_member_txn(txn, event)
+        elif event.type == EventTypes.Feedback:
+            self._store_feedback_txn(txn, event)
+        elif event.type == EventTypes.Name:
+            self._store_room_name_txn(txn, event)
+        elif event.type == EventTypes.Topic:
+            self._store_room_topic_txn(txn, event)
+        elif event.type == EventTypes.Redaction:
+            self._store_redaction(txn, event)
+
+        event_dict = {
+            k: v
+            for k, v in event.get_dict().items()
+            if k not in [
+                "redacted",
+                "redacted_because",
+            ]
+        }
+
+        self._simple_insert_txn(
+            txn,
+            table="event_json",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "internal_metadata": metadata_json.decode("UTF-8"),
+                "json": encode_canonical_json(event_dict).decode("UTF-8"),
+            },
+            or_replace=True,
+        )
+
+        content = encode_canonical_json(
+            event.content
+        ).decode("UTF-8")
+
+        vals = {
+            "topological_ordering": event.depth,
+            "event_id": event.event_id,
+            "type": event.type,
+            "room_id": event.room_id,
+            "content": content,
+            "processed": True,
+            "outlier": outlier,
+            "depth": event.depth,
+        }
+
+        if stream_ordering is not None:
+            vals["stream_ordering"] = stream_ordering
+
+        unrec = {
+            k: v
+            for k, v in event.get_dict().items()
+            if k not in vals.keys() and k not in [
+                "redacted",
+                "redacted_because",
+                "signatures",
+                "hashes",
+                "prev_events",
+            ]
+        }
+
+        vals["unrecognized_keys"] = encode_canonical_json(
+            unrec
+        ).decode("UTF-8")
+
+        try:
+            self._simple_insert_txn(
+                txn,
+                "events",
+                vals,
+                or_replace=(not outlier),
+                or_ignore=bool(outlier),
+            )
+        except:
+            logger.warn(
+                "Failed to persist, probably duplicate: %s",
+                event.event_id,
+                exc_info=True,
+            )
+            raise _RollbackButIsFineException("_persist_event")
+
+        if context.rejected:
+            self._store_rejections_txn(txn, event.event_id, context.rejected)
+
+        if event.is_state():
+            vals = {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "state_key": event.state_key,
+            }
+
+            # TODO: How does this work with backfilling?
+            if hasattr(event, "replaces_state"):
+                vals["prev_state"] = event.replaces_state
+
+            self._simple_insert_txn(
+                txn,
+                "state_events",
+                vals,
+            )
+
+            if is_new_state and not context.rejected:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                )
+
+            for e_id, h in event.prev_state:
+                self._simple_insert_txn(
+                    txn,
+                    table="event_edges",
+                    values={
+                        "event_id": event.event_id,
+                        "prev_event_id": e_id,
+                        "room_id": event.room_id,
+                        "is_state": 1,
+                    },
+                )
+
+        for hash_alg, hash_base64 in event.hashes.items():
+            hash_bytes = decode_base64(hash_base64)
+            self._store_event_content_hash_txn(
+                txn, event.event_id, hash_alg, hash_bytes,
+            )
+
+        for prev_event_id, prev_hashes in event.prev_events:
+            for alg, hash_base64 in prev_hashes.items():
+                hash_bytes = decode_base64(hash_base64)
+                self._store_prev_event_hash_txn(
+                    txn, event.event_id, prev_event_id, alg, hash_bytes
+                )
+
+        for auth_id, _ in event.auth_events:
+            self._simple_insert_txn(
+                txn,
+                table="event_auth",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                },
+            )
+
+        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+        self._store_event_reference_hash_txn(
+            txn, event.event_id, ref_alg, ref_hash_bytes
+        )
+
+    def _store_redaction(self, txn, event):
+        # invalidate the cache for the redacted event
+        self._get_event_cache.pop(event.redacts)
+        txn.execute(
+            "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
+            (event.event_id, event.redacts)
+        )
+
+    def have_events(self, event_ids):
+        """Given a list of event ids, check if we have already processed them.
+
+        Returns:
+            dict: Has an entry for each event id we already have seen. Maps to
+            the rejected reason string if we rejected the event, else maps to
+            None.
+        """
+        if not event_ids:
+            return defer.succeed({})
+
+        def f(txn):
+            sql = (
+                "SELECT e.event_id, reason FROM events as e "
+                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+                "WHERE e.event_id = ?"
+            )
+
+            res = {}
+            for event_id in event_ids:
+                txn.execute(sql, (event_id,))
+                row = txn.fetchone()
+                if row:
+                    _, rejected = row
+                    res[event_id] = rejected
+
+            return res
+
+        return self.runInteraction(
+            "have_events", f,
+        )
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
deleted file mode 100644
index 8eab769b71..0000000000
--- a/synapse/storage/feedback.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from twisted.internet import defer
-
-from ._base import SQLBaseStore
-
-
-class FeedbackStore(SQLBaseStore):
-
-    def _store_feedback_txn(self, txn, event):
-        self._simple_insert_txn(txn, "feedback", {
-            "event_id": event.event_id,
-            "feedback_type": event.content["type"],
-            "room_id": event.room_id,
-            "target_event_id": event.content["target_event_id"],
-            "sender": event.user_id,
-        })
-
-    @defer.inlineCallbacks
-    def get_feedback_for_event(self, event_id):
-        sql = (
-            "SELECT events.* FROM events INNER JOIN feedback "
-            "ON events.event_id = feedback.event_id "
-            "WHERE feedback.target_event_id = ? "
-        )
-
-        rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id)
-
-        defer.returnValue(
-            [
-                (yield self._parse_events(r))
-                for r in rows
-            ]
-        )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 549c9af393..71bae15450 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -158,6 +158,43 @@ class RoomStore(SQLBaseStore):
                 }
             )
 
+    @defer.inlineCallbacks
+    def get_room_name_and_aliases(self, room_id):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
+        sql = (
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        ) % {
+            "redacted": del_sql,
+        }
+
+        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+        sql += " OR s.type = 'm.room.aliases')"
+        args = (room_id,)
+
+        results = yield self._execute_and_decode("get_current_state", sql, *args)
+
+        events = yield self._parse_events(results)
+
+        name = None
+        aliases = []
+
+        for e in events:
+            if e.type == 'm.room.name':
+                if 'name' in e.content:
+                    name = e.content['name']
+            elif e.type == 'm.room.aliases':
+                if 'aliases' in e.content:
+                    aliases.extend(e.content['aliases'])
+
+        defer.returnValue((name, aliases))
+
 
 class RoomsTable(Table):
     table_name = "rooms"
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 456e4bd45d..58dbf2802b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -15,6 +15,8 @@
 
 from ._base import SQLBaseStore
 
+from twisted.internet import defer
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -122,3 +124,33 @@ class StateStore(SQLBaseStore):
             },
             or_replace=True,
         )
+
+    @defer.inlineCallbacks
+    def get_current_state(self, room_id, event_type=None, state_key=""):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
+        sql = (
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        ) % {
+            "redacted": del_sql,
+        }
+
+        if event_type and state_key is not None:
+            sql += " AND s.type = ? AND s.state_key = ? "
+            args = (room_id, event_type, state_key)
+        elif event_type:
+            sql += " AND s.type = ?"
+            args = (room_id, event_type)
+        else:
+            args = (room_id, )
+
+        results = yield self._execute_and_decode("get_current_state", sql, *args)
+
+        events = yield self._parse_events(results)
+        defer.returnValue(events)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 09bc522210..df234efdff 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -419,6 +419,25 @@ class StreamStore(SQLBaseStore):
             self._get_room_events_max_id_txn
         )
 
+    @defer.inlineCallbacks
+    def _get_min_token(self):
+        row = yield self._execute(
+            "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
+        )
+
+        self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
+        self.min_token = min(self.min_token, -1)
+
+        logger.debug("min_token is: %s", self.min_token)
+
+        defer.returnValue(self.min_token)
+
+    def get_next_stream_id(self):
+        with self._next_stream_id_lock:
+            i = self._next_stream_id
+            self._next_stream_id += 1
+            return i
+
     def _get_room_events_max_id_txn(self, txn):
         txn.execute(
             "SELECT MAX(stream_ordering) as m FROM events"