summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py154
1 files changed, 60 insertions, 94 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a5e501897c..26fc0d3ec7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,38 +25,42 @@ The methods that define policy are:
 from twisted.internet import defer, reactor
 from contextlib import contextmanager
 
+from six import itervalues, iteritems
+
 from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from synapse.storage.presence import UserPresenceState
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.async import Linearizer
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 from synapse.types import UserID, get_domain_from_id
-import synapse.metrics
+from synapse.metrics import LaterGauge
 
 import logging
 
+from prometheus_client import Counter
 
 logger = logging.getLogger(__name__)
 
-metrics = synapse.metrics.get_metrics_for(__name__)
 
-notified_presence_counter = metrics.register_counter("notified_presence")
-federation_presence_out_counter = metrics.register_counter("federation_presence_out")
-presence_updates_counter = metrics.register_counter("presence_updates")
-timers_fired_counter = metrics.register_counter("timers_fired")
-federation_presence_counter = metrics.register_counter("federation_presence")
-bump_active_time_counter = metrics.register_counter("bump_active_time")
+notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
+federation_presence_out_counter = Counter(
+    "synapse_handler_presence_federation_presence_out", "")
+presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
+timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
+federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
+bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
 
-get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
 
-notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
-state_transition_counter = metrics.register_counter(
-    "state_transition", labels=["from", "to"]
+notify_reason_counter = Counter(
+    "synapse_handler_presence_notify_reason", "", ["reason"])
+state_transition_counter = Counter(
+    "synapse_handler_presence_state_transition", "", ["from", "to"]
 )
 
 
@@ -87,6 +91,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 class PresenceHandler(object):
 
     def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer):
