summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py198
-rw-r--r--synapse/storage/_base.py158
-rw-r--r--synapse/storage/event_federation.py11
-rw-r--r--synapse/storage/media_repository.py129
-rw-r--r--synapse/storage/schema/delta/v9.sql79
-rw-r--r--synapse/storage/schema/im.sql13
-rw-r--r--synapse/storage/schema/media_repository.sql68
-rw-r--r--synapse/storage/schema/state.sql3
-rw-r--r--synapse/storage/schema/transactions.sql6
-rw-r--r--synapse/storage/signatures.py19
-rw-r--r--synapse/storage/state.py13
-rw-r--r--synapse/storage/transactions.py106
12 files changed, 560 insertions, 243 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f15e3dfe62..e6bb665932 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -15,12 +15,8 @@
 
 from twisted.internet import defer
 
-from synapse.api.events.room import (
-    RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent,
-    RoomRedactionEvent,
-)
-
 from synapse.util.logutils import log_function
+from synapse.api.constants import EventTypes
 
 from .directory import DirectoryStore
 from .feedback import FeedbackStore
@@ -33,11 +29,13 @@ from .stream import StreamStore
 from .transactions import TransactionStore
 from .keys import KeyStore
 from .event_federation import EventFederationStore
+from .media_repository import MediaRepositoryStore
 
 from .state import StateStore
 from .signatures import SignatureStore
 
 from syutil.base64util import decode_base64
+from syutil.jsonutil import encode_canonical_json
 
 from synapse.crypto.event_signing import compute_event_reference_hash
 
@@ -62,12 +60,13 @@ SCHEMAS = [
     "state",
     "event_edges",
     "event_signatures",
+    "media_repository",
 ]
 
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 8
+SCHEMA_VERSION = 10
 
 
 class _RollbackButIsFineException(Exception):
@@ -81,11 +80,12 @@ class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
-                EventFederationStore, ):
+                EventFederationStore,
+                MediaRepositoryStore,
+                ):
 
     def __init__(self, hs):
         super(DataStore, self).__init__(hs)
-        self.event_factory = hs.get_event_factory()
         self.hs = hs
 
         self.min_token_deferred = self._get_min_token()
@@ -93,8 +93,8 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, backfilled=False, is_new_state=True,
-                      current_state=None):
+    def persist_event(self, event, context, backfilled=False,
+                      is_new_state=True, current_state=None):
         stream_ordering = None
         if backfilled:
             if not self.min_token_deferred.called:
@@ -107,6 +107,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 "persist_event",
                 self._persist_event_txn,
                 event=event,
+                context=context,
                 backfilled=backfilled,
                 stream_ordering=stream_ordering,
                 is_new_state=is_new_state,
@@ -117,50 +118,64 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @defer.inlineCallbacks
     def get_event(self, event_id, allow_none=False):
-        events_dict = yield self._simple_select_one(
-            "events",
-            {"event_id": event_id},
-            [
-                "event_id",
-                "type",
-                "room_id",
-                "content",
-                "unrecognized_keys",
-                "depth",
-            ],
-            allow_none=allow_none,
-        )
+        events = yield self._get_events([event_id])
 
-        if not events_dict:
-            defer.returnValue(None)
+        if not events:
+            if allow_none:
+                defer.returnValue(None)
+            else:
+                raise RuntimeError("Could not find event %s" % (event_id,))
 
-        event = yield self._parse_events([events_dict])
-        defer.returnValue(event[0])
+        defer.returnValue(events[0])
 
     @log_function
-    def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
-                           is_new_state=True, current_state=None):
-        if event.type == RoomMemberEvent.TYPE:
+    def _persist_event_txn(self, txn, event, context, backfilled,
+                           stream_ordering=None, is_new_state=True,
+                           current_state=None):
+        if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == FeedbackEvent.TYPE:
+        elif event.type == EventTypes.Feedback:
             self._store_feedback_txn(txn, event)
