summary refs log tree commit diff
path: root/synapse/storage/__init__.py
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2014-12-18 15:15:22 +0000
committerDavid Baker <dave@matrix.org>2014-12-18 15:15:22 +0000
commitb56730bb6e549e6b22c95858cae50e091de96844 (patch)
tree0eb011bc6b32a44df8c343ffedbc15e2ddd046e9 /synapse/storage/__init__.py
parentschema version is now 10 (diff)
parentUpdate README.rst (diff)
downloadsynapse-b56730bb6e549e6b22c95858cae50e091de96844.tar.xz
Merge branch 'develop' into pushers
Conflicts:
	synapse/api/errors.py
	synapse/server.py
	synapse/storage/__init__.py
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r--synapse/storage/__init__.py199
1 files changed, 70 insertions, 129 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ad1765e04d..ac3bf5cee5 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -15,12 +15,8 @@
 
 from twisted.internet import defer
 
-from synapse.api.events.room import (
-    RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent,
-    RoomRedactionEvent,
-)
-
 from synapse.util.logutils import log_function
+from synapse.api.constants import EventTypes
 
 from .directory import DirectoryStore
 from .feedback import FeedbackStore
@@ -34,11 +30,13 @@ from .transactions import TransactionStore
 from .keys import KeyStore
 from .event_federation import EventFederationStore
 from .pusher import PusherStore
+from .media_repository import MediaRepositoryStore
 
 from .state import StateStore
 from .signatures import SignatureStore
 
 from syutil.base64util import decode_base64
+from syutil.jsonutil import encode_canonical_json
 
 from synapse.crypto.event_signing import compute_event_reference_hash
 
@@ -63,7 +61,8 @@ SCHEMAS = [
     "state",
     "event_edges",
     "event_signatures",
-    "pusher"
+    "pusher",
+    "media_repository",
 ]
 
 
@@ -83,11 +82,13 @@ class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
-                EventFederationStore, PusherStore, ):
+                EventFederationStore,
+                MediaRepositoryStore,
+                PusherStore,
+                ):
 
     def __init__(self, hs):
         super(DataStore, self).__init__(hs)
-        self.event_factory = hs.get_event_factory()
         self.hs = hs
 
         self.min_token_deferred = self._get_min_token()
@@ -95,8 +96,8 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, backfilled=False, is_new_state=True,
-                      current_state=None):
+    def persist_event(self, event, context, backfilled=False,
+                      is_new_state=True, current_state=None):
         stream_ordering = None
         if backfilled:
             if not self.min_token_deferred.called:
@@ -109,6 +110,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 "persist_event",
                 self._persist_event_txn,
                 event=event,
+                context=context,
                 backfilled=backfilled,
                 stream_ordering=stream_ordering,
                 is_new_state=is_new_state,
@@ -119,50 +121,66 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @defer.inlineCallbacks
     def get_event(self, event_id, allow_none=False):
-        events_dict = yield self._simple_select_one(
-            "events",
-            {"event_id": event_id},
-            [
-                "event_id",
-                "type",
-                "room_id",
-                "content",
-                "unrecognized_keys",
-                "depth",
-            ],
-            allow_none=allow_none,
-        )
+        events = yield self._get_events([event_id])
 
-        if not events_dict:
-            defer.returnValue(None)
+        if not events:
+            if allow_none:
+                defer.returnValue(None)
+            else:
+                raise RuntimeError("Could not find event %s" % (event_id,))
 
-        event = yield self._parse_events([events_dict])
-        defer.returnValue(event[0])
+        defer.returnValue(events[0])
 
     @log_function
-    def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
-                           is_new_state=True, current_state=None):
-        if event.type == RoomMemberEvent.TYPE:
+    def _persist_event_txn(self, txn, event, context, backfilled,
+                           stream_ordering=None, is_new_state=True,
+                           current_state=None):
+        if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == FeedbackEvent.TYPE:
+        elif event.type == EventTypes.Feedback:
             self._store_feedback_txn(txn, event)
-        elif event.type == RoomNameEvent.TYPE:
+        elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
-        elif event.type == RoomTopicEvent.TYPE:
+        elif event.type == EventTypes.Topic:
             self._store_room_topic_txn(txn, event)
-        elif event.type == RoomRedactionEvent.TYPE:
+        elif event.type == EventTypes.Redaction:
             self._store_redaction(txn, event)
 
         outlier = False
