diff options
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r-- | synapse/storage/__init__.py | 199 |
1 files changed, 70 insertions, 129 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index ad1765e04d..ac3bf5cee5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -15,12 +15,8 @@ from twisted.internet import defer -from synapse.api.events.room import ( - RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent, - RoomRedactionEvent, -) - from synapse.util.logutils import log_function +from synapse.api.constants import EventTypes from .directory import DirectoryStore from .feedback import FeedbackStore @@ -34,11 +30,13 @@ from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore from .pusher import PusherStore +from .media_repository import MediaRepositoryStore from .state import StateStore from .signatures import SignatureStore from syutil.base64util import decode_base64 +from syutil.jsonutil import encode_canonical_json from synapse.crypto.event_signing import compute_event_reference_hash @@ -63,7 +61,8 @@ SCHEMAS = [ "state", "event_edges", "event_signatures", - "pusher" + "pusher", + "media_repository", ] @@ -83,11 +82,13 @@ class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, - EventFederationStore, PusherStore, ): + EventFederationStore, + MediaRepositoryStore, + PusherStore, + ): def __init__(self, hs): super(DataStore, self).__init__(hs) - self.event_factory = hs.get_event_factory() self.hs = hs self.min_token_deferred = self._get_min_token() @@ -95,8 +96,8 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks @log_function - def persist_event(self, event, backfilled=False, is_new_state=True, - current_state=None): + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): stream_ordering = None if backfilled: if not self.min_token_deferred.called: @@ -109,6 +110,7 @@ class DataStore(RoomMemberStore, RoomStore, "persist_event", self._persist_event_txn, event=event, + context=context, backfilled=backfilled, stream_ordering=stream_ordering, is_new_state=is_new_state, @@ -119,50 +121,66 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks def get_event(self, event_id, allow_none=False): - events_dict = yield self._simple_select_one( - "events", - {"event_id": event_id}, - [ - "event_id", - "type", - "room_id", - "content", - "unrecognized_keys", - "depth", - ], - allow_none=allow_none, - ) + events = yield self._get_events([event_id]) - if not events_dict: - defer.returnValue(None) + if not events: + if allow_none: + defer.returnValue(None) + else: + raise RuntimeError("Could not find event %s" % (event_id,)) - event = yield self._parse_events([events_dict]) - defer.returnValue(event[0]) + defer.returnValue(events[0]) @log_function - def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, - is_new_state=True, current_state=None): - if event.type == RoomMemberEvent.TYPE: + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) - elif event.type == FeedbackEvent.TYPE: + elif event.type == EventTypes.Feedback: self._store_feedback_txn(txn, event) - elif event.type == RoomNameEvent.TYPE: + elif event.type == EventTypes.Name: self._store_room_name_txn(txn, event) - elif event.type == RoomTopicEvent.TYPE: + elif event.type == EventTypes.Topic: self._store_room_topic_txn(txn, event) - elif event.type == RoomRedactionEvent.TYPE: + elif event.type == EventTypes.Redaction: self._store_redaction(txn, event) outlier = False - if hasattr(event, "outlier"): - outlier = event.outlier + if hasattr(event.internal_metadata, "outlier"): + outlier = event.internal_metadata.outlier + + event_dict = { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + self._simple_insert_txn( + txn, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": metadata_json.decode("UTF-8"), + "json": encode_canonical_json(event_dict).decode("UTF-8"), + }, + or_replace=True, + ) vals = { "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.content), + "content": json.dumps(event.get_dict()["content"]), "processed": True, "outlier": outlier, "depth": event.depth, @@ -173,7 +191,7 @@ class DataStore(RoomMemberStore, RoomStore, unrec = { k: v - for k, v in event.get_full_dict().items() + for k, v in event.get_dict().items() if k not in vals.keys() and k not in [ "redacted", "redacted_because", @@ -208,7 +226,8 @@ class DataStore(RoomMemberStore, RoomStore, room_id=event.room_id, ) - self._store_state_groups_txn(txn, event) + if not outlier: + self._store_state_groups_txn(txn, event, context) if current_state: txn.execute( @@ -302,16 +321,6 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, hash_alg, hash_bytes, ) - if hasattr(event, "signatures"): - logger.debug("sigs: %s", event.signatures) - for name, sigs in event.signatures.items(): - for key_id, signature_base64 in sigs.items(): - signature_bytes = decode_base64(signature_base64) - self._store_event_signature_txn( - txn, event.event_id, name, key_id, - signature_bytes, - ) - for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) @@ -413,86 +422,6 @@ class DataStore(RoomMemberStore, RoomStore, ], ) - def snapshot_room(self, event): - """Snapshot the room for an update by a user - Args: - room_id (synapse.types.RoomId): The room to snapshot. - user_id (synapse.types.UserId): The user to snapshot the room for. - state_type (str): Optional state type to snapshot. - state_key (str): Optional state key to snapshot. - Returns: - synapse.storage.Snapshot: A snapshot of the state of the room. - """ - def _snapshot(txn): - prev_events = self._get_latest_events_in_room( - txn, - event.room_id - ) - - prev_state = None - state_key = None - if hasattr(event, "state_key"): - state_key = event.state_key - prev_state = self._get_latest_state_in_room( - txn, - event.room_id, - type=event.type, - state_key=state_key, - ) - - return Snapshot( - store=self, - room_id=event.room_id, - user_id=event.user_id, - prev_events=prev_events, - prev_state=prev_state, - state_type=event.type, - state_key=state_key, - ) - - return self.runInteraction("snapshot_room", _snapshot) - - -class Snapshot(object): - """Snapshot of the state of a room - Args: - store (DataStore): The datastore. - room_id (RoomId): The room of the snapshot. - user_id (UserId): The user this snapshot is for. - prev_events (list): The list of event ids this snapshot is after. - membership_state (RoomMemberEvent): The current state of the user in - the room. - state_type (str, optional): State type captured by the snapshot - state_key (str, optional): State key captured by the snapshot - prev_state_pdu (PduEntry, optional): pdu id of - the previous value of the state type and key in the room. - """ - - def __init__(self, store, room_id, user_id, prev_events, - prev_state, state_type=None, state_key=None): - self.store = store - self.room_id = room_id - self.user_id = user_id - self.prev_events = prev_events - self.prev_state = prev_state - self.state_type = state_type - self.state_key = state_key - - def fill_out_prev_events(self, event): - if not hasattr(event, "prev_events"): - event.prev_events = [ - (event_id, hashes) - for event_id, hashes, _ in self.prev_events - ] - - if self.prev_events: - event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 - else: - event.depth = 0 - - if not hasattr(event, "prev_state") and self.prev_state is not None: - event.prev_state = self.prev_state - def schema_path(schema): """ Get a filesystem path for the named database schema @@ -520,6 +449,14 @@ def read_schema(schema): return schema_file.read() +class PrepareDatabaseException(Exception): + pass + + +class UpgradeDatabaseException(PrepareDatabaseException): + pass + + def prepare_database(db_conn): """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we don't have to worry about overwriting existing content. @@ -544,6 +481,10 @@ def prepare_database(db_conn): # Run every version since after the current version. for v in range(user_version + 1, SCHEMA_VERSION + 1): + if v == 10: + raise UpgradeDatabaseException( + "No delta for version 10" + ) sql_script = read_schema("delta/v%d" % (v)) c.executescript(sql_script) |