summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py498
-rw-r--r--synapse/storage/_base.py238
-rw-r--r--synapse/storage/appservice.py457
-rw-r--r--synapse/storage/directory.py17
-rw-r--r--synapse/storage/events.py395
-rw-r--r--synapse/storage/feedback.py47
-rw-r--r--synapse/storage/filtering.py1
-rw-r--r--synapse/storage/media_repository.py20
-rw-r--r--synapse/storage/presence.py12
-rw-r--r--synapse/storage/profile.py5
-rw-r--r--synapse/storage/push_rule.py9
-rw-r--r--synapse/storage/pusher.py16
-rw-r--r--synapse/storage/registration.py9
-rw-r--r--synapse/storage/rejections.py3
-rw-r--r--synapse/storage/room.py76
-rw-r--r--synapse/storage/roommember.py3
-rw-r--r--synapse/storage/schema/delta/15/appservice_txns.sql30
-rw-r--r--synapse/storage/state.py32
-rw-r--r--synapse/storage/stream.py22
-rw-r--r--synapse/storage/transactions.py20
20 files changed, 1101 insertions, 809 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4b16f445d6..f4dec70393 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,13 +14,12 @@
 # limitations under the License.
 
 from twisted.internet import defer
-
-from synapse.util.logutils import log_function
-from synapse.api.constants import EventTypes
-
-from .appservice import ApplicationServiceStore
+from .appservice import (
+    ApplicationServiceStore, ApplicationServiceTransactionStore
+)
+from ._base import Cache
 from .directory import DirectoryStore
-from .feedback import FeedbackStore
+from .events import EventsStore
 from .presence import PresenceStore
 from .profile import ProfileStore
 from .registration import RegistrationStore
@@ -39,11 +38,6 @@ from .state import StateStore
 from .signatures import SignatureStore
 from .filtering import FilteringStore
 
-from syutil.base64util import decode_base64
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import compute_event_reference_hash
-
 
 import fnmatch
 import imp
@@ -57,20 +51,18 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 14
+SCHEMA_VERSION = 15
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
-
-class _RollbackButIsFineException(Exception):
-    """ This exception is used to rollback a transaction without implying
-    something went wrong.
-    """
-    pass
+# Number of msec of granularity to store the user IP 'last seen' time. Smaller
+# times give more inserts into the database even for readonly API hits
+# 120 seconds == 2 minutes
+LAST_SEEN_GRANULARITY = 120*1000
 
 
 class DataStore(RoomMemberStore, RoomStore,
-                RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
+                RegistrationStore, StreamStore, ProfileStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 ApplicationServiceStore,
@@ -79,7 +71,9 @@ class DataStore(RoomMemberStore, RoomStore,
                 RejectionsStore,
                 FilteringStore,
                 PusherStore,
-                PushRuleStore
+                PushRuleStore,
+                ApplicationServiceTransactionStore,
+                EventsStore,
                 ):
 
     def __init__(self, hs):
@@ -89,424 +83,28 @@ class DataStore(RoomMemberStore, RoomStore,
         self.min_token_deferred = self._get_min_token()
         self.min_token = None
 
-    @defer.inlineCallbacks
-    @log_function
-    def persist_event(self, event, context, backfilled=False,
-                      is_new_state=True, current_state=None):
-        stream_ordering = None
-        if backfilled:
-            if not self.min_token_deferred.called:
-                yield self.min_token_deferred
-            self.min_token -= 1
-            stream_ordering = self.min_token
-
-        try:
-            yield self.runInteraction(
-                "persist_event",
-                self._persist_event_txn,
-                event=event,
-                context=context,
-                backfilled=backfilled,
-                stream_ordering=stream_ordering,
-                is_new_state=is_new_state,
-                current_state=current_state,
-            )
-        except _RollbackButIsFineException:
-            pass
-
-    @defer.inlineCallbacks
-    def get_event(self, event_id, check_redacted=True,
-                  get_prev_content=False, allow_rejected=False,
-                  allow_none=False):
-        """Get an event from the database by event_id.
-
-        Args:
-            event_id (str): The event_id of the event to fetch
-            check_redacted (bool): If True, check if event has been redacted
-                and redact it.
-            get_prev_content (bool): If True and event is a state event,
-                include the previous states content in the unsigned field.
-            allow_rejected (bool): If True return rejected events.
-            allow_none (bool): If True, return None if no event found, if
-                False throw an exception.
-
-        Returns:
-            Deferred : A FrozenEvent.
-        """
-        event = yield self.runInteraction(
-            "get_event", self._get_event_txn,
-            event_id,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
-
-        if not event and not allow_none:
-            raise RuntimeError("Could not find event %s" % (event_id,))
-
-        defer.returnValue(event)
-
-    @log_function
-    def _persist_event_txn(self, txn, event, context, backfilled,
-                           stream_ordering=None, is_new_state=True,
-                           current_state=None):
-
-        # Remove the any existing cache entries for the event_id
-        self._get_event_cache.pop(event.event_id)
-
-        # We purposefully do this first since if we include a `current_state`
-        # key, we *want* to update the `current_state_events` table
-        if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
-            )
-
-            for s in current_state:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": s.event_id,
-                        "room_id": s.room_id,
-                        "type": s.type,
-                        "state_key": s.state_key,
-                    },
-                    or_replace=True,
-                )
-
-        if event.is_state() and is_new_state:
-            if not backfilled and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    table="state_forward_extremities",
-                    values={
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    or_replace=True,
-                )
-
-                for prev_state_id, _ in event.prev_state:
-                    self._simple_delete_txn(
-                        txn,
-                        table="state_forward_extremities",
-                        keyvalues={
-                            "event_id": prev_state_id,
-                        }
-                    )
-
-        outlier = event.internal_metadata.is_outlier()
-
-        if not outlier:
-            self._store_state_groups_txn(txn, event, context)
-
-            self._update_min_depth_for_room_txn(
-                txn,
-                event.room_id,
-                event.depth
-            )
-
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
-        have_persisted = self._simple_select_one_onecol_txn(
-            txn,
-            table="event_json",
-            keyvalues={"event_id": event.event_id},
-            retcol="event_id",
-            allow_none=True,
-        )
-
-        metadata_json = encode_canonical_json(
-            event.internal_metadata.get_dict()
-        )
-
-        # If we have already persisted this event, we don't need to do any
-        # more processing.
-        # The processing above must be done on every call to persist event,
-        # since they might not have happened on previous calls. For example,
-        # if we are persisting an event that we had persisted as an outlier,
-        # but is no longer one.
-        if have_persisted:
-            if not outlier:
-                sql = (
-                    "UPDATE event_json SET internal_metadata = ?"
-                    " WHERE event_id = ?"
-                )
-                txn.execute(
-                    sql,
-                    (metadata_json.decode("UTF-8"), event.event_id,)
-                )
-
-                sql = (
-                    "UPDATE events SET outlier = 0"
-                    " WHERE event_id = ?"
-                )
-                txn.execute(
-                    sql,
-                    (event.event_id,)
-                )
-            return
-
-        if event.type == EventTypes.Member:
-            self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
-        elif event.type == EventTypes.Name:
-            self._store_room_name_txn(txn, event)
-        elif event.type == EventTypes.Topic:
-            self._store_room_topic_txn(txn, event)
-        elif event.type == EventTypes.Redaction:
-            self._store_redaction(txn, event)
-
-        event_dict = {
-            k: v
-            for k, v in event.get_dict().items()
-            if k not in [
-                "redacted",
-                "redacted_because",
-            ]
-        }
-
-        self._simple_insert_txn(
-            txn,
-            table="event_json",
-            values={
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "internal_metadata": metadata_json.decode("UTF-8"),
-                "json": encode_canonical_json(event_dict).decode("UTF-8"),
-            },
-            or_replace=True,
-        )
-
-        content = encode_canonical_json(
-            event.content
-        ).decode("UTF-8")
-
-        vals = {
-            "topological_ordering": event.depth,
-            "event_id": event.event_id,
-            "type": event.type,
-            "room_id": event.room_id,
-            "content": content,
-            "processed": True,
-            "outlier": outlier,
-            "depth": event.depth,
-        }
-
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
-
-        unrec = {
-            k: v
-            for k, v in event.get_dict().items()
-            if k not in vals.keys() and k not in [
-                "redacted",
-                "redacted_because",
-                "signatures",
-                "hashes",
-                "prev_events",
-            ]
-        }
-
-        vals["unrecognized_keys"] = encode_canonical_json(
-            unrec
-        ).decode("UTF-8")
-
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
-            )
-            raise _RollbackButIsFineException("_persist_event")
-
-        if context.rejected:
-            self._store_rejections_txn(txn, event.event_id, context.rejected)
-
-        if event.is_state():
-            vals = {
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "type": event.type,
-                "state_key": event.state_key,
-            }
-
-            # TODO: How does this work with backfilling?
-            if hasattr(event, "replaces_state"):
-                vals["prev_state"] = event.replaces_state
-
-            self._simple_insert_txn(
-                txn,
-                "state_events",
-                vals,
-                or_replace=True,
-            )
-
-            if is_new_state and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    or_replace=True,
-                )
-
-            for e_id, h in event.prev_state:
-                self._simple_insert_txn(
-                    txn,
-                    table="event_edges",
-                    values={
-                        "event_id": event.event_id,
-                        "prev_event_id": e_id,
-                        "room_id": event.room_id,
-                        "is_state": 1,
-                    },
-                    or_ignore=True,
-                )
-
-        for hash_alg, hash_base64 in event.hashes.items():
-            hash_bytes = decode_base64(hash_base64)
-            self._store_event_content_hash_txn(
-                txn, event.event_id, hash_alg, hash_bytes,
-            )
-
-        for prev_event_id, prev_hashes in event.prev_events:
-            for alg, hash_base64 in prev_hashes.items():
-                hash_bytes = decode_base64(hash_base64)
-                self._store_prev_event_hash_txn(
-                    txn, event.event_id, prev_event_id, alg, hash_bytes
-                )
-
-        for auth_id, _ in event.auth_events:
-            self._simple_insert_txn(
-                txn,
-                table="event_auth",
-                values={
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "auth_id": auth_id,
-                },
-                or_ignore=True,
-            )
-
-        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
-        )
-
-    def _store_redaction(self, txn, event):
-        # invalidate the cache for the redacted event
-        self._get_event_cache.pop(event.redacts)
-        txn.execute(
-            "INSERT OR IGNORE INTO redactions "
-            "(event_id, redacts) VALUES (?,?)",
-            (event.event_id, event.redacts)
-        )
-
-    @defer.inlineCallbacks
-    def get_current_state(self, room_id, event_type=None, state_key=""):
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
-            "LIMIT 1"
-        )
-
-        sql = (
-            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
-            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
-            "INNER JOIN state_events as s ON e.event_id = s.event_id "
-            "WHERE c.room_id = ? "
-        ) % {
-            "redacted": del_sql,
-        }
-
-        if event_type and state_key is not None:
-            sql += " AND s.type = ? AND s.state_key = ? "
-            args = (room_id, event_type, state_key)
-        elif event_type:
-            sql += " AND s.type = ?"
-            args = (room_id, event_type)
-        else:
-            args = (room_id, )
-
-        results = yield self._execute_and_decode("get_current_state", sql, *args)
-
-        events = yield self._parse_events(results)
-        defer.returnValue(events)
-
-    @defer.inlineCallbacks
-    def get_room_name_and_aliases(self, room_id):
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
-            "LIMIT 1"
+        self.client_ip_last_seen = Cache(
+            name="client_ip_last_seen",
+            keylen=4,
         )
 
