summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py28
1 files changed, 15 insertions, 13 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6dce20a284..82f391481c 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,28 +13,27 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+from collections import namedtuple
+
+from prometheus_client import Counter
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
-
-from synapse.util.logutils import log_function
-from synapse.util.async import (
-    ObservableDeferred, add_timeout_to_deferred,
+from synapse.metrics import LaterGauge
+from synapse.types import StreamToken
+from synapse.util.async_helpers import (
     DeferredTimeoutError,
+    ObservableDeferred,
+    add_timeout_to_deferred,
 )
 from synapse.util.logcontext import PreserveLoggingContext, run_in_background
+from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
-from synapse.types import StreamToken
 from synapse.visibility import filter_events_for_client
-from synapse.metrics import LaterGauge
-
-from collections import namedtuple
-from prometheus_client import Counter
-
-import logging
-
 
 logger = logging.getLogger(__name__)
 
@@ -161,6 +160,7 @@ class Notifier(object):
         self.user_to_user_stream = {}
         self.room_to_user_streams = {}
 
+        self.hs = hs
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
         self.pending_new_room_events = []
@@ -274,7 +274,7 @@ class Notifier(object):
             logger.exception("Error notifying application services of event")
 
     def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
-        """ Used to inform listeners that something has happend event wise.
+        """ Used to inform listeners that something has happened event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
@@ -340,6 +340,7 @@ class Notifier(object):
                     add_timeout_to_deferred(
                         listener.deferred,
                         (end_time - now) / 1000.,
+                        self.hs.get_reactor(),
                     )
                     with PreserveLoggingContext():
                         yield listener.deferred
@@ -561,6 +562,7 @@ class Notifier(object):
             add_timeout_to_deferred(
                 listener.deferred.addTimeout,
                 (end_time - now) / 1000.,
+                self.hs.get_reactor(),
             )
             try:
                 with PreserveLoggingContext():