summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py160
1 files changed, 94 insertions, 66 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index fd52578325..560866b26e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014 - 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,10 +18,13 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError
 
 from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, ObservableDeferred
+from synapse.util.async import ObservableDeferred
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import StreamToken
 import synapse.metrics
 
+from collections import namedtuple
+
 import logging
 
 
@@ -63,15 +66,16 @@ class _NotifierUserStream(object):
     so that it can remove itself from the indexes in the Notifier class.
     """
 
-    def __init__(self, user, rooms, current_token, time_now_ms,
+    def __init__(self, user_id, rooms, current_token, time_now_ms,
                  appservice=None):
-        self.user = str(user)
+        self.user_id = user_id
         self.appservice = appservice
         self.rooms = set(rooms)
         self.current_token = current_token
         self.last_notified_ms = time_now_ms
 
-        self.notify_deferred = ObservableDeferred(defer.Deferred())
+        with PreserveLoggingContext():
+            self.notify_deferred = ObservableDeferred(defer.Deferred())
 
     def notify(self, stream_key, stream_id, time_now_ms):
         """Notify any listeners for this user of a new event from an
@@ -86,8 +90,10 @@ class _NotifierUserStream(object):
         )
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
-        self.notify_deferred = ObservableDeferred(defer.Deferred())
-        noify_deferred.callback(self.current_token)
+
+        with PreserveLoggingContext():
+            self.notify_deferred = ObservableDeferred(defer.Deferred())
+            noify_deferred.callback(self.current_token)
 
     def remove(self, notifier):
         """ Remove this listener from all the indexes in the Notifier
@@ -98,7 +104,7 @@ class _NotifierUserStream(object):
             lst = notifier.room_to_user_streams.get(room, set())
             lst.discard(self)
 
-        notifier.user_to_user_stream.pop(self.user)
+        notifier.user_to_user_stream.pop(self.user_id)
 
         if self.appservice:
             notifier.appservice_to_user_streams.get(
@@ -118,6 +124,11 @@ class _NotifierUserStream(object):
             return _NotificationListener(self.notify_deferred.observe())
 
 
+class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
+    def __nonzero__(self):
+        return bool(self.events)
+
+
 class Notifier(object):
     """ This class is responsible for notifying any listeners when there are
     new events available for it.
@@ -177,8 +188,6 @@ class Notifier(object):
             lambda: count(bool, self.appservice_to_user_streams.values()),
         )
 
-    @log_function
-    @defer.inlineCallbacks
     def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
                           extra_users=[]):
         """ Used by handlers to inform the notifier something has happened
@@ -192,12 +201,11 @@ class Notifier(object):
         until all previous events have been persisted before notifying
         the client streams.
         """
-        yield run_on_reactor()
-
-        self.pending_new_room_events.append((
-            room_stream_id, event, extra_users
-        ))
-        self._notify_pending_new_room_events(max_room_stream_id)
+        with PreserveLoggingContext():
+            self.pending_new_room_events.append((
+                room_stream_id, event, extra_users
+            ))
+            self._notify_pending_new_room_events(max_room_stream_id)
 
     def _notify_pending_new_room_events(self, max_room_stream_id):
         """Notify for the room events that were queued waiting for a previous
@@ -244,48 +252,45 @@ class Notifier(object):
             extra_streams=app_streams,
         )
 
-    @defer.inlineCallbacks
-    @log_function
     def on_new_event(self, stream_key, new_token, users=[], rooms=[],
                      extra_streams=set()):
         """ Used to inform listeners that something has happend event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
-        yield run_on_reactor()
-        user_streams = set()
+        with PreserveLoggingContext():
+            user_streams = set()
 
-        for user in users:
-            user_stream = self.user_to_user_stream.get(str(user))
-            if user_stream is not None:
-                user_streams.add(user_stream)
+            for user in users:
+                user_stream = self.user_to_user_stream.get(str(user))
+                if user_stream is not None:
+                    user_streams.add(user_stream)
 
-        for room in rooms:
-            user_streams |= self.room_to_user_streams.get(room, set())
+            for room in rooms:
+                user_streams |= self.room_to_user_streams.get(room, set())
 
-        time_now_ms = self.clock.time_msec()
-        for user_stream in user_streams:
-            try:
-                user_stream.notify(stream_key, new_token, time_now_ms)
-            except:
-                logger.exception("Failed to notify listener")
+            time_now_ms = self.clock.time_msec()
+            for user_stream in user_streams:
+                try:
+                    user_stream.notify(stream_key, new_token, time_now_ms)
+                except:
+                    logger.exception("Failed to notify listener")
 
     @defer.inlineCallbacks
