diff options
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r-- | synapse/handlers/events.py | 86 |
1 files changed, 49 insertions, 37 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 103bc67c42..d3297b7292 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,10 +17,13 @@ from twisted.internet import defer from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function +from synapse.types import UserID +from synapse.events.utils import serialize_event from ._base import BaseHandler import logging +import random logger = logging.getLogger(__name__) @@ -47,38 +50,46 @@ class EventStreamHandler(BaseHandler): @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, - as_client_event=True): - auth_user = self.hs.parse_userid(auth_user_id) + as_client_event=True, affect_presence=True): + auth_user = UserID.from_string(auth_user_id) try: - if auth_user not in self._streams_per_user: - self._streams_per_user[auth_user] = 0 - if auth_user in self._stop_timer_per_user: - try: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(auth_user) + if affect_presence: + if auth_user not in self._streams_per_user: + self._streams_per_user[auth_user] = 0 + if auth_user in self._stop_timer_per_user: + try: + self.clock.cancel_call_later( + self._stop_timer_per_user.pop(auth_user) + ) + except: + logger.exception("Failed to cancel event timer") + else: + yield self.distributor.fire( + "started_user_eventstream", auth_user ) - except: - logger.exception("Failed to cancel event timer") - else: - yield self.distributor.fire( - "started_user_eventstream", auth_user - ) - self._streams_per_user[auth_user] += 1 - - if pagin_config.from_token is None: - pagin_config.from_token = None + self._streams_per_user[auth_user] += 1 rm_handler = self.hs.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(auth_user) + if timeout: + # If they've set a timeout set a minimum limit. + timeout = max(timeout, 500) + + # Add some randomness to this value to try and mitigate against + # thundering herds on restart. + timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) + with PreserveLoggingContext(): events, tokens = yield self.notifier.get_events_for( auth_user, room_ids, pagin_config, timeout ) + time_now = self.clock.time_msec() + chunks = [ - self.hs.serialize_event(e, as_client_event) for e in events + serialize_event(e, time_now, as_client_event) for e in events ] chunk = { @@ -90,27 +101,28 @@ class EventStreamHandler(BaseHandler): defer.returnValue(chunk) finally: - self._streams_per_user[auth_user] -= 1 - if not self._streams_per_user[auth_user]: - del self._streams_per_user[auth_user] - - # 10 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - logger.debug( - "_later stopped_user_eventstream %s", auth_user - ) + if affect_presence: + self._streams_per_user[auth_user] -= 1 + if not self._streams_per_user[auth_user]: + del self._streams_per_user[auth_user] + + # 10 seconds of grace to allow the client to reconnect again + # before we think they're gone + def _later(): + logger.debug( + "_later stopped_user_eventstream %s", auth_user + ) - self._stop_timer_per_user.pop(auth_user, None) + self._stop_timer_per_user.pop(auth_user, None) - yield self.distributor.fire( - "stopped_user_eventstream", auth_user - ) + return self.distributor.fire( + "stopped_user_eventstream", auth_user + ) - logger.debug("Scheduling _later: for %s", auth_user) - self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(30, _later) - ) + logger.debug("Scheduling _later: for %s", auth_user) + self._stop_timer_per_user[auth_user] = ( + self.clock.call_later(30, _later) + ) class EventHandler(BaseHandler): |