diff --git a/synapse/notifier.py b/synapse/notifier.py
index ef042681bc..6dce20a284 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,33 +14,34 @@
# limitations under the License.
from twisted.internet import defer
+
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
-from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.async import (
+ ObservableDeferred, add_timeout_to_deferred,
+ DeferredTimeoutError,
+)
+from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.metrics import Measure
from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
-import synapse.metrics
+from synapse.metrics import LaterGauge
from collections import namedtuple
+from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-notified_events_counter = metrics.register_counter("notified_events")
+notified_events_counter = Counter("synapse_notifier_notified_events", "")
-users_woken_by_stream_counter = metrics.register_counter(
- "users_woken_by_stream", labels=["stream"]
-)
+users_woken_by_stream_counter = Counter(
+ "synapse_notifier_users_woken_by_stream", "", ["stream"])
# TODO(paul): Should be shared somewhere
@@ -105,7 +106,7 @@ class _NotifierUserStream(object):
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
- users_woken_by_stream_counter.inc(stream_key)
+ users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
@@ -144,6 +145,7 @@ class _NotifierUserStream(object):
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
def __nonzero__(self):
return bool(self.events)
+ __bool__ = __nonzero__ # python3
class Notifier(object):
@@ -193,14 +195,14 @@ class Notifier(object):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
- metrics.register_callback("listeners", count_listeners)
+ LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
- metrics.register_callback(
- "rooms",
+ LaterGauge(
+ "synapse_notifier_rooms", "", [],
lambda: count(bool, self.room_to_user_streams.values()),
)
- metrics.register_callback(
- "users",
+ LaterGauge(
+ "synapse_notifier_users", "", [],
lambda: len(self.user_to_user_stream),
)
@@ -250,9 +252,7 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- preserve_fn(self.appservice_handler.notify_interested_services)(
- room_stream_id
- )
+ run_in_background(self._notify_app_services, room_stream_id)
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)
@@ -266,6 +266,13 @@ class Notifier(object):
rooms=[event.room_id],
)
+ @defer.inlineCallbacks
+ def _notify_app_services(self, room_stream_id):
+ try:
+ yield self.appservice_handler.notify_interested_services(room_stream_id)
+ except Exception:
+ logger.exception("Error notifying application services of event")
+
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise.
@@ -330,11 +337,12 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
+ add_timeout_to_deferred(
+ listener.deferred,
+ (end_time - now) / 1000.,
+ )
with PreserveLoggingContext():
- yield self.clock.time_bound_deferred(
- listener.deferred,
- time_out=(end_time - now) / 1000.
- )
+ yield listener.deferred
current_token = user_stream.current_token
@@ -345,7 +353,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
- except DeferredTimedOutError:
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
@@ -550,13 +558,14 @@ class Notifier(object):
if end_time <= now:
break
+ add_timeout_to_deferred(
+ listener.deferred.addTimeout,
+ (end_time - now) / 1000.,
+ )
try:
with PreserveLoggingContext():
- yield self.clock.time_bound_deferred(
- listener.deferred,
- time_out=(end_time - now) / 1000.
- )
- except DeferredTimedOutError:
+ yield listener.deferred
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
|