summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/_base.py7
-rw-r--r--synapse/handlers/federation.py25
-rw-r--r--synapse/handlers/presence.py4
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/notifier.py315
-rw-r--r--synapse/storage/events.py3
-rw-r--r--synapse/types.py19
8 files changed, 236 insertions, 143 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index ddc5c21e7d..833ff41377 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -105,7 +105,9 @@ class BaseHandler(object):
         if not suppress_auth:
             self.auth.check(event, auth_events=context.current_state)
 
-        yield self.store.persist_event(event, context=context)
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
+        )
 
         federation_handler = self.hs.get_handlers().federation_handler
 
@@ -142,7 +144,8 @@ class BaseHandler(object):
         with PreserveLoggingContext():
             # Don't block waiting on waking up all the listeners.
             notify_d = self.notifier.on_new_room_event(
-                event, extra_users=extra_users
+                event, event_stream_id, max_stream_id,
+                extra_users=extra_users
             )
 
         def log_failure(f):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 880cbd77e7..d35d9f603c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -160,7 +160,7 @@ class FederationHandler(BaseHandler):
                     )
 
         try:
-            yield self._handle_new_event(
+            _, event_stream_id, max_stream_id = yield self._handle_new_event(
                 origin,
                 event,
                 state=state,
@@ -203,7 +203,8 @@ class FederationHandler(BaseHandler):
 
             with PreserveLoggingContext():
                 d = self.notifier.on_new_room_event(
-                    event, extra_users=extra_users
+                    event, event_stream_id, max_stream_id,
+                    extra_users=extra_users
                 )
 
             def log_failure(f):
@@ -563,7 +564,7 @@ class FederationHandler(BaseHandler):
                 if e.event_id in auth_ids
             }
 
-            yield self._handle_new_event(
+            _, event_stream_id, max_stream_id = yield self._handle_new_event(
                 origin,
                 new_event,
                 state=state,
@@ -573,7 +574,8 @@ class FederationHandler(BaseHandler):
 
             with PreserveLoggingContext():
                 d = self.notifier.on_new_room_event(
-                    new_event, extra_users=[joinee]
+                    new_event, event_stream_id, max_stream_id,
+                    extra_users=[joinee]
                 )
 
             def log_failure(f):
@@ -639,7 +641,9 @@ class FederationHandler(BaseHandler):
 
         event.internal_metadata.outlier = False
 
-        context = yield self._handle_new_event(origin, event)
+        context, event_stream_id, max_stream_id = yield self._handle_new_event(
+            origin, event
+        )
 
         logger.debug(
             "on_send_join_request: After _handle_new_event: %s, sigs: %s",
@@ -655,7 +659,7 @@ class FederationHandler(BaseHandler):
 
         with PreserveLoggingContext():
             d = self.notifier.on_new_room_event(
-                event, extra_users=extra_users
+                event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
         def log_failure(f):
@@ -729,7 +733,7 @@ class FederationHandler(BaseHandler):
 
         context = yield self.state_handler.compute_event_context(event)
 
-        yield self.store.persist_event(
+        event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
             backfilled=False,
@@ -738,7 +742,8 @@ class FederationHandler(BaseHandler):
         target_user = UserID.from_string(event.state_key)
         with PreserveLoggingContext():
             d = self.notifier.on_new_room_event(
-                event, extra_users=[target_user],
+                event, event_stream_id, max_stream_id,
+                extra_users=[target_user],
             )
 
         def log_failure(f):
@@ -916,7 +921,7 @@ class FederationHandler(BaseHandler):
             )
             raise
 
-        yield self.store.persist_event(
+        event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
             backfilled=backfilled,
@@ -924,7 +929,7 @@ class FederationHandler(BaseHandler):
             current_state=current_state,
         )
 
-        defer.returnValue(context)
+        defer.returnValue((context, event_stream_id, max_stream_id))
 
     @defer.inlineCallbacks
     def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1edab05492..afde49c004 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -336,6 +336,8 @@ class PresenceHandler(BaseHandler):
         curr_users = yield rm_handler.get_room_members(room_id)
 
         for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
+            statuscache = self._get_or_offline_usercache(local_user)
+            statuscache.update({}, serial=self._user_cachemap_latest_serial)
             self.push_update_to_local_and_remote(
                 observed_user=local_user,
                 users_to_push=[user],
@@ -811,6 +813,8 @@ class PresenceHandler(BaseHandler):
                                room_ids=[], statuscache=None):
         with PreserveLoggingContext():
             self.notifier.on_new_user_event(
+                "presence_key",
+                self._user_cachemap_latest_serial,
                 users_to_push,
                 room_ids,
             )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 35a62fda47..bd8c603681 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -92,7 +92,7 @@ class SyncHandler(BaseHandler):
             result = yield self.current_sync_for_user(sync_config, since_token)
             defer.returnValue(result)
         else:
-            def current_sync_callback():
+            def current_sync_callback(before_token, after_token):
                 return self.current_sync_for_user(sync_config, since_token)
 
             rm_handler = self.hs.get_handlers().room_member_handler
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 64fe51aa3e..a9895292c2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -218,7 +218,9 @@ class TypingNotificationHandler(BaseHandler):
         self._room_serials[room_id] = self._latest_room_serial
 
         with PreserveLoggingContext():
-            self.notifier.on_new_user_event(rooms=[room_id])
+            self.notifier.on_new_user_event(
+                "typing_key", self._latest_room_serial, rooms=[room_id]
+            )
 
 
 class TypingNotificationEventSource(object):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 4ebe1d66de..2de7dca8a5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -42,56 +42,71 @@ def count(func, l):
 
 class _NotificationListener(object):
     """ This represents a single client connection to the events stream.
-
     The events stream handler will have yielded to the deferred, so to
     notify the handler it is sufficient to resolve the deferred.
-
-    This listener will also keep track of which rooms it is listening in
-    so that it can remove itself from the indexes in the Notifier class.
     """
 
-    def __init__(self, user, rooms, deferred, appservice=None):
-        self.user = user
-        self.appservice = appservice
+    def __init__(self, deferred):
         self.deferred = deferred
-        self.rooms = rooms
-        self.timer = None
 
     def notified(self):
         return self.deferred.called
 
-    def notify(self, notifier):
-        """ Inform whoever is listening about the new events. This will
-        also remove this listener from all the indexes in the Notifier
-        it knows about.
+    def notify(self, token):
+        """ Inform whoever is listening about the new events.
         """
-
         try:
-            self.deferred.callback(None)
+            self.deferred.callback(token)
         except defer.AlreadyCalledError:
             pass
 
-        # Should the following be done be using intrusively linked lists?
-        # -- erikj
+
+class _NotifierUserStream(object):
+    """This represents a user connected to the event stream.
+    It tracks the most recent stream token for that user.
+    At a given point a user may have a number of streams listening for
+    events.
+
+    This listener will also keep track of which rooms it is listening in
+    so that it can remove itself from the indexes in the Notifier class.
+    """
+
+    def __init__(self, user, rooms, current_token, time_now_ms,
+                 appservice=None):
+        self.user = str(user)
+        self.appservice = appservice
+        self.listeners = set()
+        self.rooms = set(rooms)
+        self.current_token = current_token
+        self.last_notified_ms = time_now_ms
+
+    def notify(self, stream_key, stream_id, time_now_ms):
+        self.current_token = self.current_token.copy_and_replace(
+            stream_key, stream_id
+        )
+        if self.listeners:
+            self.last_notified_ms = time_now_ms
+            listeners = self.listeners
+            self.listeners = set()
+            for listener in listeners:
+                listener.notify(self.current_token)
+
+    def remove(self, notifier):
+        """ Remove this listener from all the indexes in the Notifier
+        it knows about.
+        """
 
         for room in self.rooms:
-            lst = notifier.room_to_listeners.get(room, set())
+            lst = notifier.room_to_user_streams.get(room, set())
             lst.discard(self)
 
-        notifier.user_to_listeners.get(self.user, set()).discard(self)
+        notifier.user_to_user_stream.pop(self.user)
 
         if self.appservice:
-            notifier.appservice_to_listeners.get(
+            notifier.appservice_to_user_streams.get(
                 self.appservice, set()
             ).discard(self)
 
-        # Cancel the timeout for this notifer if one exists.
-        if self.timer is not None:
-            try:
-                notifier.clock.cancel_call_later(self.timer)
-            except:
-                logger.warn("Failed to cancel notifier timer")
-
 
 class Notifier(object):
     """ This class is responsible for notifying any listeners when there are
@@ -100,14 +115,18 @@ class Notifier(object):
     Primarily used from the /events stream.
     """
 
+    UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
+
     def __init__(self, hs):
         self.hs = hs
 
-        self.room_to_listeners = {}
-        self.user_to_listeners = {}
-        self.appservice_to_listeners = {}
+        self.user_to_user_stream = {}
+        self.room_to_user_streams = {}
+        self.appservice_to_user_streams = {}
 
         self.event_sources = hs.get_event_sources()
+        self.store = hs.get_datastore()
+        self.pending_new_room_events = []
 
         self.clock = hs.get_clock()
 
@@ -115,38 +134,54 @@ class Notifier(object):
             "user_joined_room", self._user_joined_room
         )
 
+        self.clock.looping_call(
+            self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
+        )
+
         # This is not a very cheap test to perform, but it's only executed
         # when rendering the metrics page, which is likely once per minute at
         # most when scraping it.
         def count_listeners():
-            all_listeners = set()
+            all_user_streams = set()
 
-            for x in self.room_to_listeners.values():
-                all_listeners |= x
-            for x in self.user_to_listeners.values():
-                all_listeners |= x
-            for x in self.appservice_to_listeners.values():
-                all_listeners |= x
+            for x in self.room_to_user_streams.values():
+                all_user_streams |= x
+            for x in self.user_to_user_stream:
+                all_user_streams.add(x)
+            for x in self.appservice_to_user_streams.values():
+                all_user_streams |= x
 
-            return len(all_listeners)
+            return sum(len(stream.listeners) for stream in all_user_streams)
         metrics.register_callback("listeners", count_listeners)
 
         metrics.register_callback(
             "rooms",
-            lambda: count(bool, self.room_to_listeners.values()),
+            lambda: count(bool, self.room_to_user_streams.values()),
         )
         metrics.register_callback(
             "users",
-            lambda: count(bool, self.user_to_listeners.values()),
+            lambda: len(self.user_to_user_stream),
         )
         metrics.register_callback(
             "appservices",
-            lambda: count(bool, self.appservice_to_listeners.values()),
+            lambda: count(bool, self.appservice_to_user_streams.values()),
         )
 
+    def notify_pending_new_room_events(self, max_room_stream_id):
+        pending = sorted(self.pending_new_room_events)
+        self.pending_new_room_events = []
+        for event, room_stream_id, extra_users in pending:
+            if room_stream_id > max_room_stream_id:
+                self.pending_new_room_events.append((
+                    event, room_stream_id, extra_users
+                ))
+            else:
+                self._on_new_room_event(event, room_stream_id, extra_users)
+
     @log_function
     @defer.inlineCallbacks
-    def on_new_room_event(self, event, extra_users=[]):
+    def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
+                          extra_users=[]):
         """ Used by handlers to inform the notifier something has happened
         in the room, room event wise.
 
@@ -155,6 +190,17 @@ class Notifier(object):
         `extra_users` param.
         """
         yield run_on_reactor()
+
+        self.notify_pending_new_room_events(max_room_stream_id)
+
+        if room_stream_id > max_room_stream_id:
+            self.pending_new_room_events.append((
+                event, room_stream_id, extra_users
+            ))
+        else:
+            self._on_new_room_event(event, room_stream_id, extra_users)
+
+    def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         # poke any interested application service.
         self.hs.get_handlers().appservice_handler.notify_interested_services(
             event
@@ -162,70 +208,61 @@ class Notifier(object):
 
         room_id = event.room_id
 
-        room_listeners = self.room_to_listeners.get(room_id, set())
+        room_user_streams = self.room_to_user_streams.get(room_id, set())
 
-        _discard_if_notified(room_listeners)
-
-        listeners = room_listeners.copy()
+        user_streams = room_user_streams.copy()
 
         for user in extra_users:
-            user_listeners = self.user_to_listeners.get(user, set())
-
-            _discard_if_notified(user_listeners)
+            user_stream = self.user_to_user_stream.get(str(user))
+            if user_stream is not None:
+                user_streams.add(user_stream)
 
-            listeners |= user_listeners
-
-        for appservice in self.appservice_to_listeners:
+        for appservice in self.appservice_to_user_streams:
             # TODO (kegan): Redundant appservice listener checks?
-            # App services will already be in the room_to_listeners set, but
+            # App services will already be in the room_to_user_streams set, but
             # that isn't enough. They need to be checked here in order to
             # receive *invites* for users they are interested in. Does this
-            # make the room_to_listeners check somewhat obselete?
+            # make the room_to_user_streams check somewhat obselete?
             if appservice.is_interested(event):
-                app_listeners = self.appservice_to_listeners.get(
+                app_user_streams = self.appservice_to_user_streams.get(
                     appservice, set()
                 )
+                user_streams |= app_user_streams
 
-                _discard_if_notified(app_listeners)
-
-                listeners |= app_listeners
-
-        logger.debug("on_new_room_event listeners %s", listeners)
+        logger.debug("on_new_room_event listeners %s", user_streams)
 
-        for listener in listeners:
+        time_now_ms = self.clock.time_msec()
+        for user_stream in user_streams:
             try:
-                listener.notify(self)
+                user_stream.notify(
+                    "room_key", "s%d" % (room_stream_id,), time_now_ms
+                )
             except:
                 logger.exception("Failed to notify listener")
 
     @defer.inlineCallbacks
     @log_function
-    def on_new_user_event(self, users=[], rooms=[]):
+    def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
         """ Used to inform listeners that something has happend
         presence/user event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
         yield run_on_reactor()
-        listeners = set()
+        user_streams = set()
 
         for user in users:
-            user_listeners = self.user_to_listeners.get(user, set())
-
-            _discard_if_notified(user_listeners)
-
-            listeners |= user_listeners
+            user_stream = self.user_to_user_stream.get(user)
+            if user_stream is not None:
+                user_streams.add(user_stream)
 
         for room in rooms:
-            room_listeners = self.room_to_listeners.get(room, set())
+            user_streams |= self.room_to_user_streams.get(room, set())
 
-            _discard_if_notified(room_listeners)
-
-            listeners |= room_listeners
-
-        for listener in listeners:
+        time_now_ms = self.clock.time_msec()
+        for user_stream in user_streams:
             try:
-                listener.notify(self)
+                user_stream.notify(stream_key, new_token, time_now_ms)
             except:
                 logger.exception("Failed to notify listener")
 
@@ -237,46 +274,62 @@ class Notifier(object):
         """
 
         deferred = defer.Deferred()
-        appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
-            user.to_string()
-        )
+        time_now_ms = self.clock.time_msec()
+
+        user = str(user)
+        user_stream = self.user_to_user_stream.get(user)
+        if user_stream is None:
+            appservice = yield self.store.get_app_service_by_user_id(user)
+            current_token = yield self.event_sources.get_current_token()
+            rooms = yield self.store.get_rooms_for_user(user)
+            user_stream = _NotifierUserStream(
+                user=user,
+                rooms=rooms,
+                appservice=appservice,
+                current_token=current_token,
+                time_now_ms=time_now_ms,
+            )
+            self._register_with_keys(user_stream)
+        else:
+            current_token = user_stream.current_token
+
+        listener = [_NotificationListener(deferred)]
+
+        if timeout and not current_token.is_after(from_token):
+            user_stream.listeners.add(listener[0])
+
+        if current_token.is_after(from_token):
+            result = yield callback(from_token, current_token)
+        else:
+            result = None
 
-        listener = [_NotificationListener(
-            user=user,
-            rooms=rooms,
-            deferred=deferred,
-            appservice=appservice,
-        )]
-
-        if timeout:
-            self._register_with_keys(listener[0])
-
-        result = yield callback()
         timer = [None]
 
+        if result:
+            user_stream.listeners.discard(listener[0])
+            defer.returnValue(result)
+            return
+
         if timeout:
             timed_out = [False]
 
             def _timeout_listener():
                 timed_out[0] = True
                 timer[0] = None
-                listener[0].notify(self)
+                user_stream.listeners.discard(listener[0])
+                listener[0].notify(from_token)
 
             # We create multiple notification listeners so we have to manage
             # canceling the timeout ourselves.
             timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
 
             while not result and not timed_out[0]:
-                yield deferred
+                new_token = yield deferred
                 deferred = defer.Deferred()
-                listener[0] = _NotificationListener(
-                    user=user,
-                    rooms=rooms,
-                    deferred=deferred,
-                    appservice=appservice,
-                )
-                self._register_with_keys(listener[0])
-                result = yield callback()
+                listener[0] = _NotificationListener(deferred)
+                user_stream.listeners.add(listener[0])
+                result = yield callback(current_token, new_token)
+                current_token = new_token
 
         if timer[0] is not None:
             try:
@@ -299,11 +352,15 @@ class Notifier(object):
         limit = pagination_config.limit
 
         @defer.inlineCallbacks
-        def check_for_updates():
+        def check_for_updates(before_token, after_token):
             events = []
             end_token = from_token
             for name, source in self.event_sources.sources.items():
                 keyname = "%s_key" % name
+                before_id = getattr(before_token, keyname)
+                after_id = getattr(after_token, keyname)
+                if before_id == after_id:
+                    continue
                 stuff, new_key = yield source.get_new_events_for_user(
                     user, getattr(from_token, keyname), limit,
                 )
@@ -325,34 +382,36 @@ class Notifier(object):
         defer.returnValue(result)
 
     @log_function
-    def _register_with_keys(self, listener):
-        for room in listener.rooms:
-            s = self.room_to_listeners.setdefault(room, set())
-            s.add(listener)
-
-        self.user_to_listeners.setdefault(listener.user, set()).add(listener)
-
-        if listener.appservice:
-            self.appservice_to_listeners.setdefault(
-                listener.appservice, set()
-            ).add(listener)
+    def remove_expired_streams(self):
+        time_now_ms = self.clock.time_msec()
+        expired_streams = []
+        expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
+        for stream in self.user_to_user_stream.values():
+            if stream.listeners:
+                continue
+            if stream.last_notified_ms < expire_before_ts:
+                expired_streams.append(stream)
+
+        for expired_stream in expired_streams:
+            expired_stream.remove(self)
 
-    def _user_joined_room(self, user, room_id):
-        new_listeners = self.user_to_listeners.get(user, set())
-
-        listeners = self.room_to_listeners.setdefault(room_id, set())
-        listeners |= new_listeners
-
-        for l in new_listeners:
-            l.rooms.add(room_id)
+    @log_function
+    def _register_with_keys(self, user_stream):
+        self.user_to_user_stream[user_stream.user] = user_stream
 
+        for room in user_stream.rooms:
+            s = self.room_to_user_streams.setdefault(room, set())
+            s.add(user_stream)
 
-def _discard_if_notified(listener_set):
-    """Remove any 'stale' listeners from the given set.
-    """
-    to_discard = set()
-    for l in listener_set:
-        if l.notified():
-            to_discard.add(l)
+        if user_stream.appservice:
+            self.appservice_to_user_stream.setdefault(
+                user_stream.appservice, set()
+            ).add(user_stream)
 
-    listener_set -= to_discard
+    def _user_joined_room(self, user, room_id):
+        user = str(user)
+        new_user_stream = self.user_to_user_stream.get(user)
+        if new_user_stream is not None:
+            room_streams = self.room_to_user_streams.setdefault(room_id, set())
+            room_streams.add(new_user_stream)
+            new_user_stream.rooms.add(room_id)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 9242b0a84e..971f3211ac 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -65,6 +65,9 @@ class EventsStore(SQLBaseStore):
         except _RollbackButIsFineException:
             pass
 
+        max_persisted_id = yield self._stream_id_gen.get_max_token(self)
+        defer.returnValue((stream_ordering, max_persisted_id))
+
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
                   get_prev_content=False, allow_rejected=False,
diff --git a/synapse/types.py b/synapse/types.py
index 0f16867d75..d89a04f7c3 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -70,6 +70,8 @@ class DomainSpecificString(
         """Return a string encoding the fields of the structure object."""
         return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain)
 
+    __str__ = to_string
+
     @classmethod
     def create(cls, localpart, domain,):
         return cls(localpart=localpart, domain=domain)
@@ -107,7 +109,6 @@ class StreamToken(
     def from_string(cls, string):
         try:
             keys = string.split(cls._SEPARATOR)
-
             return cls(*keys)
         except:
             raise SynapseError(400, "Invalid Token")
@@ -115,6 +116,22 @@ class StreamToken(
     def to_string(self):
         return self._SEPARATOR.join([str(k) for k in self])
 
+    @property
+    def room_stream_id(self):
+        # TODO(markjh): Awful hack to work around hacks in the presence tests
+        if type(self.room_key) is int:
+            return self.room_key
+        else:
+            return int(self.room_key[1:].split("-")[-1])
+
+    def is_after(self, other_token):
+        """Does this token contain events that the other doesn't?"""
+        return (
+            (other_token.room_stream_id < self.room_stream_id)
+            or (int(other_token.presence_key) < int(self.presence_key))
+            or (int(other_token.typing_key) < int(self.typing_key))
+        )
+
     def copy_and_replace(self, key, new_value):
         d = self._asdict()
         d[key] = new_value