diff options
Diffstat (limited to 'synapse/storage/presence.py')
-rw-r--r-- | synapse/storage/presence.py | 148 |
1 files changed, 39 insertions, 109 deletions
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index a0c7a0dc87..42ec8c6bb8 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -19,15 +19,25 @@ from twisted.internet import defer from synapse.api.constants import PresenceState from synapse.util import batch_iter -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedList from ._base import SQLBaseStore -class UserPresenceState(namedtuple("UserPresenceState", - ("user_id", "state", "last_active_ts", - "last_federation_update_ts", "last_user_sync_ts", - "status_msg", "currently_active"))): +class UserPresenceState( + namedtuple( + "UserPresenceState", + ( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + ) +): """Represents the current presence state of the user. user_id (str) @@ -75,22 +85,21 @@ class PresenceStore(SQLBaseStore): with stream_ordering_manager as stream_orderings: yield self.runInteraction( "update_presence", - self._update_presence_txn, stream_orderings, presence_states, + self._update_presence_txn, + stream_orderings, + presence_states, ) - defer.returnValue(( - stream_orderings[-1], self._presence_id_gen.get_current_token() - )) + defer.returnValue( + (stream_orderings[-1], self._presence_id_gen.get_current_token()) + ) def _update_presence_txn(self, txn, stream_orderings, presence_states): for stream_id, state in zip(stream_orderings, presence_states): txn.call_after( - self.presence_stream_cache.entity_has_changed, - state.user_id, stream_id, - ) - txn.call_after( - self._get_presence_for_user.invalidate, (state.user_id,) + self.presence_stream_cache.entity_has_changed, state.user_id, stream_id ) + txn.call_after(self._get_presence_for_user.invalidate, (state.user_id,)) # Actually insert new rows self._simple_insert_many_txn( @@ -113,18 +122,13 @@ class PresenceStore(SQLBaseStore): # Delete old rows to stop database from getting really big sql = ( - "DELETE FROM presence_stream WHERE" - " stream_id < ?" - " AND user_id IN (%s)" + "DELETE FROM presence_stream WHERE" " stream_id < ?" " AND user_id IN (%s)" ) for states in batch_iter(presence_states, 50): args = [stream_id] args.extend(s.user_id for s in states) - txn.execute( - sql % (",".join("?" for _ in states),), - args - ) + txn.execute(sql % (",".join("?" for _ in states),), args) def get_all_presence_updates(self, last_id, current_id): if last_id == current_id: @@ -149,8 +153,12 @@ class PresenceStore(SQLBaseStore): def _get_presence_for_user(self, user_id): raise NotImplementedError() - @cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids", - num_args=1, inlineCallbacks=True) + @cachedList( + cached_method_name="_get_presence_for_user", + list_name="user_ids", + num_args=1, + inlineCallbacks=True, + ) def get_presence_for_users(self, user_ids): rows = yield self._simple_select_many_batch( table="presence_stream", @@ -180,8 +188,10 @@ class PresenceStore(SQLBaseStore): def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( table="presence_allow_inbound", - values={"observed_user_id": observed_localpart, - "observer_user_id": observer_userid}, + values={ + "observed_user_id": observed_localpart, + "observer_user_id": observer_userid, + }, desc="allow_presence_visible", or_ignore=True, ) @@ -189,89 +199,9 @@ class PresenceStore(SQLBaseStore): def disallow_presence_visible(self, observed_localpart, observer_userid): return self._simple_delete_one( table="presence_allow_inbound", - keyvalues={"observed_user_id": observed_localpart, - "observer_user_id": observer_userid}, + keyvalues={ + "observed_user_id": observed_localpart, + "observer_user_id": observer_userid, + }, desc="disallow_presence_visible", ) - - def add_presence_list_pending(self, observer_localpart, observed_userid): - return self._simple_insert( - table="presence_list", - values={"user_id": observer_localpart, - "observed_user_id": observed_userid, - "accepted": False}, - desc="add_presence_list_pending", - ) - - def set_presence_list_accepted(self, observer_localpart, observed_userid): - def update_presence_list_txn(txn): - result = self._simple_update_one_txn( - txn, - table="presence_list", - keyvalues={ - "user_id": observer_localpart, - "observed_user_id": observed_userid - }, - updatevalues={"accepted": True}, - ) - - self._invalidate_cache_and_stream( - txn, self.get_presence_list_accepted, (observer_localpart,) - ) - self._invalidate_cache_and_stream( - txn, self.get_presence_list_observers_accepted, (observed_userid,) - ) - - return result - - return self.runInteraction( - "set_presence_list_accepted", update_presence_list_txn, - ) - - def get_presence_list(self, observer_localpart, accepted=None): - if accepted: - return self.get_presence_list_accepted(observer_localpart) - else: - keyvalues = {"user_id": observer_localpart} - if accepted is not None: - keyvalues["accepted"] = accepted - - return self._simple_select_list( - table="presence_list", - keyvalues=keyvalues, - retcols=["observed_user_id", "accepted"], - desc="get_presence_list", - ) - - @cached() - def get_presence_list_accepted(self, observer_localpart): - return self._simple_select_list( - table="presence_list", - keyvalues={"user_id": observer_localpart, "accepted": True}, - retcols=["observed_user_id", "accepted"], - desc="get_presence_list_accepted", - ) - - @cachedInlineCallbacks() - def get_presence_list_observers_accepted(self, observed_userid): - user_localparts = yield self._simple_select_onecol( - table="presence_list", - keyvalues={"observed_user_id": observed_userid, "accepted": True}, - retcol="user_id", - desc="get_presence_list_accepted", - ) - - defer.returnValue([ - "@%s:%s" % (u, self.hs.hostname,) for u in user_localparts - ]) - - @defer.inlineCallbacks - def del_presence_list(self, observer_localpart, observed_userid): - yield self._simple_delete_one( - table="presence_list", - keyvalues={"user_id": observer_localpart, - "observed_user_id": observed_userid}, - desc="del_presence_list", - ) - self.get_presence_list_accepted.invalidate((observer_localpart,)) - self.get_presence_list_observers_accepted.invalidate((observed_userid,)) |