diff --git a/synapse/notifier.py b/synapse/notifier.py
index 0a5653b8d5..3285487551 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -63,9 +63,9 @@ class _NotifierUserStream(object):
so that it can remove itself from the indexes in the Notifier class.
"""
- def __init__(self, user, rooms, current_token, time_now_ms,
+ def __init__(self, user_id, rooms, current_token, time_now_ms,
appservice=None):
- self.user = str(user)
+ self.user_id = user_id
self.appservice = appservice
self.rooms = set(rooms)
self.current_token = current_token
@@ -98,7 +98,7 @@ class _NotifierUserStream(object):
lst = notifier.room_to_user_streams.get(room, set())
lst.discard(self)
- notifier.user_to_user_stream.pop(self.user)
+ notifier.user_to_user_stream.pop(self.user_id)
if self.appservice:
notifier.appservice_to_user_streams.get(
@@ -271,21 +271,20 @@ class Notifier(object):
logger.exception("Failed to notify listener")
@defer.inlineCallbacks
- def wait_for_events(self, user, timeout, callback, room_ids=None,
+ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
from_token=StreamToken("s0", "0", "0", "0", "0")):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
- user = str(user)
- user_stream = self.user_to_user_stream.get(user)
+ user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None:
- appservice = yield self.store.get_app_service_by_user_id(user)
+ appservice = yield self.store.get_app_service_by_user_id(user_id)
current_token = yield self.event_sources.get_current_token()
if room_ids is None:
- rooms = yield self.store.get_rooms_for_user(user)
+ rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = [room.room_id for room in rooms]
user_stream = _NotifierUserStream(
- user=user,
+ user_id=user_id,
rooms=room_ids,
appservice=appservice,
current_token=current_token,
@@ -333,12 +332,17 @@ class Notifier(object):
@defer.inlineCallbacks
def get_events_for(self, user, pagination_config, timeout,
only_room_events=False,
- is_guest=False, guest_room_id=None):
+ is_guest=False, explicit_room_id=None):
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
new events to happen before returning.
If `only_room_events` is `True` only room events will be returned.
+
+ If explicit_room_id is not set, the user's joined rooms will be polled
+ for events.
+ If explicit_room_id is set, that room will be polled for events only if
+ it is world readable or the user has joined the room.
"""
from_token = pagination_config.from_token
if not from_token:
@@ -346,15 +350,8 @@ class Notifier(object):
limit = pagination_config.limit
- room_ids = []
- if is_guest:
- if guest_room_id:
- if not (yield self._is_world_readable(guest_room_id)):
- raise AuthError(403, "Guest access not allowed")
- room_ids = [guest_room_id]
- else:
- rooms = yield self.store.get_rooms_for_user(user.to_string())
- room_ids = [room.room_id for room in rooms]
+ room_ids, is_joined = yield self._get_room_ids(user, explicit_room_id)
+ is_peeking = not is_joined
@defer.inlineCallbacks
def check_for_updates(before_token, after_token):
@@ -376,7 +373,7 @@ class Notifier(object):
user=user,
from_key=getattr(from_token, keyname),
limit=limit,
- is_guest=is_guest,
+ is_guest=is_peeking,
room_ids=room_ids,
)
@@ -385,7 +382,7 @@ class Notifier(object):
new_events = yield room_member_handler._filter_events_for_client(
user.to_string(),
new_events,
- is_guest=is_guest,
+ is_peeking=is_peeking,
)
events.extend(new_events)
@@ -396,8 +393,24 @@ class Notifier(object):
else:
defer.returnValue(None)
+ user_id_for_stream = user.to_string()
+ if is_peeking:
+ # Internally, the notifier keeps an event stream per user_id.
+ # This is used by both /sync and /events.
+ # We want /events to be used for peeking independently of /sync,
+ # without polluting its contents. So we invent an illegal user ID
+ # (which thus cannot clash with any real users) for keying peeking
+ # over /events.
+ #
+ # I am sorry for what I have done.
+ user_id_for_stream = "_PEEKING_" + user_id_for_stream
+
result = yield self.wait_for_events(
- user, timeout, check_for_updates, room_ids=room_ids, from_token=from_token
+ user_id_for_stream,
+ timeout,
+ check_for_updates,
+ room_ids=room_ids,
+ from_token=from_token,
)
if result is None:
@@ -406,6 +419,18 @@ class Notifier(object):
defer.returnValue(result)
@defer.inlineCallbacks
+ def _get_room_ids(self, user, explicit_room_id):
+ joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
+ joined_room_ids = map(lambda r: r.room_id, joined_rooms)
+ if explicit_room_id:
+ if explicit_room_id in joined_room_ids:
+ defer.returnValue(([explicit_room_id], True))
+ if (yield self._is_world_readable(explicit_room_id)):
+ defer.returnValue(([explicit_room_id], False))
+ raise AuthError(403, "Non-joined access not allowed")
+ defer.returnValue((joined_room_ids, True))
+
+ @defer.inlineCallbacks
def _is_world_readable(self, room_id):
state = yield self.hs.get_state_handler().get_current_state(
room_id,
@@ -432,7 +457,7 @@ class Notifier(object):
@log_function
def _register_with_keys(self, user_stream):
- self.user_to_user_stream[user_stream.user] = user_stream
+ self.user_to_user_stream[user_stream.user_id] = user_stream
for room in user_stream.rooms:
s = self.room_to_user_streams.setdefault(room, set())
|