diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/presence.py | 163 |
1 files changed, 102 insertions, 61 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 40794187b1..670c1d353f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler): self._user_cachemap = {} self._user_cachemap_latest_serial = 0 + # map room_ids to the latest presence serial for a member of that + # room + self._room_serials = {} + metrics.register_callback( "userCachemap:size", lambda: len(self._user_cachemap), @@ -297,13 +301,34 @@ class PresenceHandler(BaseHandler): self.changed_presencelike_data(user, {"last_active": now}) + def get_joined_rooms_for_user(self, user): + """Get the list of rooms a user is joined to. + + Args: + user(UserID): The user. + Returns: + A Deferred of a list of room id strings. + """ + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_joined_rooms_for_user(user) + + def get_joined_users_for_room_id(self, room_id): + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_room_members(room_id) + + @defer.inlineCallbacks def changed_presencelike_data(self, user, state): - statuscache = self._get_or_make_usercache(user) + """Updates the presence state of a local user. + Args: + user(UserID): The user being updated. + state(dict): The new presence state for the user. + Returns: + A Deferred + """ self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) - - return self.push_presence(user, statuscache=statuscache) + statuscache = yield self.update_presence_cache(user, state) + yield self.push_presence(user, statuscache=statuscache) @log_function def started_user_eventstream(self, user): @@ -326,13 +351,12 @@ class PresenceHandler(BaseHandler): room_id(str): The room id the user joined. """ if self.hs.is_mine(user): - statuscache = self._get_or_make_usercache(user) - # No actual update but we need to bump the serial anyway for the # event source self._user_cachemap_latest_serial += 1 - statuscache.update({}, serial=self._user_cachemap_latest_serial) - + statuscache = yield self.update_presence_cache( + user, room_ids=[room_id] + ) self.push_update_to_local_and_remote( observed_user=user, room_ids=[room_id], @@ -340,16 +364,17 @@ class PresenceHandler(BaseHandler): ) # We also want to tell them about current presence of people. - rm_handler = self.homeserver.get_handlers().room_member_handler - curr_users = yield rm_handler.get_room_members(room_id) + curr_users = yield self.get_joined_users_for_room_id(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: - statuscache = self._get_or_offline_usercache(local_user) - statuscache.update({}, serial=self._user_cachemap_latest_serial) + statuscache = yield self.update_presence_cache( + local_user, room_ids=[room_id], add_to_cache=False + ) + self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(local_user), + statuscache=statuscache, ) @defer.inlineCallbacks @@ -546,8 +571,7 @@ class PresenceHandler(BaseHandler): # Also include people in all my rooms - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if state is None: state = yield self.store.get_presence_state(user.localpart) @@ -747,8 +771,7 @@ class PresenceHandler(BaseHandler): # and also user is informed of server-forced pushes localusers.add(user) - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if not localusers and not room_ids: defer.returnValue(None) @@ -793,8 +816,7 @@ class PresenceHandler(BaseHandler): " | %d interested local observers %r", len(observers), observers ) - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if room_ids: logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids) @@ -813,10 +835,8 @@ class PresenceHandler(BaseHandler): self.clock.time_msec() - state.pop("last_active_ago") ) - statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) + yield self.update_presence_cache(user, state, room_ids=room_ids) if not observers and not room_ids: logger.debug(" | no interested observers or room IDs") @@ -875,6 +895,35 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) @defer.inlineCallbacks + def update_presence_cache(self, user, state={}, room_ids=None, + add_to_cache=True): + """Update the presence cache for a user with a new state and bump the + serial to the latest value. + + Args: + user(UserID): The user being updated + state(dict): The presence state being updated + room_ids(None or list of str): A list of room_ids to update. If + room_ids is None then fetch the list of room_ids the user is + joined to. + add_to_cache: Whether to add an entry to the presence cache if the + user isn't already in the cache. + Returns: + A Deferred UserPresenceCache for the user being updated. + """ + if room_ids is None: + room_ids = yield self.get_joined_rooms_for_user(user) + + for room_id in room_ids: + self._room_serials[room_id] = self._user_cachemap_latest_serial + if add_to_cache: + statuscache = self._get_or_make_usercache(user) + else: + statuscache = self._get_or_offline_usercache(user) + statuscache.update(state, serial=self._user_cachemap_latest_serial) + defer.returnValue(statuscache) + + @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], remote_domains=[]): @@ -997,38 +1046,10 @@ class PresenceEventSource(object): self.clock = hs.get_clock() @defer.inlineCallbacks - def is_visible(self, observer_user, observed_user): - if observer_user == observed_user: - defer.returnValue(True) - - presence = self.hs.get_handlers().presence_handler - - if (yield presence.store.user_rooms_intersect( - [u.to_string() for u in observer_user, observed_user])): - defer.returnValue(True) - - if self.hs.is_mine(observed_user): - pushmap = presence._local_pushmap - - defer.returnValue( - observed_user.localpart in pushmap and - observer_user in pushmap[observed_user.localpart] - ) - else: - recvmap = presence._remote_recvmap - - defer.returnValue( - observed_user in recvmap and - observer_user in recvmap[observed_user] - ) - - @defer.inlineCallbacks @log_function def get_new_events_for_user(self, user, from_key, limit): from_key = int(from_key) - observer_user = user - presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap @@ -1037,17 +1058,27 @@ class PresenceEventSource(object): clock = self.clock latest_serial = 0 + user_ids_to_check = {user} + presence_list = yield presence.store.get_presence_list( + user.localpart, accepted=True + ) + if presence_list is not None: + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) + room_ids = yield presence.get_joined_rooms_for_user(user) + for room_id in set(room_ids) & set(presence._room_serials): + if presence._room_serials[room_id] > from_key: + joined = yield presence.get_joined_users_for_room_id(room_id) + user_ids_to_check |= set(joined) + updates = [] - # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in user_ids_to_check & set(cachemap): cached = cachemap[observed_user] if cached.serial <= from_key or cached.serial > max_serial: continue - if not (yield self.is_visible(observer_user, observed_user)): - continue - latest_serial = max(cached.serial, latest_serial) updates.append(cached.make_event(user=observed_user, clock=clock)) @@ -1084,8 +1115,6 @@ class PresenceEventSource(object): def get_pagination_rows(self, user, pagination_config, key): # TODO (erikj): Does this make sense? Ordering? - observer_user = user - from_key = int(pagination_config.from_key) if pagination_config.to_key: @@ -1096,14 +1125,26 @@ class PresenceEventSource(object): presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap + user_ids_to_check = {user} + presence_list = yield presence.store.get_presence_list( + user.localpart, accepted=True + ) + if presence_list is not None: + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) + room_ids = yield presence.get_joined_rooms_for_user(user) + for room_id in set(room_ids) & set(presence._room_serials): + if presence._room_serials[room_id] >= from_key: + joined = yield presence.get_joined_users_for_room_id(room_id) + user_ids_to_check |= set(joined) + updates = [] - # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in user_ids_to_check & set(cachemap): if not (to_key < cachemap[observed_user].serial <= from_key): continue - if (yield self.is_visible(observer_user, observed_user)): - updates.append((observed_user, cachemap[observed_user])) + updates.append((observed_user, cachemap[observed_user])) # TODO(paul): limit |