diff --git a/synapse/notifier.py b/synapse/notifier.py
index 7a2b54036c..6190432b87 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -41,6 +41,7 @@ import attr
from prometheus_client import Counter
from twisted.internet import defer
+from twisted.internet.defer import Deferred
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
@@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
+ ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
@@ -61,8 +63,10 @@ from synapse.types import (
StreamToken,
UserID,
)
-from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
-from synapse.util.metrics import Measure
+from synapse.util.async_helpers import (
+ timeout_deferred,
+)
+from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -89,18 +93,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
return n
-class _NotificationListener:
- """This represents a single client connection to the events stream.
- The events stream handler will have yielded to the deferred, so to
- notify the handler it is sufficient to resolve the deferred.
- """
-
- __slots__ = ["deferred"]
-
- def __init__(self, deferred: "defer.Deferred"):
- self.deferred = deferred
-
-
class _NotifierUserStream:
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
@@ -113,59 +105,49 @@ class _NotifierUserStream:
def __init__(
self,
+ reactor: ISynapseReactor,
user_id: str,
rooms: StrCollection,
current_token: StreamToken,
time_now_ms: int,
):
+ self.reactor = reactor
self.user_id = user_id
self.rooms = set(rooms)
- self.current_token = current_token
# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
- self.last_notified_token = current_token
+ self.current_token = current_token
self.last_notified_ms = time_now_ms
- self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
- defer.Deferred()
- )
+ # Set of listeners that we need to wake up when there has been a change.
+ self.listeners: Set[Deferred[StreamToken]] = set()
- def notify(
+ def update_and_fetch_deferreds(
self,
- stream_key: StreamKeyType,
- stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
+ current_token: StreamToken,
time_now_ms: int,
- ) -> None:
- """Notify any listeners for this user of a new event from an
- event source.
+ ) -> Collection["Deferred[StreamToken]"]:
+ """Update the stream for this user because of a new event from an
+ event source, and return the set of deferreds to wake up.
+
Args:
- stream_key: The stream the event came from.
- stream_id: The new id for the stream the event came from.
+ current_token: The new current token.
time_now_ms: The current time in milliseconds.
+
+ Returns:
+ The set of deferreds that need to be called.
"""
- self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
- self.last_notified_token = self.current_token
+ self.current_token = current_token
self.last_notified_ms = time_now_ms
- notify_deferred = self.notify_deferred
-
- log_kv(
- {
- "notify": self.user_id,
- "stream": stream_key,
- "stream_id": stream_id,
- "listeners": self.count_listeners(),
- }
- )
- users_woken_by_stream_counter.labels(stream_key).inc()
+ listeners = self.listeners
+ self.listeners = set()
- with PreserveLoggingContext():
- self.notify_deferred = ObservableDeferred(defer.Deferred())
- notify_deferred.callback(self.current_token)
+ return listeners
def remove(self, notifier: "Notifier") -> None:
"""Remove this listener from all the indexes in the Notifier
@@ -176,12 +158,15 @@ class _NotifierUserStream:
lst = notifier.room_to_user_streams.get(room, set())
lst.discard(self)
+ if not lst:
+ notifier.room_to_user_streams.pop(room, None)
+
notifier.user_to_user_stream.pop(self.user_id)
def count_listeners(self) -> int:
- return len(self.notify_deferred.observers())
+ return len(self.listeners)
- def new_listener(self, token: StreamToken) -> _NotificationListener:
+ def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
@@ -191,10 +176,17 @@ class _NotifierUserStream:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
- if self.last_notified_token != token:
- return _NotificationListener(defer.succeed(self.current_token))
- else:
- return _NotificationListener(self.notify_deferred.observe())
+ if token != self.current_token:
+ return defer.succeed(self.current_token)
+
+ # Create a new deferred and add it to the set of listeners. We add a
+ # cancel handler to remove it from the set again, to handle timeouts.
+ deferred: "Deferred[StreamToken]" = Deferred(
+ canceller=lambda d: self.listeners.discard(d)
+ )
+ self.listeners.add(deferred)
+
+ return deferred
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -247,6 +239,7 @@ class Notifier:
# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
+ self.reactor = hs.get_reactor()
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
@@ -342,14 +335,25 @@ class Notifier:
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
+ current_token = self.event_sources.get_current_token()
+
+ listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
- user_stream.notify(
- StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
+ listeners.extend(
+ user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")
+ with PreserveLoggingContext():
+ for listener in listeners:
+ listener.callback(current_token)
+
+ users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
+ len(user_streams)
+ )
+
# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
@@ -518,16 +522,22 @@ class Notifier:
users = users or []
rooms = rooms or []
- with Measure(self.clock, "on_new_event"):
- user_streams = set()
+ user_streams: Set[_NotifierUserStream] = set()
- log_kv(
- {
- "waking_up_explicit_users": len(users),
- "waking_up_explicit_rooms": len(rooms),
- }
- )
+ log_kv(
+ {
+ "waking_up_explicit_users": len(users),
+ "waking_up_explicit_rooms": len(rooms),
+ "users": shortstr(users),
+ "rooms": shortstr(rooms),
+ "stream": stream_key,
+ "stream_id": new_token,
+ }
+ )
+ # Only calculate which user streams to wake up if there are, in fact,
+ # any user streams registered.
+ if self.user_to_user_stream or self.room_to_user_streams:
for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
@@ -544,25 +554,40 @@ class Notifier:
)
time_now_ms = self.clock.time_msec()
+ current_token = self.event_sources.get_current_token()
+ listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
- user_stream.notify(stream_key, new_token, time_now_ms)
+ listeners.extend(
+ user_stream.update_and_fetch_deferreds(
+ current_token, time_now_ms
+ )
+ )
except Exception:
logger.exception("Failed to notify listener")
- self.notify_replication()
+ # We resolve all these deferreds in one go so that we only need to
+ # call `PreserveLoggingContext` once, as it has a bunch of overhead
+ # (to calculate performance stats)
+ if listeners:
+ with PreserveLoggingContext():
+ for listener in listeners:
+ listener.callback(current_token)
- # Notify appservices.
- try:
- self.appservice_handler.notify_interested_services_ephemeral(
- stream_key,
- new_token,
- users,
- )
- except Exception:
- logger.exception(
- "Error notifying application services of ephemeral events"
- )
+ if user_streams:
+ users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
+
+ self.notify_replication()
+
+ # Notify appservices.
+ try:
+ self.appservice_handler.notify_interested_services_ephemeral(
+ stream_key,
+ new_token,
+ users,
+ )
+ except Exception:
+ logger.exception("Error notifying application services of ephemeral events")
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened
@@ -586,6 +611,7 @@ class Notifier:
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
+ reactor=self.reactor,
user_id=user_id,
rooms=room_ids,
current_token=current_token,
@@ -608,8 +634,8 @@ class Notifier:
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
- listener.deferred = timeout_deferred(
- listener.deferred,
+ listener = timeout_deferred(
+ listener,
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
@@ -622,7 +648,7 @@ class Notifier:
)
with PreserveLoggingContext():
- await listener.deferred
+ await listener
log_kv(
{
|