diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 6dce20a284..82f391481c 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -13,28 +13,27 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +from collections import namedtuple + +from prometheus_client import Counter + from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state - -from synapse.util.logutils import log_function -from synapse.util.async import ( - ObservableDeferred, add_timeout_to_deferred, +from synapse.metrics import LaterGauge +from synapse.types import StreamToken +from synapse.util.async_helpers import ( DeferredTimeoutError, + ObservableDeferred, + add_timeout_to_deferred, ) from synapse.util.logcontext import PreserveLoggingContext, run_in_background +from synapse.util.logutils import log_function from synapse.util.metrics import Measure -from synapse.types import StreamToken from synapse.visibility import filter_events_for_client -from synapse.metrics import LaterGauge - -from collections import namedtuple -from prometheus_client import Counter - -import logging - logger = logging.getLogger(__name__) @@ -161,6 +160,7 @@ class Notifier(object): self.user_to_user_stream = {} self.room_to_user_streams = {} + self.hs = hs self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() self.pending_new_room_events = [] @@ -274,7 +274,7 @@ class Notifier(object): logger.exception("Error notifying application services of event") def on_new_event(self, stream_key, new_token, users=[], rooms=[]): - """ Used to inform listeners that something has happend event wise. + """ Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. """ @@ -340,6 +340,7 @@ class Notifier(object): add_timeout_to_deferred( listener.deferred, (end_time - now) / 1000., + self.hs.get_reactor(), ) with PreserveLoggingContext(): yield listener.deferred @@ -561,6 +562,7 @@ class Notifier(object): add_timeout_to_deferred( listener.deferred.addTimeout, (end_time - now) / 1000., + self.hs.get_reactor(), ) try: with PreserveLoggingContext(): |