diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ad1765e04d..ac3bf5cee5 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -15,12 +15,8 @@
from twisted.internet import defer
-from synapse.api.events.room import (
- RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent,
- RoomRedactionEvent,
-)
-
from synapse.util.logutils import log_function
+from synapse.api.constants import EventTypes
from .directory import DirectoryStore
from .feedback import FeedbackStore
@@ -34,11 +30,13 @@ from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
from .pusher import PusherStore
+from .media_repository import MediaRepositoryStore
from .state import StateStore
from .signatures import SignatureStore
from syutil.base64util import decode_base64
+from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
@@ -63,7 +61,8 @@ SCHEMAS = [
"state",
"event_edges",
"event_signatures",
- "pusher"
+ "pusher",
+ "media_repository",
]
@@ -83,11 +82,13 @@ class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
- EventFederationStore, PusherStore, ):
+ EventFederationStore,
+ MediaRepositoryStore,
+ PusherStore,
+ ):
def __init__(self, hs):
super(DataStore, self).__init__(hs)
- self.event_factory = hs.get_event_factory()
self.hs = hs
self.min_token_deferred = self._get_min_token()
@@ -95,8 +96,8 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, backfilled=False, is_new_state=True,
- current_state=None):
+ def persist_event(self, event, context, backfilled=False,
+ is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
@@ -109,6 +110,7 @@ class DataStore(RoomMemberStore, RoomStore,
"persist_event",
self._persist_event_txn,
event=event,
+ context=context,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
@@ -119,50 +121,66 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
def get_event(self, event_id, allow_none=False):
- events_dict = yield self._simple_select_one(
- "events",
- {"event_id": event_id},
- [
- "event_id",
- "type",
- "room_id",
- "content",
- "unrecognized_keys",
- "depth",
- ],
- allow_none=allow_none,
- )
+ events = yield self._get_events([event_id])
- if not events_dict:
- defer.returnValue(None)
+ if not events:
+ if allow_none:
+ defer.returnValue(None)
+ else:
+ raise RuntimeError("Could not find event %s" % (event_id,))
- event = yield self._parse_events([events_dict])
- defer.returnValue(event[0])
+ defer.returnValue(events[0])
@log_function
- def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
- is_new_state=True, current_state=None):
- if event.type == RoomMemberEvent.TYPE:
+ def _persist_event_txn(self, txn, event, context, backfilled,
+ stream_ordering=None, is_new_state=True,
+ current_state=None):
+ if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
- elif event.type == FeedbackEvent.TYPE:
+ elif event.type == EventTypes.Feedback:
self._store_feedback_txn(txn, event)
- elif event.type == RoomNameEvent.TYPE:
+ elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
- elif event.type == RoomTopicEvent.TYPE:
+ elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
- elif event.type == RoomRedactionEvent.TYPE:
+ elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
outlier = False
- if hasattr(event, "outlier"):
- outlier = event.outlier
+ if hasattr(event.internal_metadata, "outlier"):
+ outlier = event.internal_metadata.outlier
+
+ event_dict = {
+ k: v
+ for k, v in event.get_dict().items()
+ if k not in [
+ "redacted",
+ "redacted_because",
+ ]
+ }
+
+ metadata_json = encode_canonical_json(
+ event.internal_metadata.get_dict()
+ )
+
+ self._simple_insert_txn(
+ txn,
+ table="event_json",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "internal_metadata": metadata_json.decode("UTF-8"),
+ "json": encode_canonical_json(event_dict).decode("UTF-8"),
+ },
+ or_replace=True,
+ )
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
- "content": json.dumps(event.content),
+ "content": json.dumps(event.get_dict()["content"]),
"processed": True,
"outlier": outlier,
"depth": event.depth,
@@ -173,7 +191,7 @@ class DataStore(RoomMemberStore, RoomStore,
unrec = {
k: v
- for k, v in event.get_full_dict().items()
+ for k, v in event.get_dict().items()
if k not in vals.keys() and k not in [
"redacted",
"redacted_because",
@@ -208,7 +226,8 @@ class DataStore(RoomMemberStore, RoomStore,
room_id=event.room_id,
)
- self._store_state_groups_txn(txn, event)
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
if current_state:
txn.execute(
@@ -302,16 +321,6 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, hash_alg, hash_bytes,
)
- if hasattr(event, "signatures"):
- logger.debug("sigs: %s", event.signatures)
- for name, sigs in event.signatures.items():
- for key_id, signature_base64 in sigs.items():
- signature_bytes = decode_base64(signature_base64)
- self._store_event_signature_txn(
- txn, event.event_id, name, key_id,
- signature_bytes,
- )
-
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
hash_bytes = decode_base64(hash_base64)
@@ -413,86 +422,6 @@ class DataStore(RoomMemberStore, RoomStore,
],
)
- def snapshot_room(self, event):
- """Snapshot the room for an update by a user
- Args:
- room_id (synapse.types.RoomId): The room to snapshot.
- user_id (synapse.types.UserId): The user to snapshot the room for.
- state_type (str): Optional state type to snapshot.
- state_key (str): Optional state key to snapshot.
- Returns:
- synapse.storage.Snapshot: A snapshot of the state of the room.
- """
- def _snapshot(txn):
- prev_events = self._get_latest_events_in_room(
- txn,
- event.room_id
- )
-
- prev_state = None
- state_key = None
- if hasattr(event, "state_key"):
- state_key = event.state_key
- prev_state = self._get_latest_state_in_room(
- txn,
- event.room_id,
- type=event.type,
- state_key=state_key,
- )
-
- return Snapshot(
- store=self,
- room_id=event.room_id,
- user_id=event.user_id,
- prev_events=prev_events,
- prev_state=prev_state,
- state_type=event.type,
- state_key=state_key,
- )
-
- return self.runInteraction("snapshot_room", _snapshot)
-
-
-class Snapshot(object):
- """Snapshot of the state of a room
- Args:
- store (DataStore): The datastore.
- room_id (RoomId): The room of the snapshot.
- user_id (UserId): The user this snapshot is for.
- prev_events (list): The list of event ids this snapshot is after.
- membership_state (RoomMemberEvent): The current state of the user in
- the room.
- state_type (str, optional): State type captured by the snapshot
- state_key (str, optional): State key captured by the snapshot
- prev_state_pdu (PduEntry, optional): pdu id of
- the previous value of the state type and key in the room.
- """
-
- def __init__(self, store, room_id, user_id, prev_events,
- prev_state, state_type=None, state_key=None):
- self.store = store
- self.room_id = room_id
- self.user_id = user_id
- self.prev_events = prev_events
- self.prev_state = prev_state
- self.state_type = state_type
- self.state_key = state_key
-
- def fill_out_prev_events(self, event):
- if not hasattr(event, "prev_events"):
- event.prev_events = [
- (event_id, hashes)
- for event_id, hashes, _ in self.prev_events
- ]
-
- if self.prev_events:
- event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
- else:
- event.depth = 0
-
- if not hasattr(event, "prev_state") and self.prev_state is not None:
- event.prev_state = self.prev_state
-
def schema_path(schema):
""" Get a filesystem path for the named database schema
@@ -520,6 +449,14 @@ def read_schema(schema):
return schema_file.read()
+class PrepareDatabaseException(Exception):
+ pass
+
+
+class UpgradeDatabaseException(PrepareDatabaseException):
+ pass
+
+
def prepare_database(db_conn):
""" Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
don't have to worry about overwriting existing content.
@@ -544,6 +481,10 @@ def prepare_database(db_conn):
# Run every version since after the current version.
for v in range(user_version + 1, SCHEMA_VERSION + 1):
+ if v == 10:
+ raise UpgradeDatabaseException(
+ "No delta for version 10"
+ )
sql_script = read_schema("delta/v%d" % (v))
c.executescript(sql_script)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index eb8cc4a9f3..efb2664680 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -15,15 +15,14 @@
import logging
from synapse.api.errors import StoreError
-from synapse.api.events.utils import prune_event
+from synapse.events import FrozenEvent
+from synapse.events.utils import prune_event
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
-from syutil.base64util import encode_base64
from twisted.internet import defer
import collections
-import copy
import json
import sys
import time
@@ -84,7 +83,6 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
self._db_pool = hs.get_db_pool()
- self.event_factory = hs.get_event_factory()
self._clock = hs.get_clock()
@defer.inlineCallbacks
@@ -481,123 +479,77 @@ class SQLBaseStore(object):
return self.runInteraction("_simple_max_id", func)
- def _parse_event_from_row(self, row_dict):
- d = copy.deepcopy({k: v for k, v in row_dict.items()})
-
- d.pop("stream_ordering", None)
- d.pop("topological_ordering", None)
- d.pop("processed", None)
- d["origin_server_ts"] = d.pop("ts", 0)
- replaces_state = d.pop("prev_state", None)
+ def _get_events(self, event_ids):
+ return self.runInteraction(
+ "_get_events", self._get_events_txn, event_ids
+ )
- if replaces_state:
- d["replaces_state"] = replaces_state
+ def _get_events_txn(self, txn, event_ids):
+ events = []
+ for e_id in event_ids:
+ ev = self._get_event_txn(txn, e_id)
- d.update(json.loads(row_dict["unrecognized_keys"]))
- d["content"] = json.loads(d["content"])
- del d["unrecognized_keys"]
+ if ev:
+ events.append(ev)
- if "age_ts" not in d:
- # For compatibility
- d["age_ts"] = d.get("origin_server_ts", 0)
+ return events
- return self.event_factory.create_event(
- etype=d["type"],
- **d
+ def _get_event_txn(self, txn, event_id, check_redacted=True,
+ get_prev_content=True):
+ sql = (
+ "SELECT internal_metadata, json, r.event_id FROM event_json as e "
+ "LEFT JOIN redactions as r ON e.event_id = r.redacts "
+ "WHERE e.event_id = ? "
+ "LIMIT 1 "
)
- def _get_events_txn(self, txn, event_ids):
- # FIXME (erikj): This should be batched?
+ txn.execute(sql, (event_id,))
- sql = "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
+ res = txn.fetchone()
- event_rows = []
- for e_id in event_ids:
- c = txn.execute(sql, (e_id,))
- event_rows.extend(self.cursor_to_dict(c))
+ if not res:
+ return None
- return self._parse_events_txn(txn, event_rows)
+ internal_metadata, js, redacted = res
- def _parse_events(self, rows):
- return self.runInteraction(
- "_parse_events", self._parse_events_txn, rows
- )
+ d = json.loads(js)
+ internal_metadata = json.loads(internal_metadata)
- def _parse_events_txn(self, txn, rows):
- events = [self._parse_event_from_row(r) for r in rows]
+ ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
- select_event_sql = (
- "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
- )
+ if check_redacted and redacted:
+ ev = prune_event(ev)
- for i, ev in enumerate(events):
- signatures = self._get_event_signatures_txn(
- txn, ev.event_id,
+ ev.unsigned["redacted_by"] = redacted
+ # Get the redaction event.
+
+ because = self._get_event_txn(
+ txn,
+ redacted,
+ check_redacted=False
)
- ev.signatures = {
- n: {
- k: encode_base64(v) for k, v in s.items()
- }
- for n, s in signatures.items()
- }
+ if because:
+ ev.unsigned["redacted_because"] = because
- hashes = self._get_event_content_hashes_txn(
- txn, ev.event_id,
- )
+ if get_prev_content and "replaces_state" in ev.unsigned:
+ ev.unsigned["prev_content"] = self._get_event_txn(
+ txn,
+ ev.unsigned["replaces_state"],
+ get_prev_content=False,
+ ).get_dict()["content"]
- ev.hashes = {
- k: encode_base64(v) for k, v in hashes.items()
- }
-
- prevs = self._get_prev_events_and_state(txn, ev.event_id)
-
- ev.prev_events = [
- (e_id, h)
- for e_id, h, is_state in prevs
- if is_state == 0
- ]
-
- ev.auth_events = self._get_auth_events(txn, ev.event_id)
-
- if hasattr(ev, "state_key"):
- ev.prev_state = [
- (e_id, h)
- for e_id, h, is_state in prevs
- if is_state == 1
- ]
-
- if hasattr(ev, "replaces_state"):
- # Load previous state_content.
- # FIXME (erikj): Handle multiple prev_states.
- cursor = txn.execute(
- select_event_sql,
- (ev.replaces_state,)
- )
- prevs = self.cursor_to_dict(cursor)
- if prevs:
- prev = self._parse_event_from_row(prevs[0])
- ev.prev_content = prev.content
-
- if not hasattr(ev, "redacted"):
- logger.debug("Doesn't have redacted key: %s", ev)
- ev.redacted = self._has_been_redacted_txn(txn, ev)
-
- if ev.redacted:
- # Get the redaction event.
- select_event_sql = "SELECT * FROM events WHERE event_id = ?"
- txn.execute(select_event_sql, (ev.redacted,))
-
- del_evs = self._parse_events_txn(
- txn, self.cursor_to_dict(txn)
- )
+ return ev
- if del_evs:
- ev = prune_event(ev)
- events[i] = ev
- ev.redacted_because = del_evs[0]
+ def _parse_events(self, rows):
+ return self.runInteraction(
+ "_parse_events", self._parse_events_txn, rows
+ )
- return events
+ def _parse_events_txn(self, txn, rows):
+ event_ids = [r["event_id"] for r in rows]
+
+ return self._get_events_txn(txn, event_ids)
def _has_been_redacted_txn(self, txn, event):
sql = "SELECT event_id FROM redactions WHERE redacts = ?"
@@ -695,7 +647,7 @@ class JoinHelper(object):
to dump the results into.
Attributes:
- taples (list): List of `Table` classes
+ tables (list): List of `Table` classes
EntryType (type)
"""
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 6c559f8f63..ced066f407 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -177,14 +177,15 @@ class EventFederationStore(SQLBaseStore):
retcols=["prev_event_id", "is_state"],
)
+ hashes = self._get_prev_event_hashes_txn(txn, event_id)
+
results = []
for d in res:
- hashes = self._get_event_reference_hashes_txn(
- txn,
- d["prev_event_id"]
- )
+ edge_hash = self._get_event_reference_hashes_txn(txn, d["prev_event_id"])
+ edge_hash.update(hashes.get(d["prev_event_id"], {}))
prev_hashes = {
- k: encode_base64(v) for k, v in hashes.items()
+ k: encode_base64(v)
+ for k, v in edge_hash.items()
if k == "sha256"
}
results.append((d["prev_event_id"], prev_hashes, d["is_state"]))
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
new file mode 100644
index 0000000000..18c068d3d9
--- /dev/null
+++ b/synapse/storage/media_repository.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from _base import SQLBaseStore
+
+
+class MediaRepositoryStore(SQLBaseStore):
+ """Persistence for attachments and avatars"""
+
+ def get_default_thumbnails(self, top_level_type, sub_type):
+ return []
+
+ def get_local_media(self, media_id):
+ """Get the metadata for a local piece of media
+ Returns:
+ None if the meia_id doesn't exist.
+ """
+ return self._simple_select_one(
+ "local_media_repository",
+ {"media_id": media_id},
+ ("media_type", "media_length", "upload_name", "created_ts"),
+ allow_none=True,
+ )
+
+ def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
+ media_length, user_id):
+ return self._simple_insert(
+ "local_media_repository",
+ {
+ "media_id": media_id,
+ "media_type": media_type,
+ "created_ts": time_now_ms,
+ "upload_name": upload_name,
+ "media_length": media_length,
+ "user_id": user_id.to_string(),
+ }
+ )
+
+ def get_local_media_thumbnails(self, media_id):
+ return self._simple_select_list(
+ "local_media_repository_thumbnails",
+ {"media_id": media_id},
+ (
+ "thumbnail_width", "thumbnail_height", "thumbnail_method",
+ "thumbnail_type", "thumbnail_length",
+ )
+ )
+
+ def store_local_thumbnail(self, media_id, thumbnail_width,
+ thumbnail_height, thumbnail_type,
+ thumbnail_method, thumbnail_length):
+ return self._simple_insert(
+ "local_media_repository_thumbnails",
+ {
+ "media_id": media_id,
+ "thumbnail_width": thumbnail_width,
+ "thumbnail_height": thumbnail_height,
+ "thumbnail_method": thumbnail_method,
+ "thumbnail_type": thumbnail_type,
+ "thumbnail_length": thumbnail_length,
+ }
+ )
+
+ def get_cached_remote_media(self, origin, media_id):
+ return self._simple_select_one(
+ "remote_media_cache",
+ {"media_origin": origin, "media_id": media_id},
+ (
+ "media_type", "media_length", "upload_name", "created_ts",
+ "filesystem_id",
+ ),
+ allow_none=True,
+ )
+
+ def store_cached_remote_media(self, origin, media_id, media_type,
+ media_length, time_now_ms, upload_name,
+ filesystem_id):
+ return self._simple_insert(
+ "remote_media_cache",
+ {
+ "media_origin": origin,
+ "media_id": media_id,
+ "media_type": media_type,
+ "media_length": media_length,
+ "created_ts": time_now_ms,
+ "upload_name": upload_name,
+ "filesystem_id": filesystem_id,
+ }
+ )
+
+ def get_remote_media_thumbnails(self, origin, media_id):
+ return self._simple_select_list(
+ "remote_media_cache_thumbnails",
+ {"media_origin": origin, "media_id": media_id},
+ (
+ "thumbnail_width", "thumbnail_height", "thumbnail_method",
+ "thumbnail_type", "thumbnail_length", "filesystem_id",
+ )
+ )
+
+ def store_remote_media_thumbnail(self, origin, media_id, filesystem_id,
+ thumbnail_width, thumbnail_height,
+ thumbnail_type, thumbnail_method,
+ thumbnail_length):
+ return self._simple_insert(
+ "remote_media_cache_thumbnails",
+ {
+ "media_origin": origin,
+ "media_id": media_id,
+ "thumbnail_width": thumbnail_width,
+ "thumbnail_height": thumbnail_height,
+ "thumbnail_method": thumbnail_method,
+ "thumbnail_type": thumbnail_type,
+ "thumbnail_length": thumbnail_length,
+ "filesystem_id": filesystem_id,
+ }
+ )
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..0af29733a0
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,79 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination TEXT PRIMARY KEY,
+ retry_last_ts INTEGER,
+ retry_interval INTEGER
+);
+
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+ media_id TEXT, -- The id used to refer to the media.
+ media_type TEXT, -- The MIME-type of the media.
+ media_length INTEGER, -- Length of the media in bytes.
+ created_ts INTEGER, -- When the content was uploaded in ms.
+ upload_name TEXT, -- The name the media was uploaded with.
+ user_id TEXT, -- The user who uploaded the file.
+ CONSTRAINT uniqueness UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+ media_id TEXT, -- The id used to refer to the media.
+ thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+ thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+ thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+ thumbnail_method TEXT, -- The method used to make the thumbnail.
+ thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+ CONSTRAINT uniqueness UNIQUE (
+ media_id, thumbnail_width, thumbnail_height, thumbnail_type
+ )
+);
+
+CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
+ ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+ media_origin TEXT, -- The remote HS the media came from.
+ media_id TEXT, -- The id used to refer to the media on that server.
+ media_type TEXT, -- The MIME-type of the media.
+ created_ts INTEGER, -- When the content was uploaded in ms.
+ upload_name TEXT, -- The name the media was uploaded with.
+ media_length INTEGER, -- Length of the media in bytes.
+ filesystem_id TEXT, -- The name used to store the media on disk.
+ CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+ media_origin TEXT, -- The remote HS the media came from.
+ media_id TEXT, -- The id used to refer to the media.
+ thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+ thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+ thumbnail_method TEXT, -- The method used to make the thumbnail
+ thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+ thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+ filesystem_id TEXT, -- The name used to store the media on disk.
+ CONSTRAINT uniqueness UNIQUE (
+ media_origin, media_id, thumbnail_width, thumbnail_height,
+ thumbnail_type, thumbnail_type
+ )
+);
+
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+ ON local_media_repository_thumbnails (media_id);
+
+
+PRAGMA user_version = 9;
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 8ba732a23b..253f9f779b 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -32,6 +32,19 @@ CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
+
+CREATE TABLE IF NOT EXISTS event_json(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ internal_metadata NOT NULL,
+ json BLOB NOT NULL,
+ CONSTRAINT ev_j_uniq UNIQUE (event_id)
+);
+
+CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id);
+CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
+
+
CREATE TABLE IF NOT EXISTS state_events(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
diff --git a/synapse/storage/schema/media_repository.sql b/synapse/storage/schema/media_repository.sql
new file mode 100644
index 0000000000..b785fa0208
--- /dev/null
+++ b/synapse/storage/schema/media_repository.sql
@@ -0,0 +1,68 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+ media_id TEXT, -- The id used to refer to the media.
+ media_type TEXT, -- The MIME-type of the media.
+ media_length INTEGER, -- Length of the media in bytes.
+ created_ts INTEGER, -- When the content was uploaded in ms.
+ upload_name TEXT, -- The name the media was uploaded with.
+ user_id TEXT, -- The user who uploaded the file.
+ CONSTRAINT uniqueness UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+ media_id TEXT, -- The id used to refer to the media.
+ thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+ thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+ thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+ thumbnail_method TEXT, -- The method used to make the thumbnail.
+ thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+ CONSTRAINT uniqueness UNIQUE (
+ media_id, thumbnail_width, thumbnail_height, thumbnail_type
+ )
+);
+
+CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
+ ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+ media_origin TEXT, -- The remote HS the media came from.
+ media_id TEXT, -- The id used to refer to the media on that server.
+ media_type TEXT, -- The MIME-type of the media.
+ created_ts INTEGER, -- When the content was uploaded in ms.
+ upload_name TEXT, -- The name the media was uploaded with.
+ media_length INTEGER, -- Length of the media in bytes.
+ filesystem_id TEXT, -- The name used to store the media on disk.
+ CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+ media_origin TEXT, -- The remote HS the media came from.
+ media_id TEXT, -- The id used to refer to the media.
+ thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+ thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+ thumbnail_method TEXT, -- The method used to make the thumbnail
+ thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+ thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+ filesystem_id TEXT, -- The name used to store the media on disk.
+ CONSTRAINT uniqueness UNIQUE (
+ media_origin, media_id, thumbnail_width, thumbnail_height,
+ thumbnail_type, thumbnail_type
+ )
+);
+
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+ ON local_media_repository_thumbnails (media_id);
diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/state.sql
index 44f7aafb27..2c48d6daca 100644
--- a/synapse/storage/schema/state.sql
+++ b/synapse/storage/schema/state.sql
@@ -29,7 +29,8 @@ CREATE TABLE IF NOT EXISTS state_groups_state(
CREATE TABLE IF NOT EXISTS event_to_state_groups(
event_id TEXT NOT NULL,
- state_group INTEGER NOT NULL
+ state_group INTEGER NOT NULL,
+ CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id)
);
CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index 88e3e4e04d..de461bfa15 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination TEXT PRIMARY KEY,
+ retry_last_ts INTEGER,
+ retry_interval INTEGER
+);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index eea4f21065..3a705119fd 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -13,8 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from _base import SQLBaseStore
+from syutil.base64util import encode_base64
+
class SignatureStore(SQLBaseStore):
"""Persistence for event signatures and hashes"""
@@ -67,6 +71,21 @@ class SignatureStore(SQLBaseStore):
f
)
+ @defer.inlineCallbacks
+ def add_event_hashes(self, event_ids):
+ hashes = yield self.get_event_reference_hashes(
+ event_ids
+ )
+ hashes = [
+ {
+ k: encode_base64(v) for k, v in h.items()
+ if k == "sha256"
+ }
+ for h in hashes
+ ]
+
+ defer.returnValue(zip(event_ids, hashes))
+
def _get_event_reference_hashes_txn(self, txn, event_id):
"""Get all the hashes for a given PDU.
Args:
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index e0f44b3e59..afe3e5edea 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -86,11 +86,16 @@ class StateStore(SQLBaseStore):
self._store_state_groups_txn, event
)
- def _store_state_groups_txn(self, txn, event):
- if event.state_events is None:
+ def _store_state_groups_txn(self, txn, event, context):
+ if context.current_state is None:
return
- state_group = event.state_group
+ state_events = context.current_state
+
+ if event.is_state():
+ state_events[(event.type, event.state_key)] = event
+
+ state_group = context.state_group
if not state_group:
state_group = self._simple_insert_txn(
txn,
@@ -102,7 +107,7 @@ class StateStore(SQLBaseStore):
or_ignore=True,
)
- for state in event.state_events.values():
+ for state in state_events.values():
self._simple_insert_txn(
txn,
table="state_groups_state",
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..423cc3f02a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
from collections import namedtuple
+from twisted.internet import defer
+
import logging
logger = logging.getLogger(__name__)
@@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
+ # a write-through cache of DestinationsTable.EntryType indexed by
+ # destination string
+ destination_retry_cache = {}
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore):
def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts):
- # First we find out what the prev_txs should be.
+ # First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore):
return ReceivedTransactionsTable.decode_results(txn.fetchall())
+ def get_destination_retry_timings(self, destination):
+ """Gets the current retry timings (if any) for a given destination.
+
+ Args:
+ destination (str)
+
+ Returns:
+ None if not retrying
+ Otherwise a DestinationsTable.EntryType for the retry scheme
+ """
+ if destination in self.destination_retry_cache:
+ return defer.succeed(self.destination_retry_cache[destination])
+
+ return self.runInteraction(
+ "get_destination_retry_timings",
+ self._get_destination_retry_timings, destination)
+
+ def _get_destination_retry_timings(cls, txn, destination):
+ query = DestinationsTable.select_statement("destination = ?")
+ txn.execute(query, (destination,))
+ result = txn.fetchall()
+ if result:
+ result = DestinationsTable.decode_single_result(result)
+ if result.retry_last_ts > 0:
+ return result
+ else:
+ return None
+
+ def set_destination_retry_timings(self, destination,
+ retry_last_ts, retry_interval):
+ """Sets the current retry timings for a given destination.
+ Both timings should be zero if retrying is no longer occuring.
+
+ Args:
+ destination (str)
+ retry_last_ts (int) - time of last retry attempt in unix epoch ms
+ retry_interval (int) - how long until next retry in ms
+ """
+
+ self.destination_retry_cache[destination] = (
+ DestinationsTable.EntryType(
+ destination,
+ retry_last_ts,
+ retry_interval
+ )
+ )
+
+ # XXX: we could chose to not bother persisting this if our cache thinks
+ # this is a NOOP
+ return self.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings,
+ destination,
+ retry_last_ts,
+ retry_interval,
+ )
+
+ def _set_destination_retry_timings(cls, txn, destination,
+ retry_last_ts, retry_interval):
+
+ query = (
+ "INSERT OR REPLACE INTO %s "
+ "(destination, retry_last_ts, retry_interval) "
+ "VALUES (?, ?, ?) "
+ ) % DestinationsTable.table_name
+
+ txn.execute(query, (destination, retry_last_ts, retry_interval))
+
+ def get_destinations_needing_retry(self):
+ """Get all destinations which are due a retry for sending a transaction.
+
+ Returns:
+ list: A list of `DestinationsTable.EntryType`
+ """
+
+ return self.runInteraction(
+ "get_destinations_needing_retry",
+ self._get_destinations_needing_retry
+ )
+
+ def _get_destinations_needing_retry(cls, txn):
+ where = "retry_last_ts > 0 and retry_next_ts < now()"
+ query = DestinationsTable.select_statement(where)
+ txn.execute(query)
+ return DestinationsTable.decode_results(txn.fetchall())
+
class ReceivedTransactionsTable(Table):
table_name = "received_transactions"
@@ -247,3 +339,15 @@ class TransactionsToPduTable(Table):
]
EntryType = namedtuple("TransactionsToPduEntry", fields)
+
+
+class DestinationsTable(Table):
+ table_name = "destinations"
+
+ fields = [
+ "destination",
+ "retry_last_ts",
+ "retry_interval",
+ ]
+
+ EntryType = namedtuple("DestinationsEntry", fields)
|