summary refs log tree commit diff
path: root/synapse/storage/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r--synapse/storage/__init__.py295
1 files changed, 182 insertions, 113 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4e9291fdff..d8f351a675 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -16,14 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.events.room import (
-    RoomMemberEvent, RoomTopicEvent, FeedbackEvent,
-#   RoomConfigEvent,
-    RoomNameEvent,
-    RoomJoinRulesEvent,
-    RoomPowerLevelsEvent,
-    RoomAddStateLevelEvent,
-    RoomSendEventLevelEvent,
-    RoomOpsPowerLevelsEvent,
+    RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent,
     RoomRedactionEvent,
 )
 
@@ -37,9 +30,17 @@ from .registration import RegistrationStore
 from .room import RoomStore
 from .roommember import RoomMemberStore
 from .stream import StreamStore
-from .pdu import StatePduStore, PduStore, PdusTable
 from .transactions import TransactionStore
 from .keys import KeyStore
+from .event_federation import EventFederationStore
+
+from .state import StateStore
+from .signatures import SignatureStore
+
+from syutil.base64util import decode_base64
+
+from synapse.crypto.event_signing import compute_event_reference_hash
+
 
 import json
 import logging
@@ -51,7 +52,6 @@ logger = logging.getLogger(__name__)
 
 SCHEMAS = [
     "transactions",
-    "pdu",
     "users",
     "profiles",
     "presence",
@@ -59,6 +59,9 @@ SCHEMAS = [
     "room_aliases",
     "keys",
     "redactions",
+    "state",
+    "event_edges",
+    "event_signatures",
 ]
 
 
@@ -73,10 +76,12 @@ class _RollbackButIsFineException(Exception):
     """
     pass
 
+
 class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
-                PresenceStore, PduStore, StatePduStore, TransactionStore,
-                DirectoryStore, KeyStore):
+                PresenceStore, TransactionStore,
+                DirectoryStore, KeyStore, StateStore, SignatureStore,
+                EventFederationStore, ):
 
     def __init__(self, hs):
         super(DataStore, self).__init__(hs)
@@ -88,8 +93,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event=None, backfilled=False, pdu=None,
-                      is_new_state=True):
+    def persist_event(self, event, backfilled=False, is_new_state=True):
         stream_ordering = None
         if backfilled:
             if not self.min_token_deferred.called:
@@ -99,8 +103,8 @@ class DataStore(RoomMemberStore, RoomStore,
 
         try:
             yield self.runInteraction(
-                self._persist_pdu_event_txn,
-                pdu=pdu,
+                "persist_event",
+                self._persist_event_txn,
                 event=event,
                 backfilled=backfilled,
                 stream_ordering=stream_ordering,
@@ -119,7 +123,8 @@ class DataStore(RoomMemberStore, RoomStore,
                 "type",
                 "room_id",
                 "content",
-                "unrecognized_keys"
+                "unrecognized_keys",
+                "depth",
             ],
             allow_none=allow_none,
         )
@@ -130,42 +135,6 @@ class DataStore(RoomMemberStore, RoomStore,
         event = self._parse_event_from_row(events_dict)
         defer.returnValue(event)
 
-    def _persist_pdu_event_txn(self, txn, pdu=None, event=None,
-                               backfilled=False, stream_ordering=None,
-                               is_new_state=True):
-        if pdu is not None:
-            self._persist_event_pdu_txn(txn, pdu)
-        if event is not None:
-            return self._persist_event_txn(
-                txn, event, backfilled, stream_ordering,
-                is_new_state=is_new_state,
-            )
-
-    def _persist_event_pdu_txn(self, txn, pdu):
-        cols = dict(pdu.__dict__)
-        unrec_keys = dict(pdu.unrecognized_keys)
-        del cols["content"]
-        del cols["prev_pdus"]
-        cols["content_json"] = json.dumps(pdu.content)
-
-        unrec_keys.update({
-            k: v for k, v in cols.items()
-            if k not in PdusTable.fields
-        })
-
-        cols["unrecognized_keys"] = json.dumps(unrec_keys)
-
-        cols["ts"] = cols.pop("origin_server_ts")
-
-        logger.debug("Persisting: %s", repr(cols))
-
-        if pdu.is_state:
-            self._persist_state_txn(txn, pdu.prev_pdus, cols)
-        else:
-            self._persist_pdu_txn(txn, pdu.prev_pdus, cols)
-
-        self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth)
-
     @log_function
     def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
                            is_new_state=True):
@@ -177,19 +146,13 @@ class DataStore(RoomMemberStore, RoomStore,
             self._store_room_name_txn(txn, event)
         elif event.type == RoomTopicEvent.TYPE:
             self._store_room_topic_txn(txn, event)
-        elif event.type == RoomJoinRulesEvent.TYPE:
-            self._store_join_rule(txn, event)
-        elif event.type == RoomPowerLevelsEvent.TYPE:
-            self._store_power_levels(txn, event)
-        elif event.type == RoomAddStateLevelEvent.TYPE:
-            self._store_add_state_level(txn, event)
-        elif event.type == RoomSendEventLevelEvent.TYPE:
-            self._store_send_event_level(txn, event)
-        elif event.type == RoomOpsPowerLevelsEvent.TYPE:
-            self._store_ops_level(txn, event)
         elif event.type == RoomRedactionEvent.TYPE:
             self._store_redaction(txn, event)
 
+        outlier = False
+        if hasattr(event, "outlier"):
+            outlier = event.outlier
+
         vals = {
             "topological_ordering": event.depth,
             "event_id": event.event_id,
@@ -197,25 +160,34 @@ class DataStore(RoomMemberStore, RoomStore,
             "room_id": event.room_id,
             "content": json.dumps(event.content),
             "processed": True,
+            "outlier": outlier,
+            "depth": event.depth,
         }
 
         if stream_ordering is not None:
             vals["stream_ordering"] = stream_ordering
 
-        if hasattr(event, "outlier"):
-            vals["outlier"] = event.outlier
-        else:
-            vals["outlier"] = False
-
         unrec = {
             k: v
             for k, v in event.get_full_dict().items()
-            if k not in vals.keys() and k not in ["redacted", "redacted_because"]
+            if k not in vals.keys() and k not in [
+                "redacted",
+                "redacted_because",
+                "signatures",
+                "hashes",
+                "prev_events",
+            ]
         }
         vals["unrecognized_keys"] = json.dumps(unrec)
 
         try:
-            self._simple_insert_txn(txn, "events", vals)
+            self._simple_insert_txn(
+                txn,
+                "events",
+                vals,
+                or_replace=(not outlier),
+                or_ignore=bool(outlier),
+            )
         except:
             logger.warn(
                 "Failed to persist, probably duplicate: %s",
@@ -224,6 +196,16 @@ class DataStore(RoomMemberStore, RoomStore,
             )
             raise _RollbackButIsFineException("_persist_event")
 
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
+        self._store_state_groups_txn(txn, event)
+
         is_state = hasattr(event, "state_key") and event.state_key is not None
         if is_new_state and is_state:
             vals = {
@@ -233,10 +215,15 @@ class DataStore(RoomMemberStore, RoomStore,
                 "state_key": event.state_key,
             }
 
-            if hasattr(event, "prev_state"):
-                vals["prev_state"] = event.prev_state
+            if hasattr(event, "replaces_state"):
+                vals["prev_state"] = event.replaces_state
 
-            self._simple_insert_txn(txn, "state_events", vals)
+            self._simple_insert_txn(
+                txn,
+                "state_events",
+                vals,
+                or_replace=True,
+            )
 
             self._simple_insert_txn(
                 txn,
@@ -246,9 +233,87 @@ class DataStore(RoomMemberStore, RoomStore,
                     "room_id": event.room_id,
                     "type": event.type,
                     "state_key": event.state_key,
-                }
+                },
+                or_replace=True,
+            )
+
+            for e_id, h in event.prev_state:
+                self._simple_insert_txn(
+                    txn,
+                    table="event_edges",
+                    values={
+                        "event_id": event.event_id,
+                        "prev_event_id": e_id,
+                        "room_id": event.room_id,
+                        "is_state": 1,
+                    },
+                    or_ignore=True,
+                )
+
+            if not backfilled:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_forward_extremities",
+                    values={
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    or_replace=True,
+                )
+
+                for prev_state_id, _ in event.prev_state:
+                    self._simple_delete_txn(
+                        txn,
+                        table="state_forward_extremities",
+                        keyvalues={
+                            "event_id": prev_state_id,
+                        }
+                    )
+
+        for hash_alg, hash_base64 in event.hashes.items():
+            hash_bytes = decode_base64(hash_base64)
+            self._store_event_content_hash_txn(
+                txn, event.event_id, hash_alg, hash_bytes,
             )
 
+        if hasattr(event, "signatures"):
+            logger.debug("sigs: %s", event.signatures)
+            for name, sigs in event.signatures.items():
+                for key_id, signature_base64 in sigs.items():
+                    signature_bytes = decode_base64(signature_base64)
+                    self._store_event_signature_txn(
+                        txn, event.event_id, name, key_id,
+                        signature_bytes,
+                    )
+
+        for prev_event_id, prev_hashes in event.prev_events:
+            for alg, hash_base64 in prev_hashes.items():
+                hash_bytes = decode_base64(hash_base64)
+                self._store_prev_event_hash_txn(
+                    txn, event.event_id, prev_event_id, alg, hash_bytes
+                )
+
+        for auth_id, _ in event.auth_events:
+            self._simple_insert_txn(
+                txn,
+                table="event_auth",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                },
+                or_ignore=True,
+            )
+
+        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+        self._store_event_reference_hash_txn(
+            txn, event.event_id, ref_alg, ref_hash_bytes
+        )
+
+        self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
+
     def _store_redaction(self, txn, event):
         txn.execute(
             "INSERT OR IGNORE INTO redactions "
@@ -319,7 +384,7 @@ class DataStore(RoomMemberStore, RoomStore,
             ],
         )
 
-    def snapshot_room(self, room_id, user_id, state_type=None, state_key=None):
+    def snapshot_room(self, event):
         """Snapshot the room for an update by a user
         Args:
             room_id (synapse.types.RoomId): The room to snapshot.
@@ -330,29 +395,33 @@ class DataStore(RoomMemberStore, RoomStore,
             synapse.storage.Snapshot: A snapshot of the state of the room.
         """
         def _snapshot(txn):
-            membership_state = self._get_room_member(txn, user_id, room_id)
-            prev_pdus = self._get_latest_pdus_in_context(
-                txn, room_id
+            prev_events = self._get_latest_events_in_room(
+                txn,
+                event.room_id
             )
-            if state_type is not None and state_key is not None:
-                prev_state_pdu = self._get_current_state_pdu(
-                    txn, room_id, state_type, state_key
+
+            prev_state = None
+            state_key = None
+            if hasattr(event, "state_key"):
+                state_key = event.state_key
+                prev_state = self._get_latest_state_in_room(
+                    txn,
+                    event.room_id,
+                    type=event.type,
+                    state_key=state_key,
                 )
-            else:
-                prev_state_pdu = None
 
             return Snapshot(
                 store=self,
-                room_id=room_id,
-                user_id=user_id,
-                prev_pdus=prev_pdus,
-                membership_state=membership_state,
-                state_type=state_type,
+                room_id=event.room_id,
+                user_id=event.user_id,
+                prev_events=prev_events,
+                prev_state=prev_state,
+                state_type=event.type,
                 state_key=state_key,
-                prev_state_pdu=prev_state_pdu,
             )
 
-        return self.runInteraction(_snapshot)
+        return self.runInteraction("snapshot_room", _snapshot)
 
 
 class Snapshot(object):
@@ -361,7 +430,7 @@ class Snapshot(object):
         store (DataStore): The datastore.
         room_id (RoomId): The room of the snapshot.
         user_id (UserId): The user this snapshot is for.
-        prev_pdus (list): The list of PDU ids this snapshot is after.
+        prev_events (list): The list of event ids this snapshot is after.
         membership_state (RoomMemberEvent): The current state of the user in
             the room.
         state_type (str, optional): State type captured by the snapshot
@@ -370,32 +439,30 @@ class Snapshot(object):
             the previous value of the state type and key in the room.
     """
 
-    def __init__(self, store, room_id, user_id, prev_pdus,
-                 membership_state, state_type=None, state_key=None,
-                 prev_state_pdu=None):
+    def __init__(self, store, room_id, user_id, prev_events,
+                 prev_state, state_type=None, state_key=None):
         self.store = store
         self.room_id = room_id
         self.user_id = user_id
-        self.prev_pdus = prev_pdus
-        self.membership_state = membership_state
+        self.prev_events = prev_events
+        self.prev_state = prev_state
         self.state_type = state_type
         self.state_key = state_key
-        self.prev_state_pdu = prev_state_pdu
 
     def fill_out_prev_events(self, event):
-        if hasattr(event, "prev_events"):
-            return
-
-        es = [
-            "%s@%s" % (p_id, origin) for p_id, origin, _ in self.prev_pdus
-        ]
-
-        event.prev_events = [e for e in es if e != event.event_id]
+        if not hasattr(event, "prev_events"):
+            event.prev_events = [
+                (event_id, hashes)
+                for event_id, hashes, _ in self.prev_events
+            ]
+
+            if self.prev_events:
+                event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
+            else:
+                event.depth = 0
 
-        if self.prev_pdus:
-            event.depth = max([int(v) for _, _, v in self.prev_pdus]) + 1
-        else:
-            event.depth = 0
+        if not hasattr(event, "prev_state") and self.prev_state is not None:
+            event.prev_state = self.prev_state
 
 
 def schema_path(schema):
@@ -436,11 +503,13 @@ def prepare_database(db_conn):
         user_version = row[0]
 
         if user_version > SCHEMA_VERSION:
-            raise ValueError("Cannot use this database as it is too " +
+            raise ValueError(
+                "Cannot use this database as it is too " +
                 "new for the server to understand"
             )
         elif user_version < SCHEMA_VERSION:
-            logging.info("Upgrading database from version %d",
+            logging.info(
+                "Upgrading database from version %d",
                 user_version
             )
 
@@ -452,13 +521,13 @@ def prepare_database(db_conn):
             db_conn.commit()
 
     else:
-        sql_script = "BEGIN TRANSACTION;"
+        sql_script = "BEGIN TRANSACTION;\n"
         for sql_loc in SCHEMAS:
             sql_script += read_schema(sql_loc)
+            sql_script += "\n"
         sql_script += "COMMIT TRANSACTION;"
         c.executescript(sql_script)
         db_conn.commit()
         c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
 
     c.close()
-