+        """
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
@@ -94,7 +103,6 @@ class PresenceHandler(object):
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
         self.federation = hs.get_federation_sender()
-
         self.state = hs.get_state_handler()
 
         federation_registry = hs.get_federation_registry()
@@ -137,8 +145,9 @@ class PresenceHandler(object):
             for state in active_presence
         }
 
-        metrics.register_callback(
-            "user_to_current_state_size", lambda: len(self.user_to_current_state)
+        LaterGauge(
+            "synapse_handlers_presence_user_to_current_state_size", "", [],
+            lambda: len(self.user_to_current_state)
         )
 
         now = self.clock.time_msec()
@@ -208,7 +217,8 @@ class PresenceHandler(object):
             60 * 1000,
         )
 
-        metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
+        LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
+                   lambda: len(self.wheel_timer))
 
     @defer.inlineCallbacks
     def _on_shutdown(self):
@@ -255,6 +265,14 @@ class PresenceHandler(object):
         logger.info("Finished _persist_unpersisted_changes")
 
     @defer.inlineCallbacks
+    def _update_states_and_catch_exception(self, new_states):
+        try:
+            res = yield self._update_states(new_states)
+            defer.returnValue(res)
+        except Exception:
+            logger.exception("Error updating presence")
+
+    @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
         the notifier and federation if and only if the changed presence state
@@ -303,10 +321,10 @@ class PresenceHandler(object):
 
             # TODO: We should probably ensure there are no races hereafter
 
-            presence_updates_counter.inc_by(len(new_states))
+            presence_updates_counter.inc(len(new_states))
 
             if to_notify:
-                notified_presence_counter.inc_by(len(to_notify))
+                notified_presence_counter.inc(len(to_notify))
                 yield self._persist_and_notify(to_notify.values())
 
             self.unpersisted_users_changes |= set(s.user_id for s in new_states)
@@ -317,7 +335,7 @@ class PresenceHandler(object):
                 if user_id not in to_notify
             }
             if to_federation_ping:
-                federation_presence_out_counter.inc_by(len(to_federation_ping))
+                federation_presence_out_counter.inc(len(to_federation_ping))
 
                 self._push_to_remotes(to_federation_ping.values())
 
@@ -355,7 +373,7 @@ class PresenceHandler(object):
                     for user_id in users_to_check
                 ]
 
-                timers_fired_counter.inc_by(len(states))
+                timers_fired_counter.inc(len(states))
 
                 changes = handle_timeouts(
                     states,
@@ -364,7 +382,7 @@ class PresenceHandler(object):
                     now=now,
                 )
 
-            preserve_fn(self._update_states)(changes)
+            run_in_background(self._update_states_and_catch_exception, changes)
         except Exception:
             logger.exception("Exception in _handle_timeouts loop")
 
@@ -422,20 +440,23 @@ class PresenceHandler(object):
 
         @defer.inlineCallbacks
         def _end():
-            if affect_presence:
+            try:
                 self.user_to_num_current_syncs[user_id] -= 1
 
                 prev_state = yield self.current_state_for_user(user_id)
                 yield self._update_states([prev_state.copy_and_replace(
                     last_user_sync_ts=self.clock.time_msec(),
                 )])
+            except Exception:
+                logger.exception("Error updating presence after sync")
 
         @contextmanager
         def _user_syncing():
             try:
                 yield
             finally:
-                preserve_fn(_end)()
+                if affect_presence:
+                    run_in_background(_end)
 
         defer.returnValue(_user_syncing())
 
@@ -453,61 +474,6 @@ class PresenceHandler(object):
         return syncing_user_ids
 
     @defer.inlineCallbacks
-    def update_external_syncs(self, process_id, syncing_user_ids):
-        """Update the syncing users for an external process
-
-        Args:
-            process_id(str): An identifier for the process the users are
-                syncing against. This allows synapse to process updates
-                as user start and stop syncing against a given process.
-            syncing_user_ids(set(str)): The set of user_ids that are
-                currently syncing on that server.
-        """
-
-        # Grab the previous list of user_ids that were syncing on that process
-        prev_syncing_user_ids = (
-            self.external_process_to_current_syncs.get(process_id, set())
-        )
-        # Grab the current presence state for both the users that are syncing
-        # now and the users that were syncing before this update.
-        prev_states = yield self.current_state_for_users(
-            syncing_user_ids | prev_syncing_user_ids
-        )
-        updates = []
-        time_now_ms = self.clock.time_msec()
-
-        # For each new user that is syncing check if we need to mark them as
-        # being online.
-        for new_user_id in syncing_user_ids - prev_syncing_user_ids:
-            prev_state = prev_states[new_user_id]
-            if prev_state.state == PresenceState.OFFLINE:
-                updates.append(prev_state.copy_and_replace(
-                    state=PresenceState.ONLINE,
-                    last_active_ts=time_now_ms,
-                    last_user_sync_ts=time_now_ms,
-                ))
-            else:
-                updates.append(prev_state.copy_and_replace(
-                    last_user_sync_ts=time_now_ms,
-                ))
-
-        # For each user that is still syncing or stopped syncing update the
-        # last sync time so that we will correctly apply the grace period when
-        # they stop syncing.
-        for old_user_id in prev_syncing_user_ids:
-            prev_state = prev_states[old_user_id]
-            updates.append(prev_state.copy_and_replace(
-                last_user_sync_ts=time_now_ms,
-            ))
-
-        yield self._update_states(updates)
-
-        # Update the last updated time for the process. We expire the entries
-        # if we don't receive an update in the given timeframe.
-        self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
-        self.external_process_to_current_syncs[process_id] = syncing_user_ids
-
-    @defer.inlineCallbacks
     def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
         """Update the syncing users for an external process as a delta.
 
@@ -570,7 +536,7 @@ class PresenceHandler(object):
                 prev_state.copy_and_replace(
                     last_user_sync_ts=time_now_ms,
                 )
-                for prev_state in prev_states.itervalues()
+                for prev_state in itervalues(prev_states)
             ])
             self.external_process_last_updated_ms.pop(process_id, None)
 
