From e9aec001f463a4704836e7f02645afc641238d28 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Jul 2017 10:30:10 +0100 Subject: Use less DB for device list handling in sync --- synapse/handlers/sync.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91c6c6be3c..e6df1819b9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -579,18 +579,17 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - room_ids = yield self.store.get_rooms_for_user(user_id) - - user_ids_changed = set() changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) - for other_user_id in changed: - other_room_ids = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(other_room_ids): - user_ids_changed.add(other_user_id) + if not changed: + defer.returnValue([]) + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - defer.returnValue(user_ids_changed) + defer.returnValue(users_who_share_room & changed) else: defer.returnValue([]) -- cgit 1.4.1 From 53cc8ad35a269723478a1ee1a9a96d510a7b044f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Sep 2017 15:08:39 +0100 Subject: Send down device list change notif when member leaves/rejoins room --- synapse/handlers/device.py | 2 +- synapse/handlers/sync.py | 64 +++++++++++++++++++++++++++++------- synapse/rest/client/v2_alpha/sync.py | 3 +- 3 files changed, 55 insertions(+), 14 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ed60d494ff..be120b2f34 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -320,7 +320,7 @@ class DeviceHandler(BaseHandler): # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.values(): + for state_dict in prev_state_ids.itervalues(): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: possibly_changed.add(state_key) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e6df1819b9..4ee6109cf8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -108,6 +108,16 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ return True +class DeviceLists(collections.namedtuple("DeviceLists", [ + "changed", # list of user_ids whose devices may have changed + "left", # list of user_ids whose devices we no longer track +])): + __slots__ = [] + + def __nonzero__(self): + return bool(self.changed or self.left) + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -535,7 +545,7 @@ class SyncHandler(object): res = yield self._generate_sync_entry_for_rooms( sync_result_builder, account_data_by_room ) - newly_joined_rooms, newly_joined_users = res + newly_joined_rooms, newly_joined_users, _, newly_left_users = res block_all_presence_data = ( since_token is None and @@ -549,7 +559,11 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) device_lists = yield self._generate_sync_entry_for_device_list( - sync_result_builder + sync_result_builder, + newly_joined_rooms=newly_joined_rooms, + newly_joined_users=newly_joined_users, + newly_left_rooms=[], + newly_left_users=newly_left_users, ) device_id = sync_config.device_id @@ -574,7 +588,9 @@ class SyncHandler(object): @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks - def _generate_sync_entry_for_device_list(self, sync_result_builder): + def _generate_sync_entry_for_device_list(self, sync_result_builder, + newly_joined_rooms, newly_joined_users, + newly_left_rooms, newly_left_users): user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token @@ -582,16 +598,32 @@ class SyncHandler(object): changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) - if not changed: - defer.returnValue([]) + + # TODO: Check that these users are actually new, i.e. either they + # weren't in the previous sync *or* they left and rejoined. + changed.update(newly_joined_users) + + # TODO: Add the members from newly_*_rooms + + if not changed and not newly_left_users: + defer.returnValue(DeviceLists( + changed=[], + left=newly_left_users, + )) users_who_share_room = yield self.store.get_users_who_share_room_with_user( user_id ) - defer.returnValue(users_who_share_room & changed) + defer.returnValue(DeviceLists( + changed=users_who_share_room & changed, + left=set(newly_left_users) - users_who_share_room, + )) else: - defer.returnValue([]) + defer.returnValue(DeviceLists( + changed=[], + left=[], + )) @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): @@ -755,8 +787,8 @@ class SyncHandler(object): account_data_by_room(dict): Dictionary of per room account data Returns: - Deferred(tuple): Returns a 2-tuple of - `(newly_joined_rooms, newly_joined_users)` + Deferred(tuple): Returns a 4-tuple of + `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)` """ user_id = sync_result_builder.sync_config.user.to_string() block_all_room_ephemeral = ( @@ -787,7 +819,7 @@ class SyncHandler(object): ) if not tags_by_room: logger.debug("no-oping sync") - defer.returnValue(([], [])) + defer.returnValue(([], [], [], [])) ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, @@ -828,17 +860,24 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() + newly_left_users = set() if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.values() + joined_sync.timeline.events, joined_sync.state.itervalues() ) for event in it: if event.type == EventTypes.Member: if event.membership == Membership.JOIN: newly_joined_users.add(event.state_key) + else: + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership == Membership.JOIN: + newly_left_users.add(event.state_key) - defer.returnValue((newly_joined_rooms, newly_joined_users)) + newly_left_users -= newly_joined_users + defer.returnValue((newly_joined_rooms, newly_joined_users, [], newly_left_users)) @defer.inlineCallbacks def _have_rooms_changed(self, sync_result_builder): @@ -1259,6 +1298,7 @@ class SyncResultBuilder(object): self.invited = [] self.archived = [] self.device = [] + self.to_device = [] class RoomSyncResultBuilder(object): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 2939896f44..978af9c280 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -189,7 +189,8 @@ class SyncRestServlet(RestServlet): "account_data": {"events": sync_result.account_data}, "to_device": {"events": sync_result.to_device}, "device_lists": { - "changed": list(sync_result.device_lists), + "changed": list(sync_result.device_lists.changed), + "left": list(sync_result.device_lists.left), }, "presence": SyncRestServlet.encode_presence( sync_result.presence, time_now -- cgit 1.4.1 From 69ef4987a68d66093007ca11886e25139ea0c970 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 Sep 2017 14:44:36 +0100 Subject: Add left section to /keys/changes --- synapse/handlers/device.py | 22 ++++++++++++++++------ synapse/handlers/sync.py | 2 +- synapse/rest/client/v2_alpha/keys.py | 6 ++---- 3 files changed, 19 insertions(+), 11 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index be120b2f34..ef8753b1ff 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -326,13 +326,23 @@ class DeviceHandler(BaseHandler): possibly_changed.add(state_key) break - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) + if possibly_changed: + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - # Take the intersection of the users whose devices may have changed - # and those that actually still share a room with the user - defer.returnValue(users_who_share_room & possibly_changed) + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + possibly_joined = possibly_changed & users_who_share_room + possibly_left = possibly_changed - users_who_share_room + else: + possibly_joined = [] + possibly_left = [] + + defer.returnValue({ + "changed": list(possibly_joined), + "left": list(possibly_left), + }) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4ee6109cf8..9ae7fbc797 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -949,7 +949,7 @@ class SyncHandler(object): newly_joined_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.items(): + for room_id, events in mem_change_events_by_room_id.iteritems(): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 6a3cfe84f8..943e87e7fd 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -188,13 +188,11 @@ class KeyChangesServlet(RestServlet): user_id = requester.user.to_string() - changed = yield self.device_handler.get_user_ids_changed( + results = yield self.device_handler.get_user_ids_changed( user_id, from_token, ) - defer.returnValue((200, { - "changed": list(changed), - })) + defer.returnValue((200, results)) class OneTimeKeyServlet(RestServlet): -- cgit 1.4.1 From 9ce866ed4f68450d8a2eab84be759c0056b6b992 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 Sep 2017 16:44:26 +0100 Subject: In sync handle device lists for newly joined/left rooms --- synapse/handlers/sync.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9ae7fbc797..d1ba75dbda 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -599,12 +599,20 @@ class SyncHandler(object): since_token.device_list_key ) + # TODO: Be more clever than this, i.e. remove users who we already + # share a room with? + for room_id in newly_joined_rooms: + joined_users = yield self.state.get_current_user_in_room(room_id) + newly_joined_users.update(joined_users) + + for room_id in newly_left_rooms: + left_users = yield self.state.get_current_user_in_room(room_id) + newly_left_users.update(left_users) + # TODO: Check that these users are actually new, i.e. either they # weren't in the previous sync *or* they left and rejoined. changed.update(newly_joined_users) - # TODO: Add the members from newly_*_rooms - if not changed and not newly_left_users: defer.returnValue(DeviceLists( changed=[], -- cgit 1.4.1 From 473700f0162482e7bb57cad922de99ff29b9b216 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2017 15:13:41 +0100 Subject: Get left rooms --- synapse/handlers/sync.py | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d1ba75dbda..9aae4c344b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -840,7 +840,7 @@ class SyncHandler(object): if since_token: res = yield self._get_rooms_changed(sync_result_builder, ignored_users) - room_entries, invited, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms, newly_left_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, since_token.account_data_key, @@ -848,6 +848,7 @@ class SyncHandler(object): else: res = yield self._get_all_rooms(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res + newly_left_rooms = [] tags_by_room = yield self.store.get_tags_for_user(user_id) @@ -885,7 +886,13 @@ class SyncHandler(object): newly_left_users.add(event.state_key) newly_left_users -= newly_joined_users - defer.returnValue((newly_joined_rooms, newly_joined_users, [], newly_left_users)) + + defer.returnValue(( + newly_joined_rooms, + newly_joined_users, + newly_left_rooms, + newly_left_users, + )) @defer.inlineCallbacks def _have_rooms_changed(self, sync_result_builder): @@ -955,6 +962,7 @@ class SyncHandler(object): mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) newly_joined_rooms = [] + newly_left_rooms = [] room_entries = [] invited = [] for room_id, events in mem_change_events_by_room_id.iteritems(): @@ -964,6 +972,7 @@ class SyncHandler(object): # We want to figure out if we joined the room at some point since # the last sync (even if we have since left). This is to make sure # we do send down the room, and with full state, where necessary + old_state_ids = None if room_id in joined_room_ids or has_join: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) @@ -981,6 +990,26 @@ class SyncHandler(object): if not non_joins: continue + # Check if we have left the room. This can either be because we were + # joined before *or* that we since joined and then left. + if events[-1].membership != Membership.JOIN: + if has_join: + newly_left_rooms.append(room_id) + else: + if not old_state_ids: + old_state_ids = yield self.get_state_at(room_id, since_token) + old_mem_ev_id = old_state_ids.get( + (EventTypes.Member, user_id), + None, + ) + old_mem_ev = None + if old_mem_ev_id: + old_mem_ev = yield self.store.get_event( + old_mem_ev_id, allow_none=True + ) + if old_mem_ev and old_mem_ev.membership == Membership.JOIN: + newly_left_rooms.append(room_id) + # Only bother if we're still currently invited should_invite = non_joins[-1].membership == Membership.INVITE if should_invite: @@ -1058,7 +1087,7 @@ class SyncHandler(object): upto_token=since_token, )) - defer.returnValue((room_entries, invited, newly_joined_rooms)) + defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builder, ignored_users): -- cgit 1.4.1 From 3a0cee28d6457b812123f6bad6deee476bef4984 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Sep 2017 11:49:37 +0100 Subject: Actually hook leave notifs up --- synapse/handlers/sync.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9aae4c344b..c6b04a1683 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -545,7 +545,8 @@ class SyncHandler(object): res = yield self._generate_sync_entry_for_rooms( sync_result_builder, account_data_by_room ) - newly_joined_rooms, newly_joined_users, _, newly_left_users = res + newly_joined_rooms, newly_joined_users, _, _ = res + _, _, newly_left_rooms, newly_left_users = res block_all_presence_data = ( since_token is None and @@ -562,7 +563,7 @@ class SyncHandler(object): sync_result_builder, newly_joined_rooms=newly_joined_rooms, newly_joined_users=newly_joined_users, - newly_left_rooms=[], + newly_left_rooms=newly_left_rooms, newly_left_users=newly_left_users, ) -- cgit 1.4.1 From 2d1b7955aec60a2a5dabc7882b4081b794968d7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Sep 2017 17:13:03 +0100 Subject: Don't filter out current state events from timeline --- synapse/handlers/sync.py | 7 +++++++ synapse/visibility.py | 14 +++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c6b04a1683..bb78c25ee5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -293,6 +293,11 @@ class SyncHandler(object): timeline_limit = sync_config.filter_collection.timeline_limit() block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline() + # Pull out the current state, as we always want to include those events + # in the timeline if they're there. + current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = frozenset(current_state_ids.itervalues()) + if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: @@ -304,6 +309,7 @@ class SyncHandler(object): self.store, sync_config.user.to_string(), recents, + always_include_ids=current_state_ids, ) else: recents = [] @@ -339,6 +345,7 @@ class SyncHandler(object): self.store, sync_config.user.to_string(), loaded_recents, + always_include_ids=current_state_ids, ) loaded_recents.extend(recents) recents = loaded_recents diff --git a/synapse/visibility.py b/synapse/visibility.py index 5590b866ed..d7dbdc77ff 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -43,7 +43,8 @@ MEMBERSHIP_PRIORITY = ( @defer.inlineCallbacks -def filter_events_for_clients(store, user_tuples, events, event_id_to_state): +def filter_events_for_clients(store, user_tuples, events, event_id_to_state, + always_include_ids=frozenset()): """ Returns dict of user_id -> list of events that user is allowed to see. @@ -54,6 +55,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): * the user has not been a member of the room since the given events events ([synapse.events.EventBase]): list of events to filter + always_include_ids (set(event_id)): set of event ids to specifically + include (unless sender is ignored) """ forgotten = yield preserve_context_over_deferred(defer.gatherResults([ defer.maybeDeferred( @@ -91,6 +94,9 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): if not event.is_state() and event.sender in ignore_list: return False + if event.event_id in always_include_ids: + return True + state = event_id_to_state[event.event_id] # get the room_visibility at the time of the event. @@ -189,7 +195,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): @defer.inlineCallbacks -def filter_events_for_client(store, user_id, events, is_peeking=False): +def filter_events_for_client(store, user_id, events, is_peeking=False, + always_include_ids=frozenset()): """ Check which events a user is allowed to see @@ -213,6 +220,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False): types=types ) res = yield filter_events_for_clients( - store, [(user_id, is_peeking)], events, event_id_to_state + store, [(user_id, is_peeking)], events, event_id_to_state, + always_include_ids=always_include_ids, ) defer.returnValue(res.get(user_id, [])) -- cgit 1.4.1