diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 6baa474931..25ea77738a 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -16,6 +16,8 @@
from ._base import Config, ConfigError
from collections import namedtuple
+from synapse.util.module_loader import load_module
+
MISSING_NETADDR = (
"Missing netaddr library. This is required for URL preview API."
@@ -36,6 +38,14 @@ ThumbnailRequirement = namedtuple(
"ThumbnailRequirement", ["width", "height", "method", "media_type"]
)
+MediaStorageProviderConfig = namedtuple(
+ "MediaStorageProviderConfig", (
+ "store_local", # Whether to store newly uploaded local files
+ "store_remote", # Whether to store newly downloaded remote files
+ "store_synchronous", # Whether to wait for successful storage for local uploads
+ ),
+)
+
def parse_thumbnail_requirements(thumbnail_sizes):
""" Takes a list of dictionaries with "width", "height", and "method" keys
@@ -73,16 +83,61 @@ class ContentRepositoryConfig(Config):
self.media_store_path = self.ensure_directory(config["media_store_path"])
- self.backup_media_store_path = config.get("backup_media_store_path")
- if self.backup_media_store_path:
- self.backup_media_store_path = self.ensure_directory(
- self.backup_media_store_path
- )
+ backup_media_store_path = config.get("backup_media_store_path")
- self.synchronous_backup_media_store = config.get(
+ synchronous_backup_media_store = config.get(
"synchronous_backup_media_store", False
)
+ storage_providers = config.get("media_storage_providers", [])
+
+ if backup_media_store_path:
+ if storage_providers:
+ raise ConfigError(
+ "Cannot use both 'backup_media_store_path' and 'storage_providers'"
+ )
+
+ storage_providers = [{
+ "module": "file_system",
+ "store_local": True,
+ "store_synchronous": synchronous_backup_media_store,
+ "store_remote": True,
+ "config": {
+ "directory": backup_media_store_path,
+ }
+ }]
+
+ # This is a list of config that can be used to create the storage
+ # providers. The entries are tuples of (Class, class_config,
+ # MediaStorageProviderConfig), where Class is the class of the provider,
+ # the class_config the config to pass to it, and
+ # MediaStorageProviderConfig are options for StorageProviderWrapper.
+ #
+ # We don't create the storage providers here as not all workers need
+ # them to be started.
+ self.media_storage_providers = []
+
+ for provider_config in storage_providers:
+ # We special case the module "file_system" so as not to need to
+ # expose FileStorageProviderBackend
+ if provider_config["module"] == "file_system":
+ provider_config["module"] = (
+ "synapse.rest.media.v1.storage_provider"
+ ".FileStorageProviderBackend"
+ )
+
+ provider_class, parsed_config = load_module(provider_config)
+
+ wrapper_config = MediaStorageProviderConfig(
+ provider_config.get("store_local", False),
+ provider_config.get("store_remote", False),
+ provider_config.get("store_synchronous", False),
+ )
+
+ self.media_storage_providers.append(
+ (provider_class, parsed_config, wrapper_config,)
+ )
+
self.uploads_path = self.ensure_directory(config["uploads_path"])
self.dynamic_thumbnails = config["dynamic_thumbnails"]
self.thumbnail_requirements = parse_thumbnail_requirements(
@@ -127,13 +182,19 @@ class ContentRepositoryConfig(Config):
# Directory where uploaded images and attachments are stored.
media_store_path: "%(media_store)s"
- # A secondary directory where uploaded images and attachments are
- # stored as a backup.
- # backup_media_store_path: "%(media_store)s"
-
- # Whether to wait for successful write to backup media store before
- # returning successfully.
- # synchronous_backup_media_store: false
+ # Media storage providers allow media to be stored in different
+ # locations.
+ # media_storage_providers:
+ # - module: file_system
+ # # Whether to write new local files.
+ # store_local: false
+ # # Whether to write new remote media
+ # store_remote: false
+ # # Whether to block upload requests waiting for write to this
+ # # provider to complete
+ # store_synchronous: false
+ # config:
+ # directory: /mnt/some/other/directory
# Directory where in-progress uploads are stored.
uploads_path: "%(uploads_path)s"
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index f480aae614..1e783e5ff4 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -15,6 +15,9 @@
from itertools import chain
+import logging
+
+logger = logging.getLogger(__name__)
def flatten(items):
@@ -153,7 +156,11 @@ class CallbackMetric(BaseMetric):
self.callback = callback
def render(self):
- value = self.callback()
+ try:
+ value = self.callback()
+ except Exception:
+ logger.exception("Failed to render %s", self.name)
+ return ["# FAILED to render " + self.name]
if self.is_scalar():
return list(self._render_for_labels([], value))
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index b2c76440b7..4f56bcf577 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -27,9 +27,7 @@ from .identicon_resource import IdenticonResource
from .preview_url_resource import PreviewUrlResource
from .filepath import MediaFilePaths
from .thumbnailer import Thumbnailer
-from .storage_provider import (
- StorageProviderWrapper, FileStorageProviderBackend,
-)
+from .storage_provider import StorageProviderWrapper
from .media_storage import MediaStorage
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
@@ -81,17 +79,13 @@ class MediaRepository(object):
# potentially upload to.
storage_providers = []
- # TODO: Move this into config and allow other storage providers to be
- # defined.
- if hs.config.backup_media_store_path:
- backend = FileStorageProviderBackend(
- self.primary_base_path, hs.config.backup_media_store_path,
- )
+ for clz, provider_config, wrapper_config in hs.config.media_storage_providers:
+ backend = clz(hs, provider_config)
provider = StorageProviderWrapper(
backend,
- store=True,
- store_synchronous=hs.config.synchronous_backup_media_store,
- store_remote=True,
+ store_local=wrapper_config.store_local,
+ store_remote=wrapper_config.store_remote,
+ store_synchronous=wrapper_config.store_synchronous,
)
storage_providers.append(provider)
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 001e84578e..041ae396cd 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -164,6 +164,14 @@ class MediaStorage(object):
str
"""
if file_info.url_cache:
+ if file_info.thumbnail:
+ return self.filepaths.url_cache_thumbnail_rel(
+ media_id=file_info.file_id,
+ width=file_info.thumbnail_width,
+ height=file_info.thumbnail_height,
+ content_type=file_info.thumbnail_type,
+ method=file_info.thumbnail_method,
+ )
return self.filepaths.url_cache_filepath_rel(file_info.file_id)
if file_info.server_name:
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 2ad602e101..c188192f2b 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,6 +17,7 @@ from twisted.internet import defer, threads
from .media_storage import FileResponder
+from synapse.config._base import Config
from synapse.util.logcontext import preserve_fn
import logging
@@ -64,19 +65,19 @@ class StorageProviderWrapper(StorageProvider):
Args:
backend (StorageProvider)
- store (bool): Whether to store new files or not.
+ store_local (bool): Whether to store new local files or not.
store_synchronous (bool): Whether to wait for file to be successfully
uploaded, or todo the upload in the backgroud.
store_remote (bool): Whether remote media should be uploaded
"""
- def __init__(self, backend, store, store_synchronous, store_remote):
+ def __init__(self, backend, store_local, store_synchronous, store_remote):
self.backend = backend
- self.store = store
+ self.store_local = store_local
self.store_synchronous = store_synchronous
self.store_remote = store_remote
def store_file(self, path, file_info):
- if not self.store:
+ if not file_info.server_name and not self.store_local:
return defer.succeed(None)
if file_info.server_name and not self.store_remote:
@@ -97,13 +98,13 @@ class FileStorageProviderBackend(StorageProvider):
"""A storage provider that stores files in a directory on a filesystem.
Args:
- cache_directory (str): Base path of the local media repository
- base_directory (str): Base path to store new files
+ hs (HomeServer)
+ config: The config returned by `parse_config`.
"""
- def __init__(self, cache_directory, base_directory):
- self.cache_directory = cache_directory
- self.base_directory = base_directory
+ def __init__(self, hs, config):
+ self.cache_directory = hs.config.media_store_path
+ self.base_directory = config
def store_file(self, path, file_info):
"""See StorageProvider.store_file"""
@@ -125,3 +126,15 @@ class FileStorageProviderBackend(StorageProvider):
backup_fname = os.path.join(self.base_directory, path)
if os.path.isfile(backup_fname):
return FileResponder(open(backup_fname, "rb"))
+
+ @staticmethod
+ def parse_config(config):
+ """Called on startup to parse config supplied. This should parse
+ the config and raise if there is a problem.
+
+ The returned value is passed into the constructor.
+
+ In this case we only care about a single param, the directory, so let's
+ just pull that out.
+ """
+ return Config.ensure_directory(config["directory"])
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index c09f2dec41..12e84a2b7c 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -67,7 +67,7 @@ class ThumbnailResource(Resource):
yield self._respond_local_thumbnail(
request, media_id, width, height, method, m_type
)
- self.media_repo.mark_recently_accessed(server_name, media_id)
+ self.media_repo.mark_recently_accessed(None, media_id)
else:
if self.dynamic_thumbnails:
yield self._select_or_generate_remote_thumbnail(
@@ -79,7 +79,7 @@ class ThumbnailResource(Resource):
request, server_name, media_id,
width, height, method, m_type
)
- self.media_repo.mark_recently_accessed(None, media_id)
+ self.media_repo.mark_recently_accessed(server_name, media_id)
@defer.inlineCallbacks
def _respond_local_thumbnail(self, request, media_id, width, height,
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
new file mode 100644
index 0000000000..90a2608d6f
--- /dev/null
+++ b/synapse/util/file_consumer.py
@@ -0,0 +1,139 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import threads, reactor
+
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+
+import Queue
+
+
+class BackgroundFileConsumer(object):
+ """A consumer that writes to a file like object. Supports both push
+ and pull producers
+
+ Args:
+ file_obj (file): The file like object to write to. Closed when
+ finished.
+ """
+
+ # For PushProducers pause if we have this many unwritten slices
+ _PAUSE_ON_QUEUE_SIZE = 5
+ # And resume once the size of the queue is less than this
+ _RESUME_ON_QUEUE_SIZE = 2
+
+ def __init__(self, file_obj):
+ self._file_obj = file_obj
+
+ # Producer we're registered with
+ self._producer = None
+
+ # True if PushProducer, false if PullProducer
+ self.streaming = False
+
+ # For PushProducers, indicates whether we've paused the producer and
+ # need to call resumeProducing before we get more data.
+ self._paused_producer = False
+
+ # Queue of slices of bytes to be written. When producer calls
+ # unregister a final None is sent.
+ self._bytes_queue = Queue.Queue()
+
+ # Deferred that is resolved when finished writing
+ self._finished_deferred = None
+
+ # If the _writer thread throws an exception it gets stored here.
+ self._write_exception = None
+
+ def registerProducer(self, producer, streaming):
+ """Part of IConsumer interface
+
+ Args:
+ producer (IProducer)
+ streaming (bool): True if push based producer, False if pull
+ based.
+ """
+ if self._producer:
+ raise Exception("registerProducer called twice")
+
+ self._producer = producer
+ self.streaming = streaming
+ self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
+ if not streaming:
+ self._producer.resumeProducing()
+
+ def unregisterProducer(self):
+ """Part of IProducer interface
+ """
+ self._producer = None
+ if not self._finished_deferred.called:
+ self._bytes_queue.put_nowait(None)
+
+ def write(self, bytes):
+ """Part of IProducer interface
+ """
+ if self._write_exception:
+ raise self._write_exception
+
+ if self._finished_deferred.called:
+ raise Exception("consumer has closed")
+
+ self._bytes_queue.put_nowait(bytes)
+
+ # If this is a PushProducer and the queue is getting behind
+ # then we pause the producer.
+ if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
+ self._paused_producer = True
+ self._producer.pauseProducing()
+
+ def _writer(self):
+ """This is run in a background thread to write to the file.
+ """
+ try:
+ while self._producer or not self._bytes_queue.empty():
+ # If we've paused the producer check if we should resume the
+ # producer.
+ if self._producer and self._paused_producer:
+ if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
+ reactor.callFromThread(self._resume_paused_producer)
+
+ bytes = self._bytes_queue.get()
+
+ # If we get a None (or empty list) then that's a signal used
+ # to indicate we should check if we should stop.
+ if bytes:
+ self._file_obj.write(bytes)
+
+ # If its a pull producer then we need to explicitly ask for
+ # more stuff.
+ if not self.streaming and self._producer:
+ reactor.callFromThread(self._producer.resumeProducing)
+ except Exception as e:
+ self._write_exception = e
+ raise
+ finally:
+ self._file_obj.close()
+
+ def wait(self):
+ """Returns a deferred that resolves when finished writing to file
+ """
+ return make_deferred_yieldable(self._finished_deferred)
+
+ def _resume_paused_producer(self):
+ """Gets called if we should resume producing after being paused
+ """
+ if self._paused_producer and self._producer:
+ self._paused_producer = False
+ self._producer.resumeProducing()
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 059bb7fedf..e4b5687a4b 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
# total number of times we have hit this block
-response_count = metrics.register_counter(
+block_counter = metrics.register_counter(
"block_count",
labels=["block_name"],
alternative_names=(
@@ -76,7 +76,7 @@ block_db_txn_count = metrics.register_counter(
block_db_txn_duration = metrics.register_counter(
"block_db_txn_duration_seconds", labels=["block_name"],
alternative_names=(
- metrics.name_prefix + "_block_db_txn_count:total",
+ metrics.name_prefix + "_block_db_txn_duration:total",
),
)
@@ -131,6 +131,8 @@ class Measure(object):
return
duration = self.clock.time_msec() - self.start
+
+ block_counter.inc(self.name)
block_timer.inc_by(duration, self.name)
context = LoggingContext.current_context()
diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py
new file mode 100644
index 0000000000..76e2234255
--- /dev/null
+++ b/tests/util/test_file_consumer.py
@@ -0,0 +1,176 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer, reactor
+from mock import NonCallableMock
+
+from synapse.util.file_consumer import BackgroundFileConsumer
+
+from tests import unittest
+from StringIO import StringIO
+
+import threading
+
+
+class FileConsumerTests(unittest.TestCase):
+
+ @defer.inlineCallbacks
+ def test_pull_consumer(self):
+ string_file = StringIO()
+ consumer = BackgroundFileConsumer(string_file)
+
+ try:
+ producer = DummyPullProducer()
+
+ yield producer.register_with_consumer(consumer)
+
+ yield producer.write_and_wait("Foo")
+
+ self.assertEqual(string_file.getvalue(), "Foo")
+
+ yield producer.write_and_wait("Bar")
+
+ self.assertEqual(string_file.getvalue(), "FooBar")
+ finally:
+ consumer.unregisterProducer()
+
+ yield consumer.wait()
+
+ self.assertTrue(string_file.closed)
+
+ @defer.inlineCallbacks
+ def test_push_consumer(self):
+ string_file = BlockingStringWrite()
+ consumer = BackgroundFileConsumer(string_file)
+
+ try:
+ producer = NonCallableMock(spec_set=[])
+
+ consumer.registerProducer(producer, True)
+
+ consumer.write("Foo")
+ yield string_file.wait_for_n_writes(1)
+
+ self.assertEqual(string_file.buffer, "Foo")
+
+ consumer.write("Bar")
+ yield string_file.wait_for_n_writes(2)
+
+ self.assertEqual(string_file.buffer, "FooBar")
+ finally:
+ consumer.unregisterProducer()
+
+ yield consumer.wait()
+
+ self.assertTrue(string_file.closed)
+
+ @defer.inlineCallbacks
+ def test_push_producer_feedback(self):
+ string_file = BlockingStringWrite()
+ consumer = BackgroundFileConsumer(string_file)
+
+ try:
+ producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"])
+
+ resume_deferred = defer.Deferred()
+ producer.resumeProducing.side_effect = lambda: resume_deferred.callback(None)
+
+ consumer.registerProducer(producer, True)
+
+ number_writes = 0
+ with string_file.write_lock:
+ for _ in range(consumer._PAUSE_ON_QUEUE_SIZE):
+ consumer.write("Foo")
+ number_writes += 1
+
+ producer.pauseProducing.assert_called_once()
+
+ yield string_file.wait_for_n_writes(number_writes)
+
+ yield resume_deferred
+ producer.resumeProducing.assert_called_once()
+ finally:
+ consumer.unregisterProducer()
+
+ yield consumer.wait()
+
+ self.assertTrue(string_file.closed)
+
+
+class DummyPullProducer(object):
+ def __init__(self):
+ self.consumer = None
+ self.deferred = defer.Deferred()
+
+ def resumeProducing(self):
+ d = self.deferred
+ self.deferred = defer.Deferred()
+ d.callback(None)
+
+ def write_and_wait(self, bytes):
+ d = self.deferred
+ self.consumer.write(bytes)
+ return d
+
+ def register_with_consumer(self, consumer):
+ d = self.deferred
+ self.consumer = consumer
+ self.consumer.registerProducer(self, False)
+ return d
+
+
+class BlockingStringWrite(object):
+ def __init__(self):
+ self.buffer = ""
+ self.closed = False
+ self.write_lock = threading.Lock()
+
+ self._notify_write_deferred = None
+ self._number_of_writes = 0
+
+ def write(self, bytes):
+ with self.write_lock:
+ self.buffer += bytes
+ self._number_of_writes += 1
+
+ reactor.callFromThread(self._notify_write)
+
+ def close(self):
+ self.closed = True
+
+ def _notify_write(self):
+ "Called by write to indicate a write happened"
+ with self.write_lock:
+ if not self._notify_write_deferred:
+ return
+ d = self._notify_write_deferred
+ self._notify_write_deferred = None
+ d.callback(None)
+
+ @defer.inlineCallbacks
+ def wait_for_n_writes(self, n):
+ "Wait for n writes to have happened"
+ while True:
+ with self.write_lock:
+ if n <= self._number_of_writes:
+ return
+
+ if not self._notify_write_deferred:
+ self._notify_write_deferred = defer.Deferred()
+
+ d = self._notify_write_deferred
+
+ yield d
|