summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/4799.misc1
-rw-r--r--synapse/notifier.py65
2 files changed, 6 insertions, 60 deletions
diff --git a/changelog.d/4799.misc b/changelog.d/4799.misc
new file mode 100644
index 0000000000..5ab11a5c0b
--- /dev/null
+++ b/changelog.d/4799.misc
@@ -0,0 +1 @@
+Clean up some replication code.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index de02b1017e..ff589660da 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -178,8 +178,6 @@ class Notifier(object):
             self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
         )
 
-        self.replication_deferred = ObservableDeferred(defer.Deferred())
-
         # This is not a very cheap test to perform, but it's only executed
         # when rendering the metrics page, which is likely once per minute at
         # most when scraping it.
@@ -205,7 +203,9 @@ class Notifier(object):
 
     def add_replication_callback(self, cb):
         """Add a callback that will be called when some new data is available.
-        Callback is not given any arguments.
+        Callback is not given any arguments. It should *not* return a Deferred - if
+        it needs to do any asynchronous work, a background thread should be started and
+        wrapped with run_as_background_process.
         """
         self.replication_callbacks.append(cb)
 
@@ -517,60 +517,5 @@ class Notifier(object):
 
     def notify_replication(self):
         """Notify the any replication listeners that there's a new event"""
-        with PreserveLoggingContext():
-            deferred = self.replication_deferred
-            self.replication_deferred = ObservableDeferred(defer.Deferred())
-            deferred.callback(None)
-
-            # the callbacks may well outlast the current request, so we run
-            # them in the sentinel logcontext.
-            #
-            # (ideally it would be up to the callbacks to know if they were
-            # starting off background processes and drop the logcontext
-            # accordingly, but that requires more changes)
-            for cb in self.replication_callbacks:
-                cb()
-
-    @defer.inlineCallbacks
-    def wait_for_replication(self, callback, timeout):
-        """Wait for an event to happen.
-
-        Args:
-            callback: Gets called whenever an event happens. If this returns a
-                truthy value then ``wait_for_replication`` returns, otherwise
-                it waits for another event.
-            timeout: How many milliseconds to wait for callback return a truthy
-                value.
-
-        Returns:
-            A deferred that resolves with the value returned by the callback.
-        """
-        listener = _NotificationListener(None)
-
-        end_time = self.clock.time_msec() + timeout
-
-        while True:
-            listener.deferred = self.replication_deferred.observe()
-            result = yield callback()
-            if result:
-                break
-
-            now = self.clock.time_msec()
-            if end_time <= now:
-                break
-
-            listener.deferred = timeout_deferred(
-                listener.deferred,
-                timeout=(end_time - now) / 1000.,
-                reactor=self.hs.get_reactor(),
-            )
-
-            try:
-                with PreserveLoggingContext():
-                    yield listener.deferred
-            except defer.TimeoutError:
-                break
-            except defer.CancelledError:
-                break
-
-        defer.returnValue(result)
+        for cb in self.replication_callbacks:
+            cb()