diff options
author | Mark Haines <mark.haines@matrix.org> | 2014-08-31 16:08:20 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2014-08-31 16:08:20 +0100 |
commit | 3eb45eba0e387dacb67158efe323f19731799c79 (patch) | |
tree | 79d942de91b8c9a940e787da9b4a329047139a80 /synapse/handlers/presence.py | |
parent | Add store for server certificates and keys (diff) | |
parent | Add config tree to synapse. Add support for reading config from a file (diff) | |
download | synapse-3eb45eba0e387dacb67158efe323f19731799c79.tar.xz |
Merge branch 'develop' into server2server_tls
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r-- | synapse/handlers/presence.py | 228 |
1 files changed, 135 insertions, 93 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c479908f61..7731de85c0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,6 +18,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState +from synapse.util.logutils import log_function + from ._base import BaseHandler import logging @@ -142,7 +144,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): defer.returnValue(True) - return + # return # FIXME (erikj): This code path absolutely kills the database. assert(observed_user.is_mine) @@ -188,8 +190,9 @@ class PresenceHandler(BaseHandler): defer.returnValue(state) @defer.inlineCallbacks + @log_function def set_state(self, target_user, auth_user, state): - return + # return # TODO (erikj): Turn this back on. Why did we end up sending EDUs # everywhere? @@ -245,33 +248,42 @@ class PresenceHandler(BaseHandler): self.push_presence(user, statuscache=statuscache) + @log_function def started_user_eventstream(self, user): # TODO(paul): Use "last online" state self.set_state(user, user, {"state": PresenceState.ONLINE}) + @log_function def stopped_user_eventstream(self, user): # TODO(paul): Save current state as "last online" state self.set_state(user, user, {"state": PresenceState.OFFLINE}) @defer.inlineCallbacks def user_joined_room(self, user, room_id): - localusers = set() - remotedomains = set() - - rm_handler = self.homeserver.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into(room_id, - localusers=localusers, remotedomains=remotedomains, - ignore_user=user) if user.is_mine: - yield self._send_presence_to_distribution(srcuser=user, - localusers=localusers, remotedomains=remotedomains, + self.push_update_to_local_and_remote( + observed_user=user, + room_ids=[room_id], + statuscache=self._get_or_offline_usercache(user), + ) + + else: + self.push_update_to_clients( + observed_user=user, + room_ids=[room_id], statuscache=self._get_or_offline_usercache(user), ) - for srcuser in localusers: - yield self._send_presence(srcuser=srcuser, destuser=user, - statuscache=self._get_or_offline_usercache(srcuser), + # We also want to tell them about current presence of people. + rm_handler = self.homeserver.get_handlers().room_member_handler + curr_users = yield rm_handler.get_room_members(room_id) + + for local_user in [c for c in curr_users if c.is_mine]: + self.push_update_to_local_and_remote( + observed_user=local_user, + users_to_push=[user], + statuscache=self._get_or_offline_usercache(local_user), ) @defer.inlineCallbacks @@ -382,11 +394,13 @@ class PresenceHandler(BaseHandler): defer.returnValue(presence) @defer.inlineCallbacks + @log_function def start_polling_presence(self, user, target_user=None, state=None): logger.debug("Start polling for presence from %s", user) if target_user: target_users = set([target_user]) + room_ids = [] else: presence = yield self.store.get_presence_list( user.localpart, accepted=True @@ -400,23 +414,37 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - for member in (yield rm_handler.get_room_members(room_id)): - target_users.add(member) - if state is None: state = yield self.store.get_presence_state(user.localpart) - - localusers, remoteusers = partitionbool( - target_users, - lambda u: u.is_mine + else: +# statuscache = self._get_or_make_usercache(user) +# self._user_cachemap_latest_serial += 1 +# statuscache.update(state, self._user_cachemap_latest_serial) + pass + + yield self.push_update_to_local_and_remote( + observed_user=user, + users_to_push=target_users, + room_ids=room_ids, + statuscache=self._get_or_make_usercache(user), ) - for target_user in localusers: - self._start_polling_local(user, target_user) + for target_user in target_users: + if target_user.is_mine: + self._start_polling_local(user, target_user) + + # We want to tell the person that just came online + # presence state of people they are interested in? + self.push_update_to_clients( + observed_user=target_user, + users_to_push=[user], + statuscache=self._get_or_offline_usercache(target_user), + ) deferreds = [] - remoteusers_by_domain = partition(remoteusers, lambda u: u.domain) + remote_users = [u for u in target_users if not u.is_mine] + remoteusers_by_domain = partition(remote_users, lambda u: u.domain) + # Only poll for people in our get_presence_list for domain in remoteusers_by_domain: remoteusers = remoteusers_by_domain[domain] @@ -438,25 +466,26 @@ class PresenceHandler(BaseHandler): self._local_pushmap[target_localpart].add(user) - self.push_update_to_clients( - observer_user=user, - observed_user=target_user, - statuscache=self._get_or_offline_usercache(target_user), - ) - def _start_polling_remote(self, user, domain, remoteusers): + to_poll = set() + for u in remoteusers: if u not in self._remote_recvmap: self._remote_recvmap[u] = set() + to_poll.add(u) self._remote_recvmap[u].add(user) + if not to_poll: + return defer.succeed(None) + return self.federation.send_edu( destination=domain, edu_type="m.presence", - content={"poll": [u.to_string() for u in remoteusers]} + content={"poll": [u.to_string() for u in to_poll]} ) + @log_function def stop_polling_presence(self, user, target_user=None): logger.debug("Stop polling for presence from %s", user) @@ -496,20 +525,28 @@ class PresenceHandler(BaseHandler): if not self._local_pushmap[localpart]: del self._local_pushmap[localpart] + @log_function def _stop_polling_remote(self, user, domain, remoteusers): + to_unpoll = set() + for u in remoteusers: self._remote_recvmap[u].remove(user) if not self._remote_recvmap[u]: del self._remote_recvmap[u] + to_unpoll.add(u) + + if not to_unpoll: + return defer.succeed(None) return self.federation.send_edu( destination=domain, edu_type="m.presence", - content={"unpoll": [u.to_string() for u in remoteusers]} + content={"unpoll": [u.to_string() for u in to_unpoll]} ) @defer.inlineCallbacks + @log_function def push_presence(self, user, statuscache): assert(user.is_mine) @@ -525,53 +562,17 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains, - ignore_user=user, - ) - - if not localusers and not remotedomains: + if not localusers and not room_ids: defer.returnValue(None) - yield self._send_presence_to_distribution(user, - localusers=localusers, remotedomains=remotedomains, - statuscache=statuscache + yield self.push_update_to_local_and_remote( + observed_user=user, + users_to_push=localusers, + remote_domains=remotedomains, + room_ids=room_ids, + statuscache=statuscache, ) - def _send_presence(self, srcuser, destuser, statuscache): - if destuser.is_mine: - self.push_update_to_clients( - observer_user=destuser, - observed_user=srcuser, - statuscache=statuscache) - return defer.succeed(None) - else: - return self._push_presence_remote(srcuser, destuser.domain, - state=statuscache.get_state() - ) - - @defer.inlineCallbacks - def _send_presence_to_distribution(self, srcuser, localusers=set(), - remotedomains=set(), statuscache=None): - - for u in localusers: - logger.debug(" | push to local user %s", u) - self.push_update_to_clients( - observer_user=u, - observed_user=srcuser, - statuscache=statuscache, - ) - - deferreds = [] - for domain in remotedomains: - logger.debug(" | push to remote domain %s", domain) - deferreds.append(self._push_presence_remote(srcuser, domain, - state=statuscache.get_state()) - ) - - yield defer.DeferredList(deferreds) - @defer.inlineCallbacks def _push_presence_remote(self, user, destination, state=None): if state is None: @@ -587,12 +588,17 @@ class PresenceHandler(BaseHandler): self.clock.time_msec() - state.pop("mtime") ) + user_state = { + "user_id": user.to_string(), + } + user_state.update(**state) + yield self.federation.send_edu( destination=destination, edu_type="m.presence", content={ "push": [ - dict(user_id=user.to_string(), **state), + user_state, ], } ) @@ -611,12 +617,7 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=observers, ignore_user=user - ) - - if not observers: + if not observers and not room_ids: break state = dict(push) @@ -632,12 +633,12 @@ class PresenceHandler(BaseHandler): self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) - for observer_user in observers: - self.push_update_to_clients( - observer_user=observer_user, - observed_user=user, - statuscache=statuscache, - ) + self.push_update_to_clients( + observed_user=user, + users_to_push=observers, + room_ids=room_ids, + statuscache=statuscache, + ) if state["state"] == PresenceState.OFFLINE: del self._user_cachemap[user] @@ -671,12 +672,53 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds) - def push_update_to_clients(self, observer_user, observed_user, - statuscache): - statuscache.make_event(user=observed_user, clock=self.clock) + @defer.inlineCallbacks + def push_update_to_local_and_remote(self, observed_user, + users_to_push=[], room_ids=[], + remote_domains=[], + statuscache=None): + + localusers, remoteusers = partitionbool( + users_to_push, + lambda u: u.is_mine + ) + + localusers = set(localusers) + + self.push_update_to_clients( + observed_user=observed_user, + users_to_push=localusers, + room_ids=room_ids, + statuscache=statuscache, + ) + + remote_domains = set(remote_domains) + remote_domains |= set([r.domain for r in remoteusers]) + for room_id in room_ids: + remote_domains.update( + (yield self.store.get_joined_hosts_for_room(room_id)) + ) + + remote_domains.discard(self.hs.hostname) + + deferreds = [] + for domain in remote_domains: + logger.debug(" | push to remote domain %s", domain) + deferreds.append( + self._push_presence_remote( + observed_user, domain, state=statuscache.get_state() + ) + ) + + yield defer.DeferredList(deferreds) + + defer.returnValue((localusers, remote_domains)) + def push_update_to_clients(self, observed_user, users_to_push=[], + room_ids=[], statuscache=None): self.notifier.on_new_user_event( - [observer_user], + users_to_push, + room_ids, ) |