diff options
-rw-r--r-- | synapse/http/servlet.py | 3 | ||||
-rw-r--r-- | synapse/rest/media/v1/preview_url_resource.py | 88 | ||||
-rw-r--r-- | synapse/storage/_base.py | 6 | ||||
-rw-r--r-- | synapse/storage/account_data.py | 2 | ||||
-rw-r--r-- | synapse/storage/state.py | 19 |
5 files changed, 63 insertions, 55 deletions
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 8118ee7cc2..71420e54db 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -167,7 +167,8 @@ def parse_json_value_from_request(request): try: content = simplejson.loads(content_bytes) - except simplejson.JSONDecodeError: + except Exception as e: + logger.warn("Unable to parse JSON: %s", e) raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) return content diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 7907a9d17a..723f7043f4 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -20,6 +20,7 @@ from twisted.web.resource import Resource from synapse.api.errors import ( SynapseError, Codes, ) +from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.stringutils import random_string from synapse.util.caches.expiringcache import ExpiringCache from synapse.http.client import SpiderHttpClient @@ -63,16 +64,15 @@ class PreviewUrlResource(Resource): self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist - # simple memory cache mapping urls to OG metadata - self.cache = ExpiringCache( + # memory cache mapping urls to an ObservableDeferred returning + # JSON-encoded OG metadata + self._cache = ExpiringCache( cache_name="url_previews", clock=self.clock, # don't spider URLs more often than once an hour expiry_ms=60 * 60 * 1000, ) - self.cache.start() - - self.downloads = {} + self._cache.start() self._cleaner_loop = self.clock.looping_call( self._expire_url_cache_data, 10 * 1000 @@ -94,6 +94,7 @@ class PreviewUrlResource(Resource): else: ts = self.clock.time_msec() + # XXX: we could move this into _do_preview if we wanted. url_tuple = urlparse.urlsplit(url) for entry in self.url_preview_url_blacklist: match = True @@ -126,14 +127,42 @@ class PreviewUrlResource(Resource): Codes.UNKNOWN ) - # first check the memory cache - good to handle all the clients on this - # HS thundering away to preview the same URL at the same time. - og = self.cache.get(url) - if og: - respond_with_json_bytes(request, 200, json.dumps(og), send_cors=True) - return + # the in-memory cache: + # * ensures that only one request is active at a time + # * takes load off the DB for the thundering herds + # * also caches any failures (unlike the DB) so we don't keep + # requesting the same endpoint + + observable = self._cache.get(url) + + if not observable: + download = preserve_fn(self._do_preview)( + url, requester.user, ts, + ) + observable = ObservableDeferred( + download, + consumeErrors=True + ) + self._cache[url] = observable + else: + logger.info("Returning cached response") + + og = yield make_deferred_yieldable(observable.observe()) + respond_with_json_bytes(request, 200, og, send_cors=True) - # then check the URL cache in the DB (which will also provide us with + @defer.inlineCallbacks + def _do_preview(self, url, user, ts): + """Check the db, and download the URL and build a preview + + Args: + url (str): + user (str): + ts (int): + + Returns: + Deferred[str]: json-encoded og data + """ + # check the URL cache in the DB (which will also provide us with # historical previews, if we have any) cache_result = yield self.store.get_url_cache(url, ts) if ( @@ -141,32 +170,10 @@ class PreviewUrlResource(Resource): cache_result["expires_ts"] > ts and cache_result["response_code"] / 100 == 2 ): - respond_with_json_bytes( - request, 200, cache_result["og"].encode('utf-8'), - send_cors=True - ) + defer.returnValue(cache_result["og"]) return - # Ensure only one download for a given URL is active at a time - download = self.downloads.get(url) - if download is None: - download = self._download_url(url, requester.user) - download = ObservableDeferred( - download, - consumeErrors=True - ) - self.downloads[url] = download - - @download.addBoth - def callback(media_info): - del self.downloads[url] - return media_info - media_info = yield download.observe() - - # FIXME: we should probably update our cache now anyway, so that - # even if the OG calculation raises, we don't keep hammering on the - # remote server. For now, leave it uncached to aid debugging OG - # calculation problems + media_info = yield self._download_url(url, user) logger.debug("got media_info of '%s'" % media_info) @@ -212,7 +219,7 @@ class PreviewUrlResource(Resource): # just rely on the caching on the master request to speed things up. if 'og:image' in og and og['og:image']: image_info = yield self._download_url( - _rebase_url(og['og:image'], media_info['uri']), requester.user + _rebase_url(og['og:image'], media_info['uri']), user ) if _is_media(image_info['media_type']): @@ -239,8 +246,7 @@ class PreviewUrlResource(Resource): logger.debug("Calculated OG for %s as %s" % (url, og)) - # store OG in ephemeral in-memory cache - self.cache[url] = og + jsonog = json.dumps(og) # store OG in history-aware DB cache yield self.store.store_url_cache( @@ -248,12 +254,12 @@ class PreviewUrlResource(Resource): media_info["response_code"], media_info["etag"], media_info["expires"] + media_info["created_ts"], - json.dumps(og), + jsonog, media_info["filesystem_id"], media_info["created_ts"], ) - respond_with_json_bytes(request, 200, json.dumps(og), send_cors=True) + defer.returnValue(jsonog) @defer.inlineCallbacks def _download_url(self, url, user): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a37d1934ec..6caf7b3356 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -16,6 +16,8 @@ import logging from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.caches import CACHE_SIZE_FACTOR +from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine import synapse.metrics @@ -178,6 +180,10 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, max_entries=hs.config.event_cache_size) + self._state_group_cache = DictionaryCache( + "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR + ) + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index ff14e54c11..c8a1eb016b 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -63,7 +63,7 @@ class AccountDataStore(SQLBaseStore): "get_account_data_for_user", get_account_data_for_user_txn ) - @cachedInlineCallbacks(num_args=2) + @cachedInlineCallbacks(num_args=2, max_entries=5000) def get_global_account_data_by_type_for_user(self, data_type, user_id): """ Returns: diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a1da3ad7a5..5673e4aa96 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -13,17 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple -import logging +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches import intern_string +from synapse.util.stringutils import to_ascii +from synapse.storage.engines import PostgresEngine from twisted.internet import defer +from collections import namedtuple -from synapse.storage.engines import PostgresEngine -from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR -from synapse.util.caches.descriptors import cached, cachedList -from synapse.util.caches.dictionary_cache import DictionaryCache -from synapse.util.stringutils import to_ascii -from ._base import SQLBaseStore +import logging logger = logging.getLogger(__name__) @@ -82,10 +81,6 @@ class StateStore(SQLBaseStore): where_clause="type='m.room.member'", ) - self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR - ) - @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): """Get the current state event ids for a room based on the |