-        sql = (
-            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
-            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
-            "INNER JOIN state_events as s ON e.event_id = s.event_id "
-            "WHERE c.room_id = ? "
-        ) % {
-            "redacted": del_sql,
-        }
-
-        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
-        sql += " OR s.type = 'm.room.aliases')"
-        args = (room_id,)
-
-        results = yield self._execute_and_decode("get_current_state", sql, *args)
-
-        events = yield self._parse_events(results)
-
-        name = None
-        aliases = []
-
-        for e in events:
-            if e.type == 'm.room.name':
-                if 'name' in e.content:
-                    name = e.content['name']
-            elif e.type == 'm.room.aliases':
-                if 'aliases' in e.content:
-                    aliases.extend(e.content['aliases'])
-
-        defer.returnValue((name, aliases))
-
     @defer.inlineCallbacks
-    def _get_min_token(self):
-        row = yield self._execute(
-            "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
-        )
+    def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
+        now = int(self._clock.time_msec())
+        key = (user.to_string(), access_token, device_id, ip)
 
-        self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
-        self.min_token = min(self.min_token, -1)
+        try:
+            last_seen = self.client_ip_last_seen.get(*key)
+        except KeyError:
+            last_seen = None
 
-        logger.debug("min_token is: %s", self.min_token)
+        # Rate-limited inserts
+        if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+            defer.returnValue(None)
 
-        defer.returnValue(self.min_token)
+        self.client_ip_last_seen.prefill(*key + (now,))
 
-    def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
-        return self._simple_insert(
+        yield self._simple_insert(
             "user_ips",
             {
                 "user": user.to_string(),
@@ -514,8 +112,9 @@ class DataStore(RoomMemberStore, RoomStore,
                 "device_id": device_id,
                 "ip": ip,
                 "user_agent": user_agent,
-                "last_seen": int(self._clock.time_msec()),
-            }
+                "last_seen": now,
+            },
+            desc="insert_client_ip",
         )
 
     def get_user_ip_and_agents(self, user):
@@ -525,38 +124,7 @@ class DataStore(RoomMemberStore, RoomStore,
             retcols=[
                 "device_id", "access_token", "ip", "user_agent", "last_seen"
             ],
-        )
-
-    def have_events(self, event_ids):
-        """Given a list of event ids, check if we have already processed them.
-
-        Returns:
-            dict: Has an entry for each event id we already have seen. Maps to
-            the rejected reason string if we rejected the event, else maps to
-            None.
-        """
-        if not event_ids:
-            return defer.succeed({})
-
-        def f(txn):
-            sql = (
-                "SELECT e.event_id, reason FROM events as e "
-                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
-                "WHERE e.event_id = ?"
-            )
-
-            res = {}
-            for event_id in event_ids:
-                txn.execute(sql, (event_id,))
-                row = txn.fetchone()
-                if row:
-                    _, rejected = row
-                    res[event_id] = rejected
-
-            return res
-
-        return self.runInteraction(
-            "have_events", f,
+            desc="get_user_ip_and_agents",
         )
 
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 9125bb1198..53eee10d51 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
 
 sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
+perf_logger = logging.getLogger("synapse.storage.TIME")
 
 
 metrics = synapse.metrics.get_metrics_for("synapse.storage")
@@ -53,14 +54,57 @@ cache_counter = metrics.register_cache(
 )
 
 
-# TODO(paul):
-#  * more generic key management
-#  * consider other eviction strategies - LRU?
-def cached(max_entries=1000):
+class Cache(object):
+
+    def __init__(self, name, max_entries=1000, keylen=1, lru=False):
+        if lru:
+            self.cache = LruCache(max_size=max_entries)
+            self.max_entries = None
+        else:
+            self.cache = OrderedDict()
+            self.max_entries = max_entries
+
+        self.name = name
+        self.keylen = keylen
+
+        caches_by_name[name] = self.cache
+
+    def get(self, *keyargs):
+        if len(keyargs) != self.keylen:
+            raise ValueError("Expected a key to have %d items", self.keylen)
+
+        if keyargs in self.cache:
+            cache_counter.inc_hits(self.name)
+            return self.cache[keyargs]
+
+        cache_counter.inc_misses(self.name)
+        raise KeyError()
+
+    def prefill(self, *args):  # because I can't  *keyargs, value
+        keyargs = args[:-1]
+        value = args[-1]
+
+        if len(keyargs) != self.keylen:
+            raise ValueError("Expected a key to have %d items", self.keylen)
+
+        if self.max_entries is not None:
+            while len(self.cache) >= self.max_entries:
+                self.cache.popitem(last=False)
+
+        self.cache[keyargs] = value
+
+    def invalidate(self, *keyargs):
+        if len(keyargs) != self.keylen:
+            raise ValueError("Expected a key to have %d items", self.keylen)
+
+        self.cache.pop(keyargs, None)
+
+
+def cached(max_entries=1000, num_args=1, lru=False):
     """ A method decorator that applies a memoizing cache around the function.
 
-    The function is presumed to take one additional argument, which is used as
-    the key for the cache. Cache hits are served directly from the cache;
+    The function is presumed to take zero or more arguments, which are used in
+    a tuple as the key for the cache. Hits are served directly from the cache;
     misses use the function body to generate the value.
 
     The wrapped function has an additional member, a callable called
@@ -71,34 +115,27 @@ def cached(max_entries=1000):
     calling the calculation function.
     """
     def wrap(orig):
-        cache = OrderedDict()
-        name = orig.__name__
-
-        caches_by_name[name] = cache
-
-        def prefill(key, value):
-            while len(cache) > max_entries:
-                cache.popitem(last=False)
-
-            cache[key] = value
+        cache = Cache(
+            name=orig.__name__,
+            max_entries=max_entries,
+            keylen=num_args,
+            lru=lru,
+        )
 
         @functools.wraps(orig)
         @defer.inlineCallbacks
-        def wrapped(self, key):
-            if key in cache:
-                cache_counter.inc_hits(name)
-                defer.returnValue(cache[key])
+        def wrapped(self, *keyargs):
+            try:
+                defer.returnValue(cache.get(*keyargs))
+            except KeyError:
+                ret = yield orig(self, *keyargs)
 
-            cache_counter.inc_misses(name)
-            ret = yield orig(self, key)
-            prefill(key, ret)
-            defer.returnValue(ret)
+                cache.prefill(*keyargs + (ret,))
 
-        def invalidate(key):
-            cache.pop(key, None)
+                defer.returnValue(ret)
 
-        wrapped.invalidate = invalidate
-        wrapped.prefill = prefill
+        wrapped.invalidate = cache.invalidate
+        wrapped.prefill = cache.prefill
         return wrapped
 
     return wrap
@@ -232,7 +269,7 @@ class SQLBaseStore(object):
                 time_now - time_then, limit=3
             )
 
