summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/notifier.py176
1 files changed, 101 insertions, 75 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py

index 7a2b54036c..6190432b87 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py
@@ -41,6 +41,7 @@ import attr from prometheus_client import Counter from twisted.internet import defer +from twisted.internet.defer import Deferred from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership from synapse.api.errors import AuthError @@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig from synapse.types import ( + ISynapseReactor, JsonDict, MultiWriterStreamToken, PersistedEventPosition, @@ -61,8 +63,10 @@ from synapse.types import ( StreamToken, UserID, ) -from synapse.util.async_helpers import ObservableDeferred, timeout_deferred -from synapse.util.metrics import Measure +from synapse.util.async_helpers import ( + timeout_deferred, +) +from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -89,18 +93,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int: return n -class _NotificationListener: - """This represents a single client connection to the events stream. - The events stream handler will have yielded to the deferred, so to - notify the handler it is sufficient to resolve the deferred. - """ - - __slots__ = ["deferred"] - - def __init__(self, deferred: "defer.Deferred"): - self.deferred = deferred - - class _NotifierUserStream: """This represents a user connected to the event stream. It tracks the most recent stream token for that user. @@ -113,59 +105,49 @@ class _NotifierUserStream: def __init__( self, + reactor: ISynapseReactor, user_id: str, rooms: StrCollection, current_token: StreamToken, time_now_ms: int, ): + self.reactor = reactor self.user_id = user_id self.rooms = set(rooms) - self.current_token = current_token # The last token for which we should wake up any streams that have a # token that comes before it. This gets updated every time we get poked. # We start it at the current token since if we get any streams # that have a token from before we have no idea whether they should be # woken up or not, so lets just wake them up. - self.last_notified_token = current_token + self.current_token = current_token self.last_notified_ms = time_now_ms - self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred( - defer.Deferred() - ) + # Set of listeners that we need to wake up when there has been a change. + self.listeners: Set[Deferred[StreamToken]] = set() - def notify( + def update_and_fetch_deferreds( self, - stream_key: StreamKeyType, - stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken], + current_token: StreamToken, time_now_ms: int, - ) -> None: - """Notify any listeners for this user of a new event from an - event source. + ) -> Collection["Deferred[StreamToken]"]: + """Update the stream for this user because of a new event from an + event source, and return the set of deferreds to wake up. + Args: - stream_key: The stream the event came from. - stream_id: The new id for the stream the event came from. + current_token: The new current token. time_now_ms: The current time in milliseconds. + + Returns: + The set of deferreds that need to be called. """ - self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) - self.last_notified_token = self.current_token + self.current_token = current_token self.last_notified_ms = time_now_ms - notify_deferred = self.notify_deferred - - log_kv( - { - "notify": self.user_id, - "stream": stream_key, - "stream_id": stream_id, - "listeners": self.count_listeners(), - } - ) - users_woken_by_stream_counter.labels(stream_key).inc() + listeners = self.listeners + self.listeners = set() - with PreserveLoggingContext(): - self.notify_deferred = ObservableDeferred(defer.Deferred()) - notify_deferred.callback(self.current_token) + return listeners def remove(self, notifier: "Notifier") -> None: """Remove this listener from all the indexes in the Notifier @@ -176,12 +158,15 @@ class _NotifierUserStream: lst = notifier.room_to_user_streams.get(room, set()) lst.discard(self) + if not lst: + notifier.room_to_user_streams.pop(room, None) + notifier.user_to_user_stream.pop(self.user_id) def count_listeners(self) -> int: - return len(self.notify_deferred.observers()) + return len(self.listeners) - def new_listener(self, token: StreamToken) -> _NotificationListener: + def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]": """Returns a deferred that is resolved when there is a new token greater than the given token. @@ -191,10 +176,17 @@ class _NotifierUserStream: """ # Immediately wake up stream if something has already since happened # since their last token. - if self.last_notified_token != token: - return _NotificationListener(defer.succeed(self.current_token)) - else: - return _NotificationListener(self.notify_deferred.observe()) + if token != self.current_token: + return defer.succeed(self.current_token) + + # Create a new deferred and add it to the set of listeners. We add a + # cancel handler to remove it from the set again, to handle timeouts. + deferred: "Deferred[StreamToken]" = Deferred( + canceller=lambda d: self.listeners.discard(d) + ) + self.listeners.add(deferred) + + return deferred @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -247,6 +239,7 @@ class Notifier: # List of callbacks to be notified when a lock is released self._lock_released_callback: List[Callable[[str, str, str], None]] = [] + self.reactor = hs.get_reactor() self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() self._pusher_pool = hs.get_pusherpool() @@ -342,14 +335,25 @@ class Notifier: # Wake up all related user stream notifiers user_streams = self.room_to_user_streams.get(room_id, set()) time_now_ms = self.clock.time_msec() + current_token = self.event_sources.get_current_token() + + listeners: List["Deferred[StreamToken]"] = [] for user_stream in user_streams: try: - user_stream.notify( - StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms + listeners.extend( + user_stream.update_and_fetch_deferreds(current_token, time_now_ms) ) except Exception: logger.exception("Failed to notify listener") + with PreserveLoggingContext(): + for listener in listeners: + listener.callback(current_token) + + users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc( + len(user_streams) + ) + # Poke the replication so that other workers also see the write to # the un-partial-stated rooms stream. self.notify_replication() @@ -518,16 +522,22 @@ class Notifier: users = users or [] rooms = rooms or [] - with Measure(self.clock, "on_new_event"): - user_streams = set() + user_streams: Set[_NotifierUserStream] = set() - log_kv( - { - "waking_up_explicit_users": len(users), - "waking_up_explicit_rooms": len(rooms), - } - ) + log_kv( + { + "waking_up_explicit_users": len(users), + "waking_up_explicit_rooms": len(rooms), + "users": shortstr(users), + "rooms": shortstr(rooms), + "stream": stream_key, + "stream_id": new_token, + } + ) + # Only calculate which user streams to wake up if there are, in fact, + # any user streams registered. + if self.user_to_user_stream or self.room_to_user_streams: for user in users: user_stream = self.user_to_user_stream.get(str(user)) if user_stream is not None: @@ -544,25 +554,40 @@ class Notifier: ) time_now_ms = self.clock.time_msec() + current_token = self.event_sources.get_current_token() + listeners: List["Deferred[StreamToken]"] = [] for user_stream in user_streams: try: - user_stream.notify(stream_key, new_token, time_now_ms) + listeners.extend( + user_stream.update_and_fetch_deferreds( + current_token, time_now_ms + ) + ) except Exception: logger.exception("Failed to notify listener") - self.notify_replication() + # We resolve all these deferreds in one go so that we only need to + # call `PreserveLoggingContext` once, as it has a bunch of overhead + # (to calculate performance stats) + if listeners: + with PreserveLoggingContext(): + for listener in listeners: + listener.callback(current_token) - # Notify appservices. - try: - self.appservice_handler.notify_interested_services_ephemeral( - stream_key, - new_token, - users, - ) - except Exception: - logger.exception( - "Error notifying application services of ephemeral events" - ) + if user_streams: + users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams)) + + self.notify_replication() + + # Notify appservices. + try: + self.appservice_handler.notify_interested_services_ephemeral( + stream_key, + new_token, + users, + ) + except Exception: + logger.exception("Error notifying application services of ephemeral events") def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happened @@ -586,6 +611,7 @@ class Notifier: if room_ids is None: room_ids = await self.store.get_rooms_for_user(user_id) user_stream = _NotifierUserStream( + reactor=self.reactor, user_id=user_id, rooms=room_ids, current_token=current_token, @@ -608,8 +634,8 @@ class Notifier: # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - listener.deferred = timeout_deferred( - listener.deferred, + listener = timeout_deferred( + listener, (end_time - now) / 1000.0, self.hs.get_reactor(), ) @@ -622,7 +648,7 @@ class Notifier: ) with PreserveLoggingContext(): - await listener.deferred + await listener log_kv( {