diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/_base.py | 7 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 25 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 4 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 4 | ||||
-rw-r--r-- | synapse/notifier.py | 75 | ||||
-rw-r--r-- | synapse/storage/events.py | 3 | ||||
-rw-r--r-- | synapse/types.py | 19 |
7 files changed, 99 insertions, 38 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ddc5c21e7d..833ff41377 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -105,7 +105,9 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) - yield self.store.persist_event(event, context=context) + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) federation_handler = self.hs.get_handlers().federation_handler @@ -142,7 +144,8 @@ class BaseHandler(object): with PreserveLoggingContext(): # Don't block waiting on waking up all the listeners. notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, + extra_users=extra_users ) def log_failure(f): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7d9906039e..bc0f7b0ee7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -160,7 +160,7 @@ class FederationHandler(BaseHandler): ) try: - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -203,7 +203,8 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, + extra_users=extra_users ) def log_failure(f): @@ -561,7 +562,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, new_event, state=state, @@ -571,7 +572,8 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] + new_event, event_stream_id, max_stream_id, + extra_users=[joinee] ) def log_failure(f): @@ -637,7 +639,9 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(origin, event) + context, event_stream_id, max_stream_id = yield self._handle_new_event( + origin, event + ) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -653,7 +657,7 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, extra_users=extra_users ) def log_failure(f): @@ -727,7 +731,7 @@ class FederationHandler(BaseHandler): context = yield self.state_handler.compute_event_context(event) - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=False, @@ -736,7 +740,8 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=[target_user], + event, event_stream_id, max_stream_id, + extra_users=[target_user], ) def log_failure(f): @@ -914,7 +919,7 @@ class FederationHandler(BaseHandler): ) raise - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=backfilled, @@ -922,7 +927,7 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - defer.returnValue(context) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 28688d532d..7db4b062d2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -345,6 +345,8 @@ class PresenceHandler(BaseHandler): curr_users = yield rm_handler.get_room_members(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: + statuscache = self._get_or_offline_usercache(local_user) + statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], @@ -820,6 +822,8 @@ class PresenceHandler(BaseHandler): room_ids=[], statuscache=None): with PreserveLoggingContext(): self.notifier.on_new_user_event( + "presence_key", + self._user_cachemap_latest_serial, users_to_push, room_ids, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 64fe51aa3e..a9895292c2 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -218,7 +218,9 @@ class TypingNotificationHandler(BaseHandler): self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event(rooms=[room_id]) + self.notifier.on_new_user_event( + "typing_key", self._latest_room_serial, rooms=[room_id] + ) class TypingNotificationEventSource(object): diff --git a/synapse/notifier.py b/synapse/notifier.py index 214a2b28ca..4d10c05038 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -52,12 +52,11 @@ class _NotificationListener(object): def notified(self): return self.deferred.called - def notify(self): + def notify(self, token): """ Inform whoever is listening about the new events. """ - try: - self.deferred.callback(None) + self.deferred.callback(token) except defer.AlreadyCalledError: pass @@ -73,15 +72,18 @@ class _NotifierUserStream(object): """ def __init__(self, user, rooms, current_token, appservice=None): - self.user = user + self.user = str(user) self.appservice = appservice self.listeners = set() - self.rooms = rooms + self.rooms = set(rooms) self.current_token = current_token - def notify(self, new_token): + def notify(self, stream_key, stream_id): + self.current_token = self.current_token.copy_and_replace( + stream_key, stream_id + ) for listener in self.listeners: - listener.notify(new_token) + listener.notify(self.current_token) self.listeners.clear() def remove(self, notifier): @@ -117,6 +119,7 @@ class Notifier(object): self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() + self.pending_new_room_events = [] self.clock = hs.get_clock() @@ -153,9 +156,21 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) + def notify_pending_new_room_events(self, max_room_stream_id): + pending = sorted(self.pending_new_room_events) + self.pending_new_room_events = [] + for event, room_stream_id, extra_users in pending: + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) + @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, new_token, extra_users=[]): + def on_new_room_event(self, event, room_stream_id, max_room_stream_id, + extra_users=[]): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -163,8 +178,18 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ - assert isinstance(new_token, StreamToken) yield run_on_reactor() + + self.notify_pending_new_room_events(max_room_stream_id) + + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) + + def _on_new_room_event(self, event, room_stream_id, extra_users=[]): # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -197,33 +222,32 @@ class Notifier(object): for user_stream in user_streams: try: - user_stream.notify(new_token) + user_stream.notify("room_key", "s%d" % (room_stream_id,)) except: logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function - def on_new_user_event(self, new_token, users=[], rooms=[]): + def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend presence/user event wise. Will wake up all listeners for the given users and rooms. """ - assert isinstance(new_token, StreamToken) yield run_on_reactor() user_streams = set() for user in users: user_stream = self.user_to_user_stream.get(user) - if user_stream: - user_stream.add(user_stream) + if user_stream is not None: + user_streams.add(user_stream) for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) for user_stream in user_streams: try: - user_streams.notify(new_token) + user_stream.notify(stream_key, new_token) except: logger.exception("Failed to notify listener") @@ -236,12 +260,12 @@ class Notifier(object): deferred = defer.Deferred() - user_stream = self.user_to_user_streams.get(user) + user = str(user) + user_stream = self.user_to_user_stream.get(user) if user_stream is None: - appservice = yield self.store.get_app_service_by_user_id( - user.to_string() - ) + appservice = yield self.store.get_app_service_by_user_id(user) current_token = yield self.event_sources.get_current_token() + rooms = yield self.store.get_rooms_for_user(user) user_stream = _NotifierUserStream( user=user, rooms=rooms, @@ -252,8 +276,9 @@ class Notifier(object): else: current_token = user_stream.current_token + listener = [_NotificationListener(deferred)] + if timeout and not current_token.is_after(from_token): - listener = [_NotificationListener(deferred)] user_stream.listeners.add(listener[0]) if current_token.is_after(from_token): @@ -334,7 +359,7 @@ class Notifier(object): self.user_to_user_stream[user_stream.user] = user_stream for room in user_stream.rooms: - s = self.room_to_user_stream.setdefault(room, set()) + s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) if user_stream.appservice: @@ -343,10 +368,12 @@ class Notifier(object): ).add(user_stream) def _user_joined_room(self, user, room_id): + user = str(user) new_user_stream = self.user_to_user_stream.get(user) - room_streams = self.room_to_user_streams.setdefault(room_id, set()) - room_streams.add(new_user_stream) - new_user_stream.rooms.add(room_id) + if new_user_stream is not None: + room_streams = self.room_to_user_streams.setdefault(room_id, set()) + room_streams.add(new_user_stream) + new_user_stream.rooms.add(room_id) def _discard_if_notified(listener_set): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a5a6869079..7d6df5f4c6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -64,6 +64,9 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass + max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_ordering, max_persisted_id)) + @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, diff --git a/synapse/types.py b/synapse/types.py index 0f16867d75..d89a04f7c3 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -70,6 +70,8 @@ class DomainSpecificString( """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) + __str__ = to_string + @classmethod def create(cls, localpart, domain,): return cls(localpart=localpart, domain=domain) @@ -107,7 +109,6 @@ class StreamToken( def from_string(cls, string): try: keys = string.split(cls._SEPARATOR) - return cls(*keys) except: raise SynapseError(400, "Invalid Token") @@ -115,6 +116,22 @@ class StreamToken( def to_string(self): return self._SEPARATOR.join([str(k) for k in self]) + @property + def room_stream_id(self): + # TODO(markjh): Awful hack to work around hacks in the presence tests + if type(self.room_key) is int: + return self.room_key + else: + return int(self.room_key[1:].split("-")[-1]) + + def is_after(self, other_token): + """Does this token contain events that the other doesn't?""" + return ( + (other_token.room_stream_id < self.room_stream_id) + or (int(other_token.presence_key) < int(self.presence_key)) + or (int(other_token.typing_key) < int(self.typing_key)) + ) + def copy_and_replace(self, key, new_value): d = self._asdict() d[key] = new_value |