diff options
Diffstat (limited to 'synapse/storage/presence.py')
-rw-r--r-- | synapse/storage/presence.py | 86 |
1 files changed, 50 insertions, 36 deletions
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index a0c7a0dc87..089ea8c048 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -24,10 +24,20 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cache from ._base import SQLBaseStore -class UserPresenceState(namedtuple("UserPresenceState", - ("user_id", "state", "last_active_ts", - "last_federation_update_ts", "last_user_sync_ts", - "status_msg", "currently_active"))): +class UserPresenceState( + namedtuple( + "UserPresenceState", + ( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + ) +): """Represents the current presence state of the user. user_id (str) @@ -75,22 +85,21 @@ class PresenceStore(SQLBaseStore): with stream_ordering_manager as stream_orderings: yield self.runInteraction( "update_presence", - self._update_presence_txn, stream_orderings, presence_states, + self._update_presence_txn, + stream_orderings, + presence_states, ) - defer.returnValue(( - stream_orderings[-1], self._presence_id_gen.get_current_token() - )) + defer.returnValue( + (stream_orderings[-1], self._presence_id_gen.get_current_token()) + ) def _update_presence_txn(self, txn, stream_orderings, presence_states): for stream_id, state in zip(stream_orderings, presence_states): txn.call_after( - self.presence_stream_cache.entity_has_changed, - state.user_id, stream_id, - ) - txn.call_after( - self._get_presence_for_user.invalidate, (state.user_id,) + self.presence_stream_cache.entity_has_changed, state.user_id, stream_id ) + txn.call_after(self._get_presence_for_user.invalidate, (state.user_id,)) # Actually insert new rows self._simple_insert_many_txn( @@ -113,18 +122,13 @@ class PresenceStore(SQLBaseStore): # Delete old rows to stop database from getting really big sql = ( - "DELETE FROM presence_stream WHERE" - " stream_id < ?" - " AND user_id IN (%s)" + "DELETE FROM presence_stream WHERE" " stream_id < ?" " AND user_id IN (%s)" ) for states in batch_iter(presence_states, 50): args = [stream_id] args.extend(s.user_id for s in states) - txn.execute( - sql % (",".join("?" for _ in states),), - args - ) + txn.execute(sql % (",".join("?" for _ in states),), args) def get_all_presence_updates(self, last_id, current_id): if last_id == current_id: @@ -149,8 +153,12 @@ class PresenceStore(SQLBaseStore): def _get_presence_for_user(self, user_id): raise NotImplementedError() - @cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids", - num_args=1, inlineCallbacks=True) + @cachedList( + cached_method_name="_get_presence_for_user", + list_name="user_ids", + num_args=1, + inlineCallbacks=True, + ) def get_presence_for_users(self, user_ids): rows = yield self._simple_select_many_batch( table="presence_stream", @@ -180,8 +188,10 @@ class PresenceStore(SQLBaseStore): def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( table="presence_allow_inbound", - values={"observed_user_id": observed_localpart, - "observer_user_id": observer_userid}, + values={ + "observed_user_id": observed_localpart, + "observer_user_id": observer_userid, + }, desc="allow_presence_visible", or_ignore=True, ) @@ -189,17 +199,21 @@ class PresenceStore(SQLBaseStore): def disallow_presence_visible(self, observed_localpart, observer_userid): return self._simple_delete_one( table="presence_allow_inbound", - keyvalues={"observed_user_id": observed_localpart, - "observer_user_id": observer_userid}, + keyvalues={ + "observed_user_id": observed_localpart, + "observer_user_id": observer_userid, + }, desc="disallow_presence_visible", ) def add_presence_list_pending(self, observer_localpart, observed_userid): return self._simple_insert( table="presence_list", - values={"user_id": observer_localpart, - "observed_user_id": observed_userid, - "accepted": False}, + values={ + "user_id": observer_localpart, + "observed_user_id": observed_userid, + "accepted": False, + }, desc="add_presence_list_pending", ) @@ -210,7 +224,7 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues={ "user_id": observer_localpart, - "observed_user_id": observed_userid + "observed_user_id": observed_userid, }, updatevalues={"accepted": True}, ) @@ -225,7 +239,7 @@ class PresenceStore(SQLBaseStore): return result return self.runInteraction( - "set_presence_list_accepted", update_presence_list_txn, + "set_presence_list_accepted", update_presence_list_txn ) def get_presence_list(self, observer_localpart, accepted=None): @@ -261,16 +275,16 @@ class PresenceStore(SQLBaseStore): desc="get_presence_list_accepted", ) - defer.returnValue([ - "@%s:%s" % (u, self.hs.hostname,) for u in user_localparts - ]) + defer.returnValue(["@%s:%s" % (u, self.hs.hostname) for u in user_localparts]) @defer.inlineCallbacks def del_presence_list(self, observer_localpart, observed_userid): yield self._simple_delete_one( table="presence_list", - keyvalues={"user_id": observer_localpart, - "observed_user_id": observed_userid}, + keyvalues={ + "user_id": observer_localpart, + "observed_user_id": observed_userid, + }, desc="del_presence_list", ) self.get_presence_list_accepted.invalidate((observer_localpart,)) |