diff --git a/synapse/notifier.py b/synapse/notifier.py
index 385208b574..e650c3e494 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,34 +13,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+from collections import namedtuple
+
+from prometheus_client import Counter
+
from twisted.internet import defer
+
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
-
-from synapse.util import DeferredTimedOutError
+from synapse.metrics import LaterGauge
+from synapse.types import StreamToken
+from synapse.util.async import (
+ DeferredTimeoutError,
+ ObservableDeferred,
+ add_timeout_to_deferred,
+)
+from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.metrics import Measure
-from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
-import synapse.metrics
-
-from collections import namedtuple
-
-import logging
-
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
+notified_events_counter = Counter("synapse_notifier_notified_events", "")
-notified_events_counter = metrics.register_counter("notified_events")
-
-users_woken_by_stream_counter = metrics.register_counter(
- "users_woken_by_stream", labels=["stream"]
-)
+users_woken_by_stream_counter = Counter(
+ "synapse_notifier_users_woken_by_stream", "", ["stream"])
# TODO(paul): Should be shared somewhere
@@ -105,7 +105,7 @@ class _NotifierUserStream(object):
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
- users_woken_by_stream_counter.inc(stream_key)
+ users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
@@ -144,6 +144,7 @@ class _NotifierUserStream(object):
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
def __nonzero__(self):
return bool(self.events)
+ __bool__ = __nonzero__ # python3
class Notifier(object):
@@ -159,6 +160,7 @@ class Notifier(object):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
+ self.hs = hs
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
@@ -193,14 +195,14 @@ class Notifier(object):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
- metrics.register_callback("listeners", count_listeners)
+ LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
- metrics.register_callback(
- "rooms",
+ LaterGauge(
+ "synapse_notifier_rooms", "", [],
lambda: count(bool, self.room_to_user_streams.values()),
)
- metrics.register_callback(
- "users",
+ LaterGauge(
+ "synapse_notifier_users", "", [],
lambda: len(self.user_to_user_stream),
)
@@ -250,14 +252,10 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- preserve_fn(self.appservice_handler.notify_interested_services)(
- room_stream_id
- )
+ run_in_background(self._notify_app_services, room_stream_id)
if self.federation_sender:
- preserve_fn(self.federation_sender.notify_new_events)(
- room_stream_id
- )
+ self.federation_sender.notify_new_events(room_stream_id)
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id)
@@ -268,8 +266,15 @@ class Notifier(object):
rooms=[event.room_id],
)
+ @defer.inlineCallbacks
+ def _notify_app_services(self, room_stream_id):
+ try:
+ yield self.appservice_handler.notify_interested_services(room_stream_id)
+ except Exception:
+ logger.exception("Error notifying application services of event")
+
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
- """ Used to inform listeners that something has happend event wise.
+ """ Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
"""
@@ -289,7 +294,7 @@ class Notifier(object):
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
- except:
+ except Exception:
logger.exception("Failed to notify listener")
self.notify_replication()
@@ -297,8 +302,7 @@ class Notifier(object):
def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
- with PreserveLoggingContext():
- self.notify_replication()
+ self.notify_replication()
@defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
@@ -333,11 +337,13 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
+ add_timeout_to_deferred(
+ listener.deferred,
+ (end_time - now) / 1000.,
+ self.hs.get_reactor(),
+ )
with PreserveLoggingContext():
- yield self.clock.time_bound_deferred(
- listener.deferred,
- time_out=(end_time - now) / 1000.
- )
+ yield listener.deferred
current_token = user_stream.current_token
@@ -348,7 +354,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
- except DeferredTimedOutError:
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
@@ -516,8 +522,14 @@ class Notifier(object):
self.replication_deferred = ObservableDeferred(defer.Deferred())
deferred.callback(None)
- for cb in self.replication_callbacks:
- preserve_fn(cb)()
+ # the callbacks may well outlast the current request, so we run
+ # them in the sentinel logcontext.
+ #
+ # (ideally it would be up to the callbacks to know if they were
+ # starting off background processes and drop the logcontext
+ # accordingly, but that requires more changes)
+ for cb in self.replication_callbacks:
+ cb()
@defer.inlineCallbacks
def wait_for_replication(self, callback, timeout):
@@ -547,13 +559,15 @@ class Notifier(object):
if end_time <= now:
break
+ add_timeout_to_deferred(
+ listener.deferred.addTimeout,
+ (end_time - now) / 1000.,
+ self.hs.get_reactor(),
+ )
try:
with PreserveLoggingContext():
- yield self.clock.time_bound_deferred(
- listener.deferred,
- time_out=(end_time - now) / 1000.
- )
- except DeferredTimedOutError:
+ yield listener.deferred
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
|