diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 4933c31c19..f25a252523 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -18,7 +18,8 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.types import UserID
from synapse.events.utils import serialize_event
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.api.constants import Membership, EventTypes
+from synapse.events import EventBase
from ._base import BaseHandler
@@ -29,20 +30,6 @@ import random
logger = logging.getLogger(__name__)
-def started_user_eventstream(distributor, user):
- return preserve_context_over_fn(
- distributor.fire,
- "started_user_eventstream", user
- )
-
-
-def stopped_user_eventstream(distributor, user):
- return preserve_context_over_fn(
- distributor.fire,
- "stopped_user_eventstream", user
- )
-
-
class EventStreamHandler(BaseHandler):
def __init__(self, hs):
@@ -62,61 +49,6 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
- def started_stream(self, user):
- """Tells the presence handler that we have started an eventstream for
- the user:
-
- Args:
- user (User): The user who started a stream.
- Returns:
- A deferred that completes once their presence has been updated.
- """
- if user not in self._streams_per_user:
- # Make sure we set the streams per user to 1 here rather than
- # setting it to zero and incrementing the value below.
- # Otherwise this may race with stopped_stream causing the
- # user to be erased from the map before we have a chance
- # to increment it.
- self._streams_per_user[user] = 1
- if user in self._stop_timer_per_user:
- try:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(user)
- )
- except:
- logger.exception("Failed to cancel event timer")
- else:
- yield started_user_eventstream(self.distributor, user)
- else:
- self._streams_per_user[user] += 1
-
- def stopped_stream(self, user):
- """If there are no streams for a user this starts a timer that will
- notify the presence handler that we haven't got an event stream for
- the user unless the user starts a new stream in 30 seconds.
-
- Args:
- user (User): The user who stopped a stream.
- """
- self._streams_per_user[user] -= 1
- if not self._streams_per_user[user]:
- del self._streams_per_user[user]
-
- # 30 seconds of grace to allow the client to reconnect again
- # before we think they're gone
- def _later():
- logger.debug("_later stopped_user_eventstream %s", user)
-
- self._stop_timer_per_user.pop(user, None)
-
- return stopped_user_eventstream(self.distributor, user)
-
- logger.debug("Scheduling _later: for %s", user)
- self._stop_timer_per_user[user] = (
- self.clock.call_later(30, _later)
- )
-
- @defer.inlineCallbacks
@log_function
def get_stream(self, auth_user_id, pagin_config, timeout=0,
as_client_event=True, affect_presence=True,
@@ -126,11 +58,12 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down.
"""
auth_user = UserID.from_string(auth_user_id)
+ presence_handler = self.hs.get_handlers().presence_handler
- try:
- if affect_presence:
- yield self.started_stream(auth_user)
-
+ context = yield presence_handler.user_syncing(
+ auth_user_id, affect_presence=affect_presence,
+ )
+ with context:
if timeout:
# If they've set a timeout set a minimum limit.
timeout = max(timeout, 500)
@@ -145,6 +78,34 @@ class EventStreamHandler(BaseHandler):
is_guest=is_guest, explicit_room_id=room_id
)
+ # When the user joins a new room, or another user joins a currently
+ # joined room, we need to send down presence for those users.
+ to_add = []
+ for event in events:
+ if not isinstance(event, EventBase):
+ continue
+ if event.type == EventTypes.Member:
+ if event.membership != Membership.JOIN:
+ continue
+ # Send down presence.
+ if event.state_key == auth_user_id:
+ # Send down presence for everyone in the room.
+ users = yield self.store.get_users_in_room(event.room_id)
+ states = yield presence_handler.get_states(
+ users,
+ as_event=True,
+ )
+ to_add.extend(states)
+ else:
+
+ ev = yield presence_handler.get_state(
+ UserID.from_string(event.state_key),
+ as_event=True,
+ )
+ to_add.append(ev)
+
+ events.extend(to_add)
+
time_now = self.clock.time_msec()
chunks = [
@@ -159,10 +120,6 @@ class EventStreamHandler(BaseHandler):
defer.returnValue(chunk)
- finally:
- if affect_presence:
- self.stopped_stream(auth_user)
-
class EventHandler(BaseHandler):
|