diff options
author | Mark Haines <mark.haines@matrix.org> | 2015-05-12 16:37:50 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2015-05-12 16:37:50 +0100 |
commit | cffe6057fb59a4db622132a84b5ef88cf81f6bfd (patch) | |
tree | 173ebcd9a8c7c04ee3a4f733995a47c515e9c81a /synapse/handlers/presence.py | |
parent | Merge branch 'notifier_unify' into notifier_performance (diff) | |
parent | Merge branch 'develop' into notifier_unify (diff) | |
download | synapse-cffe6057fb59a4db622132a84b5ef88cf81f6bfd.tar.xz |
Merge branch 'notifier_unify' into notifier_performance
Conflicts: synapse/notifier.py
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r-- | synapse/handlers/presence.py | 70 |
1 files changed, 41 insertions, 29 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e15610401..28688d532d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,14 +18,15 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState -from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logutils import log_function from synapse.types import UserID import synapse.metrics from ._base import BaseHandler import logging +from collections import OrderedDict logger = logging.getLogger(__name__) @@ -143,7 +144,7 @@ class PresenceHandler(BaseHandler): self._remote_offline_serials = [] # map any user to a UserPresenceCache - self._user_cachemap = {} + self._user_cachemap = OrderedDict() # keep them sorted by serial self._user_cachemap_latest_serial = 0 metrics.register_callback( @@ -165,6 +166,14 @@ class PresenceHandler(BaseHandler): else: return UserPresenceCache() + def _bump_serial(self, user=None): + self._user_cachemap_latest_serial += 1 + + if user: + # Move to end + cache = self._user_cachemap.pop(user) + self._user_cachemap[user] = cache + def registered_user(self, user): return self.store.create_presence(user.localpart) @@ -278,15 +287,14 @@ class PresenceHandler(BaseHandler): now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap - with PreserveLoggingContext(): - if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) - elif not now_online and was_polling: - self.stop_polling_presence(target_user) + if now_online and not was_polling: + self.start_polling_presence(target_user, state=state) + elif not now_online and was_polling: + self.stop_polling_presence(target_user) - # TODO(paul): perform a presence push as part of start/stop poll so - # we don't have to do this all the time - self.changed_presencelike_data(target_user, state) + # TODO(paul): perform a presence push as part of start/stop poll so + # we don't have to do this all the time + self.changed_presencelike_data(target_user, state) def bump_presence_active_time(self, user, now=None): if now is None: @@ -301,7 +309,7 @@ class PresenceHandler(BaseHandler): def changed_presencelike_data(self, user, state): statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 + self._bump_serial(user=user) statuscache.update(state, serial=self._user_cachemap_latest_serial) return self.push_presence(user, statuscache=statuscache) @@ -323,7 +331,7 @@ class PresenceHandler(BaseHandler): # No actual update but we need to bump the serial anyway for the # event source - self._user_cachemap_latest_serial += 1 + self._bump_serial() statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( @@ -408,10 +416,10 @@ class PresenceHandler(BaseHandler): yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) - with PreserveLoggingContext(): - self.start_polling_presence( - observer_user, target_user=observed_user - ) + + self.start_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): @@ -430,10 +438,9 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - with PreserveLoggingContext(): - self.stop_polling_presence( - observer_user, target_user=observed_user - ) + self.stop_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): @@ -706,7 +713,7 @@ class PresenceHandler(BaseHandler): statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 + self._bump_serial(user=user) statuscache.update(state, serial=self._user_cachemap_latest_serial) if not observers and not room_ids: @@ -766,8 +773,7 @@ class PresenceHandler(BaseHandler): if not self._remote_sendmap[user]: del self._remote_sendmap[user] - with PreserveLoggingContext(): - yield defer.DeferredList(deferreds, consumeErrors=True) + yield defer.DeferredList(deferreds, consumeErrors=True) @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, @@ -812,10 +818,11 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, observed_user, users_to_push=[], room_ids=[], statuscache=None): - self.notifier.on_new_user_event( - users_to_push, - room_ids, - ) + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + users_to_push, + room_ids, + ) class PresenceEventSource(object): @@ -866,10 +873,15 @@ class PresenceEventSource(object): updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in reversed(cachemap.keys()): cached = cachemap[observed_user] - if cached.serial <= from_key or cached.serial > max_serial: + # Since this is ordered in descending order of serial, we can just + # stop once we've seen enough + if cached.serial <= from_key: + break + + if cached.serial > max_serial: continue if not (yield self.is_visible(observer_user, observed_user)): |