-        if hasattr(event, "outlier"):
-            outlier = event.outlier
+        if hasattr(event.internal_metadata, "outlier"):
+            outlier = event.internal_metadata.outlier
+
+        event_dict = {
+            k: v
+            for k, v in event.get_dict().items()
+            if k not in [
+                "redacted",
+                "redacted_because",
+            ]
+        }
+
+        metadata_json = encode_canonical_json(
+            event.internal_metadata.get_dict()
+        )
+
+        self._simple_insert_txn(
+            txn,
+            table="event_json",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "internal_metadata": metadata_json.decode("UTF-8"),
+                "json": encode_canonical_json(event_dict).decode("UTF-8"),
+            },
+            or_replace=True,
+        )
 
         vals = {
             "topological_ordering": event.depth,
             "event_id": event.event_id,
             "type": event.type,
             "room_id": event.room_id,
-            "content": json.dumps(event.content),
+            "content": json.dumps(event.get_dict()["content"]),
             "processed": True,
             "outlier": outlier,
             "depth": event.depth,
@@ -173,7 +191,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
         unrec = {
             k: v
-            for k, v in event.get_full_dict().items()
+            for k, v in event.get_dict().items()
             if k not in vals.keys() and k not in [
                 "redacted",
                 "redacted_because",
@@ -208,7 +226,8 @@ class DataStore(RoomMemberStore, RoomStore,
             room_id=event.room_id,
         )
 
-        self._store_state_groups_txn(txn, event)
+        if not outlier:
+            self._store_state_groups_txn(txn, event, context)
 
         if current_state:
             txn.execute(
@@ -302,16 +321,6 @@ class DataStore(RoomMemberStore, RoomStore,
                 txn, event.event_id, hash_alg, hash_bytes,
             )
 
-        if hasattr(event, "signatures"):
-            logger.debug("sigs: %s", event.signatures)
-            for name, sigs in event.signatures.items():
-                for key_id, signature_base64 in sigs.items():
-                    signature_bytes = decode_base64(signature_base64)
-                    self._store_event_signature_txn(
-                        txn, event.event_id, name, key_id,
-                        signature_bytes,
-                    )
-
         for prev_event_id, prev_hashes in event.prev_events:
             for alg, hash_base64 in prev_hashes.items():
                 hash_bytes = decode_base64(hash_base64)
@@ -413,86 +422,6 @@ class DataStore(RoomMemberStore, RoomStore,
             ],
         )
 
-    def snapshot_room(self, event):
-        """Snapshot the room for an update by a user
-        Args:
-            room_id (synapse.types.RoomId): The room to snapshot.
-            user_id (synapse.types.UserId): The user to snapshot the room for.
-            state_type (str): Optional state type to snapshot.
-            state_key (str): Optional state key to snapshot.
-        Returns:
-            synapse.storage.Snapshot: A snapshot of the state of the room.
-        """
-        def _snapshot(txn):
-            prev_events = self._get_latest_events_in_room(
-                txn,
-                event.room_id
-            )
-
-            prev_state = None
-            state_key = None
-            if hasattr(event, "state_key"):
-                state_key = event.state_key
-                prev_state = self._get_latest_state_in_room(
-                    txn,
-                    event.room_id,
-                    type=event.type,
-                    state_key=state_key,
-                )
-
-            return Snapshot(
-                store=self,
-                room_id=event.room_id,
-                user_id=event.user_id,
-                prev_events=prev_events,
-                prev_state=prev_state,
-                state_type=event.type,
-                state_key=state_key,
-            )
-
-        return self.runInteraction("snapshot_room", _snapshot)
-
-
-class Snapshot(object):
-    """Snapshot of the state of a room
-    Args:
-        store (DataStore): The datastore.
-        room_id (RoomId): The room of the snapshot.
-        user_id (UserId): The user this snapshot is for.
-        prev_events (list): The list of event ids this snapshot is after.
-        membership_state (RoomMemberEvent): The current state of the user in
-            the room.
-        state_type (str, optional): State type captured by the snapshot
-        state_key (str, optional): State key captured by the snapshot
-        prev_state_pdu (PduEntry, optional): pdu id of
-            the previous value of the state type and key in the room.
-    """
-
-    def __init__(self, store, room_id, user_id, prev_events,
-                 prev_state, state_type=None, state_key=None):
-        self.store = store
-        self.room_id = room_id
-        self.user_id = user_id
-        self.prev_events = prev_events
-        self.prev_state = prev_state
-        self.state_type = state_type
-        self.state_key = state_key
-
-    def fill_out_prev_events(self, event):
-        if not hasattr(event, "prev_events"):
-            event.prev_events = [
-                (event_id, hashes)
-                for event_id, hashes, _ in self.prev_events
-            ]
-
-            if self.prev_events:
-                event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
-            else:
-                event.depth = 0
-
-        if not hasattr(event, "prev_state") and self.prev_state is not None:
-            event.prev_state = self.prev_state
-
 
 def schema_path(schema):
     """ Get a filesystem path for the named database schema
@@ -520,6 +449,14 @@ def read_schema(schema):
         return schema_file.read()
 
 
+class PrepareDatabaseException(Exception):
+    pass
+
+
+class UpgradeDatabaseException(PrepareDatabaseException):
+    pass
+
+
 def prepare_database(db_conn):
     """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
     don't have to worry about overwriting existing content.
@@ -544,6 +481,10 @@ def prepare_database(db_conn):
 
             # Run every version since after the current version.
             for v in range(user_version + 1, SCHEMA_VERSION + 1):
+                if v == 10:
+                    raise UpgradeDatabaseException(
+                        "No delta for version 10"
+                    )
                 sql_script = read_schema("delta/v%d" % (v))
                 c.executescript(sql_script)