diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9982ae0fed..fdfce2a88c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1011,7 +1011,7 @@ class PresenceEventSource(object):
@defer.inlineCallbacks
@log_function
def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
- **kwargs):
+ explicit_room_id=None, **kwargs):
# The process for getting presence events are:
# 1. Get the rooms the user is in.
# 2. Get the list of user in the rooms.
@@ -1028,22 +1028,24 @@ class PresenceEventSource(object):
user_id = user.to_string()
if from_key is not None:
from_key = int(from_key)
- room_ids = room_ids or []
presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
- if not room_ids:
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = set(e.room_id for e in rooms)
- else:
- room_ids = set(room_ids)
-
max_token = self.store.get_current_presence_token()
plist = yield self.store.get_presence_list_accepted(user.localpart)
- friends = set(row["observed_user_id"] for row in plist)
- friends.add(user_id) # So that we receive our own presence
+ users_interested_in = set(row["observed_user_id"] for row in plist)
+ users_interested_in.add(user_id) # So that we receive our own presence
+
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
+ users_interested_in.update(users_who_share_room)
+
+ if explicit_room_id:
+ user_ids = yield self.store.get_users_in_room(explicit_room_id)
+ users_interested_in.update(user_ids)
user_ids_changed = set()
changed = None
@@ -1055,35 +1057,19 @@ class PresenceEventSource(object):
# work out if we share a room or they're in our presence list
get_updates_counter.inc("stream")
for other_user_id in changed:
- if other_user_id in friends:
+ if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
- continue
- other_rooms = yield self.store.get_rooms_for_user(other_user_id)
- if room_ids.intersection(e.room_id for e in other_rooms):
- user_ids_changed.add(other_user_id)
- continue
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.inc("full")
- user_ids_to_check = set()
- for room_id in room_ids:
- users = yield self.store.get_users_in_room(room_id)
- user_ids_to_check.update(users)
-
- user_ids_to_check.update(friends)
-
- # Always include yourself. Only really matters for when the user is
- # not in any rooms, but still.
- user_ids_to_check.add(user_id)
-
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
- user_ids_to_check, from_key,
+ users_interested_in, from_key,
)
else:
- user_ids_changed = user_ids_to_check
+ user_ids_changed = users_interested_in
updates = yield presence.current_state_for_users(user_ids_changed)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5f18007e90..7e7671c9a2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -437,6 +437,7 @@ class RoomEventSource(object):
limit,
room_ids,
is_guest,
+ explicit_room_id=None,
):
# We just ignore the key for now.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index acbd4bb5ae..8051a7a842 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -378,6 +378,7 @@ class Notifier(object):
limit=limit,
is_guest=is_peeking,
room_ids=room_ids,
+ explicit_room_id=explicit_room_id,
)
if name == "room":
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ee800d074f..70718f41ed 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -280,6 +280,22 @@ class RoomMemberStore(SQLBaseStore):
user_id, membership_list=[Membership.JOIN],
)
+ @cachedInlineCallbacks(max_entries=50000, cache_context=True, iterable=True)
+ def get_users_who_share_room_with_user(self, user_id, cache_context):
+ rooms = yield self.get_rooms_for_user(
+ user_id, on_invalidate=cache_context.invalidate,
+ )
+
+ user_who_share_room = set()
+ for room in rooms:
+ user_ids = yield self.get_users_in_room(
+ room.room_id, on_invalidate=cache_context.invalidate,
+ )
+ logger.info("Users in room: %r %r", room.room_id, user_ids)
+ user_who_share_room.update(user_ids)
+
+ defer.returnValue(user_who_share_room)
+
def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""
def f(txn):
|