diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index beb5aa3a6a..ae4735898e 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -156,7 +156,7 @@ class PresenceHandler(BaseHandler):
defer.returnValue(True)
if (yield self.store.user_rooms_intersect(
- [observer_user, observed_user]
+ [u.to_string() for u in observer_user, observed_user]
)):
defer.returnValue(True)
@@ -772,15 +772,52 @@ class PresenceEventSource(object):
self.hs = hs
self.clock = hs.get_clock()
+ @defer.inlineCallbacks
+ def is_visible(self, observer_user, observed_user):
+ if observer_user == observed_user:
+ defer.returnValue(True)
+
+ presence = self.hs.get_handlers().presence_handler
+
+ if (yield presence.store.user_rooms_intersect(
+ [u.to_string() for u in observer_user, observed_user]
+ )):
+ defer.returnValue(True)
+
+ if observed_user.is_mine:
+ pushmap = presence._local_pushmap
+
+ defer.returnValue(
+ observed_user.localpart in pushmap and
+ observer_user in pushmap[observed_user.localpart]
+ )
+ else:
+ recvmap = presence._remote_recvmap
+
+ defer.returnValue(
+ observed_user in recvmap and
+ observer_user in recvmap[observed_user]
+ )
+
+ @defer.inlineCallbacks
def get_new_events_for_user(self, user, from_key, limit):
from_key = int(from_key)
+ observer_user = user
+
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
- # TODO(paul): limit, and filter by visibility
- updates = [(k, cachemap[k]) for k in cachemap
- if from_key < cachemap[k].serial]
+ updates = []
+ # TODO(paul): use a DeferredList ? How to limit concurrency.
+ for observed_user in cachemap.keys():
+ if not (from_key < cachemap[observed_user].serial):
+ continue
+
+ if (yield self.is_visible(observer_user, observed_user)):
+ updates.append((observed_user, cachemap[observed_user]))
+
+ # TODO(paul): limit
if updates:
clock = self.clock
@@ -788,14 +825,15 @@ class PresenceEventSource(object):
latest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
- return ((data, latest_serial))
+ defer.returnValue((data, latest_serial))
else:
- return (([], presence._user_cachemap_latest_serial))
+ defer.returnValue(([], presence._user_cachemap_latest_serial))
def get_current_key(self):
presence = self.hs.get_handlers().presence_handler
return presence._user_cachemap_latest_serial
+ @defer.inlineCallbacks
def get_pagination_rows(self, user, pagination_config, key):
# TODO (erikj): Does this make sense? Ordering?
@@ -812,7 +850,17 @@ class PresenceEventSource(object):
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
- # TODO(paul): limit, and filter by visibility
+ updates = []
+ # TODO(paul): use a DeferredList ? How to limit concurrency.
+ for observed_user in cachemap.keys():
+ if not (to_key < cachemap[observed_user].serial < from_key):
+ continue
+
+ if (yield self.is_visible(observer_user, observed_user)):
+ updates.append((observed_user, cachemap[observed_user]))
+
+ # TODO(paul): limit
+
updates = [(k, cachemap[k]) for k in cachemap
if to_key < cachemap[k].serial < from_key]
@@ -830,13 +878,13 @@ class PresenceEventSource(object):
next_token = next_token.copy_and_replace(
"presence_key", earliest_serial
)
- return ((data, next_token))
+ defer.returnValue((data, next_token))
else:
if not to_token:
to_token = from_token.copy_and_replace(
"presence_key", 0
)
- return (([], to_token))
+ defer.returnValue(([], to_token))
class UserPresenceCache(object):
|