summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py3
-rw-r--r--synapse/util/async.py50
-rw-r--r--synapse/util/caches/response_cache.py2
-rw-r--r--synapse/util/distributor.py22
-rw-r--r--synapse/util/ratelimitutils.py14
-rw-r--r--synapse/util/stringutils.py4
6 files changed, 71 insertions, 24 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 3b9da5b34a..b462495eb8 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -49,9 +49,6 @@ class Clock(object):
         l.start(msec / 1000.0, now=False)
         return l
 
-    def stop_looping_call(self, loop):
-        loop.stop()
-
     def call_later(self, delay, callback, *args, **kwargs):
         """Call something later
 
diff --git a/synapse/util/async.py b/synapse/util/async.py
index cd4d90f3cf..0d6f48e2d8 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -16,9 +16,13 @@
 
 from twisted.internet import defer, reactor
 
-from .logcontext import PreserveLoggingContext, preserve_fn
+from .logcontext import (
+    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
 from synapse.util import unwrapFirstError
 
+from contextlib import contextmanager
+
 
 @defer.inlineCallbacks
 def sleep(seconds):
@@ -137,3 +141,47 @@ def concurrently_execute(func, args, limit):
         preserve_fn(_concurrently_execute_inner)()
         for _ in xrange(limit)
     ], consumeErrors=True).addErrback(unwrapFirstError)
+
+
+class Linearizer(object):
+    """Linearizes access to resources based on a key. Useful to ensure only one
+    thing is happening at a time on a given resource.
+
+    Example:
+
+        with (yield linearizer.queue("test_key")):
+            # do some work.
+
+    """
+    def __init__(self):
+        self.key_to_defer = {}
+
+    @defer.inlineCallbacks
+    def queue(self, key):
+        # If there is already a deferred in the queue, we pull it out so that
+        # we can wait on it later.
+        # Then we replace it with a deferred that we resolve *after* the
+        # context manager has exited.
+        # We only return the context manager after the previous deferred has
+        # resolved.
+        # This all has the net effect of creating a chain of deferreds that
+        # wait for the previous deferred before starting their work.
+        current_defer = self.key_to_defer.get(key)
+
+        new_defer = defer.Deferred()
+        self.key_to_defer[key] = new_defer
+
+        if current_defer:
+            yield preserve_context_over_deferred(current_defer)
+
+        @contextmanager
+        def _ctx_manager():
+            try:
+                yield
+            finally:
+                new_defer.callback(None)
+                current_d = self.key_to_defer.get(key)
+                if current_d is new_defer:
+                    self.key_to_defer.pop(key, None)
+
+        defer.returnValue(_ctx_manager())
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index be310ba320..36686b479e 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -35,7 +35,7 @@ class ResponseCache(object):
             return None
 
     def set(self, key, deferred):
-        result = ObservableDeferred(deferred)
+        result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
 
         def remove(r):
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 8875813de4..d7cccc06b1 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -15,7 +15,9 @@
 
 from twisted.internet import defer
 
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import (
+    PreserveLoggingContext, preserve_context_over_fn
+)
 
 from synapse.util import unwrapFirstError
 
@@ -25,6 +27,24 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+def registered_user(distributor, user):
+    return distributor.fire("registered_user", user)
+
+
+def user_left_room(distributor, user, room_id):
+    return preserve_context_over_fn(
+        distributor.fire,
+        "user_left_room", user=user, room_id=room_id
+    )
+
+
+def user_joined_room(distributor, user, room_id):
+    return preserve_context_over_fn(
+        distributor.fire,
+        "user_joined_room", user=user, room_id=room_id
+    )
+
+
 class Distributor(object):
     """A central dispatch point for loosely-connected pieces of code to
     register, observe, and fire signals.
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 4076eed269..1101881a2d 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -100,20 +100,6 @@ class _PerHostRatelimiter(object):
         self.current_processing = set()
         self.request_times = []
 
-    def is_empty(self):
-        time_now = self.clock.time_msec()
-        self.request_times[:] = [
-            r for r in self.request_times
-            if time_now - r < self.window_size
-        ]
-
-        return not (
-            self.ready_request_queue
-            or self.sleeping_requests
-            or self.current_processing
-            or self.request_times
-        )
-
     @contextlib.contextmanager
     def ratelimit(self):
         # `contextlib.contextmanager` takes a generator and turns it into a
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index b490bb8725..a100f151d4 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -21,10 +21,6 @@ _string_with_symbols = (
 )
 
 
-def origin_from_ucid(ucid):
-    return ucid.split("@", 1)[1]
-
-
 def random_string(length):
     return ''.join(random.choice(string.ascii_letters) for _ in xrange(length))