diff options
author | Erik Johnston <erik@matrix.org> | 2016-07-05 14:44:25 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-07-05 15:23:17 +0100 |
commit | 7335f0addae9ff473403eaaffd7d2b02a9f1991f (patch) | |
tree | b576266516277c4c9b408b507155b3ae8b757cee /synapse/util | |
parent | Add purge_history API (diff) | |
download | synapse-7335f0addae9ff473403eaaffd7d2b02a9f1991f.tar.xz |
Add ReadWriteLock
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/async.py | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 40be7fe7e3..c84b23ff46 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -194,3 +194,85 @@ class Linearizer(object): self.key_to_defer.pop(key, None) defer.returnValue(_ctx_manager()) + + +class ReadWriteLock(object): + """A deferred style read write lock. + + Example: + + with (yield read_write_lock.read("test_key")): + # do some work + """ + + # IMPLEMENTATION NOTES + # + # We track the most recent queued reader and writer deferreds (which get + # resolved when they release the lock). + # + # Read: We know its safe to acquire a read lock when the latest writer has + # been resolved. The new reader is appeneded to the list of latest readers. + # + # Write: We know its safe to acquire the write lock when both the latest + # writers and readers have been resolved. The new writer replaces the latest + # writer. + + def __init__(self): + # Latest readers queued + self.key_to_current_readers = {} + + # Latest writer queued + self.key_to_current_writer = {} + + @defer.inlineCallbacks + def read(self, key): + new_defer = defer.Deferred() + + curr_readers = self.key_to_current_readers.setdefault(key, set()) + curr_writer = self.key_to_current_writer.get(key, None) + + curr_readers.add(new_defer) + + # We wait for the latest writer to finish writing. We can safely ignore + # any existing readers... as they're readers. + yield curr_writer + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + new_defer.callback(None) + self.key_to_current_readers.get(key, set()).discard(new_defer) + + defer.returnValue(_ctx_manager()) + + @defer.inlineCallbacks + def write(self, key): + new_defer = defer.Deferred() + + curr_readers = self.key_to_current_readers.get(key, set()) + curr_writer = self.key_to_current_writer.get(key, None) + + # We wait on all latest readers and writer. + to_wait_on = list(curr_readers) + if curr_writer: + to_wait_on.append(curr_writer) + + # We can clear the list of current readers since the new writer waits + # for them to finish. + curr_readers.clear() + self.key_to_current_writer[key] = new_defer + + yield defer.gatherResults(to_wait_on) + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + new_defer.callback(None) + if self.key_to_current_writer[key] == new_defer: + self.key_to_current_writer.pop(key) + + defer.returnValue(_ctx_manager()) |