-        elif event.type == RoomNameEvent.TYPE:
+        elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
-        elif event.type == RoomTopicEvent.TYPE:
+        elif event.type == EventTypes.Topic:
             self._store_room_topic_txn(txn, event)
-        elif event.type == RoomRedactionEvent.TYPE:
+        elif event.type == EventTypes.Redaction:
             self._store_redaction(txn, event)
 
-        outlier = False
-        if hasattr(event, "outlier"):
-            outlier = event.outlier
+        outlier = event.internal_metadata.is_outlier()
+
+        event_dict = {
+            k: v
+            for k, v in event.get_dict().items()
+            if k not in [
+                "redacted",
+                "redacted_because",
+            ]
+        }
+
+        metadata_json = encode_canonical_json(
+            event.internal_metadata.get_dict()
+        )
+
+        self._simple_insert_txn(
+            txn,
+            table="event_json",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "internal_metadata": metadata_json.decode("UTF-8"),
+                "json": encode_canonical_json(event_dict).decode("UTF-8"),
+            },
+            or_replace=True,
+        )
 
         vals = {
             "topological_ordering": event.depth,
             "event_id": event.event_id,
             "type": event.type,
             "room_id": event.room_id,
-            "content": json.dumps(event.content),
+            "content": json.dumps(event.get_dict()["content"]),
             "processed": True,
             "outlier": outlier,
             "depth": event.depth,
@@ -171,7 +186,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
         unrec = {
             k: v
-            for k, v in event.get_full_dict().items()
+            for k, v in event.get_dict().items()
             if k not in vals.keys() and k not in [
                 "redacted",
                 "redacted_because",
@@ -206,7 +221,8 @@ class DataStore(RoomMemberStore, RoomStore,
             room_id=event.room_id,
         )
 
-        self._store_state_groups_txn(txn, event)
+        if not outlier:
+            self._store_state_groups_txn(txn, event, context)
 
         if current_state:
             txn.execute(
@@ -300,16 +316,6 @@ class DataStore(RoomMemberStore, RoomStore,
                 txn, event.event_id, hash_alg, hash_bytes,
             )
 
-        if hasattr(event, "signatures"):
-            logger.debug("sigs: %s", event.signatures)
-            for name, sigs in event.signatures.items():
-                for key_id, signature_base64 in sigs.items():
-                    signature_bytes = decode_base64(signature_base64)
-                    self._store_event_signature_txn(
-                        txn, event.event_id, name, key_id,
-                        signature_bytes,
-                    )
-
         for prev_event_id, prev_hashes in event.prev_events:
             for alg, hash_base64 in prev_hashes.items():
                 hash_bytes = decode_base64(hash_base64)
@@ -411,86 +417,6 @@ class DataStore(RoomMemberStore, RoomStore,
             ],
         )
 
-    def snapshot_room(self, event):
-        """Snapshot the room for an update by a user
-        Args:
-            room_id (synapse.types.RoomId): The room to snapshot.
-            user_id (synapse.types.UserId): The user to snapshot the room for.
-            state_type (str): Optional state type to snapshot.
-            state_key (str): Optional state key to snapshot.
-        Returns:
-            synapse.storage.Snapshot: A snapshot of the state of the room.
-        """
-        def _snapshot(txn):
-            prev_events = self._get_latest_events_in_room(
-                txn,
-                event.room_id
-            )
-
-            prev_state = None
-            state_key = None
-            if hasattr(event, "state_key"):
-                state_key = event.state_key
-                prev_state = self._get_latest_state_in_room(
-                    txn,
-                    event.room_id,
-                    type=event.type,
-                    state_key=state_key,
-                )
-
-            return Snapshot(
-                store=self,
-                room_id=event.room_id,
-                user_id=event.user_id,
-                prev_events=prev_events,
-                prev_state=prev_state,
-                state_type=event.type,
-                state_key=state_key,
-            )
-
-        return self.runInteraction("snapshot_room", _snapshot)
-
-
-class Snapshot(object):
-    """Snapshot of the state of a room
-    Args:
-        store (DataStore): The datastore.
-        room_id (RoomId): The room of the snapshot.
-        user_id (UserId): The user this snapshot is for.
-        prev_events (list): The list of event ids this snapshot is after.
-        membership_state (RoomMemberEvent): The current state of the user in
-            the room.
-        state_type (str, optional): State type captured by the snapshot
-        state_key (str, optional): State key captured by the snapshot
-        prev_state_pdu (PduEntry, optional): pdu id of
-            the previous value of the state type and key in the room.
-    """
-
-    def __init__(self, store, room_id, user_id, prev_events,
-                 prev_state, state_type=None, state_key=None):
-        self.store = store
-        self.room_id = room_id
-        self.user_id = user_id
-        self.prev_events = prev_events
-        self.prev_state = prev_state
-        self.state_type = state_type
-        self.state_key = state_key
-
-    def fill_out_prev_events(self, event):
-        if not hasattr(event, "prev_events"):
-            event.prev_events = [
-                (event_id, hashes)
-                for event_id, hashes, _ in self.prev_events
-            ]
-
-            if self.prev_events:
-                event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
-            else:
-                event.depth = 0
-
-        if not hasattr(event, "prev_state") and self.prev_state is not None:
-            event.prev_state = self.prev_state
-
 
 def schema_path(schema):
     """ Get a filesystem path for the named database schema
@@ -518,6 +444,14 @@ def read_schema(schema):
         return schema_file.read()
 
 
+class PrepareDatabaseException(Exception):
+    pass
+
+
+class UpgradeDatabaseException(PrepareDatabaseException):
+    pass
+
+
 def prepare_database(db_conn):
     """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
     don't have to worry about overwriting existing content.
@@ -542,6 +476,10 @@ def prepare_database(db_conn):
 
             # Run every version since after the current version.
             for v in range(user_version + 1, SCHEMA_VERSION + 1):
+                if v == 10:
+                    raise UpgradeDatabaseException(
+                        "No delta for version 10"
+                    )
                 sql_script = read_schema("delta/v%d" % (v))
                 c.executescript(sql_script)
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..e0d97f440b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -15,15 +15,14 @@
 import logging
 
 from synapse.api.errors import StoreError
-from synapse.api.events.utils import prune_event
+from synapse.events import FrozenEvent
+from synapse.events.utils import prune_event
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
-from syutil.base64util import encode_base64
 
 from twisted.internet import defer
 
 import collections
-import copy
 import json
 import sys
 import time
@@ -84,7 +83,6 @@ class SQLBaseStore(object):
     def __init__(self, hs):
         self.hs = hs
         self._db_pool = hs.get_db_pool()
-        self.event_factory = hs.get_event_factory()
         self._clock = hs.get_clock()
 
     @defer.inlineCallbacks
@@ -436,123 +434,79 @@ class SQLBaseStore(object):
 
         return self.runInteraction("_simple_max_id", func)
 
-    def _parse_event_from_row(self, row_dict):
-        d = copy.deepcopy({k: v for k, v in row_dict.items()})
-
-        d.pop("stream_ordering", None)
-        d.pop("topological_ordering", None)
-        d.pop("processed", None)
-        d["origin_server_ts"] = d.pop("ts", 0)
-        replaces_state = d.pop("prev_state", None)
+    def _get_events(self, event_ids):
+        return self.runInteraction(
+            "_get_events", self._get_events_txn, event_ids
+        )
 
-        if replaces_state:
-            d["replaces_state"] = replaces_state
+    def _get_events_txn(self, txn, event_ids):
+        events = []
+        for e_id in event_ids:
+            ev = self._get_event_txn(txn, e_id)
 
-        d.update(json.loads(row_dict["unrecognized_keys"]))
-        d["content"] = json.loads(d["content"])
-        del d["unrecognized_keys"]
+            if ev:
+                events.append(ev)
 
-        if "age_ts" not in d:
-            # For compatibility
-            d["age_ts"] = d.get("origin_server_ts", 0)
+        return events
 
-        return self.event_factory.create_event(
-            etype=d["type"],
-            **d
+    def _get_event_txn(self, txn, event_id, check_redacted=True,
+                       get_prev_content=True):
+        sql = (
+            "SELECT internal_metadata, json, r.event_id FROM event_json as e "
+            "LEFT JOIN redactions as r ON e.event_id = r.redacts "
+            "WHERE e.event_id = ? "
+            "LIMIT 1 "
         )
 
-    def _get_events_txn(self, txn, event_ids):
-        # FIXME (erikj): This should be batched?
+        txn.execute(sql, (event_id,))
 
-        sql = "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
+        res = txn.fetchone()
 
-        event_rows = []
-        for e_id in event_ids:
-            c = txn.execute(sql, (e_id,))
-            event_rows.extend(self.cursor_to_dict(c))
+        if not res:
+            return None
 
-        return self._parse_events_txn(txn, event_rows)
+        internal_metadata, js, redacted = res
 
-    def _parse_events(self, rows):
-        return self.runInteraction(
-            "_parse_events", self._parse_events_txn, rows
-        )
+        d = json.loads(js)
+        internal_metadata = json.loads(internal_metadata)
 
-    def _parse_events_txn(self, txn, rows):
-        events = [self._parse_event_from_row(r) for r in rows]
+        ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
 
-        select_event_sql = (
-            "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
-        )
+        if check_redacted and redacted:
+            ev = prune_event(ev)
+
+            ev.unsigned["redacted_by"] = redacted
+            # Get the redaction event.
 
-        for i, ev in enumerate(events):
-            signatures = self._get_event_signatures_txn(
-                txn, ev.event_id,
+            because = self._get_event_txn(
+                txn,
+                redacted,
+                check_redacted=False
             )
 
-            ev.signatures = {
-                n: {
-                    k: encode_base64(v) for k, v in s.items()
-                }
-                for n, s in signatures.items()
-            }
+            if because:
+                ev.unsigned["redacted_because"] = because
 
-            hashes = self._get_event_content_hashes_txn(
-                txn, ev.event_id,
+        if get_prev_content and "replaces_state" in ev.unsigned:
+            prev = self._get_event_txn(
+                txn,
+                ev.unsigned["replaces_state"],
+                get_prev_content=False,
             )
+            if prev:
+                ev.unsigned["prev_content"] = prev.get_dict()["content"]
 
-            ev.hashes = {
-                k: encode_base64(v) for k, v in hashes.items()
-            }
-
-            prevs = self._get_prev_events_and_state(txn, ev.event_id)
-
-            ev.prev_events = [
-                (e_id, h)
-                for e_id, h, is_state in prevs
-                if is_state == 0
-            ]
-
-            ev.auth_events = self._get_auth_events(txn, ev.event_id)
-
-            if hasattr(ev, "state_key"):
-                ev.prev_state = [
-                    (e_id, h)
-                    for e_id, h, is_state in prevs
-                    if is_state == 1
-                ]
-
-                if hasattr(ev, "replaces_state"):
-                    # Load previous state_content.
-                    # FIXME (erikj): Handle multiple prev_states.
-                    cursor = txn.execute(
-                        select_event_sql,
-                        (ev.replaces_state,)
-                    )
-                    prevs = self.cursor_to_dict(cursor)
-                    if prevs:
-                        prev = self._parse_event_from_row(prevs[0])
-                        ev.prev_content = prev.content
-
-            if not hasattr(ev, "redacted"):
-                logger.debug("Doesn't have redacted key: %s", ev)
-                ev.redacted = self._has_been_redacted_txn(txn, ev)
-
-            if ev.redacted:
-                # Get the redaction event.
-                select_event_sql = "SELECT * FROM events WHERE event_id = ?"
-                txn.execute(select_event_sql, (ev.redacted,))
-
-                del_evs = self._parse_events_txn(
-                    txn, self.cursor_to_dict(txn)
-                )
+        return ev
 
-                if del_evs:
-                    ev = prune_event(ev)
-                    events[i] = ev
-                    ev.redacted_because = del_evs[0]
+    def _parse_events(self, rows):
+        return self.runInteraction(
+            "_parse_events", self._parse_events_txn, rows
+        )
 
-        return events
+    def _parse_events_txn(self, txn, rows):
+        event_ids = [r["event_id"] for r in rows]
+
+        return self._get_events_txn(txn, event_ids)
 
     def _has_been_redacted_txn(self, txn, event):
         sql = "SELECT event_id FROM redactions WHERE redacts = ?"
@@ -650,7 +604,7 @@ class JoinHelper(object):
     to dump the results into.
 
     Attributes:
-        taples (list): List of `Table` classes
+        tables (list): List of `Table` classes
         EntryType (type)
     """
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 0ff9a23ee0..7a6009c9ee 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -177,14 +177,15 @@ class EventFederationStore(SQLBaseStore):
             retcols=["prev_event_id", "is_state"],
         )
 
+        hashes = self._get_prev_event_hashes_txn(txn, event_id)
+
         results = []
         for d in res:
-            hashes = self._get_event_reference_hashes_txn(
-                txn,
-                d["prev_event_id"]
-            )
+            edge_hash = self._get_event_reference_hashes_txn(txn, d["prev_event_id"])
+            edge_hash.update(hashes.get(d["prev_event_id"], {}))
             prev_hashes = {
-                k: encode_base64(v) for k, v in hashes.items()
+                k: encode_base64(v)
+                for k, v in edge_hash.items()
                 if k == "sha256"
             }
             results.append((d["prev_event_id"], prev_hashes, d["is_state"]))
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
new file mode 100644
index 0000000000..18c068d3d9
--- /dev/null
+++ b/synapse/storage/media_repository.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from _base import SQLBaseStore
+
+
+class MediaRepositoryStore(SQLBaseStore):
+    """Persistence for attachments and avatars"""
+
+    def get_default_thumbnails(self, top_level_type, sub_type):
+        return []
+
+    def get_local_media(self, media_id):
+        """Get the metadata for a local piece of media
+        Returns:
+            None if the meia_id doesn't exist.
+        """
+        return self._simple_select_one(
+            "local_media_repository",
+            {"media_id": media_id},
+            ("media_type", "media_length", "upload_name", "created_ts"),
+            allow_none=True,
+        )
+
+    def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
+                          media_length, user_id):
+        return self._simple_insert(
+            "local_media_repository",
+            {
+                "media_id": media_id,
+                "media_type": media_type,
+                "created_ts": time_now_ms,
+                "upload_name": upload_name,
+                "media_length": media_length,
+                "user_id": user_id.to_string(),
+            }
+        )
+
+    def get_local_media_thumbnails(self, media_id):
+        return self._simple_select_list(
+            "local_media_repository_thumbnails",
+            {"media_id": media_id},
+            (
+                "thumbnail_width", "thumbnail_height", "thumbnail_method",
+                "thumbnail_type", "thumbnail_length",
+            )
+        )
+
+    def store_local_thumbnail(self, media_id, thumbnail_width,
+                              thumbnail_height, thumbnail_type,
+                              thumbnail_method, thumbnail_length):
+        return self._simple_insert(
+            "local_media_repository_thumbnails",
+            {
+                "media_id": media_id,
+                "thumbnail_width": thumbnail_width,
+                "thumbnail_height": thumbnail_height,
+                "thumbnail_method": thumbnail_method,
+                "thumbnail_type": thumbnail_type,
+                "thumbnail_length": thumbnail_length,
+            }
+        )
+
+    def get_cached_remote_media(self, origin, media_id):
+        return self._simple_select_one(
+            "remote_media_cache",
+            {"media_origin": origin, "media_id": media_id},
+            (
+                "media_type", "media_length", "upload_name", "created_ts",
+                "filesystem_id",
+            ),
+            allow_none=True,
+        )
+
+    def store_cached_remote_media(self, origin, media_id, media_type,
+                                  media_length, time_now_ms, upload_name,
+                                  filesystem_id):
+        return self._simple_insert(
+            "remote_media_cache",
+            {
+                "media_origin": origin,
+                "media_id": media_id,
+                "media_type": media_type,
+                "media_length": media_length,
+                "created_ts": time_now_ms,
+                "upload_name": upload_name,
+                "filesystem_id": filesystem_id,
+            }
+        )
+
+    def get_remote_media_thumbnails(self, origin, media_id):
+        return self._simple_select_list(
+            "remote_media_cache_thumbnails",
+            {"media_origin": origin, "media_id": media_id},
+            (
+                "thumbnail_width", "thumbnail_height", "thumbnail_method",
+                "thumbnail_type", "thumbnail_length", "filesystem_id",
+            )
+        )
+
+    def store_remote_media_thumbnail(self, origin, media_id, filesystem_id,
+                                     thumbnail_width, thumbnail_height,
+                                     thumbnail_type, thumbnail_method,
+                                     thumbnail_length):
+        return self._simple_insert(
+            "remote_media_cache_thumbnails",
+            {
+                "media_origin": origin,
+                "media_id": media_id,
+                "thumbnail_width": thumbnail_width,
+                "thumbnail_height": thumbnail_height,
+                "thumbnail_method": thumbnail_method,
+                "thumbnail_type": thumbnail_type,
+                "thumbnail_length": thumbnail_length,
+                "filesystem_id": filesystem_id,
+            }
+        )
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..0af29733a0
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,79 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts INTEGER,
+    retry_interval INTEGER
+);
+
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+    media_id TEXT, -- The id used to refer to the media.
+    media_type TEXT, -- The MIME-type of the media.
+    media_length INTEGER, -- Length of the media in bytes.
+    created_ts INTEGER, -- When the content was uploaded in ms.
+    upload_name TEXT, -- The name the media was uploaded with.
+    user_id TEXT, -- The user who uploaded the file.
+    CONSTRAINT uniqueness UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+    media_id TEXT, -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_method TEXT, -- The method used to make the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    CONSTRAINT uniqueness UNIQUE (
+        media_id, thumbnail_width, thumbnail_height, thumbnail_type
+    )
+);
+
+CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
+    ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+    media_origin TEXT, -- The remote HS the media came from.
+    media_id TEXT, -- The id used to refer to the media on that server.
+    media_type TEXT, -- The MIME-type of the media.
+    created_ts INTEGER, -- When the content was uploaded in ms.
+    upload_name TEXT, -- The name the media was uploaded with.
+    media_length INTEGER, -- Length of the media in bytes.
+    filesystem_id TEXT, -- The name used to store the media on disk.
+    CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+    media_origin TEXT, -- The remote HS the media came from.
+    media_id TEXT, -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_method TEXT, -- The method used to make the thumbnail
+    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    filesystem_id TEXT, -- The name used to store the media on disk.
+    CONSTRAINT uniqueness UNIQUE (
+        media_origin, media_id, thumbnail_width, thumbnail_height,
+        thumbnail_type, thumbnail_type
+    )
+);
+
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+    ON local_media_repository_thumbnails (media_id);
+
+
+PRAGMA user_version = 9;
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 8ba732a23b..253f9f779b 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -32,6 +32,19 @@ CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
 CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
 CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
 
+
+CREATE TABLE IF NOT EXISTS event_json(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    internal_metadata NOT NULL,
+    json BLOB NOT NULL,
+    CONSTRAINT ev_j_uniq UNIQUE (event_id)
+);
+
+CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id);
+CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
+
+
 CREATE TABLE IF NOT EXISTS state_events(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
diff --git a/synapse/storage/schema/media_repository.sql b/synapse/storage/schema/media_repository.sql
new file mode 100644
index 0000000000..b785fa0208
--- /dev/null
+++ b/synapse/storage/schema/media_repository.sql
@@ -0,0 +1,68 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+    media_id TEXT, -- The id used to refer to the media.
+    media_type TEXT, -- The MIME-type of the media.
+    media_length INTEGER, -- Length of the media in bytes.
+    created_ts INTEGER, -- When the content was uploaded in ms.
+    upload_name TEXT, -- The name the media was uploaded with.
+    user_id TEXT, -- The user who uploaded the file.
+    CONSTRAINT uniqueness UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+    media_id TEXT, -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_method TEXT, -- The method used to make the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    CONSTRAINT uniqueness UNIQUE (
+        media_id, thumbnail_width, thumbnail_height, thumbnail_type
+    )
+);
+
+CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
+    ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+    media_origin TEXT, -- The remote HS the media came from.
+    media_id TEXT, -- The id used to refer to the media on that server.
+    media_type TEXT, -- The MIME-type of the media.
+    created_ts INTEGER, -- When the content was uploaded in ms.
+    upload_name TEXT, -- The name the media was uploaded with.
+    media_length INTEGER, -- Length of the media in bytes.
+    filesystem_id TEXT, -- The name used to store the media on disk.
+    CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+    media_origin TEXT, -- The remote HS the media came from.
+    media_id TEXT, -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_method TEXT, -- The method used to make the thumbnail
+    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    filesystem_id TEXT, -- The name used to store the media on disk.
+    CONSTRAINT uniqueness UNIQUE (
+        media_origin, media_id, thumbnail_width, thumbnail_height,
+        thumbnail_type, thumbnail_type
+    )
+);
+
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+    ON local_media_repository_thumbnails (media_id);
diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/state.sql
index 44f7aafb27..2c48d6daca 100644
--- a/synapse/storage/schema/state.sql
+++ b/synapse/storage/schema/state.sql
@@ -29,7 +29,8 @@ CREATE TABLE IF NOT EXISTS state_groups_state(
 
 CREATE TABLE IF NOT EXISTS event_to_state_groups(
     event_id TEXT NOT NULL,
-    state_group INTEGER NOT NULL
+    state_group INTEGER NOT NULL,
+    CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id)
 );
 
 CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index 88e3e4e04d..de461bfa15 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
 
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts INTEGER,
+    retry_interval INTEGER
+);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index eea4f21065..3a705119fd 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -13,8 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from _base import SQLBaseStore
 
+from syutil.base64util import encode_base64
+
 
 class SignatureStore(SQLBaseStore):
     """Persistence for event signatures and hashes"""
@@ -67,6 +71,21 @@ class SignatureStore(SQLBaseStore):
             f
         )
 
+    @defer.inlineCallbacks
+    def add_event_hashes(self, event_ids):
+        hashes = yield self.get_event_reference_hashes(
+            event_ids
+        )
+        hashes = [
+            {
+                k: encode_base64(v) for k, v in h.items()
+                if k == "sha256"
+            }
+            for h in hashes
+        ]
+
+        defer.returnValue(zip(event_ids, hashes))
+
     def _get_event_reference_hashes_txn(self, txn, event_id):
         """Get all the hashes for a given PDU.
         Args:
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index e0f44b3e59..afe3e5edea 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -86,11 +86,16 @@ class StateStore(SQLBaseStore):
             self._store_state_groups_txn, event
         )
 
-    def _store_state_groups_txn(self, txn, event):
-        if event.state_events is None:
+    def _store_state_groups_txn(self, txn, event, context):
+        if context.current_state is None:
             return
 
-        state_group = event.state_group
+        state_events = context.current_state
+
+        if event.is_state():
+            state_events[(event.type, event.state_key)] = event
+
+        state_group = context.state_group
         if not state_group:
             state_group = self._simple_insert_txn(
                 txn,
@@ -102,7 +107,7 @@ class StateStore(SQLBaseStore):
                 or_ignore=True,
             )
 
-            for state in event.state_events.values():
+            for state in state_events.values():
                 self._simple_insert_txn(
                     txn,
                     table="state_groups_state",
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..423cc3f02a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
 
 from collections import namedtuple
 
+from twisted.internet import defer
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
     """
 
+    # a write-through cache of DestinationsTable.EntryType indexed by
+    # destination string
+    destination_retry_cache = {}
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
-        # First we find out what the prev_txs should be.
+        # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
         query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore):
 
         return ReceivedTransactionsTable.decode_results(txn.fetchall())
 
+    def get_destination_retry_timings(self, destination):
+        """Gets the current retry timings (if any) for a given destination.
+
+        Args:
+            destination (str)
+
+        Returns:
+            None if not retrying
+            Otherwise a DestinationsTable.EntryType for the retry scheme
+        """
+        if destination in self.destination_retry_cache:
+            return defer.succeed(self.destination_retry_cache[destination])
+
+        return self.runInteraction(
+            "get_destination_retry_timings",
+            self._get_destination_retry_timings, destination)
+
+    def _get_destination_retry_timings(cls, txn, destination):
+        query = DestinationsTable.select_statement("destination = ?")
+        txn.execute(query, (destination,))
+        result = txn.fetchall()
+        if result:
+            result = DestinationsTable.decode_single_result(result)
+            if result.retry_last_ts > 0:
+                return result
+            else:
+                return None
+
+    def set_destination_retry_timings(self, destination,
+                                      retry_last_ts, retry_interval):
+        """Sets the current retry timings for a given destination.
+        Both timings should be zero if retrying is no longer occuring.
+
+        Args:
+            destination (str)
+            retry_last_ts (int) - time of last retry attempt in unix epoch ms
+            retry_interval (int) - how long until next retry in ms
+        """
+
+        self.destination_retry_cache[destination] = (
+            DestinationsTable.EntryType(
+                destination,
+                retry_last_ts,
+                retry_interval
+            )
+        )
+
+        # XXX: we could chose to not bother persisting this if our cache thinks
+        # this is a NOOP
+        return self.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings,
+            destination,
+            retry_last_ts,
+            retry_interval,
+        )
+
+    def _set_destination_retry_timings(cls, txn, destination,
+                                       retry_last_ts, retry_interval):
+
+        query = (
+            "INSERT OR REPLACE INTO %s "
+            "(destination, retry_last_ts, retry_interval) "
+            "VALUES (?, ?, ?) "
+        ) % DestinationsTable.table_name
+
+        txn.execute(query, (destination, retry_last_ts, retry_interval))
+
+    def get_destinations_needing_retry(self):
+        """Get all destinations which are due a retry for sending a transaction.
+
+        Returns:
+            list: A list of `DestinationsTable.EntryType`
+        """
+
+        return self.runInteraction(
+            "get_destinations_needing_retry",
+            self._get_destinations_needing_retry
+        )
+
+    def _get_destinations_needing_retry(cls, txn):
+        where = "retry_last_ts > 0 and retry_next_ts < now()"
+        query = DestinationsTable.select_statement(where)
+        txn.execute(query)
+        return DestinationsTable.decode_results(txn.fetchall())
+
 
 class ReceivedTransactionsTable(Table):
     table_name = "received_transactions"
@@ -247,3 +339,15 @@ class TransactionsToPduTable(Table):
     ]
 
     EntryType = namedtuple("TransactionsToPduEntry", fields)
+
+
+class DestinationsTable(Table):
+    table_name = "destinations"
+
+    fields = [
+        "destination",
+        "retry_last_ts",
+        "retry_interval",
+    ]
+
+    EntryType = namedtuple("DestinationsEntry", fields)