diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 40794187b1..670c1d353f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler):
self._user_cachemap = {}
self._user_cachemap_latest_serial = 0
+ # map room_ids to the latest presence serial for a member of that
+ # room
+ self._room_serials = {}
+
metrics.register_callback(
"userCachemap:size",
lambda: len(self._user_cachemap),
@@ -297,13 +301,34 @@ class PresenceHandler(BaseHandler):
self.changed_presencelike_data(user, {"last_active": now})
+ def get_joined_rooms_for_user(self, user):
+ """Get the list of rooms a user is joined to.
+
+ Args:
+ user(UserID): The user.
+ Returns:
+ A Deferred of a list of room id strings.
+ """
+ rm_handler = self.homeserver.get_handlers().room_member_handler
+ return rm_handler.get_joined_rooms_for_user(user)
+
+ def get_joined_users_for_room_id(self, room_id):
+ rm_handler = self.homeserver.get_handlers().room_member_handler
+ return rm_handler.get_room_members(room_id)
+
+ @defer.inlineCallbacks
def changed_presencelike_data(self, user, state):
- statuscache = self._get_or_make_usercache(user)
+ """Updates the presence state of a local user.
+ Args:
+ user(UserID): The user being updated.
+ state(dict): The new presence state for the user.
+ Returns:
+ A Deferred
+ """
self._user_cachemap_latest_serial += 1
- statuscache.update(state, serial=self._user_cachemap_latest_serial)
-
- return self.push_presence(user, statuscache=statuscache)
+ statuscache = yield self.update_presence_cache(user, state)
+ yield self.push_presence(user, statuscache=statuscache)
@log_function
def started_user_eventstream(self, user):
@@ -326,13 +351,12 @@ class PresenceHandler(BaseHandler):
room_id(str): The room id the user joined.
"""
if self.hs.is_mine(user):
- statuscache = self._get_or_make_usercache(user)
-
# No actual update but we need to bump the serial anyway for the
# event source
self._user_cachemap_latest_serial += 1
- statuscache.update({}, serial=self._user_cachemap_latest_serial)
-
+ statuscache = yield self.update_presence_cache(
+ user, room_ids=[room_id]
+ )
self.push_update_to_local_and_remote(
observed_user=user,
room_ids=[room_id],
@@ -340,16 +364,17 @@ class PresenceHandler(BaseHandler):
)
# We also want to tell them about current presence of people.
- rm_handler = self.homeserver.get_handlers().room_member_handler
- curr_users = yield rm_handler.get_room_members(room_id)
+ curr_users = yield self.get_joined_users_for_room_id(room_id)
for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
- statuscache = self._get_or_offline_usercache(local_user)
- statuscache.update({}, serial=self._user_cachemap_latest_serial)
+ statuscache = yield self.update_presence_cache(
+ local_user, room_ids=[room_id], add_to_cache=False
+ )
+
self.push_update_to_local_and_remote(
observed_user=local_user,
users_to_push=[user],
- statuscache=self._get_or_offline_usercache(local_user),
+ statuscache=statuscache,
)
@defer.inlineCallbacks
@@ -546,8 +571,7 @@ class PresenceHandler(BaseHandler):
# Also include people in all my rooms
- rm_handler = self.homeserver.get_handlers().room_member_handler
- room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+ room_ids = yield self.get_joined_rooms_for_user(user)
if state is None:
state = yield self.store.get_presence_state(user.localpart)
@@ -747,8 +771,7 @@ class PresenceHandler(BaseHandler):
# and also user is informed of server-forced pushes
localusers.add(user)
- rm_handler = self.homeserver.get_handlers().room_member_handler
- room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+ room_ids = yield self.get_joined_rooms_for_user(user)
if not localusers and not room_ids:
defer.returnValue(None)
@@ -793,8 +816,7 @@ class PresenceHandler(BaseHandler):
" | %d interested local observers %r", len(observers), observers
)
- rm_handler = self.homeserver.get_handlers().room_member_handler
- room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+ room_ids = yield self.get_joined_rooms_for_user(user)
if room_ids:
logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
@@ -813,10 +835,8 @@ class PresenceHandler(BaseHandler):
self.clock.time_msec() - state.pop("last_active_ago")
)
- statuscache = self._get_or_make_usercache(user)
-
self._user_cachemap_latest_serial += 1
- statuscache.update(state, serial=self._user_cachemap_latest_serial)
+ yield self.update_presence_cache(user, state, room_ids=room_ids)
if not observers and not room_ids:
logger.debug(" | no interested observers or room IDs")
@@ -875,6 +895,35 @@ class PresenceHandler(BaseHandler):
yield defer.DeferredList(deferreds, consumeErrors=True)
@defer.inlineCallbacks
+ def update_presence_cache(self, user, state={}, room_ids=None,
+ add_to_cache=True):
+ """Update the presence cache for a user with a new state and bump the
+ serial to the latest value.
+
+ Args:
+ user(UserID): The user being updated
+ state(dict): The presence state being updated
+ room_ids(None or list of str): A list of room_ids to update. If
+ room_ids is None then fetch the list of room_ids the user is
+ joined to.
+ add_to_cache: Whether to add an entry to the presence cache if the
+ user isn't already in the cache.
+ Returns:
+ A Deferred UserPresenceCache for the user being updated.
+ """
+ if room_ids is None:
+ room_ids = yield self.get_joined_rooms_for_user(user)
+
+ for room_id in room_ids:
+ self._room_serials[room_id] = self._user_cachemap_latest_serial
+ if add_to_cache:
+ statuscache = self._get_or_make_usercache(user)
+ else:
+ statuscache = self._get_or_offline_usercache(user)
+ statuscache.update(state, serial=self._user_cachemap_latest_serial)
+ defer.returnValue(statuscache)
+
+ @defer.inlineCallbacks
def push_update_to_local_and_remote(self, observed_user, statuscache,
users_to_push=[], room_ids=[],
remote_domains=[]):
@@ -997,38 +1046,10 @@ class PresenceEventSource(object):
self.clock = hs.get_clock()
@defer.inlineCallbacks
- def is_visible(self, observer_user, observed_user):
- if observer_user == observed_user:
- defer.returnValue(True)
-
- presence = self.hs.get_handlers().presence_handler
-
- if (yield presence.store.user_rooms_intersect(
- [u.to_string() for u in observer_user, observed_user])):
- defer.returnValue(True)
-
- if self.hs.is_mine(observed_user):
- pushmap = presence._local_pushmap
-
- defer.returnValue(
- observed_user.localpart in pushmap and
- observer_user in pushmap[observed_user.localpart]
- )
- else:
- recvmap = presence._remote_recvmap
-
- defer.returnValue(
- observed_user in recvmap and
- observer_user in recvmap[observed_user]
- )
-
- @defer.inlineCallbacks
@log_function
def get_new_events_for_user(self, user, from_key, limit):
from_key = int(from_key)
- observer_user = user
-
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
@@ -1037,17 +1058,27 @@ class PresenceEventSource(object):
clock = self.clock
latest_serial = 0
+ user_ids_to_check = {user}
+ presence_list = yield presence.store.get_presence_list(
+ user.localpart, accepted=True
+ )
+ if presence_list is not None:
+ user_ids_to_check |= set(
+ UserID.from_string(p["observed_user_id"]) for p in presence_list
+ )
+ room_ids = yield presence.get_joined_rooms_for_user(user)
+ for room_id in set(room_ids) & set(presence._room_serials):
+ if presence._room_serials[room_id] > from_key:
+ joined = yield presence.get_joined_users_for_room_id(room_id)
+ user_ids_to_check |= set(joined)
+
updates = []
- # TODO(paul): use a DeferredList ? How to limit concurrency.
- for observed_user in cachemap.keys():
+ for observed_user in user_ids_to_check & set(cachemap):
cached = cachemap[observed_user]
if cached.serial <= from_key or cached.serial > max_serial:
continue
- if not (yield self.is_visible(observer_user, observed_user)):
- continue
-
latest_serial = max(cached.serial, latest_serial)
updates.append(cached.make_event(user=observed_user, clock=clock))
@@ -1084,8 +1115,6 @@ class PresenceEventSource(object):
def get_pagination_rows(self, user, pagination_config, key):
# TODO (erikj): Does this make sense? Ordering?
- observer_user = user
-
from_key = int(pagination_config.from_key)
if pagination_config.to_key:
@@ -1096,14 +1125,26 @@ class PresenceEventSource(object):
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
+ user_ids_to_check = {user}
+ presence_list = yield presence.store.get_presence_list(
+ user.localpart, accepted=True
+ )
+ if presence_list is not None:
+ user_ids_to_check |= set(
+ UserID.from_string(p["observed_user_id"]) for p in presence_list
+ )
+ room_ids = yield presence.get_joined_rooms_for_user(user)
+ for room_id in set(room_ids) & set(presence._room_serials):
+ if presence._room_serials[room_id] >= from_key:
+ joined = yield presence.get_joined_users_for_room_id(room_id)
+ user_ids_to_check |= set(joined)
+
updates = []
- # TODO(paul): use a DeferredList ? How to limit concurrency.
- for observed_user in cachemap.keys():
+ for observed_user in user_ids_to_check & set(cachemap):
if not (to_key < cachemap[observed_user].serial <= from_key):
continue
- if (yield self.is_visible(observer_user, observed_user)):
- updates.append((observed_user, cachemap[observed_user]))
+ updates.append((observed_user, cachemap[observed_user]))
# TODO(paul): limit
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index ee773797e7..12cf5747a2 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -624,6 +624,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
"""
PRESENCE_LIST = {
'apple': [ "@banana:test", "@clementine:test" ],
+ 'banana': [ "@apple:test" ],
}
@defer.inlineCallbacks
@@ -836,12 +837,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
@defer.inlineCallbacks
def test_recv_remote(self):
- # TODO(paul): Gut-wrenching
- potato_set = self.handler._remote_recvmap.setdefault(self.u_potato,
- set())
- potato_set.add(self.u_apple)
-
- self.room_members = [self.u_banana, self.u_potato]
+ self.room_members = [self.u_apple, self.u_banana, self.u_potato]
self.assertEquals(self.event_source.get_current_key(), 0)
@@ -886,11 +882,8 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
@defer.inlineCallbacks
def test_recv_remote_offline(self):
""" Various tests relating to SYN-261 """
- potato_set = self.handler._remote_recvmap.setdefault(self.u_potato,
- set())
- potato_set.add(self.u_apple)
- self.room_members = [self.u_banana, self.u_potato]
+ self.room_members = [self.u_apple, self.u_banana, self.u_potato]
self.assertEquals(self.event_source.get_current_key(), 0)
diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py
index 21f42b3d3e..523b30cf8a 100644
--- a/tests/rest/client/v1/test_presence.py
+++ b/tests/rest/client/v1/test_presence.py
@@ -297,6 +297,9 @@ class PresenceEventStreamTestCase(unittest.TestCase):
else:
return []
hs.handlers.room_member_handler.get_joined_rooms_for_user = get_rooms_for_user
+ hs.handlers.room_member_handler.get_room_members = (
+ lambda r: self.room_members if r == "a-room" else []
+ )
self.mock_datastore = hs.get_datastore()
self.mock_datastore.get_app_service_by_token = Mock(return_value=None)
|