-            logger.info(
+            perf_logger.info(
                 "Total database time: %.3f%% {%s} {%s}",
                 ratio * 100, top_three_counters, top_3_event_counters
             )
@@ -321,7 +358,8 @@ class SQLBaseStore(object):
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
-    def _simple_insert(self, table, values, or_replace=False, or_ignore=False):
+    def _simple_insert(self, table, values, or_replace=False, or_ignore=False,
+                       desc="_simple_insert"):
         """Executes an INSERT query on the named table.
 
         Args:
@@ -330,7 +368,7 @@ class SQLBaseStore(object):
             or_replace : bool; if True performs an INSERT OR REPLACE
         """
         return self.runInteraction(
-            "_simple_insert",
+            desc,
             self._simple_insert_txn, table, values, or_replace=or_replace,
             or_ignore=or_ignore,
         )
@@ -354,7 +392,7 @@ class SQLBaseStore(object):
         txn.execute(sql, values.values())
         return txn.lastrowid
 
-    def _simple_upsert(self, table, keyvalues, values):
+    def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"):
         """
         Args:
             table (str): The table to upsert into
@@ -363,7 +401,7 @@ class SQLBaseStore(object):
         Returns: A deferred
         """
         return self.runInteraction(
-            "_simple_upsert",
+            desc,
             self._simple_upsert_txn, table, keyvalues, values
         )
 
@@ -399,7 +437,7 @@ class SQLBaseStore(object):
             txn.execute(sql, allvalues.values())
 
     def _simple_select_one(self, table, keyvalues, retcols,
-                           allow_none=False):
+                           allow_none=False, desc="_simple_select_one"):
         """Executes a SELECT query on the named table, which is expected to
         return a single row, returning a single column from it.
 
@@ -411,12 +449,15 @@ class SQLBaseStore(object):
             allow_none : If true, return None instead of failing if the SELECT
               statement returns no rows
         """
-        return self._simple_selectupdate_one(
-            table, keyvalues, retcols=retcols, allow_none=allow_none
+        return self.runInteraction(
+            desc,
+            self._simple_select_one_txn,
+            table, keyvalues, retcols, allow_none,
         )
 
     def _simple_select_one_onecol(self, table, keyvalues, retcol,
-                                  allow_none=False):
+                                  allow_none=False,
+                                  desc="_simple_select_one_onecol"):
         """Executes a SELECT query on the named table, which is expected to
         return a single row, returning a single column from it."
 
@@ -426,7 +467,7 @@ class SQLBaseStore(object):
             retcol : string giving the name of the column to return
         """
         return self.runInteraction(
-            "_simple_select_one_onecol",
+            desc,
             self._simple_select_one_onecol_txn,
             table, keyvalues, retcol, allow_none=allow_none,
         )
@@ -462,7 +503,8 @@ class SQLBaseStore(object):
 
         return [r[0] for r in txn.fetchall()]
 
-    def _simple_select_onecol(self, table, keyvalues, retcol):
+    def _simple_select_onecol(self, table, keyvalues, retcol,
+                              desc="_simple_select_onecol"):
         """Executes a SELECT query on the named table, which returns a list
         comprising of the values of the named column from the selected rows.
 
@@ -475,12 +517,13 @@ class SQLBaseStore(object):
             Deferred: Results in a list
         """
         return self.runInteraction(
-            "_simple_select_onecol",
+            desc,
             self._simple_select_onecol_txn,
             table, keyvalues, retcol
         )
 
-    def _simple_select_list(self, table, keyvalues, retcols):
+    def _simple_select_list(self, table, keyvalues, retcols,
+                            desc="_simple_select_list"):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -491,7 +534,7 @@ class SQLBaseStore(object):
             retcols : list of strings giving the names of the columns to return
         """
         return self.runInteraction(
-            "_simple_select_list",
+            desc,
             self._simple_select_list_txn,
             table, keyvalues, retcols
         )
@@ -523,7 +566,7 @@ class SQLBaseStore(object):
         return self.cursor_to_dict(txn)
 
     def _simple_update_one(self, table, keyvalues, updatevalues,
-                           retcols=None):
+                           desc="_simple_update_one"):
         """Executes an UPDATE query on the named table, setting new values for
         columns in a row matching the key values.
 
@@ -541,56 +584,76 @@ class SQLBaseStore(object):
         get-and-set.  This can be used to implement compare-and-set by putting
         the update column in the 'keyvalues' dict as well.
         """
-        return self._simple_selectupdate_one(table, keyvalues, updatevalues,
-                                             retcols=retcols)
+        return self.runInteraction(
+            desc,
+            self._simple_update_one_txn,
+            table, keyvalues, updatevalues,
+        )
 
-    def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None,
-                                 retcols=None, allow_none=False):
-        """ Combined SELECT then UPDATE."""
-        if retcols:
-            select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
-                ", ".join(retcols),
-                table,
-                " AND ".join("%s = ?" % (k) for k in keyvalues)
-            )
+    def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues):
+        update_sql = "UPDATE %s SET %s WHERE %s" % (
+            table,
+            ", ".join("%s = ?" % (k,) for k in updatevalues),
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+        )
 
-        if updatevalues:
-            update_sql = "UPDATE %s SET %s WHERE %s" % (
-                table,
-                ", ".join("%s = ?" % (k,) for k in updatevalues),
-                " AND ".join("%s = ?" % (k,) for k in keyvalues)
-            )
+        txn.execute(
+            update_sql,
+            updatevalues.values() + keyvalues.values()
+        )
 
+        if txn.rowcount == 0:
+            raise StoreError(404, "No row found")
+        if txn.rowcount > 1:
+            raise StoreError(500, "More than one row matched")
+
+    def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
+                               allow_none=False):
+        select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+            ", ".join(retcols),
+            table,
+            " AND ".join("%s = ?" % (k) for k in keyvalues)
+        )
+
+        txn.execute(select_sql, keyvalues.values())
+
+        row = txn.fetchone()
+        if not row:
+            if allow_none:
+                return None
+            raise StoreError(404, "No row found")
+        if txn.rowcount > 1:
+            raise StoreError(500, "More than one row matched")
+
+        return dict(zip(retcols, row))
+
+    def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None,
+                                 retcols=None, allow_none=False,
+                                 desc="_simple_selectupdate_one"):
+        """ Combined SELECT then UPDATE."""
         def func(txn):
             ret = None
             if retcols:
-                txn.execute(select_sql, keyvalues.values())
-
-                row = txn.fetchone()
-                if not row:
-                    if allow_none:
-                        return None
-                    raise StoreError(404, "No row found")
-                if txn.rowcount > 1:
-                    raise StoreError(500, "More than one row matched")
-
-                ret = dict(zip(retcols, row))
+                ret = self._simple_select_one_txn(
+                    txn,
+                    table=table,
+                    keyvalues=keyvalues,
+                    retcols=retcols,
+                    allow_none=allow_none,
+                )
 
             if updatevalues:
-                txn.execute(
-                    update_sql,
-                    updatevalues.values() + keyvalues.values()
+                self._simple_update_one_txn(
+                    txn,
+                    table=table,
+                    keyvalues=keyvalues,
+                    updatevalues=updatevalues,
                 )
 
-                if txn.rowcount == 0:
-                    raise StoreError(404, "No row found")
-                if txn.rowcount > 1:
-                    raise StoreError(500, "More than one row matched")
-
             return ret
-        return self.runInteraction("_simple_selectupdate_one", func)
+        return self.runInteraction(desc, func)
 
-    def _simple_delete_one(self, table, keyvalues):
+    def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"):
         """Executes a DELETE query on the named table, expecting to delete a
         single row.
 
@@ -609,9 +672,9 @@ class SQLBaseStore(object):
                 raise StoreError(404, "No row found")
             if txn.rowcount > 1:
                 raise StoreError(500, "more than one row matched")
-        return self.runInteraction("_simple_delete_one", func)
+        return self.runInteraction(desc, func)
 
-    def _simple_delete(self, table, keyvalues):
+    def _simple_delete(self, table, keyvalues, desc="_simple_delete"):
         """Executes a DELETE query on the named table.
 
         Args:
@@ -619,7 +682,7 @@ class SQLBaseStore(object):
             keyvalues : dict of column names and values to select the row with
         """
 
