summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2019-02-26 14:23:40 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2019-02-26 14:23:40 +0000
commit802884d4ee06ca8e42f46f64e6da7c188d43dc69 (patch)
tree6767e6e142d75e5500092a829d488583fcedef51 /synapse/util
parentAdd changelog (diff)
parentMerge pull request #4745 from matrix-org/revert-4736-anoa/public_rooms_federate (diff)
downloadsynapse-802884d4ee06ca8e42f46f64e6da7c188d43dc69.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/public_rooms_federate_develop
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/async_helpers.py14
-rw-r--r--synapse/util/caches/ttlcache.py161
-rw-r--r--synapse/util/logcontext.py5
-rw-r--r--synapse/util/stringutils.py39
4 files changed, 211 insertions, 8 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index ec7b2c9672..f0e4a0e10c 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -201,7 +201,7 @@ class Linearizer(object):
         if entry[0] >= self.max_count:
             res = self._await_lock(key)
         else:
-            logger.info(
+            logger.debug(
                 "Acquired uncontended linearizer lock %r for key %r", self.name, key,
             )
             entry[0] += 1
@@ -215,7 +215,7 @@ class Linearizer(object):
             try:
                 yield
             finally:
-                logger.info("Releasing linearizer lock %r for key %r", self.name, key)
+                logger.debug("Releasing linearizer lock %r for key %r", self.name, key)
 
                 # We've finished executing so check if there are any things
                 # blocked waiting to execute and start one of them
@@ -247,7 +247,7 @@ class Linearizer(object):
         """
         entry = self.key_to_defer[key]
 
-        logger.info(
+        logger.debug(
             "Waiting to acquire linearizer lock %r for key %r", self.name, key,
         )
 
@@ -255,7 +255,7 @@ class Linearizer(object):
         entry[1][new_defer] = 1
 
         def cb(_r):
-            logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+            logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
             entry[0] += 1
 
             # if the code holding the lock completes synchronously, then it
@@ -273,7 +273,7 @@ class Linearizer(object):
         def eb(e):
             logger.info("defer %r got err %r", new_defer, e)
             if isinstance(e, CancelledError):
-                logger.info(
+                logger.debug(
                     "Cancelling wait for linearizer lock %r for key %r",
                     self.name, key,
                 )
@@ -387,12 +387,14 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
     deferred that wraps and times out the given deferred, correctly handling
     the case where the given deferred's canceller throws.
 
+    (See https://twistedmatrix.com/trac/ticket/9534)
+
     NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
 
     Args:
         deferred (Deferred)
         timeout (float): Timeout in seconds
-        reactor (twisted.internet.reactor): The twisted reactor to use
+        reactor (twisted.interfaces.IReactorTime): The twisted reactor to use
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is
             otherwise cancelled before the timeout.
diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py
new file mode 100644
index 0000000000..5ba1862506
--- /dev/null
+++ b/synapse/util/caches/ttlcache.py
@@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+
+import attr
+from sortedcontainers import SortedList
+
+from synapse.util.caches import register_cache
+
+logger = logging.getLogger(__name__)
+
+SENTINEL = object()
+
+
+class TTLCache(object):
+    """A key/value cache implementation where each entry has its own TTL"""
+
+    def __init__(self, cache_name, timer=time.time):
+        # map from key to _CacheEntry
+        self._data = {}
+
+        # the _CacheEntries, sorted by expiry time
+        self._expiry_list = SortedList()
+
+        self._timer = timer
+
+        self._metrics = register_cache("ttl", cache_name, self)
+
+    def set(self, key, value, ttl):
+        """Add/update an entry in the cache
+
+        Args:
+            key: key for this entry
+            value: value for this entry
+            ttl (float): TTL for this entry, in seconds
+        """
+        expiry = self._timer() + ttl
+
+        self.expire()
+        e = self._data.pop(key, SENTINEL)
+        if e != SENTINEL:
+            self._expiry_list.remove(e)
+
+        entry = _CacheEntry(expiry_time=expiry, key=key, value=value)
+        self._data[key] = entry
+        self._expiry_list.add(entry)
+
+    def get(self, key, default=SENTINEL):
+        """Get a value from the cache
+
+        Args:
+            key: key to look up
+            default: default value to return, if key is not found. If not set, and the
+                key is not found, a KeyError will be raised
+
+        Returns:
+            value from the cache, or the default
+        """
+        self.expire()
+        e = self._data.get(key, SENTINEL)
+        if e == SENTINEL:
+            self._metrics.inc_misses()
+            if default == SENTINEL:
+                raise KeyError(key)
+            return default
+        self._metrics.inc_hits()
+        return e.value
+
+    def get_with_expiry(self, key):
+        """Get a value, and its expiry time, from the cache
+
+        Args:
+            key: key to look up
+
+        Returns:
+            Tuple[Any, float]: the value from the cache, and the expiry time
+
+        Raises:
+            KeyError if the entry is not found
+        """
+        self.expire()
+        try:
+            e = self._data[key]
+        except KeyError:
+            self._metrics.inc_misses()
+            raise
+        self._metrics.inc_hits()
+        return e.value, e.expiry_time
+
+    def pop(self, key, default=SENTINEL):
+        """Remove a value from the cache
+
+        If key is in the cache, remove it and return its value, else return default.
+        If default is not given and key is not in the cache, a KeyError is raised.
+
+        Args:
+            key: key to look up
+            default: default value to return, if key is not found. If not set, and the
+                key is not found, a KeyError will be raised
+
+        Returns:
+            value from the cache, or the default
+        """
+        self.expire()
+        e = self._data.pop(key, SENTINEL)
+        if e == SENTINEL:
+            self._metrics.inc_misses()
+            if default == SENTINEL:
+                raise KeyError(key)
+            return default
+        self._expiry_list.remove(e)
+        self._metrics.inc_hits()
+        return e.value
+
+    def __getitem__(self, key):
+        return self.get(key)
+
+    def __delitem__(self, key):
+        self.pop(key)
+
+    def __contains__(self, key):
+        return key in self._data
+
+    def __len__(self):
+        self.expire()
+        return len(self._data)
+
+    def expire(self):
+        """Run the expiry on the cache. Any entries whose expiry times are due will
+        be removed
+        """
+        now = self._timer()
+        while self._expiry_list:
+            first_entry = self._expiry_list[0]
+            if first_entry.expiry_time - now > 0.0:
+                break
+            del self._data[first_entry.key]
+            del self._expiry_list[0]
+
+
+@attr.s(frozen=True, slots=True)
+class _CacheEntry(object):
+    """TTLCache entry"""
+    # expiry_time is the first attribute, so that entries are sorted by expiry.
+    expiry_time = attr.ib()
+    key = attr.ib()
+    value = attr.ib()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 4c6e92beb8..311b49e18a 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -285,7 +285,10 @@ class LoggingContext(object):
         self.alive = False
 
         # if we have a parent, pass our CPU usage stats on
-        if self.parent_context is not None:
+        if (
+            self.parent_context is not None
+            and hasattr(self.parent_context, '_resource_usage')
+        ):
             self.parent_context._resource_usage += self._resource_usage
 
             # reset them in case we get entered again
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index 6f318c6a29..fdcb375f95 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -16,7 +16,8 @@
 import random
 import string
 
-from six import PY3
+import six
+from six import PY2, PY3
 from six.moves import range
 
 _string_with_symbols = (
@@ -71,3 +72,39 @@ def to_ascii(s):
         return s.encode("ascii")
     except UnicodeEncodeError:
         return s
+
+
+def exception_to_unicode(e):
+    """Helper function to extract the text of an exception as a unicode string
+
+    Args:
+        e (Exception): exception to be stringified
+
+    Returns:
+        unicode
+    """
+    # urgh, this is a mess. The basic problem here is that psycopg2 constructs its
+    # exceptions with PyErr_SetString, with a (possibly non-ascii) argument. str() will
+    # then produce the raw byte sequence. Under Python 2, this will then cause another
+    # error if it gets mixed with a `unicode` object, as per
+    # https://github.com/matrix-org/synapse/issues/4252
+
+    # First of all, if we're under python3, everything is fine because it will sort this
+    # nonsense out for us.
+    if not PY2:
+        return str(e)
+
+    # otherwise let's have a stab at decoding the exception message. We'll circumvent
+    # Exception.__str__(), which would explode if someone raised Exception(u'non-ascii')
+    # and instead look at what is in the args member.
+
+    if len(e.args) == 0:
+        return u""
+    elif len(e.args) > 1:
+        return six.text_type(repr(e.args))
+
+    msg = e.args[0]
+    if isinstance(msg, bytes):
+        return msg.decode('utf-8', errors='replace')
+    else:
+        return msg