summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-05-12 16:37:50 +0100
committerMark Haines <mark.haines@matrix.org>2015-05-12 16:37:50 +0100
commitcffe6057fb59a4db622132a84b5ef88cf81f6bfd (patch)
tree173ebcd9a8c7c04ee3a4f733995a47c515e9c81a /synapse/handlers/presence.py
parentMerge branch 'notifier_unify' into notifier_performance (diff)
parentMerge branch 'develop' into notifier_unify (diff)
downloadsynapse-cffe6057fb59a4db622132a84b5ef88cf81f6bfd.tar.xz
Merge branch 'notifier_unify' into notifier_performance
Conflicts:
	synapse/notifier.py
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py70
1 files changed, 41 insertions, 29 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9e15610401..28688d532d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -18,14 +18,15 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, AuthError
 from synapse.api.constants import PresenceState
 
-from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logutils import log_function
 from synapse.types import UserID
 import synapse.metrics
 
 from ._base import BaseHandler
 
 import logging
+from collections import OrderedDict
 
 
 logger = logging.getLogger(__name__)
@@ -143,7 +144,7 @@ class PresenceHandler(BaseHandler):
         self._remote_offline_serials = []
 
         # map any user to a UserPresenceCache
-        self._user_cachemap = {}
+        self._user_cachemap = OrderedDict()  # keep them sorted by serial
         self._user_cachemap_latest_serial = 0
 
         metrics.register_callback(
@@ -165,6 +166,14 @@ class PresenceHandler(BaseHandler):
         else:
             return UserPresenceCache()
 
+    def _bump_serial(self, user=None):
+        self._user_cachemap_latest_serial += 1
+
+        if user:
+            # Move to end
+            cache = self._user_cachemap.pop(user)
+            self._user_cachemap[user] = cache
+
     def registered_user(self, user):
         return self.store.create_presence(user.localpart)
 
@@ -278,15 +287,14 @@ class PresenceHandler(BaseHandler):
         now_online = state["presence"] != PresenceState.OFFLINE
         was_polling = target_user in self._user_cachemap
 
-        with PreserveLoggingContext():
-            if now_online and not was_polling:
-                self.start_polling_presence(target_user, state=state)
-            elif not now_online and was_polling:
-                self.stop_polling_presence(target_user)
+        if now_online and not was_polling:
+            self.start_polling_presence(target_user, state=state)
+        elif not now_online and was_polling:
+            self.stop_polling_presence(target_user)
 
-            # TODO(paul): perform a presence push as part of start/stop poll so
-            #   we don't have to do this all the time
-            self.changed_presencelike_data(target_user, state)
+        # TODO(paul): perform a presence push as part of start/stop poll so
+        #   we don't have to do this all the time
+        self.changed_presencelike_data(target_user, state)
 
     def bump_presence_active_time(self, user, now=None):
         if now is None:
@@ -301,7 +309,7 @@ class PresenceHandler(BaseHandler):
     def changed_presencelike_data(self, user, state):
         statuscache = self._get_or_make_usercache(user)
 
-        self._user_cachemap_latest_serial += 1
+        self._bump_serial(user=user)
         statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
         return self.push_presence(user, statuscache=statuscache)
@@ -323,7 +331,7 @@ class PresenceHandler(BaseHandler):
 
             # No actual update but we need to bump the serial anyway for the
             # event source
-            self._user_cachemap_latest_serial += 1
+            self._bump_serial()
             statuscache.update({}, serial=self._user_cachemap_latest_serial)
 
             self.push_update_to_local_and_remote(
@@ -408,10 +416,10 @@ class PresenceHandler(BaseHandler):
         yield self.store.set_presence_list_accepted(
             observer_user.localpart, observed_user.to_string()
         )
-        with PreserveLoggingContext():
-            self.start_polling_presence(
-                observer_user, target_user=observed_user
-            )
+
+        self.start_polling_presence(
+            observer_user, target_user=observed_user
+        )
 
     @defer.inlineCallbacks
     def deny_presence(self, observed_user, observer_user):
@@ -430,10 +438,9 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        with PreserveLoggingContext():
-            self.stop_polling_presence(
-                observer_user, target_user=observed_user
-            )
+        self.stop_polling_presence(
+            observer_user, target_user=observed_user
+        )
 
     @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
@@ -706,7 +713,7 @@ class PresenceHandler(BaseHandler):
 
             statuscache = self._get_or_make_usercache(user)
 
-            self._user_cachemap_latest_serial += 1
+            self._bump_serial(user=user)
             statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
             if not observers and not room_ids:
@@ -766,8 +773,7 @@ class PresenceHandler(BaseHandler):
                 if not self._remote_sendmap[user]:
                     del self._remote_sendmap[user]
 
-        with PreserveLoggingContext():
-            yield defer.DeferredList(deferreds, consumeErrors=True)
+        yield defer.DeferredList(deferreds, consumeErrors=True)
 
     @defer.inlineCallbacks
     def push_update_to_local_and_remote(self, observed_user, statuscache,
@@ -812,10 +818,11 @@ class PresenceHandler(BaseHandler):
 
     def push_update_to_clients(self, observed_user, users_to_push=[],
                                room_ids=[], statuscache=None):
-        self.notifier.on_new_user_event(
-            users_to_push,
-            room_ids,
-        )
+        with PreserveLoggingContext():
+            self.notifier.on_new_user_event(
+                users_to_push,
+                room_ids,
+            )
 
 
 class PresenceEventSource(object):
@@ -866,10 +873,15 @@ class PresenceEventSource(object):
 
         updates = []
         # TODO(paul): use a DeferredList ? How to limit concurrency.
-        for observed_user in cachemap.keys():
+        for observed_user in reversed(cachemap.keys()):
             cached = cachemap[observed_user]
 
-            if cached.serial <= from_key or cached.serial > max_serial:
+            # Since this is ordered in descending order of serial, we can just
+            # stop once we've seen enough
+            if cached.serial <= from_key:
+                break
+
+            if cached.serial > max_serial:
                 continue
 
             if not (yield self.is_visible(observer_user, observed_user)):