@@ -593,14 +559,14 @@ class PresenceHandler(object):
             for user_id in user_ids
         }
 
-        missing = [user_id for user_id, state in states.iteritems() if not state]
+        missing = [user_id for user_id, state in iteritems(states) if not state]
         if missing:
             # There are things not in our in memory cache. Lets pull them out of
             # the database.
             res = yield self.store.get_presence_for_users(missing)
             states.update(res)
 
-            missing = [user_id for user_id, state in states.iteritems() if not state]
+            missing = [user_id for user_id, state in iteritems(states) if not state]
             if missing:
                 new = {
                     user_id: UserPresenceState.default(user_id)
@@ -696,7 +662,7 @@ class PresenceHandler(object):
             updates.append(prev_state.copy_and_replace(**new_fields))
 
         if updates:
-            federation_presence_counter.inc_by(len(updates))
+            federation_presence_counter.inc(len(updates))
             yield self._update_states(updates)
 
     @defer.inlineCallbacks
@@ -971,28 +937,28 @@ def should_notify(old_state, new_state):
         return False
 
     if old_state.status_msg != new_state.status_msg:
-        notify_reason_counter.inc("status_msg_change")
+        notify_reason_counter.labels("status_msg_change").inc()
         return True
 
     if old_state.state != new_state.state:
-        notify_reason_counter.inc("state_change")
-        state_transition_counter.inc(old_state.state, new_state.state)
+        notify_reason_counter.labels("state_change").inc()
+        state_transition_counter.labels(old_state.state, new_state.state).inc()
         return True
 
     if old_state.state == PresenceState.ONLINE:
         if new_state.currently_active != old_state.currently_active:
-            notify_reason_counter.inc("current_active_change")
+            notify_reason_counter.labels("current_active_change").inc()
             return True
 
         if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
             # Only notify about last active bumps if we're not currently acive
             if not new_state.currently_active:
-                notify_reason_counter.inc("last_active_change_online")
+                notify_reason_counter.labels("last_active_change_online").inc()
                 return True
 
     elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
         # Always notify for a transition where last active gets bumped.
-        notify_reason_counter.inc("last_active_change_not_online")
+        notify_reason_counter.labels("last_active_change_not_online").inc()
         return True
 
     return False
@@ -1066,14 +1032,14 @@ class PresenceEventSource(object):
             if changed is not None and len(changed) < 500:
                 # For small deltas, its quicker to get all changes and then
                 # work out if we share a room or they're in our presence list
-                get_updates_counter.inc("stream")
+                get_updates_counter.labels("stream").inc()
                 for other_user_id in changed:
                     if other_user_id in users_interested_in:
                         user_ids_changed.add(other_user_id)
             else:
                 # Too many possible updates. Find all users we can see and check
                 # if any of them have changed.
-                get_updates_counter.inc("full")
+                get_updates_counter.labels("full").inc()
 
                 if from_key:
                     user_ids_changed = stream_change_cache.get_entities_changed(
@@ -1088,7 +1054,7 @@ class PresenceEventSource(object):
             defer.returnValue((updates.values(), max_token))
         else:
             defer.returnValue(([
-                s for s in updates.itervalues()
+                s for s in itervalues(updates)
                 if s.state != PresenceState.OFFLINE
             ], max_token))
 
@@ -1345,11 +1311,11 @@ def get_interested_remotes(store, states, state_handler):
     # hosts in those rooms.
     room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
 
-    for room_id, states in room_ids_to_states.iteritems():
+    for room_id, states in iteritems(room_ids_to_states):
         hosts = yield state_handler.get_current_hosts_in_room(room_id)
         hosts_and_states.append((hosts, states))
 
-    for user_id, states in users_to_states.iteritems():
+    for user_id, states in iteritems(users_to_states):
         host = get_domain_from_id(user_id)
         hosts_and_states.append(([host], states))