summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/notifier.py71
1 files changed, 48 insertions, 23 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 0a5653b8d5..3285487551 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -63,9 +63,9 @@ class _NotifierUserStream(object):
     so that it can remove itself from the indexes in the Notifier class.
     """
 
-    def __init__(self, user, rooms, current_token, time_now_ms,
+    def __init__(self, user_id, rooms, current_token, time_now_ms,
                  appservice=None):
-        self.user = str(user)
+        self.user_id = user_id
         self.appservice = appservice
         self.rooms = set(rooms)
         self.current_token = current_token
@@ -98,7 +98,7 @@ class _NotifierUserStream(object):
             lst = notifier.room_to_user_streams.get(room, set())
             lst.discard(self)
 
-        notifier.user_to_user_stream.pop(self.user)
+        notifier.user_to_user_stream.pop(self.user_id)
 
         if self.appservice:
             notifier.appservice_to_user_streams.get(
@@ -271,21 +271,20 @@ class Notifier(object):
                 logger.exception("Failed to notify listener")
 
     @defer.inlineCallbacks
-    def wait_for_events(self, user, timeout, callback, room_ids=None,
+    def wait_for_events(self, user_id, timeout, callback, room_ids=None,
                         from_token=StreamToken("s0", "0", "0", "0", "0")):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
-        user = str(user)
-        user_stream = self.user_to_user_stream.get(user)
+        user_stream = self.user_to_user_stream.get(user_id)
         if user_stream is None:
-            appservice = yield self.store.get_app_service_by_user_id(user)
+            appservice = yield self.store.get_app_service_by_user_id(user_id)
             current_token = yield self.event_sources.get_current_token()
             if room_ids is None:
-                rooms = yield self.store.get_rooms_for_user(user)
+                rooms = yield self.store.get_rooms_for_user(user_id)
                 room_ids = [room.room_id for room in rooms]
             user_stream = _NotifierUserStream(
-                user=user,
+                user_id=user_id,
                 rooms=room_ids,
                 appservice=appservice,
                 current_token=current_token,
@@ -333,12 +332,17 @@ class Notifier(object):
     @defer.inlineCallbacks
     def get_events_for(self, user, pagination_config, timeout,
                        only_room_events=False,
-                       is_guest=False, guest_room_id=None):
+                       is_guest=False, explicit_room_id=None):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
         new events to happen before returning.
 
         If `only_room_events` is `True` only room events will be returned.
+
+        If explicit_room_id is not set, the user's joined rooms will be polled
+        for events.
+        If explicit_room_id is set, that room will be polled for events only if
+        it is world readable or the user has joined the room.
         """
         from_token = pagination_config.from_token
         if not from_token:
@@ -346,15 +350,8 @@ class Notifier(object):
 
         limit = pagination_config.limit
 
-        room_ids = []
-        if is_guest:
-            if guest_room_id:
-                if not (yield self._is_world_readable(guest_room_id)):
-                    raise AuthError(403, "Guest access not allowed")
-                room_ids = [guest_room_id]
-        else:
-            rooms = yield self.store.get_rooms_for_user(user.to_string())
-            room_ids = [room.room_id for room in rooms]
+        room_ids, is_joined = yield self._get_room_ids(user, explicit_room_id)
+        is_peeking = not is_joined
 
         @defer.inlineCallbacks
         def check_for_updates(before_token, after_token):
@@ -376,7 +373,7 @@ class Notifier(object):
                     user=user,
                     from_key=getattr(from_token, keyname),
                     limit=limit,
-                    is_guest=is_guest,
+                    is_guest=is_peeking,
                     room_ids=room_ids,
                 )
 
@@ -385,7 +382,7 @@ class Notifier(object):
                     new_events = yield room_member_handler._filter_events_for_client(
                         user.to_string(),
                         new_events,
-                        is_guest=is_guest,
+                        is_peeking=is_peeking,
                     )
 
                 events.extend(new_events)
@@ -396,8 +393,24 @@ class Notifier(object):
             else:
                 defer.returnValue(None)
 
+        user_id_for_stream = user.to_string()
+        if is_peeking:
+            # Internally, the notifier keeps an event stream per user_id.
+            # This is used by both /sync and /events.
+            # We want /events to be used for peeking independently of /sync,
+            # without polluting its contents. So we invent an illegal user ID
+            # (which thus cannot clash with any real users) for keying peeking
+            # over /events.
+            #
+            # I am sorry for what I have done.
+            user_id_for_stream = "_PEEKING_" + user_id_for_stream
+
         result = yield self.wait_for_events(
-            user, timeout, check_for_updates, room_ids=room_ids, from_token=from_token
+            user_id_for_stream,
+            timeout,
+            check_for_updates,
+            room_ids=room_ids,
+            from_token=from_token,
         )
 
         if result is None:
@@ -406,6 +419,18 @@ class Notifier(object):
         defer.returnValue(result)
 
     @defer.inlineCallbacks
+    def _get_room_ids(self, user, explicit_room_id):
+        joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
+        joined_room_ids = map(lambda r: r.room_id, joined_rooms)
+        if explicit_room_id:
+            if explicit_room_id in joined_room_ids:
+                defer.returnValue(([explicit_room_id], True))
+            if (yield self._is_world_readable(explicit_room_id)):
+                defer.returnValue(([explicit_room_id], False))
+            raise AuthError(403, "Non-joined access not allowed")
+        defer.returnValue((joined_room_ids, True))
+
+    @defer.inlineCallbacks
     def _is_world_readable(self, room_id):
         state = yield self.hs.get_state_handler().get_current_state(
             room_id,
@@ -432,7 +457,7 @@ class Notifier(object):
 
     @log_function
     def _register_with_keys(self, user_stream):
-        self.user_to_user_stream[user_stream.user] = user_stream
+        self.user_to_user_stream[user_stream.user_id] = user_stream
 
         for room in user_stream.rooms:
             s = self.room_to_user_streams.setdefault(room, set())