-        return self.runInteraction("_simple_delete", self._simple_delete_txn)
+        return self.runInteraction(desc, self._simple_delete_txn)
 
     def _simple_delete_txn(self, txn, table, keyvalues):
         sql = "DELETE FROM %s WHERE %s" % (
@@ -789,6 +852,13 @@ class SQLBaseStore(object):
         return result[0] if result else None
 
 
+class _RollbackButIsFineException(Exception):
+    """ This exception is used to rollback a transaction without implying
+    something went wrong.
+    """
+    pass
+
+
 class Table(object):
     """ A base class used to store information about a particular table.
     """
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 850676ce6c..f8cbb3f323 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -13,154 +13,35 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson
+import urllib
+import yaml
 from simplejson import JSONDecodeError
+import simplejson as json
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
-from synapse.api.errors import StoreError
-from synapse.appservice import ApplicationService
+from synapse.appservice import ApplicationService, AppServiceTransaction
 from synapse.storage.roommember import RoomsForUser
+from synapse.types import UserID
 from ._base import SQLBaseStore
 
 
 logger = logging.getLogger(__name__)
 
 
-def log_failure(failure):
-    logger.error("Failed to detect application services: %s", failure.value)
-    logger.error(failure.getTraceback())
-
-
 class ApplicationServiceStore(SQLBaseStore):
 
     def __init__(self, hs):
         super(ApplicationServiceStore, self).__init__(hs)
+        self.hostname = hs.hostname
         self.services_cache = []
-        self.cache_defer = self._populate_cache()
-        self.cache_defer.addErrback(log_failure)
-
-    @defer.inlineCallbacks
-    def unregister_app_service(self, token):
-        """Unregisters this service.
-
-        This removes all AS specific regex and the base URL. The token is the
-        only thing preserved for future registration attempts.
-        """
-        yield self.cache_defer  # make sure the cache is ready
-        yield self.runInteraction(
-            "unregister_app_service",
-            self._unregister_app_service_txn,
-            token,
-        )
-        # update cache TODO: Should this be in the txn?
-        for service in self.services_cache:
-            if service.token == token:
-                service.url = None
-                service.namespaces = None
-                service.hs_token = None
-
-    def _unregister_app_service_txn(self, txn, token):
-        # kill the url to prevent pushes
-        txn.execute(
-            "UPDATE application_services SET url=NULL WHERE token=?",
-            (token,)
-        )
-
-        # cleanup regex
-        as_id = self._get_as_id_txn(txn, token)
-        if not as_id:
-            logger.warning(
-                "unregister_app_service_txn: Failed to find as_id for token=",
-                token
-            )
-            return False
-
-        txn.execute(
-            "DELETE FROM application_services_regex WHERE as_id=?",
-            (as_id,)
+        self._populate_appservice_cache(
+            hs.config.app_service_config_files
         )
-        return True
 
-    @defer.inlineCallbacks
-    def update_app_service(self, service):
-        """Update an application service, clobbering what was previously there.
-
-        Args:
-            service(ApplicationService): The updated service.
-        """
-        yield self.cache_defer  # make sure the cache is ready
-
-        # NB: There is no "insert" since we provide no public-facing API to
-        # allocate new ASes. It relies on the server admin inserting the AS
-        # token into the database manually.
-
-        if not service.token or not service.url:
-            raise StoreError(400, "Token and url must be specified.")
-
-        if not service.hs_token:
-            raise StoreError(500, "No HS token")
-
-        yield self.runInteraction(
-            "update_app_service",
-            self._update_app_service_txn,
-            service
-        )
-
-        # update cache TODO: Should this be in the txn?
-        for (index, cache_service) in enumerate(self.services_cache):
-            if service.token == cache_service.token:
-                self.services_cache[index] = service
-                logger.info("Updated: %s", service)
-                return
-        # new entry
-        self.services_cache.append(service)
-        logger.info("Updated(new): %s", service)
-
-    def _update_app_service_txn(self, txn, service):
-        as_id = self._get_as_id_txn(txn, service.token)
-        if not as_id:
-            logger.warning(
-                "update_app_service_txn: Failed to find as_id for token=",
-                service.token
-            )
-            return False
-
-        txn.execute(
-            "UPDATE application_services SET url=?, hs_token=?, sender=? "
-            "WHERE id=?",
-            (service.url, service.hs_token, service.sender, as_id,)
-        )
-        # cleanup regex
-        txn.execute(
-            "DELETE FROM application_services_regex WHERE as_id=?",
-            (as_id,)
-        )
-        for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST):
-            if ns_str in service.namespaces:
-                for regex_obj in service.namespaces[ns_str]:
-                    txn.execute(
-                        "INSERT INTO application_services_regex("
-                        "as_id, namespace, regex) values(?,?,?)",
-                        (as_id, ns_int, simplejson.dumps(regex_obj))
-                    )
-        return True
-
-    def _get_as_id_txn(self, txn, token):
-        cursor = txn.execute(
-            "SELECT id FROM application_services WHERE token=?",
-            (token,)
-        )
-        res = cursor.fetchone()
-        if res:
-            return res[0]
-
-    @defer.inlineCallbacks
     def get_app_services(self):
-        yield self.cache_defer  # make sure the cache is ready
-        defer.returnValue(self.services_cache)
+        return defer.succeed(self.services_cache)
 
-    @defer.inlineCallbacks
     def get_app_service_by_user_id(self, user_id):
         """Retrieve an application service from their user ID.
 
@@ -174,37 +55,23 @@ class ApplicationServiceStore(SQLBaseStore):
         Returns:
             synapse.appservice.ApplicationService or None.
         """
-
-        yield self.cache_defer  # make sure the cache is ready
-
         for service in self.services_cache:
             if service.sender == user_id:
-                defer.returnValue(service)
-                return
-        defer.returnValue(None)
+                return defer.succeed(service)
+        return defer.succeed(None)
 
-    @defer.inlineCallbacks
-    def get_app_service_by_token(self, token, from_cache=True):
+    def get_app_service_by_token(self, token):
         """Get the application service with the given appservice token.
 
         Args:
             token (str): The application service token.
-            from_cache (bool): True to get this service from the cache, False to
-                               check the database.
-        Raises:
-            StoreError if there was a problem retrieving this service.
+        Returns:
+            synapse.appservice.ApplicationService or None.
         """
-        yield self.cache_defer  # make sure the cache is ready
-
-        if from_cache:
-            for service in self.services_cache:
-                if service.token == token:
-                    defer.returnValue(service)
-                    return
-            defer.returnValue(None)
-
-        # TODO: The from_cache=False impl
-        # TODO: This should be JOINed with the application_services_regex table.
+        for service in self.services_cache:
+            if service.token == token:
+                return defer.succeed(service)
+        return defer.succeed(None)
 
     def get_app_service_rooms(self, service):
         """Get a list of RoomsForUser for this application service.
@@ -277,12 +144,7 @@ class ApplicationServiceStore(SQLBaseStore):
 
         return rooms_for_user_matching_user_id
 
-    @defer.inlineCallbacks
-    def _populate_cache(self):
-        """Populates the ApplicationServiceCache from the database."""
-        sql = ("SELECT * FROM application_services LEFT JOIN "
-               "application_services_regex ON application_services.id = "
-               "application_services_regex.as_id")
+    def _parse_services_dict(self, results):
         # SQL results in the form:
         # [
         #   {
@@ -296,12 +158,14 @@ class ApplicationServiceStore(SQLBaseStore):
         #   }
         # ]
         services = {}
-        results = yield self._execute_and_decode("_populate_cache", sql)
         for res in results:
             as_token = res["token"]
+            if as_token is None:
+                continue
             if as_token not in services:
                 # add the service
                 services[as_token] = {
+                    "id": res["id"],
                     "url": res["url"],
                     "token": as_token,
                     "hs_token": res["hs_token"],
@@ -319,20 +183,287 @@ class ApplicationServiceStore(SQLBaseStore):
             try:
                 services[as_token]["namespaces"][
                     ApplicationService.NS_LIST[ns_int]].append(
-                    simplejson.loads(res["regex"])
+                    json.loads(res["regex"])
                 )
             except IndexError:
                 logger.error("Bad namespace enum '%s'. %s", ns_int, res)
             except JSONDecodeError:
                 logger.error("Bad regex object '%s'", res["regex"])
 
-        # TODO get last successful txn id f.e. service
+        service_list = []
         for service in services.values():
-            logger.info("Found application service: %s", service)
-            self.services_cache.append(ApplicationService(
+            service_list.append(ApplicationService(
                 token=service["token"],
                 url=service["url"],
                 namespaces=service["namespaces"],
                 hs_token=service["hs_token"],
-                sender=service["sender"]
+                sender=service["sender"],
+                id=service["id"]
             ))
+        return service_list
+
+    def _load_appservice(self, as_info):
+        required_string_fields = [
+            "url", "as_token", "hs_token", "sender_localpart"
+        ]
+        for field in required_string_fields:
+            if not isinstance(as_info.get(field), basestring):
+                raise KeyError("Required string field: '%s'", field)
+
+        localpart = as_info["sender_localpart"]
+        if urllib.quote(localpart) != localpart:
+            raise ValueError(
+                "sender_localpart needs characters which are not URL encoded."
+            )
+        user = UserID(localpart, self.hostname)
+        user_id = user.to_string()
+
+        # namespace checks
+        if not isinstance(as_info.get("namespaces"), dict):
+            raise KeyError("Requires 'namespaces' object.")
+        for ns in ApplicationService.NS_LIST:
+            # specific namespaces are optional
+            if ns in as_info["namespaces"]:
+                # expect a list of dicts with exclusive and regex keys
+                for regex_obj in as_info["namespaces"][ns]:
+                    if not isinstance(regex_obj, dict):
+                        raise ValueError(
+                            "Expected namespace entry in %s to be an object,"
+                            " but got %s", ns, regex_obj
+                        )
+                    if not isinstance(regex_obj.get("regex"), basestring):
+                        raise ValueError(
+                            "Missing/bad type 'regex' key in %s", regex_obj
+                        )
+                    if not isinstance(regex_obj.get("exclusive"), bool):
+                        raise ValueError(
+                            "Missing/bad type 'exclusive' key in %s", regex_obj
+                        )
+        return ApplicationService(
+            token=as_info["as_token"],
+            url=as_info["url"],
+            namespaces=as_info["namespaces"],
+            hs_token=as_info["hs_token"],
+            sender=user_id,
+            id=as_info["as_token"]  # the token is the only unique thing here
+        )
+
+    def _populate_appservice_cache(self, config_files):
+        """Populates a cache of Application Services from the config files."""
+        if not isinstance(config_files, list):
+            logger.warning(
+                "Expected %s to be a list of AS config files.", config_files
+            )
+            return
+
+        for config_file in config_files:
+            try:
+                with open(config_file, 'r') as f:
+                    appservice = self._load_appservice(yaml.load(f))
+                    logger.info("Loaded application service: %s", appservice)
+                    self.services_cache.append(appservice)
+            except Exception as e:
+                logger.error("Failed to load appservice from '%s'", config_file)
+                logger.exception(e)
+
+
+class ApplicationServiceTransactionStore(SQLBaseStore):
+
+    def __init__(self, hs):
+        super(ApplicationServiceTransactionStore, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def get_appservices_by_state(self, state):
+        """Get a list of application services based on their state.
+
+        Args:
+            state(ApplicationServiceState): The state to filter on.
+        Returns:
+            A Deferred which resolves to a list of ApplicationServices, which
+            may be empty.
+        """
+        results = yield self._simple_select_list(
+            "application_services_state",
+            dict(state=state),
+            ["as_id"]
+        )
+        # NB: This assumes this class is linked with ApplicationServiceStore
+        as_list = yield self.get_app_services()
+        services = []
+
+        for res in results:
+            for service in as_list:
+                if service.id == res["as_id"]:
+                    services.append(service)
+        defer.returnValue(services)
+
+    @defer.inlineCallbacks
+    def get_appservice_state(self, service):
+        """Get the application service state.
+
+        Args:
+            service(ApplicationService): The service whose state to set.
+        Returns:
+            A Deferred which resolves to ApplicationServiceState.
+        """
+        result = yield self._simple_select_one(
+            "application_services_state",
+            dict(as_id=service.id),
+            ["state"],
+            allow_none=True
+        )
+        if result:
+            defer.returnValue(result.get("state"))
+            return
+        defer.returnValue(None)
+
+    def set_appservice_state(self, service, state):
+        """Set the application service state.
+
+        Args:
+            service(ApplicationService): The service whose state to set.
+            state(ApplicationServiceState): The connectivity state to apply.
+        Returns:
+            A Deferred which resolves when the state was set successfully.
+        """
+        return self._simple_upsert(
+            "application_services_state",
+            dict(as_id=service.id),
+            dict(state=state)
+        )
+
+    def create_appservice_txn(self, service, events):
+        """Atomically creates a new transaction for this application service
+        with the given list of events.
+
+        Args:
+            service(ApplicationService): The service who the transaction is for.
+            events(list<Event>): A list of events to put in the transaction.
+        Returns:
+            AppServiceTransaction: A new transaction.
+        """
+        return self.runInteraction(
+            "create_appservice_txn",
+            self._create_appservice_txn,
+            service, events
+        )
+
+    def _create_appservice_txn(self, txn, service, events):
+        # work out new txn id (highest txn id for this service += 1)
+        # The highest id may be the last one sent (in which case it is last_txn)
+        # or it may be the highest in the txns list (which are waiting to be/are
+        # being sent)
+        last_txn_id = self._get_last_txn(txn, service.id)
+
+        result = txn.execute(
+            "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
+            (service.id,)
+        )
+        highest_txn_id = result.fetchone()[0]
+        if highest_txn_id is None:
+            highest_txn_id = 0
+
+        new_txn_id = max(highest_txn_id, last_txn_id) + 1
+
+        # Insert new txn into txn table
+        event_ids = [e.event_id for e in events]
+        txn.execute(
+            "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
+            "VALUES(?,?,?)",
+            (service.id, new_txn_id, json.dumps(event_ids))
+        )
+        return AppServiceTransaction(
+            service=service, id=new_txn_id, events=events
+        )
+
+    def complete_appservice_txn(self, txn_id, service):
+        """Completes an application service transaction.
+
+        Args:
+            txn_id(str): The transaction ID being completed.
+            service(ApplicationService): The application service which was sent
+            this transaction.
+        Returns:
+            A Deferred which resolves if this transaction was stored
+            successfully.
+        """
+        return self.runInteraction(
+            "complete_appservice_txn",
+            self._complete_appservice_txn,
+            txn_id, service
+        )
+
+    def _complete_appservice_txn(self, txn, txn_id, service):
+        txn_id = int(txn_id)
+
+        # Debugging query: Make sure the txn being completed is EXACTLY +1 from
+        # what was there before. If it isn't, we've got problems (e.g. the AS
+        # has probably missed some events), so whine loudly but still continue,
+        # since it shouldn't fail completion of the transaction.
+        last_txn_id = self._get_last_txn(txn, service.id)
+        if (last_txn_id + 1) != txn_id:
+            logger.error(
+                "appservice: Completing a transaction which has an ID > 1 from "
+                "the last ID sent to this AS. We've either dropped events or "
+                "sent it to the AS out of order. FIX ME. last_txn=%s "
+                "completing_txn=%s service_id=%s", last_txn_id, txn_id,
+                service.id
+            )
+
+        # Set current txn_id for AS to 'txn_id'
+        self._simple_upsert_txn(
+            txn, "application_services_state", dict(as_id=service.id),
+            dict(last_txn=txn_id)
+        )
+
+        # Delete txn
+        self._simple_delete_txn(
+            txn, "application_services_txns",
+            dict(txn_id=txn_id, as_id=service.id)
+        )
+
+    def get_oldest_unsent_txn(self, service):
+        """Get the oldest transaction which has not been sent for this
+        service.
+
+        Args:
+            service(ApplicationService): The app service to get the oldest txn.
+        Returns:
+            A Deferred which resolves to an AppServiceTransaction or
+            None.
+        """
+        return self.runInteraction(
+            "get_oldest_unsent_appservice_txn",
+            self._get_oldest_unsent_txn,
+            service
+        )
+
+    def _get_oldest_unsent_txn(self, txn, service):
+        # Monotonically increasing txn ids, so just select the smallest
+        # one in the txns table (we delete them when they are sent)
+        result = txn.execute(
+            "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?",
+            (service.id,)
+        )
+        entry = self.cursor_to_dict(result)[0]
+        if not entry or entry["txn_id"] is None:
+            # the min(txn_id) part will force a row, so entry may not be None
+            return None
+
+        event_ids = json.loads(entry["event_ids"])
+        events = self._get_events_txn(txn, event_ids)
+
+        return AppServiceTransaction(
+            service=service, id=entry["txn_id"], events=events
+        )
+
+    def _get_last_txn(self, txn, service_id):
+        result = txn.execute(
+            "SELECT last_txn FROM application_services_state WHERE as_id=?",
+            (service_id,)
+        )
+        last_txn_id = result.fetchone()
+        if last_txn_id is None or last_txn_id[0] is None:  # no row exists
+            return 0
+        else:
+            return int(last_txn_id[0])  # select 'last_txn' col
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 68b7d59693..0199539fea 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
 
 from synapse.api.errors import SynapseError
 
@@ -48,6 +48,7 @@ class DirectoryStore(SQLBaseStore):
             {"room_alias": room_alias.to_string()},
             "room_id",
             allow_none=True,
+            desc="get_association_from_room_alias",
         )
 
         if not room_id:
@@ -58,6 +59,7 @@ class DirectoryStore(SQLBaseStore):
             "room_alias_servers",
             {"room_alias": room_alias.to_string()},
             "server",
+            desc="get_association_from_room_alias",
         )
 
         if not servers:
@@ -87,6 +89,7 @@ class DirectoryStore(SQLBaseStore):
                     "room_alias": room_alias.to_string(),
                     "room_id": room_id,
                 },
+                desc="create_room_alias_association",
             )
         except sqlite3.IntegrityError:
             raise SynapseError(
@@ -100,16 +103,22 @@ class DirectoryStore(SQLBaseStore):
                 {
                     "room_alias": room_alias.to_string(),
                     "server": server,
-                }
+                },
+                desc="create_room_alias_association",
             )
+        self.get_aliases_for_room.invalidate(room_id)
 
+    @defer.inlineCallbacks
     def delete_room_alias(self, room_alias):
-        return self.runInteraction(
+        room_id = yield self.runInteraction(
             "delete_room_alias",
             self._delete_room_alias_txn,
             room_alias,
         )
 
+        self.get_aliases_for_room.invalidate(room_id)
+        defer.returnValue(room_id)
+
     def _delete_room_alias_txn(self, txn, room_alias):
         cursor = txn.execute(
             "SELECT room_id FROM room_aliases WHERE room_alias = ?",
@@ -134,9 +143,11 @@ class DirectoryStore(SQLBaseStore):
 
         return room_id
 
+    @cached()
     def get_aliases_for_room(self, room_id):
         return self._simple_select_onecol(
             "room_aliases",
             {"room_id": room_id},
             "room_alias",
+            desc="get_aliases_for_room",
         )
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
new file mode 100644
index 0000000000..a86230d92c
--- /dev/null
+++ b/synapse/storage/events.py
@@ -0,0 +1,395 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from _base import SQLBaseStore, _RollbackButIsFineException
+
+from twisted.internet import defer
+
+from synapse.util.logutils import log_function
+from synapse.api.constants import EventTypes
+from synapse.crypto.event_signing import compute_event_reference_hash
+
+from syutil.base64util import decode_base64
+from syutil.jsonutil import encode_canonical_json
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class EventsStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    @log_function
+    def persist_event(self, event, context, backfilled=False,
+                      is_new_state=True, current_state=None):
+        stream_ordering = None
+        if backfilled:
+            if not self.min_token_deferred.called:
+                yield self.min_token_deferred
+            self.min_token -= 1
+            stream_ordering = self.min_token
+
+        try:
+            yield self.runInteraction(
+                "persist_event",
+                self._persist_event_txn,
+                event=event,
+                context=context,
+                backfilled=backfilled,
+                stream_ordering=stream_ordering,
+                is_new_state=is_new_state,
+                current_state=current_state,
+            )
+            self.get_room_events_max_id.invalidate()
+        except _RollbackButIsFineException:
+            pass
+
+    @defer.inlineCallbacks
+    def get_event(self, event_id, check_redacted=True,
+                  get_prev_content=False, allow_rejected=False,
+                  allow_none=False):
+        """Get an event from the database by event_id.
+
+        Args:
+            event_id (str): The event_id of the event to fetch
+            check_redacted (bool): If True, check if event has been redacted
+                and redact it.
+            get_prev_content (bool): If True and event is a state event,
+                include the previous states content in the unsigned field.
+            allow_rejected (bool): If True return rejected events.
+            allow_none (bool): If True, return None if no event found, if
+                False throw an exception.
+
+        Returns:
+            Deferred : A FrozenEvent.
+        """
+        event = yield self.runInteraction(
+            "get_event", self._get_event_txn,
+            event_id,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            allow_rejected=allow_rejected,
+        )
+
+        if not event and not allow_none:
+            raise RuntimeError("Could not find event %s" % (event_id,))
+
+        defer.returnValue(event)
+
+    @log_function
+    def _persist_event_txn(self, txn, event, context, backfilled,
+                           stream_ordering=None, is_new_state=True,
+                           current_state=None):
+
+        # Remove the any existing cache entries for the event_id
+        self._get_event_cache.pop(event.event_id)
+
+        # We purposefully do this first since if we include a `current_state`
+        # key, we *want* to update the `current_state_events` table
+        if current_state:
+            txn.execute(
+                "DELETE FROM current_state_events WHERE room_id = ?",
+                (event.room_id,)
+            )
+
+            for s in current_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": s.event_id,
+                        "room_id": s.room_id,
+                        "type": s.type,
+                        "state_key": s.state_key,
+                    },
+                    or_replace=True,
+                )
+
+        if event.is_state() and is_new_state:
+            if not backfilled and not context.rejected:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_forward_extremities",
+                    values={
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    or_replace=True,
+                )
+
+                for prev_state_id, _ in event.prev_state:
+                    self._simple_delete_txn(
+                        txn,
+                        table="state_forward_extremities",
+                        keyvalues={
+                            "event_id": prev_state_id,
+                        }
+                    )
+
+        outlier = event.internal_metadata.is_outlier()
+
+        if not outlier:
+            self._store_state_groups_txn(txn, event, context)
+
+            self._update_min_depth_for_room_txn(
+                txn,
+                event.room_id,
+                event.depth
+            )
+
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
+        have_persisted = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_json",
+            keyvalues={"event_id": event.event_id},
+            retcol="event_id",
+            allow_none=True,
+        )
+
+        metadata_json = encode_canonical_json(
+            event.internal_metadata.get_dict()
+        )
+
+        # If we have already persisted this event, we don't need to do any
+        # more processing.
+        # The processing above must be done on every call to persist event,
+        # since they might not have happened on previous calls. For example,
+        # if we are persisting an event that we had persisted as an outlier,
+        # but is no longer one.
+        if have_persisted:
+            if not outlier:
+                sql = (
+                    "UPDATE event_json SET internal_metadata = ?"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (metadata_json.decode("UTF-8"), event.event_id,)
+                )
+
+                sql = (
+                    "UPDATE events SET outlier = 0"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (event.event_id,)
+                )
+            return
+
+        if event.type == EventTypes.Member:
+            self._store_room_member_txn(txn, event)
+        elif event.type == EventTypes.Feedback:
+            self._store_feedback_txn(txn, event)
+        elif event.type == EventTypes.Name:
+            self._store_room_name_txn(txn, event)
+        elif event.type == EventTypes.Topic:
+            self._store_room_topic_txn(txn, event)
+        elif event.type == EventTypes.Redaction:
+            self._store_redaction(txn, event)
+
+        event_dict = {
+            k: v
+            for k, v in event.get_dict().items()
+            if k not in [
+                "redacted",
+                "redacted_because",
+            ]
+        }
+
+        self._simple_insert_txn(
+            txn,
+            table="event_json",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "internal_metadata": metadata_json.decode("UTF-8"),
+                "json": encode_canonical_json(event_dict).decode("UTF-8"),
+            },
+            or_replace=True,
+        )
+
+        content = encode_canonical_json(
+            event.content
+        ).decode("UTF-8")
+
+        vals = {
+            "topological_ordering": event.depth,
+            "event_id": event.event_id,
+            "type": event.type,
+            "room_id": event.room_id,
+            "content": content,
+            "processed": True,
+            "outlier": outlier,
+            "depth": event.depth,
+        }
+
+        if stream_ordering is not None:
+            vals["stream_ordering"] = stream_ordering
+
+        unrec = {
+            k: v
+            for k, v in event.get_dict().items()
+            if k not in vals.keys() and k not in [
+                "redacted",
+                "redacted_because",
+                "signatures",
+                "hashes",
+                "prev_events",
+            ]
+        }
+
+        vals["unrecognized_keys"] = encode_canonical_json(
+            unrec
+        ).decode("UTF-8")
+
+        try:
+            self._simple_insert_txn(
+                txn,
+                "events",
+                vals,
+                or_replace=(not outlier),
+                or_ignore=bool(outlier),
+            )
+        except:
+            logger.warn(
+                "Failed to persist, probably duplicate: %s",
+                event.event_id,
+                exc_info=True,
+            )
+            raise _RollbackButIsFineException("_persist_event")
+
+        if context.rejected:
+            self._store_rejections_txn(txn, event.event_id, context.rejected)
+
+        if event.is_state():
+            vals = {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "state_key": event.state_key,
+            }
+
+            # TODO: How does this work with backfilling?
+            if hasattr(event, "replaces_state"):
+                vals["prev_state"] = event.replaces_state
+
+            self._simple_insert_txn(
+                txn,
+                "state_events",
+                vals,
+            )
+
+            if is_new_state and not context.rejected:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                )
+
+            for e_id, h in event.prev_state:
+                self._simple_insert_txn(
+                    txn,
+                    table="event_edges",
+                    values={
+                        "event_id": event.event_id,
+                        "prev_event_id": e_id,
+                        "room_id": event.room_id,
+                        "is_state": 1,
+                    },
+                )
+
+        for hash_alg, hash_base64 in event.hashes.items():
+            hash_bytes = decode_base64(hash_base64)
+            self._store_event_content_hash_txn(
+                txn, event.event_id, hash_alg, hash_bytes,
+            )
+
+        for prev_event_id, prev_hashes in event.prev_events:
+            for alg, hash_base64 in prev_hashes.items():
+                hash_bytes = decode_base64(hash_base64)
+                self._store_prev_event_hash_txn(
+                    txn, event.event_id, prev_event_id, alg, hash_bytes
+                )
+
+        for auth_id, _ in event.auth_events:
+            self._simple_insert_txn(
+                txn,
+                table="event_auth",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                },
+            )
+
+        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+        self._store_event_reference_hash_txn(
+            txn, event.event_id, ref_alg, ref_hash_bytes
+        )
+
+    def _store_redaction(self, txn, event):
+        # invalidate the cache for the redacted event
+        self._get_event_cache.pop(event.redacts)
+        txn.execute(
+            "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
+            (event.event_id, event.redacts)
+        )
+
+    def have_events(self, event_ids):
+        """Given a list of event ids, check if we have already processed them.
+
+        Returns:
+            dict: Has an entry for each event id we already have seen. Maps to
+            the rejected reason string if we rejected the event, else maps to
+            None.
+        """
+        if not event_ids:
+            return defer.succeed({})
+
+        def f(txn):
+            sql = (
+                "SELECT e.event_id, reason FROM events as e "
+                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+                "WHERE e.event_id = ?"
+            )
+
+            res = {}
+            for event_id in event_ids:
+                txn.execute(sql, (event_id,))
+                row = txn.fetchone()
+                if row:
+                    _, rejected = row
+                    res[event_id] = rejected
+
+            return res
+
+        return self.runInteraction(
+            "have_events", f,
+        )
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
deleted file mode 100644
index 8eab769b71..0000000000
--- a/synapse/storage/feedback.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from twisted.internet import defer
-
-from ._base import SQLBaseStore
-
-
-class FeedbackStore(SQLBaseStore):
-
-    def _store_feedback_txn(self, txn, event):
-        self._simple_insert_txn(txn, "feedback", {
-            "event_id": event.event_id,
-            "feedback_type": event.content["type"],
-            "room_id": event.room_id,
-            "target_event_id": event.content["target_event_id"],
-            "sender": event.user_id,
-        })
-
-    @defer.inlineCallbacks
-    def get_feedback_for_event(self, event_id):
-        sql = (
-            "SELECT events.* FROM events INNER JOIN feedback "
-            "ON events.event_id = feedback.event_id "
-            "WHERE feedback.target_event_id = ? "
-        )
-
-        rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id)
-
-        defer.returnValue(
-            [
-                (yield self._parse_events(r))
-                for r in rows
-            ]
-        )
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 457a11fd02..8800116570 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -31,6 +31,7 @@ class FilteringStore(SQLBaseStore):
             },
             retcol="filter_json",
             allow_none=False,
+            desc="get_user_filter",
         )
 
         defer.returnValue(json.loads(def_json))
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 7101d2beec..7bf57234f6 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -32,6 +32,7 @@ class MediaRepositoryStore(SQLBaseStore):
             {"media_id": media_id},
             ("media_type", "media_length", "upload_name", "created_ts"),
             allow_none=True,
+            desc="get_local_media",
         )
 
     def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
@@ -45,7 +46,8 @@ class MediaRepositoryStore(SQLBaseStore):
                 "upload_name": upload_name,
                 "media_length": media_length,
                 "user_id": user_id.to_string(),
-            }
+            },
+            desc="store_local_media",
         )
 
     def get_local_media_thumbnails(self, media_id):
@@ -55,7 +57,8 @@ class MediaRepositoryStore(SQLBaseStore):
             (
                 "thumbnail_width", "thumbnail_height", "thumbnail_method",
                 "thumbnail_type", "thumbnail_length",
-            )
+            ),
+            desc="get_local_media_thumbnails",
         )
 
     def store_local_thumbnail(self, media_id, thumbnail_width,
@@ -70,7 +73,8 @@ class MediaRepositoryStore(SQLBaseStore):
                 "thumbnail_method": thumbnail_method,
                 "thumbnail_type": thumbnail_type,
                 "thumbnail_length": thumbnail_length,
-            }
+            },
+            desc="store_local_thumbnail",
         )
 
     def get_cached_remote_media(self, origin, media_id):
@@ -82,6 +86,7 @@ class MediaRepositoryStore(SQLBaseStore):
                 "filesystem_id",
             ),
             allow_none=True,
+            desc="get_cached_remote_media",
         )
 
     def store_cached_remote_media(self, origin, media_id, media_type,
@@ -97,7 +102,8 @@ class MediaRepositoryStore(SQLBaseStore):
                 "created_ts": time_now_ms,
                 "upload_name": upload_name,
                 "filesystem_id": filesystem_id,
-            }
+            },
+            desc="store_cached_remote_media",
         )
 
     def get_remote_media_thumbnails(self, origin, media_id):
@@ -107,7 +113,8 @@ class MediaRepositoryStore(SQLBaseStore):
             (
                 "thumbnail_width", "thumbnail_height", "thumbnail_method",
                 "thumbnail_type", "thumbnail_length", "filesystem_id",
-            )
+            ),
+            desc="get_remote_media_thumbnails",
         )
 
     def store_remote_media_thumbnail(self, origin, media_id, filesystem_id,
@@ -125,5 +132,6 @@ class MediaRepositoryStore(SQLBaseStore):
                 "thumbnail_type": thumbnail_type,
                 "thumbnail_length": thumbnail_length,
                 "filesystem_id": filesystem_id,
-            }
+            },
+            desc="store_remote_media_thumbnail",
         )
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 1dcd34723b..87fba55439 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -21,6 +21,7 @@ class PresenceStore(SQLBaseStore):
         return self._simple_insert(
             table="presence",
             values={"user_id": user_localpart},
+            desc="create_presence",
         )
 
     def has_presence_state(self, user_localpart):
@@ -29,6 +30,7 @@ class PresenceStore(SQLBaseStore):
             keyvalues={"user_id": user_localpart},
             retcols=["user_id"],
             allow_none=True,
+            desc="has_presence_state",
         )
 
     def get_presence_state(self, user_localpart):
@@ -36,6 +38,7 @@ class PresenceStore(SQLBaseStore):
             table="presence",
             keyvalues={"user_id": user_localpart},
             retcols=["state", "status_msg", "mtime"],
+            desc="get_presence_state",
         )
 
     def set_presence_state(self, user_localpart, new_state):
@@ -45,7 +48,7 @@ class PresenceStore(SQLBaseStore):
             updatevalues={"state": new_state["state"],
                           "status_msg": new_state["status_msg"],
                           "mtime": self._clock.time_msec()},
-            retcols=["state"],
+            desc="set_presence_state",
         )
 
     def allow_presence_visible(self, observed_localpart, observer_userid):
@@ -53,6 +56,7 @@ class PresenceStore(SQLBaseStore):
             table="presence_allow_inbound",
             values={"observed_user_id": observed_localpart,
                     "observer_user_id": observer_userid},
+            desc="allow_presence_visible",
         )
 
     def disallow_presence_visible(self, observed_localpart, observer_userid):
@@ -60,6 +64,7 @@ class PresenceStore(SQLBaseStore):
             table="presence_allow_inbound",
             keyvalues={"observed_user_id": observed_localpart,
                        "observer_user_id": observer_userid},
+            desc="disallow_presence_visible",
         )
 
     def is_presence_visible(self, observed_localpart, observer_userid):
@@ -69,6 +74,7 @@ class PresenceStore(SQLBaseStore):
                        "observer_user_id": observer_userid},
             retcols=["observed_user_id"],
             allow_none=True,
+            desc="is_presence_visible",
         )
 
     def add_presence_list_pending(self, observer_localpart, observed_userid):
@@ -77,6 +83,7 @@ class PresenceStore(SQLBaseStore):
             values={"user_id": observer_localpart,
                     "observed_user_id": observed_userid,
                     "accepted": False},
+            desc="add_presence_list_pending",
         )
 
     def set_presence_list_accepted(self, observer_localpart, observed_userid):
@@ -85,6 +92,7 @@ class PresenceStore(SQLBaseStore):
             keyvalues={"user_id": observer_localpart,
                        "observed_user_id": observed_userid},
             updatevalues={"accepted": True},
+            desc="set_presence_list_accepted",
         )
 
     def get_presence_list(self, observer_localpart, accepted=None):
@@ -96,6 +104,7 @@ class PresenceStore(SQLBaseStore):
             table="presence_list",
             keyvalues=keyvalues,
             retcols=["observed_user_id", "accepted"],
+            desc="get_presence_list",
         )
 
     def del_presence_list(self, observer_localpart, observed_userid):
@@ -103,4 +112,5 @@ class PresenceStore(SQLBaseStore):
             table="presence_list",
             keyvalues={"user_id": observer_localpart,
                        "observed_user_id": observed_userid},
+            desc="del_presence_list",
         )
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 153c7ad027..a6e52cb248 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -21,6 +21,7 @@ class ProfileStore(SQLBaseStore):
         return self._simple_insert(
             table="profiles",
             values={"user_id": user_localpart},
+            desc="create_profile",
         )
 
     def get_profile_displayname(self, user_localpart):
@@ -28,6 +29,7 @@ class ProfileStore(SQLBaseStore):
             table="profiles",
             keyvalues={"user_id": user_localpart},
             retcol="displayname",
+            desc="get_profile_displayname",
         )
 
     def set_profile_displayname(self, user_localpart, new_displayname):
@@ -35,6 +37,7 @@ class ProfileStore(SQLBaseStore):
             table="profiles",
             keyvalues={"user_id": user_localpart},
             updatevalues={"displayname": new_displayname},
+            desc="set_profile_displayname",
         )
 
     def get_profile_avatar_url(self, user_localpart):
@@ -42,6 +45,7 @@ class ProfileStore(SQLBaseStore):
             table="profiles",
             keyvalues={"user_id": user_localpart},
             retcol="avatar_url",
+            desc="get_profile_avatar_url",
         )
 
     def set_profile_avatar_url(self, user_localpart, new_avatar_url):
@@ -49,4 +53,5 @@ class ProfileStore(SQLBaseStore):
             table="profiles",
             keyvalues={"user_id": user_localpart},
             updatevalues={"avatar_url": new_avatar_url},
+            desc="set_profile_avatar_url",
         )
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index d769db2c78..c47bdc2861 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -50,7 +50,8 @@ class PushRuleStore(SQLBaseStore):
         results = yield self._simple_select_list(
             PushRuleEnableTable.table_name,
             {'user_name': user_name},
-            PushRuleEnableTable.fields
+            PushRuleEnableTable.fields,
+            desc="get_push_rules_enabled_for_user",
         )
         defer.returnValue(
             {r['rule_id']: False if r['enabled'] == 0 else True for r in results}
@@ -201,7 +202,8 @@ class PushRuleStore(SQLBaseStore):
         """
         yield self._simple_delete_one(
             PushRuleTable.table_name,
-            {'user_name': user_name, 'rule_id': rule_id}
+            {'user_name': user_name, 'rule_id': rule_id},
+            desc="delete_push_rule",
         )
 
     @defer.inlineCallbacks
@@ -209,7 +211,8 @@ class PushRuleStore(SQLBaseStore):
         yield self._simple_upsert(
             PushRuleEnableTable.table_name,
             {'user_name': user_name, 'rule_id': rule_id},
-            {'enabled': enabled}
+            {'enabled': enabled},
+            desc="set_push_rule_enabled",
         )
 
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 587dada68f..000502b4ff 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -114,7 +114,9 @@ class PusherStore(SQLBaseStore):
                     ts=pushkey_ts,
                     lang=lang,
                     data=data
-                ))
+                ),
+                desc="add_pusher",
+            )
         except Exception as e:
             logger.error("create_pusher with failed: %s", e)
             raise StoreError(500, "Problem creating pusher.")
