diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a5e501897c..26fc0d3ec7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,38 +25,42 @@ The methods that define policy are:
from twisted.internet import defer, reactor
from contextlib import contextmanager
+from six import itervalues, iteritems
+
from synapse.api.errors import SynapseError
from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.async import Linearizer
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
-import synapse.metrics
+from synapse.metrics import LaterGauge
import logging
+from prometheus_client import Counter
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
-notified_presence_counter = metrics.register_counter("notified_presence")
-federation_presence_out_counter = metrics.register_counter("federation_presence_out")
-presence_updates_counter = metrics.register_counter("presence_updates")
-timers_fired_counter = metrics.register_counter("timers_fired")
-federation_presence_counter = metrics.register_counter("federation_presence")
-bump_active_time_counter = metrics.register_counter("bump_active_time")
+notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
+federation_presence_out_counter = Counter(
+ "synapse_handler_presence_federation_presence_out", "")
+presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
+timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
+federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
+bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
-get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
-notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
-state_transition_counter = metrics.register_counter(
- "state_transition", labels=["from", "to"]
+notify_reason_counter = Counter(
+ "synapse_handler_presence_notify_reason", "", ["reason"])
+state_transition_counter = Counter(
+ "synapse_handler_presence_state_transition", "", ["from", "to"]
)
@@ -87,6 +91,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
def __init__(self, hs):
+ """
+
+ Args:
+ hs (synapse.server.HomeServer):
+ """
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
@@ -94,7 +103,6 @@ class PresenceHandler(object):
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
-
self.state = hs.get_state_handler()
federation_registry = hs.get_federation_registry()
@@ -137,8 +145,9 @@ class PresenceHandler(object):
for state in active_presence
}
- metrics.register_callback(
- "user_to_current_state_size", lambda: len(self.user_to_current_state)
+ LaterGauge(
+ "synapse_handlers_presence_user_to_current_state_size", "", [],
+ lambda: len(self.user_to_current_state)
)
now = self.clock.time_msec()
@@ -208,7 +217,8 @@ class PresenceHandler(object):
60 * 1000,
)
- metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
+ LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
+ lambda: len(self.wheel_timer))
@defer.inlineCallbacks
def _on_shutdown(self):
@@ -255,6 +265,14 @@ class PresenceHandler(object):
logger.info("Finished _persist_unpersisted_changes")
@defer.inlineCallbacks
+ def _update_states_and_catch_exception(self, new_states):
+ try:
+ res = yield self._update_states(new_states)
+ defer.returnValue(res)
+ except Exception:
+ logger.exception("Error updating presence")
+
+ @defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
@@ -303,10 +321,10 @@ class PresenceHandler(object):
# TODO: We should probably ensure there are no races hereafter
- presence_updates_counter.inc_by(len(new_states))
+ presence_updates_counter.inc(len(new_states))
if to_notify:
- notified_presence_counter.inc_by(len(to_notify))
+ notified_presence_counter.inc(len(to_notify))
yield self._persist_and_notify(to_notify.values())
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
@@ -317,7 +335,7 @@ class PresenceHandler(object):
if user_id not in to_notify
}
if to_federation_ping:
- federation_presence_out_counter.inc_by(len(to_federation_ping))
+ federation_presence_out_counter.inc(len(to_federation_ping))
self._push_to_remotes(to_federation_ping.values())
@@ -355,7 +373,7 @@ class PresenceHandler(object):
for user_id in users_to_check
]
- timers_fired_counter.inc_by(len(states))
+ timers_fired_counter.inc(len(states))
changes = handle_timeouts(
states,
@@ -364,7 +382,7 @@ class PresenceHandler(object):
now=now,
)
- preserve_fn(self._update_states)(changes)
+ run_in_background(self._update_states_and_catch_exception, changes)
except Exception:
logger.exception("Exception in _handle_timeouts loop")
@@ -422,20 +440,23 @@ class PresenceHandler(object):
@defer.inlineCallbacks
def _end():
- if affect_presence:
+ try:
self.user_to_num_current_syncs[user_id] -= 1
prev_state = yield self.current_state_for_user(user_id)
yield self._update_states([prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec(),
)])
+ except Exception:
+ logger.exception("Error updating presence after sync")
@contextmanager
def _user_syncing():
try:
yield
finally:
- preserve_fn(_end)()
+ if affect_presence:
+ run_in_background(_end)
defer.returnValue(_user_syncing())
@@ -453,61 +474,6 @@ class PresenceHandler(object):
return syncing_user_ids
@defer.inlineCallbacks
- def update_external_syncs(self, process_id, syncing_user_ids):
- """Update the syncing users for an external process
-
- Args:
- process_id(str): An identifier for the process the users are
- syncing against. This allows synapse to process updates
- as user start and stop syncing against a given process.
- syncing_user_ids(set(str)): The set of user_ids that are
- currently syncing on that server.
- """
-
- # Grab the previous list of user_ids that were syncing on that process
- prev_syncing_user_ids = (
- self.external_process_to_current_syncs.get(process_id, set())
- )
- # Grab the current presence state for both the users that are syncing
- # now and the users that were syncing before this update.
- prev_states = yield self.current_state_for_users(
- syncing_user_ids | prev_syncing_user_ids
- )
- updates = []
- time_now_ms = self.clock.time_msec()
-
- # For each new user that is syncing check if we need to mark them as
- # being online.
- for new_user_id in syncing_user_ids - prev_syncing_user_ids:
- prev_state = prev_states[new_user_id]
- if prev_state.state == PresenceState.OFFLINE:
- updates.append(prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=time_now_ms,
- last_user_sync_ts=time_now_ms,
- ))
- else:
- updates.append(prev_state.copy_and_replace(
- last_user_sync_ts=time_now_ms,
- ))
-
- # For each user that is still syncing or stopped syncing update the
- # last sync time so that we will correctly apply the grace period when
- # they stop syncing.
- for old_user_id in prev_syncing_user_ids:
- prev_state = prev_states[old_user_id]
- updates.append(prev_state.copy_and_replace(
- last_user_sync_ts=time_now_ms,
- ))
-
- yield self._update_states(updates)
-
- # Update the last updated time for the process. We expire the entries
- # if we don't receive an update in the given timeframe.
- self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
- self.external_process_to_current_syncs[process_id] = syncing_user_ids
-
- @defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
"""Update the syncing users for an external process as a delta.
@@ -570,7 +536,7 @@ class PresenceHandler(object):
prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
)
- for prev_state in prev_states.itervalues()
+ for prev_state in itervalues(prev_states)
])
self.external_process_last_updated_ms.pop(process_id, None)
@@ -593,14 +559,14 @@ class PresenceHandler(object):
for user_id in user_ids
}
- missing = [user_id for user_id, state in states.iteritems() if not state]
+ missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
states.update(res)
- missing = [user_id for user_id, state in states.iteritems() if not state]
+ missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id)
@@ -696,7 +662,7 @@ class PresenceHandler(object):
updates.append(prev_state.copy_and_replace(**new_fields))
if updates:
- federation_presence_counter.inc_by(len(updates))
+ federation_presence_counter.inc(len(updates))
yield self._update_states(updates)
@defer.inlineCallbacks
@@ -971,28 +937,28 @@ def should_notify(old_state, new_state):
return False
if old_state.status_msg != new_state.status_msg:
- notify_reason_counter.inc("status_msg_change")
+ notify_reason_counter.labels("status_msg_change").inc()
return True
if old_state.state != new_state.state:
- notify_reason_counter.inc("state_change")
- state_transition_counter.inc(old_state.state, new_state.state)
+ notify_reason_counter.labels("state_change").inc()
+ state_transition_counter.labels(old_state.state, new_state.state).inc()
return True
if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
- notify_reason_counter.inc("current_active_change")
+ notify_reason_counter.labels("current_active_change").inc()
return True
if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Only notify about last active bumps if we're not currently acive
if not new_state.currently_active:
- notify_reason_counter.inc("last_active_change_online")
+ notify_reason_counter.labels("last_active_change_online").inc()
return True
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
- notify_reason_counter.inc("last_active_change_not_online")
+ notify_reason_counter.labels("last_active_change_not_online").inc()
return True
return False
@@ -1066,14 +1032,14 @@ class PresenceEventSource(object):
if changed is not None and len(changed) < 500:
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
- get_updates_counter.inc("stream")
+ get_updates_counter.labels("stream").inc()
for other_user_id in changed:
if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
- get_updates_counter.inc("full")
+ get_updates_counter.labels("full").inc()
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
@@ -1088,7 +1054,7 @@ class PresenceEventSource(object):
defer.returnValue((updates.values(), max_token))
else:
defer.returnValue(([
- s for s in updates.itervalues()
+ s for s in itervalues(updates)
if s.state != PresenceState.OFFLINE
], max_token))
@@ -1345,11 +1311,11 @@ def get_interested_remotes(store, states, state_handler):
# hosts in those rooms.
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
- for room_id, states in room_ids_to_states.iteritems():
+ for room_id, states in iteritems(room_ids_to_states):
hosts = yield state_handler.get_current_hosts_in_room(room_id)
hosts_and_states.append((hosts, states))
- for user_id, states in users_to_states.iteritems():
+ for user_id, states in iteritems(users_to_states):
host = get_domain_from_id(user_id)
hosts_and_states.append(([host], states))
|