-    def wait_for_events(self, user, timeout, callback, room_ids=None,
+    def wait_for_events(self, user_id, timeout, callback, room_ids=None,
                         from_token=StreamToken("s0", "0", "0", "0", "0")):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
-        user = str(user)
-        user_stream = self.user_to_user_stream.get(user)
+        user_stream = self.user_to_user_stream.get(user_id)
         if user_stream is None:
-            appservice = yield self.store.get_app_service_by_user_id(user)
+            appservice = yield self.store.get_app_service_by_user_id(user_id)
             current_token = yield self.event_sources.get_current_token()
             if room_ids is None:
-                rooms = yield self.store.get_rooms_for_user(user)
+                rooms = yield self.store.get_rooms_for_user(user_id)
                 room_ids = [room.room_id for room in rooms]
             user_stream = _NotifierUserStream(
-                user=user,
+                user_id=user_id,
                 rooms=room_ids,
                 appservice=appservice,
                 current_token=current_token,
@@ -302,7 +307,7 @@ class Notifier(object):
             def timed_out():
                 if listener:
                     listener.deferred.cancel()
-            timer = self.clock.call_later(timeout/1000., timed_out)
+            timer = self.clock.call_later(timeout / 1000., timed_out)
 
             prev_token = from_token
             while not result:
@@ -319,7 +324,8 @@ class Notifier(object):
                     # that we don't miss any current_token updates.
                     prev_token = current_token
                     listener = user_stream.new_listener(prev_token)
-                    yield listener.deferred
+                    with PreserveLoggingContext():
+                        yield listener.deferred
                 except defer.CancelledError:
                     break
 
@@ -332,13 +338,18 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def get_events_for(self, user, pagination_config, timeout,
-                       only_room_events=False,
-                       is_guest=False, guest_room_id=None):
+                       only_keys=None,
+                       is_guest=False, explicit_room_id=None):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
         new events to happen before returning.
 
-        If `only_room_events` is `True` only room events will be returned.
+        If `only_keys` is not None, events from keys will be sent down.
+
+        If explicit_room_id is not set, the user's joined rooms will be polled
+        for events.
+        If explicit_room_id is set, that room will be polled for events only if
+        it is world readable or the user has joined the room.
         """
         from_token = pagination_config.from_token
         if not from_token:
@@ -346,20 +357,13 @@ class Notifier(object):
 
         limit = pagination_config.limit
 
-        room_ids = []
-        if is_guest:
-            if guest_room_id:
-                if not (yield self._is_world_readable(guest_room_id)):
-                    raise AuthError(403, "Guest access not allowed")
-                room_ids = [guest_room_id]
-        else:
-            rooms = yield self.store.get_rooms_for_user(user.to_string())
-            room_ids = [room.room_id for room in rooms]
+        room_ids, is_joined = yield self._get_room_ids(user, explicit_room_id)
+        is_peeking = not is_joined
 
         @defer.inlineCallbacks
         def check_for_updates(before_token, after_token):
             if not after_token.is_after(before_token):
-                defer.returnValue(None)
+                defer.returnValue(EventStreamResult([], (from_token, from_token)))
 
             events = []
             end_token = from_token
@@ -370,13 +374,14 @@ class Notifier(object):
                 after_id = getattr(after_token, keyname)
                 if before_id == after_id:
                     continue
-                if only_room_events and name != "room":
+                if only_keys and name not in only_keys:
                     continue
+
                 new_events, new_key = yield source.get_new_events(
                     user=user,
                     from_key=getattr(from_token, keyname),
                     limit=limit,
-                    is_guest=is_guest,
+                    is_guest=is_peeking,
                     room_ids=room_ids,
                 )
 
@@ -385,28 +390,51 @@ class Notifier(object):
                     new_events = yield room_member_handler._filter_events_for_client(
                         user.to_string(),
                         new_events,
-                        is_guest=is_guest,
-                        require_all_visible_for_guests=False
+                        is_peeking=is_peeking,
                     )
 
                 events.extend(new_events)
                 end_token = end_token.copy_and_replace(keyname, new_key)
 
-            if events:
-                defer.returnValue((events, (from_token, end_token)))
-            else:
-                defer.returnValue(None)
+            defer.returnValue(EventStreamResult(events, (from_token, end_token)))
+
+        user_id_for_stream = user.to_string()
+        if is_peeking:
+            # Internally, the notifier keeps an event stream per user_id.
+            # This is used by both /sync and /events.
+            # We want /events to be used for peeking independently of /sync,
+            # without polluting its contents. So we invent an illegal user ID
+            # (which thus cannot clash with any real users) for keying peeking
+            # over /events.
+            #
+            # I am sorry for what I have done.
+            user_id_for_stream = "_PEEKING_%s_%s" % (
+                explicit_room_id, user_id_for_stream
+            )
 
         result = yield self.wait_for_events(
-            user, timeout, check_for_updates, room_ids=room_ids, from_token=from_token
+            user_id_for_stream,
+            timeout,
+            check_for_updates,
+            room_ids=room_ids,
+            from_token=from_token,
         )
 
-        if result is None:
-            result = ([], (from_token, from_token))
-
         defer.returnValue(result)
 
     @defer.inlineCallbacks
+    def _get_room_ids(self, user, explicit_room_id):
+        joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
+        joined_room_ids = map(lambda r: r.room_id, joined_rooms)
+        if explicit_room_id:
+            if explicit_room_id in joined_room_ids:
+                defer.returnValue(([explicit_room_id], True))
+            if (yield self._is_world_readable(explicit_room_id)):
+                defer.returnValue(([explicit_room_id], False))
+            raise AuthError(403, "Non-joined access not allowed")
+        defer.returnValue((joined_room_ids, True))
+
+    @defer.inlineCallbacks
     def _is_world_readable(self, room_id):
         state = yield self.hs.get_state_handler().get_current_state(
             room_id,
@@ -433,7 +461,7 @@ class Notifier(object):
 
     @log_function
     def _register_with_keys(self, user_stream):
-        self.user_to_user_stream[user_stream.user] = user_stream
+        self.user_to_user_stream[user_stream.user_id] = user_stream
 
         for room in user_stream.rooms:
             s = self.room_to_user_streams.setdefault(room, set())