summary refs log tree commit diff
path: root/synapse/handlers/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r--synapse/handlers/events.py113
1 files changed, 35 insertions, 78 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 4933c31c19..f25a252523 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -18,7 +18,8 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.types import UserID
 from synapse.events.utils import serialize_event
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.api.constants import Membership, EventTypes
+from synapse.events import EventBase
 
 from ._base import BaseHandler
 
@@ -29,20 +30,6 @@ import random
 logger = logging.getLogger(__name__)
 
 
-def started_user_eventstream(distributor, user):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "started_user_eventstream", user
-    )
-
-
-def stopped_user_eventstream(distributor, user):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "stopped_user_eventstream", user
-    )
-
-
 class EventStreamHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -62,61 +49,6 @@ class EventStreamHandler(BaseHandler):
         self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
-    def started_stream(self, user):
-        """Tells the presence handler that we have started an eventstream for
-        the user:
-
-        Args:
-            user (User): The user who started a stream.
-        Returns:
-            A deferred that completes once their presence has been updated.
-        """
-        if user not in self._streams_per_user:
-            # Make sure we set the streams per user to 1 here rather than
-            # setting it to zero and incrementing the value below.
-            # Otherwise this may race with stopped_stream causing the
-            # user to be erased from the map before we have a chance
-            # to increment it.
-            self._streams_per_user[user] = 1
-            if user in self._stop_timer_per_user:
-                try:
-                    self.clock.cancel_call_later(
-                        self._stop_timer_per_user.pop(user)
-                    )
-                except:
-                    logger.exception("Failed to cancel event timer")
-            else:
-                yield started_user_eventstream(self.distributor, user)
-        else:
-            self._streams_per_user[user] += 1
-
-    def stopped_stream(self, user):
-        """If there are no streams for a user this starts a timer that will
-        notify the presence handler that we haven't got an event stream for
-        the user unless the user starts a new stream in 30 seconds.
-
-        Args:
-            user (User): The user who stopped a stream.
-        """
-        self._streams_per_user[user] -= 1
-        if not self._streams_per_user[user]:
-            del self._streams_per_user[user]
-
-            # 30 seconds of grace to allow the client to reconnect again
-            #   before we think they're gone
-            def _later():
-                logger.debug("_later stopped_user_eventstream %s", user)
-
-                self._stop_timer_per_user.pop(user, None)
-
-                return stopped_user_eventstream(self.distributor, user)
-
-            logger.debug("Scheduling _later: for %s", user)
-            self._stop_timer_per_user[user] = (
-                self.clock.call_later(30, _later)
-            )
-
-    @defer.inlineCallbacks
     @log_function
     def get_stream(self, auth_user_id, pagin_config, timeout=0,
                    as_client_event=True, affect_presence=True,
@@ -126,11 +58,12 @@ class EventStreamHandler(BaseHandler):
         If `only_keys` is not None, events from keys will be sent down.
         """
         auth_user = UserID.from_string(auth_user_id)
+        presence_handler = self.hs.get_handlers().presence_handler
 
-        try:
-            if affect_presence:
-                yield self.started_stream(auth_user)
-
+        context = yield presence_handler.user_syncing(
+            auth_user_id, affect_presence=affect_presence,
+        )
+        with context:
             if timeout:
                 # If they've set a timeout set a minimum limit.
                 timeout = max(timeout, 500)
@@ -145,6 +78,34 @@ class EventStreamHandler(BaseHandler):
                 is_guest=is_guest, explicit_room_id=room_id
             )
 
+            # When the user joins a new room, or another user joins a currently
+            # joined room, we need to send down presence for those users.
+            to_add = []
+            for event in events:
+                if not isinstance(event, EventBase):
+                    continue
+                if event.type == EventTypes.Member:
+                    if event.membership != Membership.JOIN:
+                        continue
+                    # Send down presence.
+                    if event.state_key == auth_user_id:
+                        # Send down presence for everyone in the room.
+                        users = yield self.store.get_users_in_room(event.room_id)
+                        states = yield presence_handler.get_states(
+                            users,
+                            as_event=True,
+                        )
+                        to_add.extend(states)
+                    else:
+
+                        ev = yield presence_handler.get_state(
+                            UserID.from_string(event.state_key),
+                            as_event=True,
+                        )
+                        to_add.append(ev)
+
+            events.extend(to_add)
+
             time_now = self.clock.time_msec()
 
             chunks = [
@@ -159,10 +120,6 @@ class EventStreamHandler(BaseHandler):
 
             defer.returnValue(chunk)
 
-        finally:
-            if affect_presence:
-                self.stopped_stream(auth_user)
-
 
 class EventHandler(BaseHandler):