summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/async.py82
-rw-r--r--synapse/util/presentable_names.py5
2 files changed, 86 insertions, 1 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 40be7fe7e3..c84b23ff46 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -194,3 +194,85 @@ class Linearizer(object):
                     self.key_to_defer.pop(key, None)
 
         defer.returnValue(_ctx_manager())
+
+
+class ReadWriteLock(object):
+    """A deferred style read write lock.
+
+    Example:
+
+        with (yield read_write_lock.read("test_key")):
+            # do some work
+    """
+
+    # IMPLEMENTATION NOTES
+    #
+    # We track the most recent queued reader and writer deferreds (which get
+    # resolved when they release the lock).
+    #
+    # Read: We know its safe to acquire a read lock when the latest writer has
+    # been resolved. The new reader is appeneded to the list of latest readers.
+    #
+    # Write: We know its safe to acquire the write lock when both the latest
+    # writers and readers have been resolved. The new writer replaces the latest
+    # writer.
+
+    def __init__(self):
+        # Latest readers queued
+        self.key_to_current_readers = {}
+
+        # Latest writer queued
+        self.key_to_current_writer = {}
+
+    @defer.inlineCallbacks
+    def read(self, key):
+        new_defer = defer.Deferred()
+
+        curr_readers = self.key_to_current_readers.setdefault(key, set())
+        curr_writer = self.key_to_current_writer.get(key, None)
+
+        curr_readers.add(new_defer)
+
+        # We wait for the latest writer to finish writing. We can safely ignore
+        # any existing readers... as they're readers.
+        yield curr_writer
+
+        @contextmanager
+        def _ctx_manager():
+            try:
+                yield
+            finally:
+                new_defer.callback(None)
+                self.key_to_current_readers.get(key, set()).discard(new_defer)
+
+        defer.returnValue(_ctx_manager())
+
+    @defer.inlineCallbacks
+    def write(self, key):
+        new_defer = defer.Deferred()
+
+        curr_readers = self.key_to_current_readers.get(key, set())
+        curr_writer = self.key_to_current_writer.get(key, None)
+
+        # We wait on all latest readers and writer.
+        to_wait_on = list(curr_readers)
+        if curr_writer:
+            to_wait_on.append(curr_writer)
+
+        # We can clear the list of current readers since the new writer waits
+        # for them to finish.
+        curr_readers.clear()
+        self.key_to_current_writer[key] = new_defer
+
+        yield defer.gatherResults(to_wait_on)
+
+        @contextmanager
+        def _ctx_manager():
+            try:
+                yield
+            finally:
+                new_defer.callback(None)
+                if self.key_to_current_writer[key] == new_defer:
+                    self.key_to_current_writer.pop(key)
+
+        defer.returnValue(_ctx_manager())
diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py
index a6866f6117..4c54812e6f 100644
--- a/synapse/util/presentable_names.py
+++ b/synapse/util/presentable_names.py
@@ -25,7 +25,8 @@ ALIAS_RE = re.compile(r"^#.*:.+$")
 ALL_ALONE = "Empty Room"
 
 
-def calculate_room_name(room_state, user_id, fallback_to_members=True):
+def calculate_room_name(room_state, user_id, fallback_to_members=True,
+                        fallback_to_single_member=True):
     """
     Works out a user-facing name for the given room as per Matrix
     spec recommendations.
@@ -129,6 +130,8 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True):
                 return name_from_member_event(all_members[0])
         else:
             return ALL_ALONE
+    elif len(other_members) == 1 and not fallback_to_single_member:
+        return None
     else:
         return descriptor_from_member_events(other_members)