diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ad1765e04d..ac3bf5cee5 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -15,12 +15,8 @@
from twisted.internet import defer
-from synapse.api.events.room import (
- RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent,
- RoomRedactionEvent,
-)
-
from synapse.util.logutils import log_function
+from synapse.api.constants import EventTypes
from .directory import DirectoryStore
from .feedback import FeedbackStore
@@ -34,11 +30,13 @@ from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
from .pusher import PusherStore
+from .media_repository import MediaRepositoryStore
from .state import StateStore
from .signatures import SignatureStore
from syutil.base64util import decode_base64
+from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
@@ -63,7 +61,8 @@ SCHEMAS = [
"state",
"event_edges",
"event_signatures",
- "pusher"
+ "pusher",
+ "media_repository",
]
@@ -83,11 +82,13 @@ class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
- EventFederationStore, PusherStore, ):
+ EventFederationStore,
+ MediaRepositoryStore,
+ PusherStore,
+ ):
def __init__(self, hs):
super(DataStore, self).__init__(hs)
- self.event_factory = hs.get_event_factory()
self.hs = hs
self.min_token_deferred = self._get_min_token()
@@ -95,8 +96,8 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, backfilled=False, is_new_state=True,
- current_state=None):
+ def persist_event(self, event, context, backfilled=False,
+ is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
@@ -109,6 +110,7 @@ class DataStore(RoomMemberStore, RoomStore,
"persist_event",
self._persist_event_txn,
event=event,
+ context=context,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
@@ -119,50 +121,66 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
def get_event(self, event_id, allow_none=False):
- events_dict = yield self._simple_select_one(
- "events",
- {"event_id": event_id},
- [
- "event_id",
- "type",
- "room_id",
- "content",
- "unrecognized_keys",
- "depth",
- ],
- allow_none=allow_none,
- )
+ events = yield self._get_events([event_id])
- if not events_dict:
- defer.returnValue(None)
+ if not events:
+ if allow_none:
+ defer.returnValue(None)
+ else:
+ raise RuntimeError("Could not find event %s" % (event_id,))
- event = yield self._parse_events([events_dict])
- defer.returnValue(event[0])
+ defer.returnValue(events[0])
@log_function
- def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
- is_new_state=True, current_state=None):
- if event.type == RoomMemberEvent.TYPE:
+ def _persist_event_txn(self, txn, event, context, backfilled,
+ stream_ordering=None, is_new_state=True,
+ current_state=None):
+ if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
- elif event.type == FeedbackEvent.TYPE:
+ elif event.type == EventTypes.Feedback:
self._store_feedback_txn(txn, event)
- elif event.type == RoomNameEvent.TYPE:
+ elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
- elif event.type == RoomTopicEvent.TYPE:
+ elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
- elif event.type == RoomRedactionEvent.TYPE:
+ elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
outlier = False
- if hasattr(event, "outlier"):
- outlier = event.outlier
+ if hasattr(event.internal_metadata, "outlier"):
+ outlier = event.internal_metadata.outlier
+
+ event_dict = {
+ k: v
+ for k, v in event.get_dict().items()
+ if k not in [
+ "redacted",
+ "redacted_because",
+ ]
+ }
+
+ metadata_json = encode_canonical_json(
+ event.internal_metadata.get_dict()
+ )
+
+ self._simple_insert_txn(
+ txn,
+ table="event_json",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "internal_metadata": metadata_json.decode("UTF-8"),
+ "json": encode_canonical_json(event_dict).decode("UTF-8"),
+ },
+ or_replace=True,
+ )
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
- "content": json.dumps(event.content),
+ "content": json.dumps(event.get_dict()["content"]),
"processed": True,
"outlier": outlier,
"depth": event.depth,
@@ -173,7 +191,7 @@ class DataStore(RoomMemberStore, RoomStore,
unrec = {
k: v
- for k, v in event.get_full_dict().items()
+ for k, v in event.get_dict().items()
if k not in vals.keys() and k not in [
"redacted",
"redacted_because",
@@ -208,7 +226,8 @@ class DataStore(RoomMemberStore, RoomStore,
room_id=event.room_id,
)
- self._store_state_groups_txn(txn, event)
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
if current_state:
txn.execute(
@@ -302,16 +321,6 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, hash_alg, hash_bytes,
)
- if hasattr(event, "signatures"):
- logger.debug("sigs: %s", event.signatures)
- for name, sigs in event.signatures.items():
- for key_id, signature_base64 in sigs.items():
- signature_bytes = decode_base64(signature_base64)
- self._store_event_signature_txn(
- txn, event.event_id, name, key_id,
- signature_bytes,
- )
-
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
hash_bytes = decode_base64(hash_base64)
@@ -413,86 +422,6 @@ class DataStore(RoomMemberStore, RoomStore,
],
)
- def snapshot_room(self, event):
- """Snapshot the room for an update by a user
- Args:
- room_id (synapse.types.RoomId): The room to snapshot.
- user_id (synapse.types.UserId): The user to snapshot the room for.
- state_type (str): Optional state type to snapshot.
- state_key (str): Optional state key to snapshot.
- Returns:
- synapse.storage.Snapshot: A snapshot of the state of the room.
- """
- def _snapshot(txn):
- prev_events = self._get_latest_events_in_room(
- txn,
- event.room_id
- )
-
- prev_state = None
- state_key = None
- if hasattr(event, "state_key"):
- state_key = event.state_key
- prev_state = self._get_latest_state_in_room(
- txn,
- event.room_id,
- type=event.type,
- state_key=state_key,
- )
-
- return Snapshot(
- store=self,
- room_id=event.room_id,
- user_id=event.user_id,
- prev_events=prev_events,
- prev_state=prev_state,
- state_type=event.type,
- state_key=state_key,
- )
-
- return self.runInteraction("snapshot_room", _snapshot)
-
-
-class Snapshot(object):
- """Snapshot of the state of a room
- Args:
- store (DataStore): The datastore.
- room_id (RoomId): The room of the snapshot.
- user_id (UserId): The user this snapshot is for.
- prev_events (list): The list of event ids this snapshot is after.
- membership_state (RoomMemberEvent): The current state of the user in
- the room.
- state_type (str, optional): State type captured by the snapshot
- state_key (str, optional): State key captured by the snapshot
- prev_state_pdu (PduEntry, optional): pdu id of
- the previous value of the state type and key in the room.
- """
-
- def __init__(self, store, room_id, user_id, prev_events,
- prev_state, state_type=None, state_key=None):
- self.store = store
- self.room_id = room_id
- self.user_id = user_id
- self.prev_events = prev_events
- self.prev_state = prev_state
- self.state_type = state_type
- self.state_key = state_key
-
- def fill_out_prev_events(self, event):
- if not hasattr(event, "prev_events"):
- event.prev_events = [
- (event_id, hashes)
- for event_id, hashes, _ in self.prev_events
- ]
-
- if self.prev_events:
- event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
- else:
- event.depth = 0
-
- if not hasattr(event, "prev_state") and self.prev_state is not None:
- event.prev_state = self.prev_state
-
def schema_path(schema):
""" Get a filesystem path for the named database schema
@@ -520,6 +449,14 @@ def read_schema(schema):
return schema_file.read()
+class PrepareDatabaseException(Exception):
+ pass
+
+
+class UpgradeDatabaseException(PrepareDatabaseException):
+ pass
+
+
def prepare_database(db_conn):
""" Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
don't have to worry about overwriting existing content.
@@ -544,6 +481,10 @@ def prepare_database(db_conn):
# Run every version since after the current version.
for v in range(user_version + 1, SCHEMA_VERSION + 1):
+ if v == 10:
+ raise UpgradeDatabaseException(
+ "No delta for version 10"
+ )
sql_script = read_schema("delta/v%d" % (v))
c.executescript(sql_script)
|