diff options
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r-- | synapse/handlers/events.py | 113 |
1 files changed, 35 insertions, 78 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 4933c31c19..f25a252523 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -18,7 +18,8 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event -from synapse.util.logcontext import preserve_context_over_fn +from synapse.api.constants import Membership, EventTypes +from synapse.events import EventBase from ._base import BaseHandler @@ -29,20 +30,6 @@ import random logger = logging.getLogger(__name__) -def started_user_eventstream(distributor, user): - return preserve_context_over_fn( - distributor.fire, - "started_user_eventstream", user - ) - - -def stopped_user_eventstream(distributor, user): - return preserve_context_over_fn( - distributor.fire, - "stopped_user_eventstream", user - ) - - class EventStreamHandler(BaseHandler): def __init__(self, hs): @@ -62,61 +49,6 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() @defer.inlineCallbacks - def started_stream(self, user): - """Tells the presence handler that we have started an eventstream for - the user: - - Args: - user (User): The user who started a stream. - Returns: - A deferred that completes once their presence has been updated. - """ - if user not in self._streams_per_user: - # Make sure we set the streams per user to 1 here rather than - # setting it to zero and incrementing the value below. - # Otherwise this may race with stopped_stream causing the - # user to be erased from the map before we have a chance - # to increment it. - self._streams_per_user[user] = 1 - if user in self._stop_timer_per_user: - try: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(user) - ) - except: - logger.exception("Failed to cancel event timer") - else: - yield started_user_eventstream(self.distributor, user) - else: - self._streams_per_user[user] += 1 - - def stopped_stream(self, user): - """If there are no streams for a user this starts a timer that will - notify the presence handler that we haven't got an event stream for - the user unless the user starts a new stream in 30 seconds. - - Args: - user (User): The user who stopped a stream. - """ - self._streams_per_user[user] -= 1 - if not self._streams_per_user[user]: - del self._streams_per_user[user] - - # 30 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - logger.debug("_later stopped_user_eventstream %s", user) - - self._stop_timer_per_user.pop(user, None) - - return stopped_user_eventstream(self.distributor, user) - - logger.debug("Scheduling _later: for %s", user) - self._stop_timer_per_user[user] = ( - self.clock.call_later(30, _later) - ) - - @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, as_client_event=True, affect_presence=True, @@ -126,11 +58,12 @@ class EventStreamHandler(BaseHandler): If `only_keys` is not None, events from keys will be sent down. """ auth_user = UserID.from_string(auth_user_id) + presence_handler = self.hs.get_handlers().presence_handler - try: - if affect_presence: - yield self.started_stream(auth_user) - + context = yield presence_handler.user_syncing( + auth_user_id, affect_presence=affect_presence, + ) + with context: if timeout: # If they've set a timeout set a minimum limit. timeout = max(timeout, 500) @@ -145,6 +78,34 @@ class EventStreamHandler(BaseHandler): is_guest=is_guest, explicit_room_id=room_id ) + # When the user joins a new room, or another user joins a currently + # joined room, we need to send down presence for those users. + to_add = [] + for event in events: + if not isinstance(event, EventBase): + continue + if event.type == EventTypes.Member: + if event.membership != Membership.JOIN: + continue + # Send down presence. + if event.state_key == auth_user_id: + # Send down presence for everyone in the room. + users = yield self.store.get_users_in_room(event.room_id) + states = yield presence_handler.get_states( + users, + as_event=True, + ) + to_add.extend(states) + else: + + ev = yield presence_handler.get_state( + UserID.from_string(event.state_key), + as_event=True, + ) + to_add.append(ev) + + events.extend(to_add) + time_now = self.clock.time_msec() chunks = [ @@ -159,10 +120,6 @@ class EventStreamHandler(BaseHandler): defer.returnValue(chunk) - finally: - if affect_presence: - self.stopped_stream(auth_user) - class EventHandler(BaseHandler): |