diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index ddc5c21e7d..833ff41377 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -105,7 +105,9 @@ class BaseHandler(object):
if not suppress_auth:
self.auth.check(event, auth_events=context.current_state)
- yield self.store.persist_event(event, context=context)
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
+ )
federation_handler = self.hs.get_handlers().federation_handler
@@ -142,7 +144,8 @@ class BaseHandler(object):
with PreserveLoggingContext():
# Don't block waiting on waking up all the listeners.
notify_d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
)
def log_failure(f):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 880cbd77e7..d35d9f603c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -160,7 +160,7 @@ class FederationHandler(BaseHandler):
)
try:
- yield self._handle_new_event(
+ _, event_stream_id, max_stream_id = yield self._handle_new_event(
origin,
event,
state=state,
@@ -203,7 +203,8 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
)
def log_failure(f):
@@ -563,7 +564,7 @@ class FederationHandler(BaseHandler):
if e.event_id in auth_ids
}
- yield self._handle_new_event(
+ _, event_stream_id, max_stream_id = yield self._handle_new_event(
origin,
new_event,
state=state,
@@ -573,7 +574,8 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- new_event, extra_users=[joinee]
+ new_event, event_stream_id, max_stream_id,
+ extra_users=[joinee]
)
def log_failure(f):
@@ -639,7 +641,9 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- context = yield self._handle_new_event(origin, event)
+ context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ origin, event
+ )
logger.debug(
"on_send_join_request: After _handle_new_event: %s, sigs: %s",
@@ -655,7 +659,7 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
+ event, event_stream_id, max_stream_id, extra_users=extra_users
)
def log_failure(f):
@@ -729,7 +733,7 @@ class FederationHandler(BaseHandler):
context = yield self.state_handler.compute_event_context(event)
- yield self.store.persist_event(
+ event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=False,
@@ -738,7 +742,8 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(event.state_key)
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- event, extra_users=[target_user],
+ event, event_stream_id, max_stream_id,
+ extra_users=[target_user],
)
def log_failure(f):
@@ -916,7 +921,7 @@ class FederationHandler(BaseHandler):
)
raise
- yield self.store.persist_event(
+ event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
@@ -924,7 +929,7 @@ class FederationHandler(BaseHandler):
current_state=current_state,
)
- defer.returnValue(context)
+ defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1edab05492..afde49c004 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -336,6 +336,8 @@ class PresenceHandler(BaseHandler):
curr_users = yield rm_handler.get_room_members(room_id)
for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
+ statuscache = self._get_or_offline_usercache(local_user)
+ statuscache.update({}, serial=self._user_cachemap_latest_serial)
self.push_update_to_local_and_remote(
observed_user=local_user,
users_to_push=[user],
@@ -811,6 +813,8 @@ class PresenceHandler(BaseHandler):
room_ids=[], statuscache=None):
with PreserveLoggingContext():
self.notifier.on_new_user_event(
+ "presence_key",
+ self._user_cachemap_latest_serial,
users_to_push,
room_ids,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 35a62fda47..bd8c603681 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -92,7 +92,7 @@ class SyncHandler(BaseHandler):
result = yield self.current_sync_for_user(sync_config, since_token)
defer.returnValue(result)
else:
- def current_sync_callback():
+ def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
rm_handler = self.hs.get_handlers().room_member_handler
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 64fe51aa3e..a9895292c2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -218,7 +218,9 @@ class TypingNotificationHandler(BaseHandler):
self._room_serials[room_id] = self._latest_room_serial
with PreserveLoggingContext():
- self.notifier.on_new_user_event(rooms=[room_id])
+ self.notifier.on_new_user_event(
+ "typing_key", self._latest_room_serial, rooms=[room_id]
+ )
class TypingNotificationEventSource(object):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 4ebe1d66de..2de7dca8a5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -42,56 +42,71 @@ def count(func, l):
class _NotificationListener(object):
""" This represents a single client connection to the events stream.
-
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
-
- This listener will also keep track of which rooms it is listening in
- so that it can remove itself from the indexes in the Notifier class.
"""
- def __init__(self, user, rooms, deferred, appservice=None):
- self.user = user
- self.appservice = appservice
+ def __init__(self, deferred):
self.deferred = deferred
- self.rooms = rooms
- self.timer = None
def notified(self):
return self.deferred.called
- def notify(self, notifier):
- """ Inform whoever is listening about the new events. This will
- also remove this listener from all the indexes in the Notifier
- it knows about.
+ def notify(self, token):
+ """ Inform whoever is listening about the new events.
"""
-
try:
- self.deferred.callback(None)
+ self.deferred.callback(token)
except defer.AlreadyCalledError:
pass
- # Should the following be done be using intrusively linked lists?
- # -- erikj
+
+class _NotifierUserStream(object):
+ """This represents a user connected to the event stream.
+ It tracks the most recent stream token for that user.
+ At a given point a user may have a number of streams listening for
+ events.
+
+ This listener will also keep track of which rooms it is listening in
+ so that it can remove itself from the indexes in the Notifier class.
+ """
+
+ def __init__(self, user, rooms, current_token, time_now_ms,
+ appservice=None):
+ self.user = str(user)
+ self.appservice = appservice
+ self.listeners = set()
+ self.rooms = set(rooms)
+ self.current_token = current_token
+ self.last_notified_ms = time_now_ms
+
+ def notify(self, stream_key, stream_id, time_now_ms):
+ self.current_token = self.current_token.copy_and_replace(
+ stream_key, stream_id
+ )
+ if self.listeners:
+ self.last_notified_ms = time_now_ms
+ listeners = self.listeners
+ self.listeners = set()
+ for listener in listeners:
+ listener.notify(self.current_token)
+
+ def remove(self, notifier):
+ """ Remove this listener from all the indexes in the Notifier
+ it knows about.
+ """
for room in self.rooms:
- lst = notifier.room_to_listeners.get(room, set())
+ lst = notifier.room_to_user_streams.get(room, set())
lst.discard(self)
- notifier.user_to_listeners.get(self.user, set()).discard(self)
+ notifier.user_to_user_stream.pop(self.user)
if self.appservice:
- notifier.appservice_to_listeners.get(
+ notifier.appservice_to_user_streams.get(
self.appservice, set()
).discard(self)
- # Cancel the timeout for this notifer if one exists.
- if self.timer is not None:
- try:
- notifier.clock.cancel_call_later(self.timer)
- except:
- logger.warn("Failed to cancel notifier timer")
-
class Notifier(object):
""" This class is responsible for notifying any listeners when there are
@@ -100,14 +115,18 @@ class Notifier(object):
Primarily used from the /events stream.
"""
+ UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
+
def __init__(self, hs):
self.hs = hs
- self.room_to_listeners = {}
- self.user_to_listeners = {}
- self.appservice_to_listeners = {}
+ self.user_to_user_stream = {}
+ self.room_to_user_streams = {}
+ self.appservice_to_user_streams = {}
self.event_sources = hs.get_event_sources()
+ self.store = hs.get_datastore()
+ self.pending_new_room_events = []
self.clock = hs.get_clock()
@@ -115,38 +134,54 @@ class Notifier(object):
"user_joined_room", self._user_joined_room
)
+ self.clock.looping_call(
+ self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
+ )
+
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
def count_listeners():
- all_listeners = set()
+ all_user_streams = set()
- for x in self.room_to_listeners.values():
- all_listeners |= x
- for x in self.user_to_listeners.values():
- all_listeners |= x
- for x in self.appservice_to_listeners.values():
- all_listeners |= x
+ for x in self.room_to_user_streams.values():
+ all_user_streams |= x
+ for x in self.user_to_user_stream:
+ all_user_streams.add(x)
+ for x in self.appservice_to_user_streams.values():
+ all_user_streams |= x
- return len(all_listeners)
+ return sum(len(stream.listeners) for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners)
metrics.register_callback(
"rooms",
- lambda: count(bool, self.room_to_listeners.values()),
+ lambda: count(bool, self.room_to_user_streams.values()),
)
metrics.register_callback(
"users",
- lambda: count(bool, self.user_to_listeners.values()),
+ lambda: len(self.user_to_user_stream),
)
metrics.register_callback(
"appservices",
- lambda: count(bool, self.appservice_to_listeners.values()),
+ lambda: count(bool, self.appservice_to_user_streams.values()),
)
+ def notify_pending_new_room_events(self, max_room_stream_id):
+ pending = sorted(self.pending_new_room_events)
+ self.pending_new_room_events = []
+ for event, room_stream_id, extra_users in pending:
+ if room_stream_id > max_room_stream_id:
+ self.pending_new_room_events.append((
+ event, room_stream_id, extra_users
+ ))
+ else:
+ self._on_new_room_event(event, room_stream_id, extra_users)
+
@log_function
@defer.inlineCallbacks
- def on_new_room_event(self, event, extra_users=[]):
+ def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
+ extra_users=[]):
""" Used by handlers to inform the notifier something has happened
in the room, room event wise.
@@ -155,6 +190,17 @@ class Notifier(object):
`extra_users` param.
"""
yield run_on_reactor()
+
+ self.notify_pending_new_room_events(max_room_stream_id)
+
+ if room_stream_id > max_room_stream_id:
+ self.pending_new_room_events.append((
+ event, room_stream_id, extra_users
+ ))
+ else:
+ self._on_new_room_event(event, room_stream_id, extra_users)
+
+ def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
# poke any interested application service.
self.hs.get_handlers().appservice_handler.notify_interested_services(
event
@@ -162,70 +208,61 @@ class Notifier(object):
room_id = event.room_id
- room_listeners = self.room_to_listeners.get(room_id, set())
+ room_user_streams = self.room_to_user_streams.get(room_id, set())
- _discard_if_notified(room_listeners)
-
- listeners = room_listeners.copy()
+ user_streams = room_user_streams.copy()
for user in extra_users:
- user_listeners = self.user_to_listeners.get(user, set())
-
- _discard_if_notified(user_listeners)
+ user_stream = self.user_to_user_stream.get(str(user))
+ if user_stream is not None:
+ user_streams.add(user_stream)
- listeners |= user_listeners
-
- for appservice in self.appservice_to_listeners:
+ for appservice in self.appservice_to_user_streams:
# TODO (kegan): Redundant appservice listener checks?
- # App services will already be in the room_to_listeners set, but
+ # App services will already be in the room_to_user_streams set, but
# that isn't enough. They need to be checked here in order to
# receive *invites* for users they are interested in. Does this
- # make the room_to_listeners check somewhat obselete?
+ # make the room_to_user_streams check somewhat obselete?
if appservice.is_interested(event):
- app_listeners = self.appservice_to_listeners.get(
+ app_user_streams = self.appservice_to_user_streams.get(
appservice, set()
)
+ user_streams |= app_user_streams
- _discard_if_notified(app_listeners)
-
- listeners |= app_listeners
-
- logger.debug("on_new_room_event listeners %s", listeners)
+ logger.debug("on_new_room_event listeners %s", user_streams)
- for listener in listeners:
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
try:
- listener.notify(self)
+ user_stream.notify(
+ "room_key", "s%d" % (room_stream_id,), time_now_ms
+ )
except:
logger.exception("Failed to notify listener")
@defer.inlineCallbacks
@log_function
- def on_new_user_event(self, users=[], rooms=[]):
+ def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend
presence/user event wise.
Will wake up all listeners for the given users and rooms.
"""
yield run_on_reactor()
- listeners = set()
+ user_streams = set()
for user in users:
- user_listeners = self.user_to_listeners.get(user, set())
-
- _discard_if_notified(user_listeners)
-
- listeners |= user_listeners
+ user_stream = self.user_to_user_stream.get(user)
+ if user_stream is not None:
+ user_streams.add(user_stream)
for room in rooms:
- room_listeners = self.room_to_listeners.get(room, set())
+ user_streams |= self.room_to_user_streams.get(room, set())
- _discard_if_notified(room_listeners)
-
- listeners |= room_listeners
-
- for listener in listeners:
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
try:
- listener.notify(self)
+ user_stream.notify(stream_key, new_token, time_now_ms)
except:
logger.exception("Failed to notify listener")
@@ -237,46 +274,62 @@ class Notifier(object):
"""
deferred = defer.Deferred()
- appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
- user.to_string()
- )
+ time_now_ms = self.clock.time_msec()
+
+ user = str(user)
+ user_stream = self.user_to_user_stream.get(user)
+ if user_stream is None:
+ appservice = yield self.store.get_app_service_by_user_id(user)
+ current_token = yield self.event_sources.get_current_token()
+ rooms = yield self.store.get_rooms_for_user(user)
+ user_stream = _NotifierUserStream(
+ user=user,
+ rooms=rooms,
+ appservice=appservice,
+ current_token=current_token,
+ time_now_ms=time_now_ms,
+ )
+ self._register_with_keys(user_stream)
+ else:
+ current_token = user_stream.current_token
+
+ listener = [_NotificationListener(deferred)]
+
+ if timeout and not current_token.is_after(from_token):
+ user_stream.listeners.add(listener[0])
+
+ if current_token.is_after(from_token):
+ result = yield callback(from_token, current_token)
+ else:
+ result = None
- listener = [_NotificationListener(
- user=user,
- rooms=rooms,
- deferred=deferred,
- appservice=appservice,
- )]
-
- if timeout:
- self._register_with_keys(listener[0])
-
- result = yield callback()
timer = [None]
+ if result:
+ user_stream.listeners.discard(listener[0])
+ defer.returnValue(result)
+ return
+
if timeout:
timed_out = [False]
def _timeout_listener():
timed_out[0] = True
timer[0] = None
- listener[0].notify(self)
+ user_stream.listeners.discard(listener[0])
+ listener[0].notify(from_token)
# We create multiple notification listeners so we have to manage
# canceling the timeout ourselves.
timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
while not result and not timed_out[0]:
- yield deferred
+ new_token = yield deferred
deferred = defer.Deferred()
- listener[0] = _NotificationListener(
- user=user,
- rooms=rooms,
- deferred=deferred,
- appservice=appservice,
- )
- self._register_with_keys(listener[0])
- result = yield callback()
+ listener[0] = _NotificationListener(deferred)
+ user_stream.listeners.add(listener[0])
+ result = yield callback(current_token, new_token)
+ current_token = new_token
if timer[0] is not None:
try:
@@ -299,11 +352,15 @@ class Notifier(object):
limit = pagination_config.limit
@defer.inlineCallbacks
- def check_for_updates():
+ def check_for_updates(before_token, after_token):
events = []
end_token = from_token
for name, source in self.event_sources.sources.items():
keyname = "%s_key" % name
+ before_id = getattr(before_token, keyname)
+ after_id = getattr(after_token, keyname)
+ if before_id == after_id:
+ continue
stuff, new_key = yield source.get_new_events_for_user(
user, getattr(from_token, keyname), limit,
)
@@ -325,34 +382,36 @@ class Notifier(object):
defer.returnValue(result)
@log_function
- def _register_with_keys(self, listener):
- for room in listener.rooms:
- s = self.room_to_listeners.setdefault(room, set())
- s.add(listener)
-
- self.user_to_listeners.setdefault(listener.user, set()).add(listener)
-
- if listener.appservice:
- self.appservice_to_listeners.setdefault(
- listener.appservice, set()
- ).add(listener)
+ def remove_expired_streams(self):
+ time_now_ms = self.clock.time_msec()
+ expired_streams = []
+ expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
+ for stream in self.user_to_user_stream.values():
+ if stream.listeners:
+ continue
+ if stream.last_notified_ms < expire_before_ts:
+ expired_streams.append(stream)
+
+ for expired_stream in expired_streams:
+ expired_stream.remove(self)
- def _user_joined_room(self, user, room_id):
- new_listeners = self.user_to_listeners.get(user, set())
-
- listeners = self.room_to_listeners.setdefault(room_id, set())
- listeners |= new_listeners
-
- for l in new_listeners:
- l.rooms.add(room_id)
+ @log_function
+ def _register_with_keys(self, user_stream):
+ self.user_to_user_stream[user_stream.user] = user_stream
+ for room in user_stream.rooms:
+ s = self.room_to_user_streams.setdefault(room, set())
+ s.add(user_stream)
-def _discard_if_notified(listener_set):
- """Remove any 'stale' listeners from the given set.
- """
- to_discard = set()
- for l in listener_set:
- if l.notified():
- to_discard.add(l)
+ if user_stream.appservice:
+ self.appservice_to_user_stream.setdefault(
+ user_stream.appservice, set()
+ ).add(user_stream)
- listener_set -= to_discard
+ def _user_joined_room(self, user, room_id):
+ user = str(user)
+ new_user_stream = self.user_to_user_stream.get(user)
+ if new_user_stream is not None:
+ room_streams = self.room_to_user_streams.setdefault(room_id, set())
+ room_streams.add(new_user_stream)
+ new_user_stream.rooms.add(room_id)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 9242b0a84e..971f3211ac 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -65,6 +65,9 @@ class EventsStore(SQLBaseStore):
except _RollbackButIsFineException:
pass
+ max_persisted_id = yield self._stream_id_gen.get_max_token(self)
+ defer.returnValue((stream_ordering, max_persisted_id))
+
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
diff --git a/synapse/types.py b/synapse/types.py
index 0f16867d75..d89a04f7c3 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -70,6 +70,8 @@ class DomainSpecificString(
"""Return a string encoding the fields of the structure object."""
return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain)
+ __str__ = to_string
+
@classmethod
def create(cls, localpart, domain,):
return cls(localpart=localpart, domain=domain)
@@ -107,7 +109,6 @@ class StreamToken(
def from_string(cls, string):
try:
keys = string.split(cls._SEPARATOR)
-
return cls(*keys)
except:
raise SynapseError(400, "Invalid Token")
@@ -115,6 +116,22 @@ class StreamToken(
def to_string(self):
return self._SEPARATOR.join([str(k) for k in self])
+ @property
+ def room_stream_id(self):
+ # TODO(markjh): Awful hack to work around hacks in the presence tests
+ if type(self.room_key) is int:
+ return self.room_key
+ else:
+ return int(self.room_key[1:].split("-")[-1])
+
+ def is_after(self, other_token):
+ """Does this token contain events that the other doesn't?"""
+ return (
+ (other_token.room_stream_id < self.room_stream_id)
+ or (int(other_token.presence_key) < int(self.presence_key))
+ or (int(other_token.typing_key) < int(self.typing_key))
+ )
+
def copy_and_replace(self, key, new_value):
d = self._asdict()
d[key] = new_value
|