@@ -123,7 +125,8 @@ class PusherStore(SQLBaseStore):
     def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
         yield self._simple_delete_one(
             PushersTable.table_name,
-            dict(app_id=app_id, pushkey=pushkey)
+            {"app_id": app_id, "pushkey": pushkey},
+            desc="delete_pusher_by_app_id_pushkey",
         )
 
     @defer.inlineCallbacks
@@ -131,7 +134,8 @@ class PusherStore(SQLBaseStore):
         yield self._simple_update_one(
             PushersTable.table_name,
             {'app_id': app_id, 'pushkey': pushkey},
-            {'last_token': last_token}
+            {'last_token': last_token},
+            desc="update_pusher_last_token",
         )
 
     @defer.inlineCallbacks
@@ -140,7 +144,8 @@ class PusherStore(SQLBaseStore):
         yield self._simple_update_one(
             PushersTable.table_name,
             {'app_id': app_id, 'pushkey': pushkey},
-            {'last_token': last_token, 'last_success': last_success}
+            {'last_token': last_token, 'last_success': last_success},
+            desc="update_pusher_last_token_and_success",
         )
 
     @defer.inlineCallbacks
@@ -148,7 +153,8 @@ class PusherStore(SQLBaseStore):
         yield self._simple_update_one(
             PushersTable.table_name,
             {'app_id': app_id, 'pushkey': pushkey},
-            {'failing_since': failing_since}
+            {'failing_since': failing_since},
+            desc="update_pusher_failing_since",
         )
 
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 3c2f1d6a15..f24154f146 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -39,7 +39,10 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        row = yield self._simple_select_one("users", {"name": user_id}, ["id"])
+        row = yield self._simple_select_one(
+            "users", {"name": user_id}, ["id"],
+            desc="add_access_token_to_user",
+        )
         if not row:
             raise StoreError(400, "Bad user ID supplied.")
         row_id = row["id"]
