summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py171
1 files changed, 130 insertions, 41 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da610e430f..9ed5af3cb4 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -29,6 +29,8 @@ from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from synapse.storage.presence import UserPresenceState
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.async import Linearizer
 from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -186,6 +188,7 @@ class PresenceHandler(object):
         # process_id to millisecond timestamp last updated.
         self.external_process_to_current_syncs = {}
         self.external_process_last_updated_ms = {}
+        self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
 
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
@@ -508,6 +511,73 @@ class PresenceHandler(object):
         self.external_process_to_current_syncs[process_id] = syncing_user_ids
 
     @defer.inlineCallbacks
+    def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+        """Update the syncing users for an external process as a delta.
+
+        Args:
+            process_id (str): An identifier for the process the users are
+                syncing against. This allows synapse to process updates
+                as user start and stop syncing against a given process.
+            user_id (str): The user who has started or stopped syncing
+            is_syncing (bool): Whether or not the user is now syncing
+            sync_time_msec(int): Time in ms when the user was last syncing
+        """
+        with (yield self.external_sync_linearizer.queue(process_id)):
+            prev_state = yield self.current_state_for_user(user_id)
+
+            process_presence = self.external_process_to_current_syncs.setdefault(
+                process_id, set()
+            )
+
+            updates = []
+            if is_syncing and user_id not in process_presence:
+                if prev_state.state == PresenceState.OFFLINE:
+                    updates.append(prev_state.copy_and_replace(
+                        state=PresenceState.ONLINE,
+                        last_active_ts=sync_time_msec,
+                        last_user_sync_ts=sync_time_msec,
+                    ))
+                else:
+                    updates.append(prev_state.copy_and_replace(
+                        last_user_sync_ts=sync_time_msec,
+                    ))
+                process_presence.add(user_id)
+            elif user_id in process_presence:
+                updates.append(prev_state.copy_and_replace(
+                    last_user_sync_ts=sync_time_msec,
+                ))
+
+            if not is_syncing:
+                process_presence.discard(user_id)
+
+            if updates:
+                yield self._update_states(updates)
+
+            self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+
+    @defer.inlineCallbacks
+    def update_external_syncs_clear(self, process_id):
+        """Marks all users that had been marked as syncing by a given process
+        as offline.
+
+        Used when the process has stopped/disappeared.
+        """
+        with (yield self.external_sync_linearizer.queue(process_id)):
+            process_presence = self.external_process_to_current_syncs.pop(
+                process_id, set()
+            )
+            prev_states = yield self.current_state_for_users(process_presence)
+            time_now_ms = self.clock.time_msec()
+
+            yield self._update_states([
+                prev_state.copy_and_replace(
+                    last_user_sync_ts=time_now_ms,
+                )
+                for prev_state in prev_states.itervalues()
+            ])
+            self.external_process_last_updated_ms.pop(process_id, None)
+
+    @defer.inlineCallbacks
     def current_state_for_user(self, user_id):
         """Get the current presence state for a user.
         """
@@ -526,14 +596,14 @@ class PresenceHandler(object):
             for user_id in user_ids
         }
 
-        missing = [user_id for user_id, state in states.items() if not state]
+        missing = [user_id for user_id, state in states.iteritems() if not state]
         if missing:
             # There are things not in our in memory cache. Lets pull them out of
             # the database.
             res = yield self.store.get_presence_for_users(missing)
             states.update(res)
 
-            missing = [user_id for user_id, state in states.items() if not state]
+            missing = [user_id for user_id, state in states.iteritems() if not state]
             if missing:
                 new = {
                     user_id: UserPresenceState.default(user_id)
@@ -556,9 +626,9 @@ class PresenceHandler(object):
         room_ids_to_states = {}
         users_to_states = {}
         for state in states:
-            events = yield self.store.get_rooms_for_user(state.user_id)
-            for e in events:
-                room_ids_to_states.setdefault(e.room_id, []).append(state)
+            room_ids = yield self.store.get_rooms_for_user(state.user_id)
+            for room_id in room_ids:
+                room_ids_to_states.setdefault(room_id, []).append(state)
 
             plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
             for u in plist:
@@ -574,8 +644,7 @@ class PresenceHandler(object):
                 if not local_states:
                     continue
 
-                users = yield self.store.get_users_in_room(room_id)
-                hosts = set(get_domain_from_id(u) for u in users)
+                hosts = yield self.store.get_hosts_in_room(room_id)
 
                 for host in hosts:
                     hosts_to_states.setdefault(host, []).extend(local_states)
@@ -719,9 +788,7 @@ class PresenceHandler(object):
                 for state in updates
             ])
         else:
-            defer.returnValue([
-                format_user_presence_state(state, now) for state in updates
-            ])
+            defer.returnValue(updates)
 
     @defer.inlineCallbacks
     def set_state(self, target_user, state, ignore_status_msg=False):
