summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py45
1 files changed, 43 insertions, 2 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1374aae490..d35c1f3f02 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -39,6 +39,7 @@ from synapse.api.errors import AuthError
 from synapse.events import EventBase
 from synapse.handlers.presence import format_user_presence_state
 from synapse.logging.context import PreserveLoggingContext
+from synapse.logging.opentracing import log_kv, start_active_span
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.streams.config import PaginationConfig
@@ -136,6 +137,15 @@ class _NotifierUserStream:
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
 
+        log_kv(
+            {
+                "notify": self.user_id,
+                "stream": stream_key,
+                "stream_id": stream_id,
+                "listeners": self.count_listeners(),
+            }
+        )
+
         users_woken_by_stream_counter.labels(stream_key).inc()
 
         with PreserveLoggingContext():
@@ -404,6 +414,13 @@ class Notifier:
         with Measure(self.clock, "on_new_event"):
             user_streams = set()
 
+            log_kv(
+                {
+                    "waking_up_explicit_users": len(users),
+                    "waking_up_explicit_rooms": len(rooms),
+                }
+            )
+
             for user in users:
                 user_stream = self.user_to_user_stream.get(str(user))
                 if user_stream is not None:
@@ -476,12 +493,34 @@ class Notifier:
                         (end_time - now) / 1000.0,
                         self.hs.get_reactor(),
                     )
-                    with PreserveLoggingContext():
-                        await listener.deferred
+
+                    with start_active_span("wait_for_events.deferred"):
+                        log_kv(
+                            {
+                                "wait_for_events": "sleep",
+                                "token": prev_token,
+                            }
+                        )
+
+                        with PreserveLoggingContext():
+                            await listener.deferred
+
+                        log_kv(
+                            {
+                                "wait_for_events": "woken",
+                                "token": user_stream.current_token,
+                            }
+                        )
 
                     current_token = user_stream.current_token
 
                     result = await callback(prev_token, current_token)
+                    log_kv(
+                        {
+                            "wait_for_events": "result",
+                            "result": bool(result),
+                        }
+                    )
                     if result:
                         break
 
@@ -489,8 +528,10 @@ class Notifier:
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
                 except defer.TimeoutError:
+                    log_kv({"wait_for_events": "timeout"})
                     break
                 except defer.CancelledError:
+                    log_kv({"wait_for_events": "cancelled"})
                     break
 
         if result is None: