diff options
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r-- | synapse/storage/__init__.py | 295 |
1 files changed, 182 insertions, 113 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4e9291fdff..d8f351a675 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -16,14 +16,7 @@ from twisted.internet import defer from synapse.api.events.room import ( - RoomMemberEvent, RoomTopicEvent, FeedbackEvent, -# RoomConfigEvent, - RoomNameEvent, - RoomJoinRulesEvent, - RoomPowerLevelsEvent, - RoomAddStateLevelEvent, - RoomSendEventLevelEvent, - RoomOpsPowerLevelsEvent, + RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent, RoomRedactionEvent, ) @@ -37,9 +30,17 @@ from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore from .stream import StreamStore -from .pdu import StatePduStore, PduStore, PdusTable from .transactions import TransactionStore from .keys import KeyStore +from .event_federation import EventFederationStore + +from .state import StateStore +from .signatures import SignatureStore + +from syutil.base64util import decode_base64 + +from synapse.crypto.event_signing import compute_event_reference_hash + import json import logging @@ -51,7 +52,6 @@ logger = logging.getLogger(__name__) SCHEMAS = [ "transactions", - "pdu", "users", "profiles", "presence", @@ -59,6 +59,9 @@ SCHEMAS = [ "room_aliases", "keys", "redactions", + "state", + "event_edges", + "event_signatures", ] @@ -73,10 +76,12 @@ class _RollbackButIsFineException(Exception): """ pass + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, - PresenceStore, PduStore, StatePduStore, TransactionStore, - DirectoryStore, KeyStore): + PresenceStore, TransactionStore, + DirectoryStore, KeyStore, StateStore, SignatureStore, + EventFederationStore, ): def __init__(self, hs): super(DataStore, self).__init__(hs) @@ -88,8 +93,7 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks @log_function - def persist_event(self, event=None, backfilled=False, pdu=None, - is_new_state=True): + def persist_event(self, event, backfilled=False, is_new_state=True): stream_ordering = None if backfilled: if not self.min_token_deferred.called: @@ -99,8 +103,8 @@ class DataStore(RoomMemberStore, RoomStore, try: yield self.runInteraction( - self._persist_pdu_event_txn, - pdu=pdu, + "persist_event", + self._persist_event_txn, event=event, backfilled=backfilled, stream_ordering=stream_ordering, @@ -119,7 +123,8 @@ class DataStore(RoomMemberStore, RoomStore, "type", "room_id", "content", - "unrecognized_keys" + "unrecognized_keys", + "depth", ], allow_none=allow_none, ) @@ -130,42 +135,6 @@ class DataStore(RoomMemberStore, RoomStore, event = self._parse_event_from_row(events_dict) defer.returnValue(event) - def _persist_pdu_event_txn(self, txn, pdu=None, event=None, - backfilled=False, stream_ordering=None, - is_new_state=True): - if pdu is not None: - self._persist_event_pdu_txn(txn, pdu) - if event is not None: - return self._persist_event_txn( - txn, event, backfilled, stream_ordering, - is_new_state=is_new_state, - ) - - def _persist_event_pdu_txn(self, txn, pdu): - cols = dict(pdu.__dict__) - unrec_keys = dict(pdu.unrecognized_keys) - del cols["content"] - del cols["prev_pdus"] - cols["content_json"] = json.dumps(pdu.content) - - unrec_keys.update({ - k: v for k, v in cols.items() - if k not in PdusTable.fields - }) - - cols["unrecognized_keys"] = json.dumps(unrec_keys) - - cols["ts"] = cols.pop("origin_server_ts") - - logger.debug("Persisting: %s", repr(cols)) - - if pdu.is_state: - self._persist_state_txn(txn, pdu.prev_pdus, cols) - else: - self._persist_pdu_txn(txn, pdu.prev_pdus, cols) - - self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth) - @log_function def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, is_new_state=True): @@ -177,19 +146,13 @@ class DataStore(RoomMemberStore, RoomStore, self._store_room_name_txn(txn, event) elif event.type == RoomTopicEvent.TYPE: self._store_room_topic_txn(txn, event) - elif event.type == RoomJoinRulesEvent.TYPE: - self._store_join_rule(txn, event) - elif event.type == RoomPowerLevelsEvent.TYPE: - self._store_power_levels(txn, event) - elif event.type == RoomAddStateLevelEvent.TYPE: - self._store_add_state_level(txn, event) - elif event.type == RoomSendEventLevelEvent.TYPE: - self._store_send_event_level(txn, event) - elif event.type == RoomOpsPowerLevelsEvent.TYPE: - self._store_ops_level(txn, event) elif event.type == RoomRedactionEvent.TYPE: self._store_redaction(txn, event) + outlier = False + if hasattr(event, "outlier"): + outlier = event.outlier + vals = { "topological_ordering": event.depth, "event_id": event.event_id, @@ -197,25 +160,34 @@ class DataStore(RoomMemberStore, RoomStore, "room_id": event.room_id, "content": json.dumps(event.content), "processed": True, + "outlier": outlier, + "depth": event.depth, } if stream_ordering is not None: vals["stream_ordering"] = stream_ordering - if hasattr(event, "outlier"): - vals["outlier"] = event.outlier - else: - vals["outlier"] = False - unrec = { k: v for k, v in event.get_full_dict().items() - if k not in vals.keys() and k not in ["redacted", "redacted_because"] + if k not in vals.keys() and k not in [ + "redacted", + "redacted_because", + "signatures", + "hashes", + "prev_events", + ] } vals["unrecognized_keys"] = json.dumps(unrec) try: - self._simple_insert_txn(txn, "events", vals) + self._simple_insert_txn( + txn, + "events", + vals, + or_replace=(not outlier), + or_ignore=bool(outlier), + ) except: logger.warn( "Failed to persist, probably duplicate: %s", @@ -224,6 +196,16 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + self._store_state_groups_txn(txn, event) + is_state = hasattr(event, "state_key") and event.state_key is not None if is_new_state and is_state: vals = { @@ -233,10 +215,15 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } - if hasattr(event, "prev_state"): - vals["prev_state"] = event.prev_state + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state - self._simple_insert_txn(txn, "state_events", vals) + self._simple_insert_txn( + txn, + "state_events", + vals, + or_replace=True, + ) self._simple_insert_txn( txn, @@ -246,9 +233,87 @@ class DataStore(RoomMemberStore, RoomStore, "room_id": event.room_id, "type": event.type, "state_key": event.state_key, - } + }, + or_replace=True, + ) + + for e_id, h in event.prev_state: + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": 1, + }, + or_ignore=True, + ) + + if not backfilled: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + + for hash_alg, hash_base64 in event.hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_event_content_hash_txn( + txn, event.event_id, hash_alg, hash_bytes, ) + if hasattr(event, "signatures"): + logger.debug("sigs: %s", event.signatures) + for name, sigs in event.signatures.items(): + for key_id, signature_base64 in sigs.items(): + signature_bytes = decode_base64(signature_base64) + self._store_event_signature_txn( + txn, event.event_id, name, key_id, + signature_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + or_ignore=True, + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + + self._update_min_depth_for_room_txn(txn, event.room_id, event.depth) + def _store_redaction(self, txn, event): txn.execute( "INSERT OR IGNORE INTO redactions " @@ -319,7 +384,7 @@ class DataStore(RoomMemberStore, RoomStore, ], ) - def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): + def snapshot_room(self, event): """Snapshot the room for an update by a user Args: room_id (synapse.types.RoomId): The room to snapshot. @@ -330,29 +395,33 @@ class DataStore(RoomMemberStore, RoomStore, synapse.storage.Snapshot: A snapshot of the state of the room. """ def _snapshot(txn): - membership_state = self._get_room_member(txn, user_id, room_id) - prev_pdus = self._get_latest_pdus_in_context( - txn, room_id + prev_events = self._get_latest_events_in_room( + txn, + event.room_id ) - if state_type is not None and state_key is not None: - prev_state_pdu = self._get_current_state_pdu( - txn, room_id, state_type, state_key + + prev_state = None + state_key = None + if hasattr(event, "state_key"): + state_key = event.state_key + prev_state = self._get_latest_state_in_room( + txn, + event.room_id, + type=event.type, + state_key=state_key, ) - else: - prev_state_pdu = None return Snapshot( store=self, - room_id=room_id, - user_id=user_id, - prev_pdus=prev_pdus, - membership_state=membership_state, - state_type=state_type, + room_id=event.room_id, + user_id=event.user_id, + prev_events=prev_events, + prev_state=prev_state, + state_type=event.type, state_key=state_key, - prev_state_pdu=prev_state_pdu, ) - return self.runInteraction(_snapshot) + return self.runInteraction("snapshot_room", _snapshot) class Snapshot(object): @@ -361,7 +430,7 @@ class Snapshot(object): store (DataStore): The datastore. room_id (RoomId): The room of the snapshot. user_id (UserId): The user this snapshot is for. - prev_pdus (list): The list of PDU ids this snapshot is after. + prev_events (list): The list of event ids this snapshot is after. membership_state (RoomMemberEvent): The current state of the user in the room. state_type (str, optional): State type captured by the snapshot @@ -370,32 +439,30 @@ class Snapshot(object): the previous value of the state type and key in the room. """ - def __init__(self, store, room_id, user_id, prev_pdus, - membership_state, state_type=None, state_key=None, - prev_state_pdu=None): + def __init__(self, store, room_id, user_id, prev_events, + prev_state, state_type=None, state_key=None): self.store = store self.room_id = room_id self.user_id = user_id - self.prev_pdus = prev_pdus - self.membership_state = membership_state + self.prev_events = prev_events + self.prev_state = prev_state self.state_type = state_type self.state_key = state_key - self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): - return - - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in self.prev_pdus - ] - - event.prev_events = [e for e in es if e != event.event_id] + if not hasattr(event, "prev_events"): + event.prev_events = [ + (event_id, hashes) + for event_id, hashes, _ in self.prev_events + ] + + if self.prev_events: + event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 + else: + event.depth = 0 - if self.prev_pdus: - event.depth = max([int(v) for _, _, v in self.prev_pdus]) + 1 - else: - event.depth = 0 + if not hasattr(event, "prev_state") and self.prev_state is not None: + event.prev_state = self.prev_state def schema_path(schema): @@ -436,11 +503,13 @@ def prepare_database(db_conn): user_version = row[0] if user_version > SCHEMA_VERSION: - raise ValueError("Cannot use this database as it is too " + + raise ValueError( + "Cannot use this database as it is too " + "new for the server to understand" ) elif user_version < SCHEMA_VERSION: - logging.info("Upgrading database from version %d", + logging.info( + "Upgrading database from version %d", user_version ) @@ -452,13 +521,13 @@ def prepare_database(db_conn): db_conn.commit() else: - sql_script = "BEGIN TRANSACTION;" + sql_script = "BEGIN TRANSACTION;\n" for sql_loc in SCHEMAS: sql_script += read_schema(sql_loc) + sql_script += "\n" sql_script += "COMMIT TRANSACTION;" c.executescript(sql_script) db_conn.commit() c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) c.close() - |