diff --git a/synapse/util/async.py b/synapse/util/async.py
index 40be7fe7e3..c84b23ff46 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -194,3 +194,85 @@ class Linearizer(object):
self.key_to_defer.pop(key, None)
defer.returnValue(_ctx_manager())
+
+
+class ReadWriteLock(object):
+ """A deferred style read write lock.
+
+ Example:
+
+ with (yield read_write_lock.read("test_key")):
+ # do some work
+ """
+
+ # IMPLEMENTATION NOTES
+ #
+ # We track the most recent queued reader and writer deferreds (which get
+ # resolved when they release the lock).
+ #
+ # Read: We know its safe to acquire a read lock when the latest writer has
+ # been resolved. The new reader is appeneded to the list of latest readers.
+ #
+ # Write: We know its safe to acquire the write lock when both the latest
+ # writers and readers have been resolved. The new writer replaces the latest
+ # writer.
+
+ def __init__(self):
+ # Latest readers queued
+ self.key_to_current_readers = {}
+
+ # Latest writer queued
+ self.key_to_current_writer = {}
+
+ @defer.inlineCallbacks
+ def read(self, key):
+ new_defer = defer.Deferred()
+
+ curr_readers = self.key_to_current_readers.setdefault(key, set())
+ curr_writer = self.key_to_current_writer.get(key, None)
+
+ curr_readers.add(new_defer)
+
+ # We wait for the latest writer to finish writing. We can safely ignore
+ # any existing readers... as they're readers.
+ yield curr_writer
+
+ @contextmanager
+ def _ctx_manager():
+ try:
+ yield
+ finally:
+ new_defer.callback(None)
+ self.key_to_current_readers.get(key, set()).discard(new_defer)
+
+ defer.returnValue(_ctx_manager())
+
+ @defer.inlineCallbacks
+ def write(self, key):
+ new_defer = defer.Deferred()
+
+ curr_readers = self.key_to_current_readers.get(key, set())
+ curr_writer = self.key_to_current_writer.get(key, None)
+
+ # We wait on all latest readers and writer.
+ to_wait_on = list(curr_readers)
+ if curr_writer:
+ to_wait_on.append(curr_writer)
+
+ # We can clear the list of current readers since the new writer waits
+ # for them to finish.
+ curr_readers.clear()
+ self.key_to_current_writer[key] = new_defer
+
+ yield defer.gatherResults(to_wait_on)
+
+ @contextmanager
+ def _ctx_manager():
+ try:
+ yield
+ finally:
+ new_defer.callback(None)
+ if self.key_to_current_writer[key] == new_defer:
+ self.key_to_current_writer.pop(key)
+
+ defer.returnValue(_ctx_manager())
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 36686b479e..00af539880 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -24,9 +24,12 @@ class ResponseCache(object):
used rather than trying to compute a new response.
"""
- def __init__(self):
+ def __init__(self, hs, timeout_ms=0):
self.pending_result_cache = {} # Requests that haven't finished yet.
+ self.clock = hs.get_clock()
+ self.timeout_sec = timeout_ms / 1000.
+
def get(self, key):
result = self.pending_result_cache.get(key)
if result is not None:
@@ -39,7 +42,13 @@ class ResponseCache(object):
self.pending_result_cache[key] = result
def remove(r):
- self.pending_result_cache.pop(key, None)
+ if self.timeout_sec:
+ self.clock.call_later(
+ self.timeout_sec,
+ self.pending_result_cache.pop, key, None,
+ )
+ else:
+ self.pending_result_cache.pop(key, None)
return r
result.addBoth(remove)
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index e1f374807e..0b944d3e63 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -84,7 +84,7 @@ class Measure(object):
if context != self.start_context:
logger.warn(
- "Context have unexpectedly changed from '%s' to '%s'. (%r)",
+ "Context has unexpectedly changed from '%s' to '%s'. (%r)",
context, self.start_context, self.name
)
return
diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py
index a6866f6117..f68676e9e7 100644
--- a/synapse/util/presentable_names.py
+++ b/synapse/util/presentable_names.py
@@ -25,7 +25,8 @@ ALIAS_RE = re.compile(r"^#.*:.+$")
ALL_ALONE = "Empty Room"
-def calculate_room_name(room_state, user_id, fallback_to_members=True):
+def calculate_room_name(room_state, user_id, fallback_to_members=True,
+ fallback_to_single_member=True):
"""
Works out a user-facing name for the given room as per Matrix
spec recommendations.
@@ -82,7 +83,10 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True):
):
if ("m.room.member", my_member_event.sender) in room_state:
inviter_member_event = room_state[("m.room.member", my_member_event.sender)]
- return "Invite from %s" % (name_from_member_event(inviter_member_event),)
+ if fallback_to_single_member:
+ return "Invite from %s" % (name_from_member_event(inviter_member_event),)
+ else:
+ return None
else:
return "Room Invite"
@@ -129,6 +133,8 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True):
return name_from_member_event(all_members[0])
else:
return ALL_ALONE
+ elif len(other_members) == 1 and not fallback_to_single_member:
+ return None
else:
return descriptor_from_member_events(other_members)
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 43cf11f3f6..49527f4d21 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -128,7 +128,7 @@ class RetryDestinationLimiter(object):
)
valid_err_code = False
- if exc_type is CodeMessageException:
+ if exc_type is not None and issubclass(exc_type, CodeMessageException):
valid_err_code = 0 <= exc_val.code < 500
if exc_type is None or valid_err_code:
diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py
index a4f156cb3b..52086df465 100644
--- a/synapse/util/versionstring.py
+++ b/synapse/util/versionstring.py
@@ -21,7 +21,7 @@ import logging
logger = logging.getLogger(__name__)
-def get_version_string(name, module):
+def get_version_string(module):
try:
null = open(os.devnull, 'w')
cwd = os.path.dirname(os.path.abspath(module.__file__))
@@ -74,11 +74,11 @@ def get_version_string(name, module):
)
return (
- "%s/%s (%s)" % (
- name, module.__version__, git_version,
+ "%s (%s)" % (
+ module.__version__, git_version,
)
).encode("ascii")
except Exception as e:
logger.info("Failed to check for git repository: %s", e)
- return ("%s/%s" % (name, module.__version__,)).encode("ascii")
+ return module.__version__.encode("ascii")
|