summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/notifier.py67
1 files changed, 38 insertions, 29 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index ef042681bc..6dce20a284 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,33 +14,34 @@
 # limitations under the License.
 
 from twisted.internet import defer
+
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 
-from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.async import (
+    ObservableDeferred, add_timeout_to_deferred,
+    DeferredTimeoutError,
+)
+from synapse.util.logcontext import PreserveLoggingContext, run_in_background
 from synapse.util.metrics import Measure
 from synapse.types import StreamToken
 from synapse.visibility import filter_events_for_client
-import synapse.metrics
+from synapse.metrics import LaterGauge
 
 from collections import namedtuple
+from prometheus_client import Counter
 
 import logging
 
 
 logger = logging.getLogger(__name__)
 
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-notified_events_counter = metrics.register_counter("notified_events")
+notified_events_counter = Counter("synapse_notifier_notified_events", "")
 
-users_woken_by_stream_counter = metrics.register_counter(
-    "users_woken_by_stream", labels=["stream"]
-)
+users_woken_by_stream_counter = Counter(
+    "synapse_notifier_users_woken_by_stream", "", ["stream"])
 
 
 # TODO(paul): Should be shared somewhere
@@ -105,7 +106,7 @@ class _NotifierUserStream(object):
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
 
-        users_woken_by_stream_counter.inc(stream_key)
+        users_woken_by_stream_counter.labels(stream_key).inc()
 
         with PreserveLoggingContext():
             self.notify_deferred = ObservableDeferred(defer.Deferred())
@@ -144,6 +145,7 @@ class _NotifierUserStream(object):
 class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
     def __nonzero__(self):
         return bool(self.events)
+    __bool__ = __nonzero__  # python3
 
 
 class Notifier(object):
@@ -193,14 +195,14 @@ class Notifier(object):
                 all_user_streams.add(x)
 
             return sum(stream.count_listeners() for stream in all_user_streams)
-        metrics.register_callback("listeners", count_listeners)
+        LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
 
-        metrics.register_callback(
-            "rooms",
+        LaterGauge(
+            "synapse_notifier_rooms", "", [],
             lambda: count(bool, self.room_to_user_streams.values()),
         )
-        metrics.register_callback(
-            "users",
+        LaterGauge(
+            "synapse_notifier_users", "", [],
             lambda: len(self.user_to_user_stream),
         )
 
@@ -250,9 +252,7 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        preserve_fn(self.appservice_handler.notify_interested_services)(
-            room_stream_id
-        )
+        run_in_background(self._notify_app_services, room_stream_id)
 
         if self.federation_sender:
             self.federation_sender.notify_new_events(room_stream_id)
@@ -266,6 +266,13 @@ class Notifier(object):
             rooms=[event.room_id],
         )
 
+    @defer.inlineCallbacks
+    def _notify_app_services(self, room_stream_id):
+        try:
+            yield self.appservice_handler.notify_interested_services(room_stream_id)
+        except Exception:
+            logger.exception("Error notifying application services of event")
+
     def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
         """ Used to inform listeners that something has happend event wise.
 
@@ -330,11 +337,12 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
+                    add_timeout_to_deferred(
+                        listener.deferred,
+                        (end_time - now) / 1000.,
+                    )
                     with PreserveLoggingContext():
-                        yield self.clock.time_bound_deferred(
-                            listener.deferred,
-                            time_out=(end_time - now) / 1000.
-                        )
+                        yield listener.deferred
 
                     current_token = user_stream.current_token
 
@@ -345,7 +353,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except DeferredTimedOutError:
+                except DeferredTimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -550,13 +558,14 @@ class Notifier(object):
             if end_time <= now:
                 break
 
+            add_timeout_to_deferred(
+                listener.deferred.addTimeout,
+                (end_time - now) / 1000.,
+            )
             try:
                 with PreserveLoggingContext():
-                    yield self.clock.time_bound_deferred(
-                        listener.deferred,
-                        time_out=(end_time - now) / 1000.
-                    )
-            except DeferredTimedOutError:
+                    yield listener.deferred
+            except DeferredTimeoutError:
                 break
             except defer.CancelledError:
                 break