diff --git a/synapse/notifier.py b/synapse/notifier.py
index fd52578325..560866b26e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014 - 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,10 +18,13 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, ObservableDeferred
+from synapse.util.async import ObservableDeferred
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import StreamToken
import synapse.metrics
+from collections import namedtuple
+
import logging
@@ -63,15 +66,16 @@ class _NotifierUserStream(object):
so that it can remove itself from the indexes in the Notifier class.
"""
- def __init__(self, user, rooms, current_token, time_now_ms,
+ def __init__(self, user_id, rooms, current_token, time_now_ms,
appservice=None):
- self.user = str(user)
+ self.user_id = user_id
self.appservice = appservice
self.rooms = set(rooms)
self.current_token = current_token
self.last_notified_ms = time_now_ms
- self.notify_deferred = ObservableDeferred(defer.Deferred())
+ with PreserveLoggingContext():
+ self.notify_deferred = ObservableDeferred(defer.Deferred())
def notify(self, stream_key, stream_id, time_now_ms):
"""Notify any listeners for this user of a new event from an
@@ -86,8 +90,10 @@ class _NotifierUserStream(object):
)
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
- self.notify_deferred = ObservableDeferred(defer.Deferred())
- noify_deferred.callback(self.current_token)
+
+ with PreserveLoggingContext():
+ self.notify_deferred = ObservableDeferred(defer.Deferred())
+ noify_deferred.callback(self.current_token)
def remove(self, notifier):
""" Remove this listener from all the indexes in the Notifier
@@ -98,7 +104,7 @@ class _NotifierUserStream(object):
lst = notifier.room_to_user_streams.get(room, set())
lst.discard(self)
- notifier.user_to_user_stream.pop(self.user)
+ notifier.user_to_user_stream.pop(self.user_id)
if self.appservice:
notifier.appservice_to_user_streams.get(
@@ -118,6 +124,11 @@ class _NotifierUserStream(object):
return _NotificationListener(self.notify_deferred.observe())
+class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
+ def __nonzero__(self):
+ return bool(self.events)
+
+
class Notifier(object):
""" This class is responsible for notifying any listeners when there are
new events available for it.
@@ -177,8 +188,6 @@ class Notifier(object):
lambda: count(bool, self.appservice_to_user_streams.values()),
)
- @log_function
- @defer.inlineCallbacks
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]):
""" Used by handlers to inform the notifier something has happened
@@ -192,12 +201,11 @@ class Notifier(object):
until all previous events have been persisted before notifying
the client streams.
"""
- yield run_on_reactor()
-
- self.pending_new_room_events.append((
- room_stream_id, event, extra_users
- ))
- self._notify_pending_new_room_events(max_room_stream_id)
+ with PreserveLoggingContext():
+ self.pending_new_room_events.append((
+ room_stream_id, event, extra_users
+ ))
+ self._notify_pending_new_room_events(max_room_stream_id)
def _notify_pending_new_room_events(self, max_room_stream_id):
"""Notify for the room events that were queued waiting for a previous
@@ -244,48 +252,45 @@ class Notifier(object):
extra_streams=app_streams,
)
- @defer.inlineCallbacks
- @log_function
def on_new_event(self, stream_key, new_token, users=[], rooms=[],
extra_streams=set()):
""" Used to inform listeners that something has happend event wise.
Will wake up all listeners for the given users and rooms.
"""
- yield run_on_reactor()
- user_streams = set()
+ with PreserveLoggingContext():
+ user_streams = set()
- for user in users:
- user_stream = self.user_to_user_stream.get(str(user))
- if user_stream is not None:
- user_streams.add(user_stream)
+ for user in users:
+ user_stream = self.user_to_user_stream.get(str(user))
+ if user_stream is not None:
+ user_streams.add(user_stream)
- for room in rooms:
- user_streams |= self.room_to_user_streams.get(room, set())
+ for room in rooms:
+ user_streams |= self.room_to_user_streams.get(room, set())
- time_now_ms = self.clock.time_msec()
- for user_stream in user_streams:
- try:
- user_stream.notify(stream_key, new_token, time_now_ms)
- except:
- logger.exception("Failed to notify listener")
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
+ try:
+ user_stream.notify(stream_key, new_token, time_now_ms)
+ except:
+ logger.exception("Failed to notify listener")
@defer.inlineCallbacks
- def wait_for_events(self, user, timeout, callback, room_ids=None,
+ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
from_token=StreamToken("s0", "0", "0", "0", "0")):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
- user = str(user)
- user_stream = self.user_to_user_stream.get(user)
+ user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None:
- appservice = yield self.store.get_app_service_by_user_id(user)
+ appservice = yield self.store.get_app_service_by_user_id(user_id)
current_token = yield self.event_sources.get_current_token()
if room_ids is None:
- rooms = yield self.store.get_rooms_for_user(user)
+ rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = [room.room_id for room in rooms]
user_stream = _NotifierUserStream(
- user=user,
+ user_id=user_id,
rooms=room_ids,
appservice=appservice,
current_token=current_token,
@@ -302,7 +307,7 @@ class Notifier(object):
def timed_out():
if listener:
listener.deferred.cancel()
- timer = self.clock.call_later(timeout/1000., timed_out)
+ timer = self.clock.call_later(timeout / 1000., timed_out)
prev_token = from_token
while not result:
@@ -319,7 +324,8 @@ class Notifier(object):
# that we don't miss any current_token updates.
prev_token = current_token
listener = user_stream.new_listener(prev_token)
- yield listener.deferred
+ with PreserveLoggingContext():
+ yield listener.deferred
except defer.CancelledError:
break
@@ -332,13 +338,18 @@ class Notifier(object):
@defer.inlineCallbacks
def get_events_for(self, user, pagination_config, timeout,
- only_room_events=False,
- is_guest=False, guest_room_id=None):
+ only_keys=None,
+ is_guest=False, explicit_room_id=None):
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
new events to happen before returning.
- If `only_room_events` is `True` only room events will be returned.
+ If `only_keys` is not None, events from keys will be sent down.
+
+ If explicit_room_id is not set, the user's joined rooms will be polled
+ for events.
+ If explicit_room_id is set, that room will be polled for events only if
+ it is world readable or the user has joined the room.
"""
from_token = pagination_config.from_token
if not from_token:
@@ -346,20 +357,13 @@ class Notifier(object):
limit = pagination_config.limit
- room_ids = []
- if is_guest:
- if guest_room_id:
- if not (yield self._is_world_readable(guest_room_id)):
- raise AuthError(403, "Guest access not allowed")
- room_ids = [guest_room_id]
- else:
- rooms = yield self.store.get_rooms_for_user(user.to_string())
- room_ids = [room.room_id for room in rooms]
+ room_ids, is_joined = yield self._get_room_ids(user, explicit_room_id)
+ is_peeking = not is_joined
@defer.inlineCallbacks
def check_for_updates(before_token, after_token):
if not after_token.is_after(before_token):
- defer.returnValue(None)
+ defer.returnValue(EventStreamResult([], (from_token, from_token)))
events = []
end_token = from_token
@@ -370,13 +374,14 @@ class Notifier(object):
after_id = getattr(after_token, keyname)
if before_id == after_id:
continue
- if only_room_events and name != "room":
+ if only_keys and name not in only_keys:
continue
+
new_events, new_key = yield source.get_new_events(
user=user,
from_key=getattr(from_token, keyname),
limit=limit,
- is_guest=is_guest,
+ is_guest=is_peeking,
room_ids=room_ids,
)
@@ -385,28 +390,51 @@ class Notifier(object):
new_events = yield room_member_handler._filter_events_for_client(
user.to_string(),
new_events,
- is_guest=is_guest,
- require_all_visible_for_guests=False
+ is_peeking=is_peeking,
)
events.extend(new_events)
end_token = end_token.copy_and_replace(keyname, new_key)
- if events:
- defer.returnValue((events, (from_token, end_token)))
- else:
- defer.returnValue(None)
+ defer.returnValue(EventStreamResult(events, (from_token, end_token)))
+
+ user_id_for_stream = user.to_string()
+ if is_peeking:
+ # Internally, the notifier keeps an event stream per user_id.
+ # This is used by both /sync and /events.
+ # We want /events to be used for peeking independently of /sync,
+ # without polluting its contents. So we invent an illegal user ID
+ # (which thus cannot clash with any real users) for keying peeking
+ # over /events.
+ #
+ # I am sorry for what I have done.
+ user_id_for_stream = "_PEEKING_%s_%s" % (
+ explicit_room_id, user_id_for_stream
+ )
result = yield self.wait_for_events(
- user, timeout, check_for_updates, room_ids=room_ids, from_token=from_token
+ user_id_for_stream,
+ timeout,
+ check_for_updates,
+ room_ids=room_ids,
+ from_token=from_token,
)
- if result is None:
- result = ([], (from_token, from_token))
-
defer.returnValue(result)
@defer.inlineCallbacks
+ def _get_room_ids(self, user, explicit_room_id):
+ joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
+ joined_room_ids = map(lambda r: r.room_id, joined_rooms)
+ if explicit_room_id:
+ if explicit_room_id in joined_room_ids:
+ defer.returnValue(([explicit_room_id], True))
+ if (yield self._is_world_readable(explicit_room_id)):
+ defer.returnValue(([explicit_room_id], False))
+ raise AuthError(403, "Non-joined access not allowed")
+ defer.returnValue((joined_room_ids, True))
+
+ @defer.inlineCallbacks
def _is_world_readable(self, room_id):
state = yield self.hs.get_state_handler().get_current_state(
room_id,
@@ -433,7 +461,7 @@ class Notifier(object):
@log_function
def _register_with_keys(self, user_stream):
- self.user_to_user_stream[user_stream.user] = user_stream
+ self.user_to_user_stream[user_stream.user_id] = user_stream
for room in user_stream.rooms:
s = self.room_to_user_streams.setdefault(room, set())
|