2 files changed, 61 insertions, 19 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7d4fe5aaa5..8134c5185a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -846,35 +846,61 @@ class PresenceEventSource(object):
room_ids = room_ids or []
presence = self.hs.get_handlers().presence_handler
+ stream_change_cache = self.store.presence_stream_cache
if not room_ids:
rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = set(e.room_id for e in rooms)
-
- user_ids_to_check = set()
- for room_id in room_ids:
- users = yield self.store.get_users_in_room(room_id)
- user_ids_to_check.update(users)
+ else:
+ room_ids = set(room_ids)
plist = yield self.store.get_presence_list_accepted(user.localpart)
- user_ids_to_check.update([row["observed_user_id"] for row in plist])
-
- # Always include yourself. Only really matters for when the user is
- # not in any rooms, but still.
- user_ids_to_check.add(user_id)
-
- max_token = self.store.get_current_presence_token()
-
- if from_key:
- user_ids_changed = self.store.presence_stream_cache.get_entities_changed(
- user_ids_to_check, from_key,
- )
+ friends = set(row["observed_user_id"] for row in plist)
+ friends.add(user_id) # So that we receive our own presence
+
+ user_ids_changed = set()
+ changed = None
+ if from_key and from_key < 100:
+ # For small deltas, its quicker to get all changes and then
+ # work out if we share a room or they're in our presence list
+ changed = stream_change_cache.get_all_entities_changed(from_key)
+
+ # get_all_entities_changed can return None
+ if changed is not None:
+ for other_user_id in changed:
+ if other_user_id in friends:
+ user_ids_changed.add(other_user_id)
+ continue
+ other_rooms = yield self.store.get_rooms_for_user(other_user_id)
+ if room_ids.intersection(e.room_id for e in other_rooms):
+ user_ids_changed.add(other_user_id)
+ continue
else:
- user_ids_changed = user_ids_to_check
+ # Too many possible updates. Find all users we can see and check
+ # if any of them have changed.
+ user_ids_to_check = set()
+ for room_id in room_ids:
+ users = yield self.store.get_users_in_room(room_id)
+ user_ids_to_check.update(users)
+
+ plist = yield self.store.get_presence_list_accepted(user.localpart)
+ user_ids_to_check.update([row["observed_user_id"] for row in plist])
+
+ # Always include yourself. Only really matters for when the user is
+ # not in any rooms, but still.
+ user_ids_to_check.add(user_id)
+
+ if from_key:
+ user_ids_changed = stream_change_cache.get_entities_changed(
+ user_ids_to_check, from_key,
+ )
+ else:
+ user_ids_changed = user_ids_to_check
updates = yield presence.current_state_for_users(user_ids_changed)
- now = self.clock.time_msec()
+ max_token = self.store.get_current_presence_token()
+ now = self.clock.time_msec()
defer.returnValue(([
{
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b37f1c0725..970488a19c 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -85,6 +85,22 @@ class StreamChangeCache(object):
return result
+ def get_all_entities_changed(self, stream_pos):
+ """Returns all entites that have had new things since the given
+ position. If the position is too old it will return None.
+ """
+ assert type(stream_pos) is int
+
+ if stream_pos >= self._earliest_known_stream_pos:
+ keys = self._cache.keys()
+ i = keys.bisect_right(stream_pos)
+
+ return (
+ self._cache[k] for k in keys[i:]
+ )
+ else:
+ return None
+
def entity_has_changed(self, entity, stream_pos):
"""Informs the cache that the entity has been changed at the given
position.
|