diff --git a/synapse/__init__.py b/synapse/__init__.py
index ec83e6adb7..bee4aba625 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.23.0-rc2"
+__version__ = "0.23.1"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index cd0e815919..cf4730730d 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -12,10 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import gc
import logging
+import sys
+
+try:
+ import affinity
+except:
+ affinity = None
-import affinity
from daemonize import Daemonize
from synapse.util import PreserveLoggingContext
from synapse.util.rlimit import change_resource_limit
@@ -78,6 +84,13 @@ def start_reactor(
with PreserveLoggingContext():
logger.info("Running")
if cpu_affinity is not None:
+ if not affinity:
+ quit_with_error(
+ "Missing package 'affinity' required for cpu_affinity\n"
+ "option\n\n"
+ "Install by running:\n\n"
+ " pip install affinity\n\n"
+ )
logger.info("Setting CPU affinity to %s" % cpu_affinity)
affinity.set_process_affinity_mask(0, cpu_affinity)
change_resource_limit(soft_file_limit)
@@ -97,3 +110,13 @@ def start_reactor(
daemon.start()
else:
run()
+
+
+def quit_with_error(error_string):
+ message_lines = error_string.split("\n")
+ line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
+ sys.stderr.write("*" * line_length + '\n')
+ for line in message_lines:
+ sys.stderr.write(" %s\n" % (line.rstrip(),))
+ sys.stderr.write("*" * line_length + '\n')
+ sys.exit(1)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 84ad8f04a0..3adf72e141 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -25,6 +25,7 @@ from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
STATIC_PREFIX, WEB_CLIENT_PREFIX
from synapse.app import _base
+from synapse.app._base import quit_with_error
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
@@ -249,16 +250,6 @@ class SynapseHomeServer(HomeServer):
return db_conn
-def quit_with_error(error_string):
- message_lines = error_string.split("\n")
- line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
- sys.stderr.write("*" * line_length + '\n')
- for line in message_lines:
- sys.stderr.write(" %s\n" % (line.rstrip(),))
- sys.stderr.write("*" * line_length + '\n')
- sys.exit(1)
-
-
def setup(config_options):
"""
Args:
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 241b17f2cb..a97532162f 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -354,16 +354,28 @@ def _get_hosts_for_srv_record(dns_client, host):
return res[0]
- def eb(res):
- res.trap(DNSNameError)
- return []
+ def eb(res, record_type):
+ if res.check(DNSNameError):
+ return []
+ logger.warn("Error looking up %s for %s: %s",
+ record_type, host, res, res.value)
+ return res
# no logcontexts here, so we can safely fire these off and gatherResults
d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
- results = yield defer.gatherResults([d1, d2], consumeErrors=True)
+ results = yield defer.DeferredList(
+ [d1, d2], consumeErrors=True)
+
+ # if all of the lookups failed, raise an exception rather than blowing out
+ # the cache with an empty result.
+ if results and all(s == defer.FAILURE for (s, _) in results):
+ defer.returnValue(results[0][1])
+
+ for (success, result) in results:
+ if success == defer.FAILURE:
+ continue
- for result in results:
for answer in result:
if not answer.payload:
continue
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 747a791f83..6fc3a41c29 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -204,18 +204,15 @@ class MatrixFederationHttpClient(object):
raise
logger.warn(
- "{%s} Sending request failed to %s: %s %s: %s - %s",
+ "{%s} Sending request failed to %s: %s %s: %s",
txn_id,
destination,
method,
url_bytes,
- type(e).__name__,
_flatten_response_never_received(e),
)
- log_result = "%s - %s" % (
- type(e).__name__, _flatten_response_never_received(e),
- )
+ log_result = _flatten_response_never_received(e)
if retries_left and not timeout:
if long_retries:
@@ -578,12 +575,14 @@ class _JsonProducer(object):
def _flatten_response_never_received(e):
if hasattr(e, "reasons"):
- return ", ".join(
+ reasons = ", ".join(
_flatten_response_never_received(f.value)
for f in e.reasons
)
+
+ return "%s:[%s]" % (type(e).__name__, reasons)
else:
- return "%s: %s" % (type(e).__name__, e.message,)
+ return repr(e)
def check_content_type_is_json(headers):
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 630e92c90e..7052333c19 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -40,7 +40,6 @@ REQUIREMENTS = {
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
- "affinity": ["affinity"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
@@ -59,6 +58,9 @@ CONDITIONAL_REQUIREMENTS = {
"psutil": {
"psutil>=2.0.0": ["psutil>=2.0.0"],
},
+ "affinity": {
+ "affinity": ["affinity"],
+ },
}
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index d92b7ff337..d5cec10127 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -14,6 +14,9 @@
# limitations under the License.
import os
+import re
+
+NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
class MediaFilePaths(object):
@@ -73,19 +76,105 @@ class MediaFilePaths(object):
)
def url_cache_filepath(self, media_id):
- return os.path.join(
- self.base_path, "url_cache",
- media_id[0:2], media_id[2:4], media_id[4:]
- )
+ if NEW_FORMAT_ID_RE.match(media_id):
+ # Media id is of the form <DATE><RANDOM_STRING>
+ # E.g.: 2017-09-28-fsdRDt24DS234dsf
+ return os.path.join(
+ self.base_path, "url_cache",
+ media_id[:10], media_id[11:]
+ )
+ else:
+ return os.path.join(
+ self.base_path, "url_cache",
+ media_id[0:2], media_id[2:4], media_id[4:],
+ )
+
+ def url_cache_filepath_dirs_to_delete(self, media_id):
+ "The dirs to try and remove if we delete the media_id file"
+ if NEW_FORMAT_ID_RE.match(media_id):
+ return [
+ os.path.join(
+ self.base_path, "url_cache",
+ media_id[:10],
+ ),
+ ]
+ else:
+ return [
+ os.path.join(
+ self.base_path, "url_cache",
+ media_id[0:2], media_id[2:4],
+ ),
+ os.path.join(
+ self.base_path, "url_cache",
+ media_id[0:2],
+ ),
+ ]
def url_cache_thumbnail(self, media_id, width, height, content_type,
method):
+ # Media id is of the form <DATE><RANDOM_STRING>
+ # E.g.: 2017-09-28-fsdRDt24DS234dsf
+
top_level_type, sub_type = content_type.split("/")
file_name = "%i-%i-%s-%s-%s" % (
width, height, top_level_type, sub_type, method
)
- return os.path.join(
- self.base_path, "url_cache_thumbnails",
- media_id[0:2], media_id[2:4], media_id[4:],
- file_name
- )
+
+ if NEW_FORMAT_ID_RE.match(media_id):
+ return os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[:10], media_id[11:],
+ file_name
+ )
+ else:
+ return os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[0:2], media_id[2:4], media_id[4:],
+ file_name
+ )
+
+ def url_cache_thumbnail_directory(self, media_id):
+ # Media id is of the form <DATE><RANDOM_STRING>
+ # E.g.: 2017-09-28-fsdRDt24DS234dsf
+
+ if NEW_FORMAT_ID_RE.match(media_id):
+ return os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[:10], media_id[11:],
+ )
+ else:
+ return os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[0:2], media_id[2:4], media_id[4:],
+ )
+
+ def url_cache_thumbnail_dirs_to_delete(self, media_id):
+ "The dirs to try and remove if we delete the media_id thumbnails"
+ # Media id is of the form <DATE><RANDOM_STRING>
+ # E.g.: 2017-09-28-fsdRDt24DS234dsf
+ if NEW_FORMAT_ID_RE.match(media_id):
+ return [
+ os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[:10], media_id[11:],
+ ),
+ os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[:10],
+ ),
+ ]
+ else:
+ return [
+ os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[0:2], media_id[2:4], media_id[4:],
+ ),
+ os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[0:2], media_id[2:4],
+ ),
+ os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[0:2],
+ ),
+ ]
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index b81a336c5d..895b480d5c 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -36,6 +36,9 @@ import cgi
import ujson as json
import urlparse
import itertools
+import datetime
+import errno
+import shutil
import logging
logger = logging.getLogger(__name__)
@@ -70,6 +73,10 @@ class PreviewUrlResource(Resource):
self.downloads = {}
+ self._cleaner_loop = self.clock.looping_call(
+ self._expire_url_cache_data, 10 * 1000
+ )
+
def render_GET(self, request):
self._async_render_GET(request)
return NOT_DONE_YET
@@ -130,7 +137,7 @@ class PreviewUrlResource(Resource):
cache_result = yield self.store.get_url_cache(url, ts)
if (
cache_result and
- cache_result["download_ts"] + cache_result["expires"] > ts and
+ cache_result["expires_ts"] > ts and
cache_result["response_code"] / 100 == 2
):
respond_with_json_bytes(
@@ -239,7 +246,7 @@ class PreviewUrlResource(Resource):
url,
media_info["response_code"],
media_info["etag"],
- media_info["expires"],
+ media_info["expires"] + media_info["created_ts"],
json.dumps(og),
media_info["filesystem_id"],
media_info["created_ts"],
@@ -253,8 +260,7 @@ class PreviewUrlResource(Resource):
# we're most likely being explicitly triggered by a human rather than a
# bot, so are we really a robot?
- # XXX: horrible duplication with base_resource's _download_remote_file()
- file_id = random_string(24)
+ file_id = datetime.date.today().isoformat() + '_' + random_string(16)
fname = self.filepaths.url_cache_filepath(file_id)
self.media_repo._makedirs(fname)
@@ -328,6 +334,88 @@ class PreviewUrlResource(Resource):
"etag": headers["ETag"][0] if "ETag" in headers else None,
})
+ @defer.inlineCallbacks
+ def _expire_url_cache_data(self):
+ """Clean up expired url cache content, media and thumbnails.
+ """
+ now = self.clock.time_msec()
+
+ # First we delete expired url cache entries
+ media_ids = yield self.store.get_expired_url_cache(now)
+
+ removed_media = []
+ for media_id in media_ids:
+ fname = self.filepaths.url_cache_filepath(media_id)
+ try:
+ os.remove(fname)
+ except OSError as e:
+ # If the path doesn't exist, meh
+ if e.errno != errno.ENOENT:
+ logger.warn("Failed to remove media: %r: %s", media_id, e)
+ continue
+
+ removed_media.append(media_id)
+
+ try:
+ dirs = self.filepaths.url_cache_filepath_dirs_to_delete(media_id)
+ for dir in dirs:
+ os.rmdir(dir)
+ except:
+ pass
+
+ yield self.store.delete_url_cache(removed_media)
+
+ if removed_media:
+ logger.info("Deleted %d entries from url cache", len(removed_media))
+
+ # Now we delete old images associated with the url cache.
+ # These may be cached for a bit on the client (i.e., they
+ # may have a room open with a preview url thing open).
+ # So we wait a couple of days before deleting, just in case.
+ expire_before = now - 2 * 24 * 60 * 60 * 1000
+ media_ids = yield self.store.get_url_cache_media_before(expire_before)
+
+ removed_media = []
+ for media_id in media_ids:
+ fname = self.filepaths.url_cache_filepath(media_id)
+ try:
+ os.remove(fname)
+ except OSError as e:
+ # If the path doesn't exist, meh
+ if e.errno != errno.ENOENT:
+ logger.warn("Failed to remove media: %r: %s", media_id, e)
+ continue
+
+ try:
+ dirs = self.filepaths.url_cache_filepath_dirs_to_delete(media_id)
+ for dir in dirs:
+ os.rmdir(dir)
+ except:
+ pass
+
+ thumbnail_dir = self.filepaths.url_cache_thumbnail_directory(media_id)
+ try:
+ shutil.rmtree(thumbnail_dir)
+ except OSError as e:
+ # If the path doesn't exist, meh
+ if e.errno != errno.ENOENT:
+ logger.warn("Failed to remove media: %r: %s", media_id, e)
+ continue
+
+ removed_media.append(media_id)
+
+ try:
+ dirs = self.filepaths.url_cache_thumbnail_dirs_to_delete(media_id)
+ for dir in dirs:
+ os.rmdir(dir)
+ except:
+ pass
+
+ yield self.store.delete_url_cache_media(removed_media)
+
+ if removed_media:
+ logger.info("Deleted %d media from url cache", len(removed_media))
+
def decode_and_calc_og(body, media_uri, request_encoding=None):
from lxml import etree
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 82bb61b811..7110a71279 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -62,7 +62,7 @@ class MediaRepositoryStore(SQLBaseStore):
def get_url_cache_txn(txn):
# get the most recently cached result (relative to the given ts)
sql = (
- "SELECT response_code, etag, expires, og, media_id, download_ts"
+ "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
" FROM local_media_repository_url_cache"
" WHERE url = ? AND download_ts <= ?"
" ORDER BY download_ts DESC LIMIT 1"
@@ -74,7 +74,7 @@ class MediaRepositoryStore(SQLBaseStore):
# ...or if we've requested a timestamp older than the oldest
# copy in the cache, return the oldest copy (if any)
sql = (
- "SELECT response_code, etag, expires, og, media_id, download_ts"
+ "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
" FROM local_media_repository_url_cache"
" WHERE url = ? AND download_ts > ?"
" ORDER BY download_ts ASC LIMIT 1"
@@ -86,14 +86,14 @@ class MediaRepositoryStore(SQLBaseStore):
return None
return dict(zip((
- 'response_code', 'etag', 'expires', 'og', 'media_id', 'download_ts'
+ 'response_code', 'etag', 'expires_ts', 'og', 'media_id', 'download_ts'
), row))
return self.runInteraction(
"get_url_cache", get_url_cache_txn
)
- def store_url_cache(self, url, response_code, etag, expires, og, media_id,
+ def store_url_cache(self, url, response_code, etag, expires_ts, og, media_id,
download_ts):
return self._simple_insert(
"local_media_repository_url_cache",
@@ -101,7 +101,7 @@ class MediaRepositoryStore(SQLBaseStore):
"url": url,
"response_code": response_code,
"etag": etag,
- "expires": expires,
+ "expires_ts": expires_ts,
"og": og,
"media_id": media_id,
"download_ts": download_ts,
@@ -238,3 +238,64 @@ class MediaRepositoryStore(SQLBaseStore):
},
)
return self.runInteraction("delete_remote_media", delete_remote_media_txn)
+
+ def get_expired_url_cache(self, now_ts):
+ sql = (
+ "SELECT media_id FROM local_media_repository_url_cache"
+ " WHERE expires_ts < ?"
+ " ORDER BY expires_ts ASC"
+ " LIMIT 500"
+ )
+
+ def _get_expired_url_cache_txn(txn):
+ txn.execute(sql, (now_ts,))
+ return [row[0] for row in txn]
+
+ return self.runInteraction("get_expired_url_cache", _get_expired_url_cache_txn)
+
+ def delete_url_cache(self, media_ids):
+ sql = (
+ "DELETE FROM local_media_repository_url_cache"
+ " WHERE media_id = ?"
+ )
+
+ def _delete_url_cache_txn(txn):
+ txn.executemany(sql, [(media_id,) for media_id in media_ids])
+
+ return self.runInteraction("delete_url_cache", _delete_url_cache_txn)
+
+ def get_url_cache_media_before(self, before_ts):
+ sql = (
+ "SELECT media_id FROM local_media_repository"
+ " WHERE created_ts < ? AND url_cache IS NOT NULL"
+ " ORDER BY created_ts ASC"
+ " LIMIT 500"
+ )
+
+ def _get_url_cache_media_before_txn(txn):
+ txn.execute(sql, (before_ts,))
+ return [row[0] for row in txn]
+
+ return self.runInteraction(
+ "get_url_cache_media_before", _get_url_cache_media_before_txn,
+ )
+
+ def delete_url_cache_media(self, media_ids):
+ def _delete_url_cache_media_txn(txn):
+ sql = (
+ "DELETE FROM local_media_repository"
+ " WHERE media_id = ?"
+ )
+
+ txn.executemany(sql, [(media_id,) for media_id in media_ids])
+
+ sql = (
+ "DELETE FROM local_media_repository_thumbnails"
+ " WHERE media_id = ?"
+ )
+
+ txn.executemany(sql, [(media_id,) for media_id in media_ids])
+
+ return self.runInteraction(
+ "delete_url_cache_media", _delete_url_cache_media_txn,
+ )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 72b670b83b..a0af8456f5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 43
+SCHEMA_VERSION = 44
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql
new file mode 100644
index 0000000000..e2b775f038
--- /dev/null
+++ b/synapse/storage/schema/delta/44/expire_url_cache.sql
@@ -0,0 +1,38 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
+
+-- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support
+-- indices on expressions until 3.9.
+CREATE TABLE local_media_repository_url_cache_new(
+ url TEXT,
+ response_code INTEGER,
+ etag TEXT,
+ expires_ts BIGINT,
+ og TEXT,
+ media_id TEXT,
+ download_ts BIGINT
+);
+
+INSERT INTO local_media_repository_url_cache_new
+ SELECT url, response_code, etag, expires + download_ts, og, media_id, download_ts FROM local_media_repository_url_cache;
+
+DROP TABLE local_media_repository_url_cache;
+ALTER TABLE local_media_repository_url_cache_new RENAME TO local_media_repository_url_cache;
+
+CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts);
+CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache(url, download_ts);
+CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache(media_id);
|