@@ -795,6 +862,9 @@ class PresenceHandler(object):
             as_event=False,
         )
 
+        now = self.clock.time_msec()
+        results[:] = [format_user_presence_state(r, now) for r in results]
+
         is_accepted = {
             row["observed_user_id"]: row["accepted"] for row in presence_list
         }
@@ -847,6 +917,7 @@ class PresenceHandler(object):
             )
 
             state_dict = yield self.get_state(observed_user, as_event=False)
+            state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
 
             self.federation.send_edu(
                 destination=observer_user.domain,
@@ -910,11 +981,12 @@ class PresenceHandler(object):
     def is_visible(self, observed_user, observer_user):
         """Returns whether a user can see another user's presence.
         """
-        observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
-        observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())
-
-        observer_room_ids = set(r.room_id for r in observer_rooms)
-        observed_room_ids = set(r.room_id for r in observed_rooms)
+        observer_room_ids = yield self.store.get_rooms_for_user(
+            observer_user.to_string()
+        )
+        observed_room_ids = yield self.store.get_rooms_for_user(
+            observed_user.to_string()
+        )
 
         if observer_room_ids & observed_room_ids:
             defer.returnValue(True)
@@ -979,14 +1051,18 @@ def should_notify(old_state, new_state):
     return False
 
 
-def format_user_presence_state(state, now):
+def format_user_presence_state(state, now, include_user_id=True):
     """Convert UserPresenceState to a format that can be sent down to clients
     and to other servers.
+
+    The "user_id" is optional so that this function can be used to format presence
+    updates for client /sync responses and for federation /send requests.
     """
     content = {
         "presence": state.state,
-        "user_id": state.user_id,
     }
+    if include_user_id:
+        content["user_id"] = state.user_id
     if state.last_active_ts:
         content["last_active_ago"] = now - state.last_active_ts
     if state.status_msg and state.state != PresenceState.OFFLINE:
@@ -1025,7 +1101,6 @@ class PresenceEventSource(object):
         # sending down the rare duplicate is not a concern.
 
         with Measure(self.clock, "presence.get_new_events"):
-            user_id = user.to_string()
             if from_key is not None:
                 from_key = int(from_key)
 
@@ -1034,18 +1109,7 @@ class PresenceEventSource(object):
 
             max_token = self.store.get_current_presence_token()
 
-            plist = yield self.store.get_presence_list_accepted(user.localpart)
-            users_interested_in = set(row["observed_user_id"] for row in plist)
-            users_interested_in.add(user_id)  # So that we receive our own presence
-
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-            users_interested_in.update(users_who_share_room)
-
-            if explicit_room_id:
-                user_ids = yield self.store.get_users_in_room(explicit_room_id)
-                users_interested_in.update(user_ids)
+            users_interested_in = yield self._get_interested_in(user, explicit_room_id)
 
             user_ids_changed = set()
             changed = None
@@ -1073,16 +1137,13 @@ class PresenceEventSource(object):
 
             updates = yield presence.current_state_for_users(user_ids_changed)
 
-        now = self.clock.time_msec()
-
-        defer.returnValue(([
-            {
-                "type": "m.presence",
-                "content": format_user_presence_state(s, now),
-            }
-            for s in updates.values()
-            if include_offline or s.state != PresenceState.OFFLINE
-        ], max_token))
+        if include_offline:
+            defer.returnValue((updates.values(), max_token))
+        else:
+            defer.returnValue(([
+                s for s in updates.itervalues()
+                if s.state != PresenceState.OFFLINE
+            ], max_token))
 
     def get_current_key(self):
         return self.store.get_current_presence_token()
@@ -1090,6 +1151,31 @@ class PresenceEventSource(object):
     def get_pagination_rows(self, user, pagination_config, key):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
+    @cachedInlineCallbacks(num_args=2, cache_context=True)
+    def _get_interested_in(self, user, explicit_room_id, cache_context):
+        """Returns the set of users that the given user should see presence
+        updates for
+        """
+        user_id = user.to_string()
+        plist = yield self.store.get_presence_list_accepted(
+            user.localpart, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in = set(row["observed_user_id"] for row in plist)
+        users_interested_in.add(user_id)  # So that we receive our own presence
+
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in.update(users_who_share_room)
+
+        if explicit_room_id:
+            user_ids = yield self.store.get_users_in_room(
+                explicit_room_id, on_invalidate=cache_context.invalidate,
+            )
+            users_interested_in.update(user_ids)
+
+        defer.returnValue(users_interested_in)
+
 
 def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
@@ -1157,7 +1243,10 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
         if user_id not in syncing_user_ids:
-            if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+            # If the user has done something recently but hasn't synced,
+            # don't set them as offline.
+            sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
+            if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,
                     status_msg=None,