diff options
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r-- | synapse/handlers/presence.py | 51 |
1 files changed, 44 insertions, 7 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index bbc7a0f200..571eacd343 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -36,6 +36,9 @@ metrics = synapse.metrics.get_metrics_for(__name__) # Don't bother bumping "last active" time if it differs by less than 60 seconds LAST_ACTIVE_GRANULARITY = 60*1000 +# Keep no more than this number of offline serial revisions +MAX_OFFLINE_SERIALS = 1000 + # TODO(paul): Maybe there's one of these I can steal from somewhere def partition(l, func): @@ -135,6 +138,9 @@ class PresenceHandler(BaseHandler): self._remote_sendmap = {} # map remote users to sets of local users who're interested in them self._remote_recvmap = {} + # list of (serial, set of(userids)) tuples, ordered by serial, latest + # first + self._remote_offline_serials = [] # map any user to a UserPresenceCache self._user_cachemap = {} @@ -714,8 +720,24 @@ class PresenceHandler(BaseHandler): statuscache=statuscache, ) + user_id = user.to_string() + if state["presence"] == PresenceState.OFFLINE: + self._remote_offline_serials.insert( + 0, + (self._user_cachemap_latest_serial, set([user_id])) + ) + while len(self._remote_offline_serials) > MAX_OFFLINE_SERIALS: + self._remote_offline_serials.pop() # remove the oldest del self._user_cachemap[user] + else: + # Remove the user from remote_offline_serials now that they're + # no longer offline + for idx, elem in enumerate(self._remote_offline_serials): + (_, user_ids) = elem + user_ids.discard(user_id) + if not user_ids: + self._remote_offline_serials.pop(idx) for poll in content.get("poll", []): user = UserID.from_string(poll) @@ -836,6 +858,8 @@ class PresenceEventSource(object): presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap + clock = self.clock + latest_serial = None updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. @@ -845,18 +869,31 @@ class PresenceEventSource(object): if cached.serial <= from_key: continue - if (yield self.is_visible(observer_user, observed_user)): - updates.append((observed_user, cached)) + if not (yield self.is_visible(observer_user, observed_user)): + continue + + if latest_serial is None or cached.serial > latest_serial: + latest_serial = cached.serial + updates.append(cached.make_event(user=observed_user, clock=clock)) # TODO(paul): limit - if updates: - clock = self.clock + for serial, user_ids in presence._remote_offline_serials: + if serial < from_key: + break - latest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + for u in user_ids: + updates.append({ + "type": "m.presence", + "content": {"user_id": u, "presence": PresenceState.OFFLINE}, + }) + # TODO(paul): For the v2 API we want to tell the client their from_key + # is too old if we fell off the end of the _remote_offline_serials + # list, and get them to invalidate+resync. In v1 we have no such + # concept so this is a best-effort result. - defer.returnValue((data, latest_serial)) + if updates: + defer.returnValue((updates, latest_serial)) else: defer.returnValue(([], presence._user_cachemap_latest_serial)) |