diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f15e3dfe62..c9ab434b4e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -33,6 +33,7 @@ from .stream import StreamStore
from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
+from .media_repository import MediaRepositoryStore
from .state import StateStore
from .signatures import SignatureStore
@@ -62,12 +63,13 @@ SCHEMAS = [
"state",
"event_edges",
"event_signatures",
+ "media_repository",
]
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 8
+SCHEMA_VERSION = 9
class _RollbackButIsFineException(Exception):
@@ -81,7 +83,9 @@ class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
- EventFederationStore, ):
+ EventFederationStore,
+ MediaRepositoryStore,
+ ):
def __init__(self, hs):
super(DataStore, self).__init__(hs)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..e72200e2f7 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -650,7 +650,7 @@ class JoinHelper(object):
to dump the results into.
Attributes:
- taples (list): List of `Table` classes
+ tables (list): List of `Table` classes
EntryType (type)
"""
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
new file mode 100644
index 0000000000..18c068d3d9
--- /dev/null
+++ b/synapse/storage/media_repository.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from _base import SQLBaseStore
+
+
+class MediaRepositoryStore(SQLBaseStore):
+ """Persistence for attachments and avatars"""
+
+ def get_default_thumbnails(self, top_level_type, sub_type):
+ return []
+
+ def get_local_media(self, media_id):
+ """Get the metadata for a local piece of media
+ Returns:
+ None if the meia_id doesn't exist.
+ """
+ return self._simple_select_one(
+ "local_media_repository",
+ {"media_id": media_id},
+ ("media_type", "media_length", "upload_name", "created_ts"),
+ allow_none=True,
+ )
+
+ def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
+ media_length, user_id):
+ return self._simple_insert(
+ "local_media_repository",
+ {
+ "media_id": media_id,
+ "media_type": media_type,
+ "created_ts": time_now_ms,
+ "upload_name": upload_name,
+ "media_length": media_length,
+ "user_id": user_id.to_string(),
+ }
+ )
+
+ def get_local_media_thumbnails(self, media_id):
+ return self._simple_select_list(
+ "local_media_repository_thumbnails",
+ {"media_id": media_id},
+ (
+ "thumbnail_width", "thumbnail_height", "thumbnail_method",
+ "thumbnail_type", "thumbnail_length",
+ )
+ )
+
+ def store_local_thumbnail(self, media_id, thumbnail_width,
+ thumbnail_height, thumbnail_type,
+ thumbnail_method, thumbnail_length):
+ return self._simple_insert(
+ "local_media_repository_thumbnails",
+ {
+ "media_id": media_id,
+ "thumbnail_width": thumbnail_width,
+ "thumbnail_height": thumbnail_height,
+ "thumbnail_method": thumbnail_method,
+ "thumbnail_type": thumbnail_type,
+ "thumbnail_length": thumbnail_length,
+ }
+ )
+
+ def get_cached_remote_media(self, origin, media_id):
+ return self._simple_select_one(
+ "remote_media_cache",
+ {"media_origin": origin, "media_id": media_id},
+ (
+ "media_type", "media_length", "upload_name", "created_ts",
+ "filesystem_id",
+ ),
+ allow_none=True,
+ )
+
+ def store_cached_remote_media(self, origin, media_id, media_type,
+ media_length, time_now_ms, upload_name,
+ filesystem_id):
+ return self._simple_insert(
+ "remote_media_cache",
+ {
+ "media_origin": origin,
+ "media_id": media_id,
+ "media_type": media_type,
+ "media_length": media_length,
+ "created_ts": time_now_ms,
+ "upload_name": upload_name,
+ "filesystem_id": filesystem_id,
+ }
+ )
+
+ def get_remote_media_thumbnails(self, origin, media_id):
+ return self._simple_select_list(
+ "remote_media_cache_thumbnails",
+ {"media_origin": origin, "media_id": media_id},
+ (
+ "thumbnail_width", "thumbnail_height", "thumbnail_method",
+ "thumbnail_type", "thumbnail_length", "filesystem_id",
+ )
+ )
+
+ def store_remote_media_thumbnail(self, origin, media_id, filesystem_id,
+ thumbnail_width, thumbnail_height,
+ thumbnail_type, thumbnail_method,
+ thumbnail_length):
+ return self._simple_insert(
+ "remote_media_cache_thumbnails",
+ {
+ "media_origin": origin,
+ "media_id": media_id,
+ "thumbnail_width": thumbnail_width,
+ "thumbnail_height": thumbnail_height,
+ "thumbnail_method": thumbnail_method,
+ "thumbnail_type": thumbnail_type,
+ "thumbnail_length": thumbnail_length,
+ "filesystem_id": filesystem_id,
+ }
+ )
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..ad680c64da
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,23 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination TEXT PRIMARY KEY,
+ retry_last_ts INTEGER,
+ retry_interval INTEGER
+);
+
+PRAGMA user_version = 9;
\ No newline at end of file
diff --git a/synapse/storage/schema/media_repository.sql b/synapse/storage/schema/media_repository.sql
new file mode 100644
index 0000000000..b785fa0208
--- /dev/null
+++ b/synapse/storage/schema/media_repository.sql
@@ -0,0 +1,68 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+ media_id TEXT, -- The id used to refer to the media.
+ media_type TEXT, -- The MIME-type of the media.
+ media_length INTEGER, -- Length of the media in bytes.
+ created_ts INTEGER, -- When the content was uploaded in ms.
+ upload_name TEXT, -- The name the media was uploaded with.
+ user_id TEXT, -- The user who uploaded the file.
+ CONSTRAINT uniqueness UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+ media_id TEXT, -- The id used to refer to the media.
+ thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+ thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+ thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+ thumbnail_method TEXT, -- The method used to make the thumbnail.
+ thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+ CONSTRAINT uniqueness UNIQUE (
+ media_id, thumbnail_width, thumbnail_height, thumbnail_type
+ )
+);
+
+CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
+ ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+ media_origin TEXT, -- The remote HS the media came from.
+ media_id TEXT, -- The id used to refer to the media on that server.
+ media_type TEXT, -- The MIME-type of the media.
+ created_ts INTEGER, -- When the content was uploaded in ms.
+ upload_name TEXT, -- The name the media was uploaded with.
+ media_length INTEGER, -- Length of the media in bytes.
+ filesystem_id TEXT, -- The name used to store the media on disk.
+ CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+ media_origin TEXT, -- The remote HS the media came from.
+ media_id TEXT, -- The id used to refer to the media.
+ thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+ thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+ thumbnail_method TEXT, -- The method used to make the thumbnail
+ thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+ thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+ filesystem_id TEXT, -- The name used to store the media on disk.
+ CONSTRAINT uniqueness UNIQUE (
+ media_origin, media_id, thumbnail_width, thumbnail_height,
+ thumbnail_type, thumbnail_type
+ )
+);
+
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+ ON local_media_repository_thumbnails (media_id);
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index 88e3e4e04d..de461bfa15 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination TEXT PRIMARY KEY,
+ retry_last_ts INTEGER,
+ retry_interval INTEGER
+);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..423cc3f02a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
from collections import namedtuple
+from twisted.internet import defer
+
import logging
logger = logging.getLogger(__name__)
@@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
+ # a write-through cache of DestinationsTable.EntryType indexed by
+ # destination string
+ destination_retry_cache = {}
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore):
def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts):
- # First we find out what the prev_txs should be.
+ # First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore):
return ReceivedTransactionsTable.decode_results(txn.fetchall())
+ def get_destination_retry_timings(self, destination):
+ """Gets the current retry timings (if any) for a given destination.
+
+ Args:
+ destination (str)
+
+ Returns:
+ None if not retrying
+ Otherwise a DestinationsTable.EntryType for the retry scheme
+ """
+ if destination in self.destination_retry_cache:
+ return defer.succeed(self.destination_retry_cache[destination])
+
+ return self.runInteraction(
+ "get_destination_retry_timings",
+ self._get_destination_retry_timings, destination)
+
+ def _get_destination_retry_timings(cls, txn, destination):
+ query = DestinationsTable.select_statement("destination = ?")
+ txn.execute(query, (destination,))
+ result = txn.fetchall()
+ if result:
+ result = DestinationsTable.decode_single_result(result)
+ if result.retry_last_ts > 0:
+ return result
+ else:
+ return None
+
+ def set_destination_retry_timings(self, destination,
+ retry_last_ts, retry_interval):
+ """Sets the current retry timings for a given destination.
+ Both timings should be zero if retrying is no longer occuring.
+
+ Args:
+ destination (str)
+ retry_last_ts (int) - time of last retry attempt in unix epoch ms
+ retry_interval (int) - how long until next retry in ms
+ """
+
+ self.destination_retry_cache[destination] = (
+ DestinationsTable.EntryType(
+ destination,
+ retry_last_ts,
+ retry_interval
+ )
+ )
+
+ # XXX: we could chose to not bother persisting this if our cache thinks
+ # this is a NOOP
+ return self.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings,
+ destination,
+ retry_last_ts,
+ retry_interval,
+ )
+
+ def _set_destination_retry_timings(cls, txn, destination,
+ retry_last_ts, retry_interval):
+
+ query = (
+ "INSERT OR REPLACE INTO %s "
+ "(destination, retry_last_ts, retry_interval) "
+ "VALUES (?, ?, ?) "
+ ) % DestinationsTable.table_name
+
+ txn.execute(query, (destination, retry_last_ts, retry_interval))
+
+ def get_destinations_needing_retry(self):
+ """Get all destinations which are due a retry for sending a transaction.
+
+ Returns:
+ list: A list of `DestinationsTable.EntryType`
+ """
+
+ return self.runInteraction(
+ "get_destinations_needing_retry",
+ self._get_destinations_needing_retry
+ )
+
+ def _get_destinations_needing_retry(cls, txn):
+ where = "retry_last_ts > 0 and retry_next_ts < now()"
+ query = DestinationsTable.select_statement(where)
+ txn.execute(query)
+ return DestinationsTable.decode_results(txn.fetchall())
+
class ReceivedTransactionsTable(Table):
table_name = "received_transactions"
@@ -247,3 +339,15 @@ class TransactionsToPduTable(Table):
]
EntryType = namedtuple("TransactionsToPduEntry", fields)
+
+
+class DestinationsTable(Table):
+ table_name = "destinations"
+
+ fields = [
+ "destination",
+ "retry_last_ts",
+ "retry_interval",
+ ]
+
+ EntryType = namedtuple("DestinationsEntry", fields)
|