@@ -48,7 +51,8 @@ class RegistrationStore(SQLBaseStore):
             {
                 "user_id": row_id,
                 "token": token
-            }
+            },
+            desc="add_access_token_to_user",
         )
 
     @defer.inlineCallbacks
@@ -120,6 +124,7 @@ class RegistrationStore(SQLBaseStore):
             keyvalues={"name": user.to_string()},
             retcol="admin",
             allow_none=True,
+            desc="is_server_admin",
         )
 
         defer.returnValue(res if res else False)
diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py
index 4e1a9a2783..0838eb3d12 100644
--- a/synapse/storage/rejections.py
+++ b/synapse/storage/rejections.py
@@ -29,7 +29,7 @@ class RejectionsStore(SQLBaseStore):
                 "event_id": event_id,
                 "reason": reason,
                 "last_check": self._clock.time_msec(),
-            }
+            },
         )
 
     def get_rejection_reason(self, event_id):
@@ -40,4 +40,5 @@ class RejectionsStore(SQLBaseStore):
                 "event_id": event_id,
             },
             allow_none=True,
+            desc="get_rejection_reason",
         )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 549c9af393..be3e28c2ea 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -15,11 +15,9 @@
 
 from twisted.internet import defer
 
-from sqlite3 import IntegrityError
-
 from synapse.api.errors import StoreError
 
