diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4e9291fdff..d8f351a675 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -16,14 +16,7 @@
from twisted.internet import defer
from synapse.api.events.room import (
- RoomMemberEvent, RoomTopicEvent, FeedbackEvent,
-# RoomConfigEvent,
- RoomNameEvent,
- RoomJoinRulesEvent,
- RoomPowerLevelsEvent,
- RoomAddStateLevelEvent,
- RoomSendEventLevelEvent,
- RoomOpsPowerLevelsEvent,
+ RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent,
RoomRedactionEvent,
)
@@ -37,9 +30,17 @@ from .registration import RegistrationStore
from .room import RoomStore
from .roommember import RoomMemberStore
from .stream import StreamStore
-from .pdu import StatePduStore, PduStore, PdusTable
from .transactions import TransactionStore
from .keys import KeyStore
+from .event_federation import EventFederationStore
+
+from .state import StateStore
+from .signatures import SignatureStore
+
+from syutil.base64util import decode_base64
+
+from synapse.crypto.event_signing import compute_event_reference_hash
+
import json
import logging
@@ -51,7 +52,6 @@ logger = logging.getLogger(__name__)
SCHEMAS = [
"transactions",
- "pdu",
"users",
"profiles",
"presence",
@@ -59,6 +59,9 @@ SCHEMAS = [
"room_aliases",
"keys",
"redactions",
+ "state",
+ "event_edges",
+ "event_signatures",
]
@@ -73,10 +76,12 @@ class _RollbackButIsFineException(Exception):
"""
pass
+
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
- PresenceStore, PduStore, StatePduStore, TransactionStore,
- DirectoryStore, KeyStore):
+ PresenceStore, TransactionStore,
+ DirectoryStore, KeyStore, StateStore, SignatureStore,
+ EventFederationStore, ):
def __init__(self, hs):
super(DataStore, self).__init__(hs)
@@ -88,8 +93,7 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
@log_function
- def persist_event(self, event=None, backfilled=False, pdu=None,
- is_new_state=True):
+ def persist_event(self, event, backfilled=False, is_new_state=True):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
@@ -99,8 +103,8 @@ class DataStore(RoomMemberStore, RoomStore,
try:
yield self.runInteraction(
- self._persist_pdu_event_txn,
- pdu=pdu,
+ "persist_event",
+ self._persist_event_txn,
event=event,
backfilled=backfilled,
stream_ordering=stream_ordering,
@@ -119,7 +123,8 @@ class DataStore(RoomMemberStore, RoomStore,
"type",
"room_id",
"content",
- "unrecognized_keys"
+ "unrecognized_keys",
+ "depth",
],
allow_none=allow_none,
)
@@ -130,42 +135,6 @@ class DataStore(RoomMemberStore, RoomStore,
event = self._parse_event_from_row(events_dict)
defer.returnValue(event)
- def _persist_pdu_event_txn(self, txn, pdu=None, event=None,
- backfilled=False, stream_ordering=None,
- is_new_state=True):
- if pdu is not None:
- self._persist_event_pdu_txn(txn, pdu)
- if event is not None:
- return self._persist_event_txn(
- txn, event, backfilled, stream_ordering,
- is_new_state=is_new_state,
- )
-
- def _persist_event_pdu_txn(self, txn, pdu):
- cols = dict(pdu.__dict__)
- unrec_keys = dict(pdu.unrecognized_keys)
- del cols["content"]
- del cols["prev_pdus"]
- cols["content_json"] = json.dumps(pdu.content)
-
- unrec_keys.update({
- k: v for k, v in cols.items()
- if k not in PdusTable.fields
- })
-
- cols["unrecognized_keys"] = json.dumps(unrec_keys)
-
- cols["ts"] = cols.pop("origin_server_ts")
-
- logger.debug("Persisting: %s", repr(cols))
-
- if pdu.is_state:
- self._persist_state_txn(txn, pdu.prev_pdus, cols)
- else:
- self._persist_pdu_txn(txn, pdu.prev_pdus, cols)
-
- self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth)
-
@log_function
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
is_new_state=True):
@@ -177,19 +146,13 @@ class DataStore(RoomMemberStore, RoomStore,
self._store_room_name_txn(txn, event)
elif event.type == RoomTopicEvent.TYPE:
self._store_room_topic_txn(txn, event)
- elif event.type == RoomJoinRulesEvent.TYPE:
- self._store_join_rule(txn, event)
- elif event.type == RoomPowerLevelsEvent.TYPE:
- self._store_power_levels(txn, event)
- elif event.type == RoomAddStateLevelEvent.TYPE:
- self._store_add_state_level(txn, event)
- elif event.type == RoomSendEventLevelEvent.TYPE:
- self._store_send_event_level(txn, event)
- elif event.type == RoomOpsPowerLevelsEvent.TYPE:
- self._store_ops_level(txn, event)
elif event.type == RoomRedactionEvent.TYPE:
self._store_redaction(txn, event)
+ outlier = False
+ if hasattr(event, "outlier"):
+ outlier = event.outlier
+
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
@@ -197,25 +160,34 @@ class DataStore(RoomMemberStore, RoomStore,
"room_id": event.room_id,
"content": json.dumps(event.content),
"processed": True,
+ "outlier": outlier,
+ "depth": event.depth,
}
if stream_ordering is not None:
vals["stream_ordering"] = stream_ordering
- if hasattr(event, "outlier"):
- vals["outlier"] = event.outlier
- else:
- vals["outlier"] = False
-
unrec = {
k: v
for k, v in event.get_full_dict().items()
- if k not in vals.keys() and k not in ["redacted", "redacted_because"]
+ if k not in vals.keys() and k not in [
+ "redacted",
+ "redacted_because",
+ "signatures",
+ "hashes",
+ "prev_events",
+ ]
}
vals["unrecognized_keys"] = json.dumps(unrec)
try:
- self._simple_insert_txn(txn, "events", vals)
+ self._simple_insert_txn(
+ txn,
+ "events",
+ vals,
+ or_replace=(not outlier),
+ or_ignore=bool(outlier),
+ )
except:
logger.warn(
"Failed to persist, probably duplicate: %s",
@@ -224,6 +196,16 @@ class DataStore(RoomMemberStore, RoomStore,
)
raise _RollbackButIsFineException("_persist_event")
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
+ self._store_state_groups_txn(txn, event)
+
is_state = hasattr(event, "state_key") and event.state_key is not None
if is_new_state and is_state:
vals = {
@@ -233,10 +215,15 @@ class DataStore(RoomMemberStore, RoomStore,
"state_key": event.state_key,
}
- if hasattr(event, "prev_state"):
- vals["prev_state"] = event.prev_state
+ if hasattr(event, "replaces_state"):
+ vals["prev_state"] = event.replaces_state
- self._simple_insert_txn(txn, "state_events", vals)
+ self._simple_insert_txn(
+ txn,
+ "state_events",
+ vals,
+ or_replace=True,
+ )
self._simple_insert_txn(
txn,
@@ -246,9 +233,87 @@ class DataStore(RoomMemberStore, RoomStore,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
- }
+ },
+ or_replace=True,
+ )
+
+ for e_id, h in event.prev_state:
+ self._simple_insert_txn(
+ txn,
+ table="event_edges",
+ values={
+ "event_id": event.event_id,
+ "prev_event_id": e_id,
+ "room_id": event.room_id,
+ "is_state": 1,
+ },
+ or_ignore=True,
+ )
+
+ if not backfilled:
+ self._simple_insert_txn(
+ txn,
+ table="state_forward_extremities",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ or_replace=True,
+ )
+
+ for prev_state_id, _ in event.prev_state:
+ self._simple_delete_txn(
+ txn,
+ table="state_forward_extremities",
+ keyvalues={
+ "event_id": prev_state_id,
+ }
+ )
+
+ for hash_alg, hash_base64 in event.hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_event_content_hash_txn(
+ txn, event.event_id, hash_alg, hash_bytes,
)
+ if hasattr(event, "signatures"):
+ logger.debug("sigs: %s", event.signatures)
+ for name, sigs in event.signatures.items():
+ for key_id, signature_base64 in sigs.items():
+ signature_bytes = decode_base64(signature_base64)
+ self._store_event_signature_txn(
+ txn, event.event_id, name, key_id,
+ signature_bytes,
+ )
+
+ for prev_event_id, prev_hashes in event.prev_events:
+ for alg, hash_base64 in prev_hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_prev_event_hash_txn(
+ txn, event.event_id, prev_event_id, alg, hash_bytes
+ )
+
+ for auth_id, _ in event.auth_events:
+ self._simple_insert_txn(
+ txn,
+ table="event_auth",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "auth_id": auth_id,
+ },
+ or_ignore=True,
+ )
+
+ (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+ self._store_event_reference_hash_txn(
+ txn, event.event_id, ref_alg, ref_hash_bytes
+ )
+
+ self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
+
def _store_redaction(self, txn, event):
txn.execute(
"INSERT OR IGNORE INTO redactions "
@@ -319,7 +384,7 @@ class DataStore(RoomMemberStore, RoomStore,
],
)
- def snapshot_room(self, room_id, user_id, state_type=None, state_key=None):
+ def snapshot_room(self, event):
"""Snapshot the room for an update by a user
Args:
room_id (synapse.types.RoomId): The room to snapshot.
@@ -330,29 +395,33 @@ class DataStore(RoomMemberStore, RoomStore,
synapse.storage.Snapshot: A snapshot of the state of the room.
"""
def _snapshot(txn):
- membership_state = self._get_room_member(txn, user_id, room_id)
- prev_pdus = self._get_latest_pdus_in_context(
- txn, room_id
+ prev_events = self._get_latest_events_in_room(
+ txn,
+ event.room_id
)
- if state_type is not None and state_key is not None:
- prev_state_pdu = self._get_current_state_pdu(
- txn, room_id, state_type, state_key
+
+ prev_state = None
+ state_key = None
+ if hasattr(event, "state_key"):
+ state_key = event.state_key
+ prev_state = self._get_latest_state_in_room(
+ txn,
+ event.room_id,
+ type=event.type,
+ state_key=state_key,
)
- else:
- prev_state_pdu = None
return Snapshot(
store=self,
- room_id=room_id,
- user_id=user_id,
- prev_pdus=prev_pdus,
- membership_state=membership_state,
- state_type=state_type,
+ room_id=event.room_id,
+ user_id=event.user_id,
+ prev_events=prev_events,
+ prev_state=prev_state,
+ state_type=event.type,
state_key=state_key,
- prev_state_pdu=prev_state_pdu,
)
- return self.runInteraction(_snapshot)
+ return self.runInteraction("snapshot_room", _snapshot)
class Snapshot(object):
@@ -361,7 +430,7 @@ class Snapshot(object):
store (DataStore): The datastore.
room_id (RoomId): The room of the snapshot.
user_id (UserId): The user this snapshot is for.
- prev_pdus (list): The list of PDU ids this snapshot is after.
+ prev_events (list): The list of event ids this snapshot is after.
membership_state (RoomMemberEvent): The current state of the user in
the room.
state_type (str, optional): State type captured by the snapshot
@@ -370,32 +439,30 @@ class Snapshot(object):
the previous value of the state type and key in the room.
"""
- def __init__(self, store, room_id, user_id, prev_pdus,
- membership_state, state_type=None, state_key=None,
- prev_state_pdu=None):
+ def __init__(self, store, room_id, user_id, prev_events,
+ prev_state, state_type=None, state_key=None):
self.store = store
self.room_id = room_id
self.user_id = user_id
- self.prev_pdus = prev_pdus
- self.membership_state = membership_state
+ self.prev_events = prev_events
+ self.prev_state = prev_state
self.state_type = state_type
self.state_key = state_key
- self.prev_state_pdu = prev_state_pdu
def fill_out_prev_events(self, event):
- if hasattr(event, "prev_events"):
- return
-
- es = [
- "%s@%s" % (p_id, origin) for p_id, origin, _ in self.prev_pdus
- ]
-
- event.prev_events = [e for e in es if e != event.event_id]
+ if not hasattr(event, "prev_events"):
+ event.prev_events = [
+ (event_id, hashes)
+ for event_id, hashes, _ in self.prev_events
+ ]
+
+ if self.prev_events:
+ event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
+ else:
+ event.depth = 0
- if self.prev_pdus:
- event.depth = max([int(v) for _, _, v in self.prev_pdus]) + 1
- else:
- event.depth = 0
+ if not hasattr(event, "prev_state") and self.prev_state is not None:
+ event.prev_state = self.prev_state
def schema_path(schema):
@@ -436,11 +503,13 @@ def prepare_database(db_conn):
user_version = row[0]
if user_version > SCHEMA_VERSION:
- raise ValueError("Cannot use this database as it is too " +
+ raise ValueError(
+ "Cannot use this database as it is too " +
"new for the server to understand"
)
elif user_version < SCHEMA_VERSION:
- logging.info("Upgrading database from version %d",
+ logging.info(
+ "Upgrading database from version %d",
user_version
)
@@ -452,13 +521,13 @@ def prepare_database(db_conn):
db_conn.commit()
else:
- sql_script = "BEGIN TRANSACTION;"
+ sql_script = "BEGIN TRANSACTION;\n"
for sql_loc in SCHEMAS:
sql_script += read_schema(sql_loc)
+ sql_script += "\n"
sql_script += "COMMIT TRANSACTION;"
c.executescript(sql_script)
db_conn.commit()
c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
c.close()
-
|