diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1374aae490..7ce34380af 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -39,6 +39,7 @@ from synapse.api.errors import AuthError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import PreserveLoggingContext
+from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
@@ -136,6 +137,15 @@ class _NotifierUserStream:
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
+ log_kv(
+ {
+ "notify": self.user_id,
+ "stream": stream_key,
+ "stream_id": stream_id,
+ "listeners": self.count_listeners(),
+ }
+ )
+
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
@@ -266,7 +276,7 @@ class Notifier:
event: EventBase,
event_pos: PersistedEventPosition,
max_room_stream_token: RoomStreamToken,
- extra_users: Collection[UserID] = [],
+ extra_users: Optional[Collection[UserID]] = None,
):
"""Unwraps event and calls `on_new_room_event_args`."""
self.on_new_room_event_args(
@@ -276,7 +286,7 @@ class Notifier:
state_key=event.get("state_key"),
membership=event.content.get("membership"),
max_room_stream_token=max_room_stream_token,
- extra_users=extra_users,
+ extra_users=extra_users or [],
)
def on_new_room_event_args(
@@ -287,7 +297,7 @@ class Notifier:
membership: Optional[str],
event_pos: PersistedEventPosition,
max_room_stream_token: RoomStreamToken,
- extra_users: Collection[UserID] = [],
+ extra_users: Optional[Collection[UserID]] = None,
):
"""Used by handlers to inform the notifier something has happened
in the room, room event wise.
@@ -303,7 +313,7 @@ class Notifier:
self.pending_new_room_events.append(
_PendingRoomEventEntry(
event_pos=event_pos,
- extra_users=extra_users,
+ extra_users=extra_users or [],
room_id=room_id,
type=event_type,
state_key=state_key,
@@ -372,14 +382,14 @@ class Notifier:
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
- users: Collection[Union[str, UserID]] = [],
+ users: Optional[Collection[Union[str, UserID]]] = None,
):
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
self.appservice_handler.notify_interested_services_ephemeral(
- stream_key, stream_token, users
+ stream_key, stream_token, users or []
)
except Exception:
logger.exception("Error notifying application services of event")
@@ -394,16 +404,26 @@ class Notifier:
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
- users: Collection[Union[str, UserID]] = [],
- rooms: Collection[str] = [],
+ users: Optional[Collection[Union[str, UserID]]] = None,
+ rooms: Optional[Collection[str]] = None,
):
"""Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
"""
+ users = users or []
+ rooms = rooms or []
+
with Measure(self.clock, "on_new_event"):
user_streams = set()
+ log_kv(
+ {
+ "waking_up_explicit_users": len(users),
+ "waking_up_explicit_rooms": len(rooms),
+ }
+ )
+
for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
@@ -476,12 +496,34 @@ class Notifier:
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
- with PreserveLoggingContext():
- await listener.deferred
+
+ with start_active_span("wait_for_events.deferred"):
+ log_kv(
+ {
+ "wait_for_events": "sleep",
+ "token": prev_token,
+ }
+ )
+
+ with PreserveLoggingContext():
+ await listener.deferred
+
+ log_kv(
+ {
+ "wait_for_events": "woken",
+ "token": user_stream.current_token,
+ }
+ )
current_token = user_stream.current_token
result = await callback(prev_token, current_token)
+ log_kv(
+ {
+ "wait_for_events": "result",
+ "result": bool(result),
+ }
+ )
if result:
break
@@ -489,8 +531,10 @@ class Notifier:
# has happened between the old prev_token and the current_token
prev_token = current_token
except defer.TimeoutError:
+ log_kv({"wait_for_events": "timeout"})
break
except defer.CancelledError:
+ log_kv({"wait_for_events": "cancelled"})
break
if result is None:
@@ -507,7 +551,7 @@ class Notifier:
pagination_config: PaginationConfig,
timeout: int,
is_guest: bool = False,
- explicit_room_id: str = None,
+ explicit_room_id: Optional[str] = None,
) -> EventStreamResult:
"""For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
|