-from ._base import SQLBaseStore, Table
+from ._base import SQLBaseStore
 
 import collections
 import logging
@@ -27,8 +25,9 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-OpsLevel = collections.namedtuple("OpsLevel", (
-    "ban_level", "kick_level", "redact_level")
+OpsLevel = collections.namedtuple(
+    "OpsLevel",
+    ("ban_level", "kick_level", "redact_level",)
 )
 
 
@@ -47,13 +46,15 @@ class RoomStore(SQLBaseStore):
             StoreError if the room could not be stored.
         """
         try:
-            yield self._simple_insert(RoomsTable.table_name, dict(
-                room_id=room_id,
-                creator=room_creator_user_id,
-                is_public=is_public
-            ))
-        except IntegrityError:
-            raise StoreError(409, "Room ID in use.")
+            yield self._simple_insert(
+                RoomsTable.table_name,
+                {
+                    "room_id": room_id,
+                    "creator": room_creator_user_id,
+                    "is_public": is_public,
+                },
+                desc="store_room",
+            )
         except Exception as e:
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
@@ -66,9 +67,11 @@ class RoomStore(SQLBaseStore):
         Returns:
             A namedtuple containing the room information, or an empty list.
         """
-        query = RoomsTable.select_statement("room_id=?")
-        return self._execute(
-            "get_room", RoomsTable.decode_single_result, query, room_id,
+        return self._simple_select_one(
+            table=RoomsTable.table_name,
+            keyvalues={"room_id": room_id},
+            retcols=RoomsTable.fields,
+            desc="get_room",
         )
 
     @defer.inlineCallbacks
@@ -143,7 +146,7 @@ class RoomStore(SQLBaseStore):
                     "event_id": event.event_id,
                     "room_id": event.room_id,
                     "topic": event.content["topic"],
-                }
+                },
             )
 
     def _store_room_name_txn(self, txn, event):
@@ -158,8 +161,45 @@ class RoomStore(SQLBaseStore):
                 }
             )
 
+    @defer.inlineCallbacks
+    def get_room_name_and_aliases(self, room_id):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
+        sql = (
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        ) % {
+            "redacted": del_sql,
+        }
+
+        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+        sql += " OR s.type = 'm.room.aliases')"
+        args = (room_id,)
 
-class RoomsTable(Table):
+        results = yield self._execute_and_decode("get_current_state", sql, *args)
+
+        events = yield self._parse_events(results)
+
+        name = None
+        aliases = []
+
+        for e in events:
+            if e.type == 'm.room.name':
+                if 'name' in e.content:
+                    name = e.content['name']
+            elif e.type == 'm.room.aliases':
+                if 'aliases' in e.content:
+                    aliases.extend(e.content['aliases'])
+
+        defer.returnValue((name, aliases))
+
+
+class RoomsTable(object):
     table_name = "rooms"
 
     fields = [
@@ -167,5 +207,3 @@ class RoomsTable(Table):
         "is_public",
         "creator"
     ]
-
-    EntryType = collections.namedtuple("RoomEntry", fields)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 65ffb4627f..52c37c76f5 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -212,7 +212,8 @@ class RoomMemberStore(SQLBaseStore):
         return self._simple_select_onecol(
             "room_hosts",
             {"room_id": room_id},
-            "host"
+            "host",
+            desc="get_joined_hosts_for_room",
         )
 
     def _get_members_by_dict(self, where_dict):
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
new file mode 100644
index 0000000000..2b27e2a429
--- /dev/null
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -0,0 +1,30 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services_state(
+    as_id TEXT PRIMARY KEY,
+    state TEXT,
+    last_txn TEXT
+);
+
+CREATE TABLE IF NOT EXISTS application_services_txns(
+    as_id TEXT NOT NULL,
+    txn_id INTEGER NOT NULL,
+    event_ids TEXT NOT NULL,
+    UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+);
+
+
+
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 456e4bd45d..58dbf2802b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -15,6 +15,8 @@
 
 from ._base import SQLBaseStore
 
+from twisted.internet import defer
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -122,3 +124,33 @@ class StateStore(SQLBaseStore):
             },
             or_replace=True,
         )
+
+    @defer.inlineCallbacks
+    def get_current_state(self, room_id, event_type=None, state_key=""):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
+        sql = (
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        ) % {
+            "redacted": del_sql,
+        }
+
+        if event_type and state_key is not None:
+            sql += " AND s.type = ? AND s.state_key = ? "
+            args = (room_id, event_type, state_key)
+        elif event_type:
+            sql += " AND s.type = ?"
+            args = (room_id, event_type)
+        else:
+            args = (room_id, )
+
+        results = yield self._execute_and_decode("get_current_state", sql, *args)
+
+        events = yield self._parse_events(results)
+        defer.returnValue(events)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 09bc522210..66f307e640 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,7 +35,7 @@ what sort order was used:
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.logutils import log_function
@@ -413,12 +413,32 @@ class StreamStore(SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
+    @cached(num_args=0)
     def get_room_events_max_id(self):
         return self.runInteraction(
             "get_room_events_max_id",
             self._get_room_events_max_id_txn
         )
 
+    @defer.inlineCallbacks
+    def _get_min_token(self):
+        row = yield self._execute(
+            "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
+        )
+
+        self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
+        self.min_token = min(self.min_token, -1)
+
+        logger.debug("min_token is: %s", self.min_token)
+
+        defer.returnValue(self.min_token)
+
+    def get_next_stream_id(self):
+        with self._next_stream_id_lock:
+            i = self._next_stream_id
+            self._next_stream_id += 1
+            return i
+
     def _get_room_events_max_id_txn(self, txn):
         txn.execute(
             "SELECT MAX(stream_ordering) as m FROM events"
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 0b8a3b7a07..b777395e06 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -46,15 +46,19 @@ class TransactionStore(SQLBaseStore):
         )
 
     def _get_received_txn_response(self, txn, transaction_id, origin):
-        where_clause = "transaction_id = ? AND origin = ?"
-        query = ReceivedTransactionsTable.select_statement(where_clause)
-
-        txn.execute(query, (transaction_id, origin))
-
-        results = ReceivedTransactionsTable.decode_results(txn.fetchall())
+        result = self._simple_select_one_txn(
+            txn,
+            table=ReceivedTransactionsTable.table_name,
+            keyvalues={
+                "transaction_id": transaction_id,
+                "origin": origin,
+            },
+            retcols=ReceivedTransactionsTable.fields,
+            allow_none=True,
+        )
 
-        if results and results[0].response_code:
-            return (results[0].response_code, results[0].response_json)
+        if result and result.response_code:
+            return result["response_code"], result["response_json"]
         else:
             return None