summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py214
1 files changed, 159 insertions, 55 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d0c8f1328b..6b70fa3817 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,11 +33,9 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID
+from synapse.types import UserID, get_domain_from_id
 import synapse.metrics
 
-from ._base import BaseHandler
-
 import logging
 
 
@@ -52,6 +50,8 @@ timers_fired_counter = metrics.register_counter("timers_fired")
 federation_presence_counter = metrics.register_counter("federation_presence")
 bump_active_time_counter = metrics.register_counter("bump_active_time")
 
+get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+
 
 # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
 # "currently_active"
@@ -70,14 +70,18 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
 # How often to resend presence to remote servers
 FEDERATION_PING_INTERVAL = 25 * 60 * 1000
 
+# How long we will wait before assuming that the syncs from an external process
+# are dead.
+EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+
 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
-class PresenceHandler(BaseHandler):
+class PresenceHandler(object):
 
     def __init__(self, hs):
-        super(PresenceHandler, self).__init__(hs)
-        self.hs = hs
+        self.is_mine = hs.is_mine
+        self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -138,7 +142,7 @@ class PresenceHandler(BaseHandler):
                 obj=state.user_id,
                 then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
             )
-            if self.hs.is_mine_id(state.user_id):
+            if self.is_mine_id(state.user_id):
                 self.wheel_timer.insert(
                     now=now,
                     obj=state.user_id,
@@ -160,15 +164,26 @@ class PresenceHandler(BaseHandler):
         self.serial_to_user = {}
         self._next_serial = 1
 
-        # Keeps track of the number of *ongoing* syncs. While this is non zero
-        # a user will never go offline.
+        # Keeps track of the number of *ongoing* syncs on this process. While
+        # this is non zero a user will never go offline.
         self.user_to_num_current_syncs = {}
 
+        # Keeps track of the number of *ongoing* syncs on other processes.
+        # While any sync is ongoing on another process the user will never
+        # go offline.
+        # Each process has a unique identifier and an update frequency. If
+        # no update is received from that process within the update period then
+        # we assume that all the sync requests on that process have stopped.
+        # Stored as a dict from process_id to set of user_id, and a dict of
+        # process_id to millisecond timestamp last updated.
+        self.external_process_to_current_syncs = {}
+        self.external_process_last_updated_ms = {}
+
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
         self.clock.call_later(
-            0 * 1000,
+            30,
             self.clock.looping_call,
             self._handle_timeouts,
             5000,
@@ -228,7 +243,7 @@ class PresenceHandler(BaseHandler):
 
                 new_state, should_notify, should_ping = handle_update(
                     prev_state, new_state,
-                    is_mine=self.hs.is_mine_id(user_id),
+                    is_mine=self.is_mine_id(user_id),
                     wheel_timer=self.wheel_timer,
                     now=now
                 )
@@ -268,31 +283,48 @@ class PresenceHandler(BaseHandler):
         """Checks the presence of users that have timed out and updates as
         appropriate.
         """
+        logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        with Measure(self.clock, "presence_handle_timeouts"):
-            # Fetch the list of users that *may* have timed out. Things may have
-            # changed since the timeout was set, so we won't necessarily have to
-            # take any action.
-            users_to_check = self.wheel_timer.fetch(now)
+        try:
+            with Measure(self.clock, "presence_handle_timeouts"):
+                # Fetch the list of users that *may* have timed out. Things may have
+                # changed since the timeout was set, so we won't necessarily have to
+                # take any action.
+                users_to_check = set(self.wheel_timer.fetch(now))
+
+                # Check whether the lists of syncing processes from an external
+                # process have expired.
+                expired_process_ids = [
+                    process_id for process_id, last_update
+                    in self.external_process_last_updated_ms.items()
+                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
+                ]
+                for process_id in expired_process_ids:
+                    users_to_check.update(
+                        self.external_process_last_updated_ms.pop(process_id, ())
+                    )
+                    self.external_process_last_update.pop(process_id)
 
-            states = [
-                self.user_to_current_state.get(
-                    user_id, UserPresenceState.default(user_id)
-                )
-                for user_id in set(users_to_check)
-            ]
+                states = [
+                    self.user_to_current_state.get(
+                        user_id, UserPresenceState.default(user_id)
+                    )
+                    for user_id in users_to_check
+                ]
 
-            timers_fired_counter.inc_by(len(states))
+                timers_fired_counter.inc_by(len(states))
 
-            changes = handle_timeouts(
-                states,
-                is_mine_fn=self.hs.is_mine_id,
-                user_to_num_current_syncs=self.user_to_num_current_syncs,
-                now=now,
-            )
+                changes = handle_timeouts(
+                    states,
+                    is_mine_fn=self.is_mine_id,
+                    syncing_user_ids=self.get_currently_syncing_users(),
+                    now=now,
+                )
 
-        preserve_fn(self._update_states)(changes)
+            preserve_fn(self._update_states)(changes)
+        except:
+            logger.exception("Exception in _handle_timeouts loop")
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -365,6 +397,74 @@ class PresenceHandler(BaseHandler):
 
         defer.returnValue(_user_syncing())
 
+    def get_currently_syncing_users(self):
+        """Get the set of user ids that are currently syncing on this HS.
+        Returns:
+            set(str): A set of user_id strings.
+        """
+        syncing_user_ids = {
+            user_id for user_id, count in self.user_to_num_current_syncs.items()
+            if count
+        }
+        for user_ids in self.external_process_to_current_syncs.values():
+            syncing_user_ids.update(user_ids)
+        return syncing_user_ids
+
+    @defer.inlineCallbacks
+    def update_external_syncs(self, process_id, syncing_user_ids):
+        """Update the syncing users for an external process
+
+        Args:
+            process_id(str): An identifier for the process the users are
+                syncing against. This allows synapse to process updates
+                as user start and stop syncing against a given process.
+            syncing_user_ids(set(str)): The set of user_ids that are
+                currently syncing on that server.
+        """
+
+        # Grab the previous list of user_ids that were syncing on that process
+        prev_syncing_user_ids = (
+            self.external_process_to_current_syncs.get(process_id, set())
+        )
+        # Grab the current presence state for both the users that are syncing
+        # now and the users that were syncing before this update.
+        prev_states = yield self.current_state_for_users(
+            syncing_user_ids | prev_syncing_user_ids
+        )
+        updates = []
+        time_now_ms = self.clock.time_msec()
+
+        # For each new user that is syncing check if we need to mark them as
+        # being online.
+        for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+            prev_state = prev_states[new_user_id]
+            if prev_state.state == PresenceState.OFFLINE:
+                updates.append(prev_state.copy_and_replace(
+                    state=PresenceState.ONLINE,
+                    last_active_ts=time_now_ms,
+                    last_user_sync_ts=time_now_ms,
+                ))
+            else:
+                updates.append(prev_state.copy_and_replace(
+                    last_user_sync_ts=time_now_ms,
+                ))
+
+        # For each user that is still syncing or stopped syncing update the
+        # last sync time so that we will correctly apply the grace period when
+        # they stop syncing.
+        for old_user_id in prev_syncing_user_ids:
+            prev_state = prev_states[old_user_id]
+            updates.append(prev_state.copy_and_replace(
+                last_user_sync_ts=time_now_ms,
+            ))
+
+        yield self._update_states(updates)
+
+        # Update the last updated time for the process. We expire the entries
+        # if we don't receive an update in the given timeframe.
+        self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+        self.external_process_to_current_syncs[process_id] = syncing_user_ids
+
     @defer.inlineCallbacks
     def current_state_for_user(self, user_id):
         """Get the current presence state for a user.
@@ -427,7 +527,7 @@ class PresenceHandler(BaseHandler):
 
         hosts_to_states = {}
         for room_id, states in room_ids_to_states.items():
-            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
             if not local_states:
                 continue
 
@@ -436,11 +536,11 @@ class PresenceHandler(BaseHandler):
                 hosts_to_states.setdefault(host, []).extend(local_states)
 
         for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
             if not local_states:
                 continue
 
-            host = UserID.from_string(user_id).domain
+            host = get_domain_from_id(user_id)
             hosts_to_states.setdefault(host, []).extend(local_states)
 
         # TODO: de-dup hosts_to_states, as a single host might have multiple
@@ -611,14 +711,14 @@ class PresenceHandler(BaseHandler):
         # don't need to send to local clients here, as that is done as part
         # of the event stream/sync.
         # TODO: Only send to servers not already in the room.
-        if self.hs.is_mine(user):
+        if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
             hosts = yield self.store.get_joined_hosts_for_room(room_id)
             self._push_to_remotes({host: (state,) for host in hosts})
         else:
             user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = filter(self.hs.is_mine_id, user_ids)
+            user_ids = filter(self.is_mine_id, user_ids)
 
             states = yield self.current_state_for_users(user_ids)
 
@@ -628,7 +728,7 @@ class PresenceHandler(BaseHandler):
     def get_presence_list(self, observer_user, accepted=None):
         """Returns the presence for all users in their presence list.
         """
-        if not self.hs.is_mine(observer_user):
+        if not self.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         presence_list = yield self.store.get_presence_list(
@@ -659,7 +759,7 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        if self.hs.is_mine(observed_user):
+        if self.is_mine(observed_user):
             yield self.invite_presence(observed_user, observer_user)
         else:
             yield self.federation.send_edu(
@@ -675,11 +775,11 @@ class PresenceHandler(BaseHandler):
     def invite_presence(self, observed_user, observer_user):
         """Handles new presence invites.
         """
-        if not self.hs.is_mine(observed_user):
+        if not self.is_mine(observed_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         # TODO: Don't auto accept
-        if self.hs.is_mine(observer_user):
+        if self.is_mine(observer_user):
             yield self.accept_presence(observed_user, observer_user)
         else:
             self.federation.send_edu(
@@ -742,7 +842,7 @@ class PresenceHandler(BaseHandler):
         Returns:
             A Deferred.
         """
-        if not self.hs.is_mine(observer_user):
+        if not self.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         yield self.store.del_presence_list(
@@ -834,7 +934,11 @@ def _format_user_presence_state(state, now):
 
 class PresenceEventSource(object):
     def __init__(self, hs):
-        self.hs = hs
+        # We can't call get_presence_handler here because there's a cycle:
+        #
+        #   Presence -> Notifier -> PresenceEventSource -> Presence
+        #
+        self.get_presence_handler = hs.get_presence_handler
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
 
@@ -860,7 +964,7 @@ class PresenceEventSource(object):
                 from_key = int(from_key)
             room_ids = room_ids or []
 
-            presence = self.hs.get_handlers().presence_handler
+            presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
 
             if not room_ids:
@@ -877,13 +981,13 @@ class PresenceEventSource(object):
 
             user_ids_changed = set()
             changed = None
-            if from_key and max_token - from_key < 100:
-                # For small deltas, its quicker to get all changes and then
-                # work out if we share a room or they're in our presence list
+            if from_key:
                 changed = stream_change_cache.get_all_entities_changed(from_key)
 
-            # get_all_entities_changed can return None
-            if changed is not None:
+            if changed is not None and len(changed) < 500:
+                # For small deltas, its quicker to get all changes and then
+                # work out if we share a room or they're in our presence list
+                get_updates_counter.inc("stream")
                 for other_user_id in changed:
                     if other_user_id in friends:
                         user_ids_changed.add(other_user_id)
@@ -895,6 +999,8 @@ class PresenceEventSource(object):
             else:
                 # Too many possible updates. Find all users we can see and check
                 # if any of them have changed.
+                get_updates_counter.inc("full")
+
                 user_ids_to_check = set()
                 for room_id in room_ids:
                     users = yield self.store.get_users_in_room(room_id)
@@ -933,15 +1039,14 @@ class PresenceEventSource(object):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
 
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
     appropriate.
 
     Args:
         user_states(list): List of UserPresenceState's to check.
         is_mine_fn (fn): Function that returns if a user_id is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -952,21 +1057,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
     for state in user_states:
         is_mine = is_mine_fn(state.user_id)
 
-        new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+        new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
         if new_state:
             changes[state.user_id] = new_state
 
     return changes.values()
 
 
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
     """Checks the presence of the user to see if any of the timers have elapsed
 
     Args:
         state (UserPresenceState)
         is_mine (bool): Whether the user is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -1000,7 +1104,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
 
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
-        if not user_to_num_current_syncs.get(user_id, 0):
+        if user_id not in syncing_user_ids:
             if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,