diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/notifier.py | 45 |
1 files changed, 43 insertions, 2 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 1374aae490..d35c1f3f02 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -39,6 +39,7 @@ from synapse.api.errors import AuthError from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import PreserveLoggingContext +from synapse.logging.opentracing import log_kv, start_active_span from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig @@ -136,6 +137,15 @@ class _NotifierUserStream: self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred + log_kv( + { + "notify": self.user_id, + "stream": stream_key, + "stream_id": stream_id, + "listeners": self.count_listeners(), + } + ) + users_woken_by_stream_counter.labels(stream_key).inc() with PreserveLoggingContext(): @@ -404,6 +414,13 @@ class Notifier: with Measure(self.clock, "on_new_event"): user_streams = set() + log_kv( + { + "waking_up_explicit_users": len(users), + "waking_up_explicit_rooms": len(rooms), + } + ) + for user in users: user_stream = self.user_to_user_stream.get(str(user)) if user_stream is not None: @@ -476,12 +493,34 @@ class Notifier: (end_time - now) / 1000.0, self.hs.get_reactor(), ) - with PreserveLoggingContext(): - await listener.deferred + + with start_active_span("wait_for_events.deferred"): + log_kv( + { + "wait_for_events": "sleep", + "token": prev_token, + } + ) + + with PreserveLoggingContext(): + await listener.deferred + + log_kv( + { + "wait_for_events": "woken", + "token": user_stream.current_token, + } + ) current_token = user_stream.current_token result = await callback(prev_token, current_token) + log_kv( + { + "wait_for_events": "result", + "result": bool(result), + } + ) if result: break @@ -489,8 +528,10 @@ class Notifier: # has happened between the old prev_token and the current_token prev_token = current_token except defer.TimeoutError: + log_kv({"wait_for_events": "timeout"}) break except defer.CancelledError: + log_kv({"wait_for_events": "cancelled"}) break if result is None: |