diff --git a/synapse/notifier.py b/synapse/notifier.py
index 8051a7a842..8355c7d621 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,13 +14,17 @@
# limitations under the License.
from twisted.internet import defer
+
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
+from synapse.handlers.presence import format_user_presence_state
-from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.async import (
+ ObservableDeferred, add_timeout_to_deferred,
+ DeferredTimeoutError,
+)
+from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.metrics import Measure
from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
@@ -37,6 +41,10 @@ metrics = synapse.metrics.get_metrics_for(__name__)
notified_events_counter = metrics.register_counter("notified_events")
+users_woken_by_stream_counter = metrics.register_counter(
+ "users_woken_by_stream", labels=["stream"]
+)
+
# TODO(paul): Should be shared somewhere
def count(func, l):
@@ -73,6 +81,13 @@ class _NotifierUserStream(object):
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token
+
+ # The last token for which we should wake up any streams that have a
+ # token that comes before it. This gets updated everytime we get poked.
+ # We start it at the current token since if we get any streams
+ # that have a token from before we have no idea whether they should be
+ # woken up or not, so lets just wake them up.
+ self.last_notified_token = current_token
self.last_notified_ms = time_now_ms
with PreserveLoggingContext():
@@ -89,9 +104,12 @@ class _NotifierUserStream(object):
self.current_token = self.current_token.copy_and_advance(
stream_key, stream_id
)
+ self.last_notified_token = self.current_token
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
+ users_woken_by_stream_counter.inc(stream_key)
+
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
noify_deferred.callback(self.current_token)
@@ -113,8 +131,14 @@ class _NotifierUserStream(object):
def new_listener(self, token):
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
+
+ Args:
+ token: The token from which we are streaming from, i.e. we shouldn't
+ notify for things that happened before this.
"""
- if self.current_token.is_after(token):
+ # Immediately wake up stream if something has already since happened
+ # since their last token.
+ if self.last_notified_token.is_after(token):
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
@@ -123,6 +147,7 @@ class _NotifierUserStream(object):
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
def __nonzero__(self):
return bool(self.events)
+ __bool__ = __nonzero__ # python3
class Notifier(object):
@@ -142,6 +167,8 @@ class Notifier(object):
self.store = hs.get_datastore()
self.pending_new_room_events = []
+ self.replication_callbacks = []
+
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
@@ -181,7 +208,12 @@ class Notifier(object):
lambda: len(self.user_to_user_stream),
)
- @preserve_fn
+ def add_replication_callback(self, cb):
+ """Add a callback that will be called when some new data is available.
+ Callback is not given any arguments.
+ """
+ self.replication_callbacks.append(cb)
+
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]):
""" Used by handlers to inform the notifier something has happened
@@ -195,15 +227,13 @@ class Notifier(object):
until all previous events have been persisted before notifying
the client streams.
"""
- with PreserveLoggingContext():
- self.pending_new_room_events.append((
- room_stream_id, event, extra_users
- ))
- self._notify_pending_new_room_events(max_room_stream_id)
+ self.pending_new_room_events.append((
+ room_stream_id, event, extra_users
+ ))
+ self._notify_pending_new_room_events(max_room_stream_id)
- self.notify_replication()
+ self.notify_replication()
- @preserve_fn
def _notify_pending_new_room_events(self, max_room_stream_id):
"""Notify for the room events that were queued waiting for a previous
event to be persisted.
@@ -221,11 +251,10 @@ class Notifier(object):
else:
self._on_new_room_event(event, room_stream_id, extra_users)
- @preserve_fn
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- self.appservice_handler.notify_interested_services(room_stream_id)
+ run_in_background(self._notify_app_services, room_stream_id)
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)
@@ -239,7 +268,13 @@ class Notifier(object):
rooms=[event.room_id],
)
- @preserve_fn
+ @defer.inlineCallbacks
+ def _notify_app_services(self, room_stream_id):
+ try:
+ yield self.appservice_handler.notify_interested_services(room_stream_id)
+ except Exception:
+ logger.exception("Error notifying application services of event")
+
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise.
@@ -261,17 +296,15 @@ class Notifier(object):
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
- except:
+ except Exception:
logger.exception("Failed to notify listener")
self.notify_replication()
- @preserve_fn
def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
- with PreserveLoggingContext():
- self.notify_replication()
+ self.notify_replication()
@defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
@@ -283,8 +316,7 @@ class Notifier(object):
if user_stream is None:
current_token = yield self.event_sources.get_current_token()
if room_ids is None:
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = [room.room_id for room in rooms]
+ room_ids = yield self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
user_id=user_id,
rooms=room_ids,
@@ -294,40 +326,45 @@ class Notifier(object):
self._register_with_keys(user_stream)
result = None
+ prev_token = from_token
if timeout:
end_time = self.clock.time_msec() + timeout
- prev_token = from_token
while not result:
try:
- current_token = user_stream.current_token
-
- result = yield callback(prev_token, current_token)
- if result:
- break
-
now = self.clock.time_msec()
if end_time <= now:
break
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
- # We need to supply the token we supplied to callback so
- # that we don't miss any current_token updates.
- prev_token = current_token
listener = user_stream.new_listener(prev_token)
+ add_timeout_to_deferred(
+ listener.deferred,
+ (end_time - now) / 1000.,
+ )
with PreserveLoggingContext():
- yield self.clock.time_bound_deferred(
- listener.deferred,
- time_out=(end_time - now) / 1000.
- )
- except DeferredTimedOutError:
+ yield listener.deferred
+
+ current_token = user_stream.current_token
+
+ result = yield callback(prev_token, current_token)
+ if result:
+ break
+
+ # Update the prev_token to the current_token since nothing
+ # has happened between the old prev_token and the current_token
+ prev_token = current_token
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
- else:
+
+ if result is None:
+ # This happened if there was no timeout or if the timeout had
+ # already expired.
current_token = user_stream.current_token
- result = yield callback(from_token, current_token)
+ result = yield callback(prev_token, current_token)
defer.returnValue(result)
@@ -388,6 +425,15 @@ class Notifier(object):
new_events,
is_peeking=is_peeking,
)
+ elif name == "presence":
+ now = self.clock.time_msec()
+ new_events[:] = [
+ {
+ "type": "m.presence",
+ "content": format_user_presence_state(event, now),
+ }
+ for event in new_events
+ ]
events.extend(new_events)
end_token = end_token.copy_and_replace(keyname, new_key)
@@ -420,8 +466,7 @@ class Notifier(object):
@defer.inlineCallbacks
def _get_room_ids(self, user, explicit_room_id):
- joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
- joined_room_ids = map(lambda r: r.room_id, joined_rooms)
+ joined_room_ids = yield self.store.get_rooms_for_user(user.to_string())
if explicit_room_id:
if explicit_room_id in joined_room_ids:
defer.returnValue(([explicit_room_id], True))
@@ -478,6 +523,15 @@ class Notifier(object):
self.replication_deferred = ObservableDeferred(defer.Deferred())
deferred.callback(None)
+ # the callbacks may well outlast the current request, so we run
+ # them in the sentinel logcontext.
+ #
+ # (ideally it would be up to the callbacks to know if they were
+ # starting off background processes and drop the logcontext
+ # accordingly, but that requires more changes)
+ for cb in self.replication_callbacks:
+ cb()
+
@defer.inlineCallbacks
def wait_for_replication(self, callback, timeout):
"""Wait for an event to happen.
@@ -506,13 +560,14 @@ class Notifier(object):
if end_time <= now:
break
+ add_timeout_to_deferred(
+ listener.deferred.addTimeout,
+ (end_time - now) / 1000.,
+ )
try:
with PreserveLoggingContext():
- yield self.clock.time_bound_deferred(
- listener.deferred,
- time_out=(end_time - now) / 1000.
- )
- except DeferredTimedOutError:
+ yield listener.deferred
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
|