diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6dce20a284..51cbd66f06 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,28 +13,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+from collections import namedtuple
+
+from prometheus_client import Counter
+
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
-
-from synapse.util.logutils import log_function
+from synapse.metrics import LaterGauge
+from synapse.types import StreamToken
from synapse.util.async import (
- ObservableDeferred, add_timeout_to_deferred,
DeferredTimeoutError,
+ ObservableDeferred,
+ add_timeout_to_deferred,
)
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
+from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
-from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
-from synapse.metrics import LaterGauge
-
-from collections import namedtuple
-from prometheus_client import Counter
-
-import logging
-
logger = logging.getLogger(__name__)
@@ -161,6 +160,7 @@ class Notifier(object):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
+ self.hs = hs
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
@@ -340,6 +340,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
+ self.hs.get_reactor(),
)
with PreserveLoggingContext():
yield listener.deferred
@@ -561,6 +562,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
+ self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():
|