diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9e15610401..28688d532d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -18,14 +18,15 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState
-from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logutils import log_function
from synapse.types import UserID
import synapse.metrics
from ._base import BaseHandler
import logging
+from collections import OrderedDict
logger = logging.getLogger(__name__)
@@ -143,7 +144,7 @@ class PresenceHandler(BaseHandler):
self._remote_offline_serials = []
# map any user to a UserPresenceCache
- self._user_cachemap = {}
+ self._user_cachemap = OrderedDict() # keep them sorted by serial
self._user_cachemap_latest_serial = 0
metrics.register_callback(
@@ -165,6 +166,14 @@ class PresenceHandler(BaseHandler):
else:
return UserPresenceCache()
+ def _bump_serial(self, user=None):
+ self._user_cachemap_latest_serial += 1
+
+ if user:
+ # Move to end
+ cache = self._user_cachemap.pop(user)
+ self._user_cachemap[user] = cache
+
def registered_user(self, user):
return self.store.create_presence(user.localpart)
@@ -278,15 +287,14 @@ class PresenceHandler(BaseHandler):
now_online = state["presence"] != PresenceState.OFFLINE
was_polling = target_user in self._user_cachemap
- with PreserveLoggingContext():
- if now_online and not was_polling:
- self.start_polling_presence(target_user, state=state)
- elif not now_online and was_polling:
- self.stop_polling_presence(target_user)
+ if now_online and not was_polling:
+ self.start_polling_presence(target_user, state=state)
+ elif not now_online and was_polling:
+ self.stop_polling_presence(target_user)
- # TODO(paul): perform a presence push as part of start/stop poll so
- # we don't have to do this all the time
- self.changed_presencelike_data(target_user, state)
+ # TODO(paul): perform a presence push as part of start/stop poll so
+ # we don't have to do this all the time
+ self.changed_presencelike_data(target_user, state)
def bump_presence_active_time(self, user, now=None):
if now is None:
@@ -301,7 +309,7 @@ class PresenceHandler(BaseHandler):
def changed_presencelike_data(self, user, state):
statuscache = self._get_or_make_usercache(user)
- self._user_cachemap_latest_serial += 1
+ self._bump_serial(user=user)
statuscache.update(state, serial=self._user_cachemap_latest_serial)
return self.push_presence(user, statuscache=statuscache)
@@ -323,7 +331,7 @@ class PresenceHandler(BaseHandler):
# No actual update but we need to bump the serial anyway for the
# event source
- self._user_cachemap_latest_serial += 1
+ self._bump_serial()
statuscache.update({}, serial=self._user_cachemap_latest_serial)
self.push_update_to_local_and_remote(
@@ -408,10 +416,10 @@ class PresenceHandler(BaseHandler):
yield self.store.set_presence_list_accepted(
observer_user.localpart, observed_user.to_string()
)
- with PreserveLoggingContext():
- self.start_polling_presence(
- observer_user, target_user=observed_user
- )
+
+ self.start_polling_presence(
+ observer_user, target_user=observed_user
+ )
@defer.inlineCallbacks
def deny_presence(self, observed_user, observer_user):
@@ -430,10 +438,9 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string()
)
- with PreserveLoggingContext():
- self.stop_polling_presence(
- observer_user, target_user=observed_user
- )
+ self.stop_polling_presence(
+ observer_user, target_user=observed_user
+ )
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
@@ -706,7 +713,7 @@ class PresenceHandler(BaseHandler):
statuscache = self._get_or_make_usercache(user)
- self._user_cachemap_latest_serial += 1
+ self._bump_serial(user=user)
statuscache.update(state, serial=self._user_cachemap_latest_serial)
if not observers and not room_ids:
@@ -766,8 +773,7 @@ class PresenceHandler(BaseHandler):
if not self._remote_sendmap[user]:
del self._remote_sendmap[user]
- with PreserveLoggingContext():
- yield defer.DeferredList(deferreds, consumeErrors=True)
+ yield defer.DeferredList(deferreds, consumeErrors=True)
@defer.inlineCallbacks
def push_update_to_local_and_remote(self, observed_user, statuscache,
@@ -812,10 +818,11 @@ class PresenceHandler(BaseHandler):
def push_update_to_clients(self, observed_user, users_to_push=[],
room_ids=[], statuscache=None):
- self.notifier.on_new_user_event(
- users_to_push,
- room_ids,
- )
+ with PreserveLoggingContext():
+ self.notifier.on_new_user_event(
+ users_to_push,
+ room_ids,
+ )
class PresenceEventSource(object):
@@ -866,10 +873,15 @@ class PresenceEventSource(object):
updates = []
# TODO(paul): use a DeferredList ? How to limit concurrency.
- for observed_user in cachemap.keys():
+ for observed_user in reversed(cachemap.keys()):
cached = cachemap[observed_user]
- if cached.serial <= from_key or cached.serial > max_serial:
+ # Since this is ordered in descending order of serial, we can just
+ # stop once we've seen enough
+ if cached.serial <= from_key:
+ break
+
+ if cached.serial > max_serial:
continue
if not (yield self.is_visible(observer_user, observed_user)):
|