diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 891502c04f..0e4c0d4d06 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -47,10 +47,60 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
+ def started_stream(self, user):
+ """Tells the presence handler that we have started an eventstream for
+ the user:
+
+ Args:
+ user (User): The user who started a stream.
+ Returns:
+ A deferred that completes once their presence has been updated.
+ """
+ if user not in self._streams_per_user:
+ self._streams_per_user[user] = 0
+ if user in self._stop_timer_per_user:
+ try:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(user)
+ )
+ except:
+ logger.exception("Failed to cancel event timer")
+ else:
+ yield self.distributor.fire("started_user_eventstream", user)
+
+ self._streams_per_user[user] += 1
+
+ def stopped_stream(self, user):
+ """If there are no streams for a user this starts a timer that will
+ notify the presence handler that we haven't got an event stream for
+ the user unless the user starts a new stream in 30 seconds.
+
+ Args:
+ user (User): The user who stopped a stream.
+ """
+ self._streams_per_user[user] -= 1
+ if not self._streams_per_user[user]:
+ del self._streams_per_user[user]
+
+ # 30 seconds of grace to allow the client to reconnect again
+ # before we think they're gone
+ def _later():
+ logger.debug("_later stopped_user_eventstream %s", user)
+
+ self._stop_timer_per_user.pop(user, None)
+
+ return self.distributor.fire("stopped_user_eventstream", user)
+
+ logger.debug("Scheduling _later: for %s", user)
+ self._stop_timer_per_user[user] = (
+ self.clock.call_later(30, _later)
+ )
+
+ @defer.inlineCallbacks
@log_function
def get_stream(self, auth_user_id, pagin_config, timeout=0,
as_client_event=True, affect_presence=True,
- only_room_events=False):
+ only_room_events=False, room_id=None, is_guest=False):
"""Fetches the events stream for a given user.
If `only_room_events` is `True` only room events will be returned.
@@ -59,31 +109,7 @@ class EventStreamHandler(BaseHandler):
try:
if affect_presence:
- if auth_user not in self._streams_per_user:
- self._streams_per_user[auth_user] = 0
- if auth_user in self._stop_timer_per_user:
- try:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(auth_user)
- )
- except:
- logger.exception("Failed to cancel event timer")
- else:
- yield self.distributor.fire(
- "started_user_eventstream", auth_user
- )
- self._streams_per_user[auth_user] += 1
-
- rm_handler = self.hs.get_handlers().room_member_handler
-
- app_service = yield self.store.get_app_service_by_user_id(
- auth_user.to_string()
- )
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- room_ids = set(r.room_id for r in rooms)
- else:
- room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user)
+ yield self.started_stream(auth_user)
if timeout:
# If they've set a timeout set a minimum limit.
@@ -93,9 +119,15 @@ class EventStreamHandler(BaseHandler):
# thundering herds on restart.
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+ if is_guest:
+ yield self.distributor.fire(
+ "user_joined_room", user=auth_user, room_id=room_id
+ )
+
events, tokens = yield self.notifier.get_events_for(
- auth_user, room_ids, pagin_config, timeout,
- only_room_events=only_room_events
+ auth_user, pagin_config, timeout,
+ only_room_events=only_room_events,
+ is_guest=is_guest, guest_room_id=room_id
)
time_now = self.clock.time_msec()
@@ -114,27 +146,7 @@ class EventStreamHandler(BaseHandler):
finally:
if affect_presence:
- self._streams_per_user[auth_user] -= 1
- if not self._streams_per_user[auth_user]:
- del self._streams_per_user[auth_user]
-
- # 10 seconds of grace to allow the client to reconnect again
- # before we think they're gone
- def _later():
- logger.debug(
- "_later stopped_user_eventstream %s", auth_user
- )
-
- self._stop_timer_per_user.pop(auth_user, None)
-
- return self.distributor.fire(
- "stopped_user_eventstream", auth_user
- )
-
- logger.debug("Scheduling _later: for %s", auth_user)
- self._stop_timer_per_user[auth_user] = (
- self.clock.call_later(30, _later)
- )
+ self.stopped_stream(auth_user)
class EventHandler(BaseHandler):
|