diff options
author | Erik Johnston <erik@matrix.org> | 2019-06-20 11:59:14 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-06-20 11:59:14 +0100 |
commit | 45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3 (patch) | |
tree | 07bb21377c6611db89f64f948a2e27645662ff0e /synapse/handlers/sync.py | |
parent | Add descriptions and remove redundant set(..) (diff) | |
parent | Run Black. (#5482) (diff) | |
download | synapse-45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/histogram_extremities
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r-- | synapse/handlers/sync.py | 694 |
1 files changed, 351 insertions, 343 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 62fda0c664..c5188a1f8e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -64,20 +64,14 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 -SyncConfig = collections.namedtuple("SyncConfig", [ - "user", - "filter_collection", - "is_guest", - "request_key", - "device_id", -]) - - -class TimelineBatch(collections.namedtuple("TimelineBatch", [ - "prev_batch", - "events", - "limited", -])): +SyncConfig = collections.namedtuple( + "SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"] +) + + +class TimelineBatch( + collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"]) +): __slots__ = [] def __nonzero__(self): @@ -85,18 +79,24 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ to tell if room needs to be part of the sync result. """ return bool(self.events) + __bool__ = __nonzero__ # python3 -class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ - "room_id", # str - "timeline", # TimelineBatch - "state", # dict[(str, str), FrozenEvent] - "ephemeral", - "account_data", - "unread_notifications", - "summary", -])): +class JoinedSyncResult( + collections.namedtuple( + "JoinedSyncResult", + [ + "room_id", # str + "timeline", # TimelineBatch + "state", # dict[(str, str), FrozenEvent] + "ephemeral", + "account_data", + "unread_notifications", + "summary", + ], + ) +): __slots__ = [] def __nonzero__(self): @@ -111,77 +111,93 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ # nb the notification count does not, er, count: if there's nothing # else in the result, we don't need to send it. ) + __bool__ = __nonzero__ # python3 -class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ - "room_id", # str - "timeline", # TimelineBatch - "state", # dict[(str, str), FrozenEvent] - "account_data", -])): +class ArchivedSyncResult( + collections.namedtuple( + "ArchivedSyncResult", + [ + "room_id", # str + "timeline", # TimelineBatch + "state", # dict[(str, str), FrozenEvent] + "account_data", + ], + ) +): __slots__ = [] def __nonzero__(self): """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool( - self.timeline - or self.state - or self.account_data - ) + return bool(self.timeline or self.state or self.account_data) + __bool__ = __nonzero__ # python3 -class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ - "room_id", # str - "invite", # FrozenEvent: the invite event -])): +class InvitedSyncResult( + collections.namedtuple( + "InvitedSyncResult", + ["room_id", "invite"], # str # FrozenEvent: the invite event + ) +): __slots__ = [] def __nonzero__(self): """Invited rooms should always be reported to the client""" return True + __bool__ = __nonzero__ # python3 -class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ - "join", - "invite", - "leave", -])): +class GroupsSyncResult( + collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"]) +): __slots__ = [] def __nonzero__(self): return bool(self.join or self.invite or self.leave) + __bool__ = __nonzero__ # python3 -class DeviceLists(collections.namedtuple("DeviceLists", [ - "changed", # list of user_ids whose devices may have changed - "left", # list of user_ids whose devices we no longer track -])): +class DeviceLists( + collections.namedtuple( + "DeviceLists", + [ + "changed", # list of user_ids whose devices may have changed + "left", # list of user_ids whose devices we no longer track + ], + ) +): __slots__ = [] def __nonzero__(self): return bool(self.changed or self.left) + __bool__ = __nonzero__ # python3 -class SyncResult(collections.namedtuple("SyncResult", [ - "next_batch", # Token for the next sync - "presence", # List of presence events for the user. - "account_data", # List of account_data events for the user. - "joined", # JoinedSyncResult for each joined room. - "invited", # InvitedSyncResult for each invited room. - "archived", # ArchivedSyncResult for each archived room. - "to_device", # List of direct messages for the device. - "device_lists", # List of user_ids whose devices have changed - "device_one_time_keys_count", # Dict of algorithm to count for one time keys - # for this device - "groups", -])): +class SyncResult( + collections.namedtuple( + "SyncResult", + [ + "next_batch", # Token for the next sync + "presence", # List of presence events for the user. + "account_data", # List of account_data events for the user. + "joined", # JoinedSyncResult for each joined room. + "invited", # InvitedSyncResult for each invited room. + "archived", # ArchivedSyncResult for each archived room. + "to_device", # List of direct messages for the device. + "device_lists", # List of user_ids whose devices have changed + "device_one_time_keys_count", # Dict of algorithm to count for one time keys + # for this device + "groups", + ], + ) +): __slots__ = [] def __nonzero__(self): @@ -190,20 +206,20 @@ class SyncResult(collections.namedtuple("SyncResult", [ events. """ return bool( - self.presence or - self.joined or - self.invited or - self.archived or - self.account_data or - self.to_device or - self.device_lists or - self.groups + self.presence + or self.joined + or self.invited + or self.archived + or self.account_data + or self.to_device + or self.device_lists + or self.groups ) + __bool__ = __nonzero__ # python3 class SyncHandler(object): - def __init__(self, hs): self.hs_config = hs.config self.store = hs.get_datastore() @@ -217,13 +233,16 @@ class SyncHandler(object): # ExpiringCache((User, Device)) -> LruCache(state_key => event_id) self.lazy_loaded_members_cache = ExpiringCache( - "lazy_loaded_members_cache", self.clock, - max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, + "lazy_loaded_members_cache", + self.clock, + max_len=0, + expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, ) @defer.inlineCallbacks - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, - full_state=False): + def wait_for_sync_for_user( + self, sync_config, since_token=None, timeout=0, full_state=False + ): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. @@ -239,13 +258,15 @@ class SyncHandler(object): res = yield self.response_cache.wrap( sync_config.request_key, self._wait_for_sync_for_user, - sync_config, since_token, timeout, full_state, + sync_config, + since_token, + timeout, + full_state, ) defer.returnValue(res) @defer.inlineCallbacks - def _wait_for_sync_for_user(self, sync_config, since_token, timeout, - full_state): + def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state): if since_token is None: sync_type = "initial_sync" elif full_state: @@ -261,14 +282,17 @@ class SyncHandler(object): # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result = yield self.current_sync_for_user( - sync_config, since_token, full_state=full_state, + sync_config, since_token, full_state=full_state ) else: + def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) result = yield self.notifier.wait_for_events( - sync_config.user.to_string(), timeout, current_sync_callback, + sync_config.user.to_string(), + timeout, + current_sync_callback, from_token=since_token, ) @@ -281,8 +305,7 @@ class SyncHandler(object): defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None, - full_state=False): + def current_sync_for_user(self, sync_config, since_token=None, full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. @@ -334,8 +357,7 @@ class SyncHandler(object): # result returned by the event source is poor form (it might cache # the object) room_id = event["room_id"] - event_copy = {k: v for (k, v) in iteritems(event) - if k != "room_id"} + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) receipt_key = since_token.receipt_key if since_token else "0" @@ -353,22 +375,30 @@ class SyncHandler(object): for event in receipts: room_id = event["room_id"] # exclude room id, as above - event_copy = {k: v for (k, v) in iteritems(event) - if k != "room_id"} + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) defer.returnValue((now_token, ephemeral_by_room)) @defer.inlineCallbacks - def _load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None, recents=None, newly_joined_room=False): + def _load_filtered_recents( + self, + room_id, + sync_config, + now_token, + since_token=None, + recents=None, + newly_joined_room=False, + ): """ Returns: a Deferred TimelineBatch """ with Measure(self.clock, "load_filtered_recents"): timeline_limit = sync_config.filter_collection.timeline_limit() - block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline() + block_all_timeline = ( + sync_config.filter_collection.blocks_all_room_timeline() + ) if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True @@ -396,11 +426,9 @@ class SyncHandler(object): recents = [] if not limited or block_all_timeline: - defer.returnValue(TimelineBatch( - events=recents, - prev_batch=now_token, - limited=False - )) + defer.returnValue( + TimelineBatch(events=recents, prev_batch=now_token, limited=False) + ) filtering_factor = 2 load_limit = max(timeline_limit * filtering_factor, 10) @@ -427,9 +455,7 @@ class SyncHandler(object): ) else: events, end_key = yield self.store.get_recent_events_for_room( - room_id, - limit=load_limit + 1, - end_token=end_key, + room_id, limit=load_limit + 1, end_token=end_key ) loaded_recents = sync_config.filter_collection.filter_room_timeline( events @@ -462,15 +488,15 @@ class SyncHandler(object): recents = recents[-timeline_limit:] room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace( - "room_key", room_key - ) + prev_batch_token = now_token.copy_and_replace("room_key", room_key) - defer.returnValue(TimelineBatch( - events=recents, - prev_batch=prev_batch_token, - limited=limited or newly_joined_room - )) + defer.returnValue( + TimelineBatch( + events=recents, + prev_batch=prev_batch_token, + limited=limited or newly_joined_room, + ) + ) @defer.inlineCallbacks def get_state_after_event(self, event, state_filter=StateFilter.all()): @@ -486,7 +512,7 @@ class SyncHandler(object): A Deferred map from ((type, state_key)->Event) """ state_ids = yield self.store.get_state_ids_for_event( - event.event_id, state_filter=state_filter, + event.event_id, state_filter=state_filter ) if event.is_state(): state_ids = state_ids.copy() @@ -511,13 +537,13 @@ class SyncHandler(object): # does not reliably give you the state at the given stream position. # (https://github.com/matrix-org/synapse/issues/3305) last_events, _ = yield self.store.get_recent_events_for_room( - room_id, end_token=stream_position.room_key, limit=1, + room_id, end_token=stream_position.room_key, limit=1 ) if last_events: last_event = last_events[-1] state = yield self.get_state_after_event( - last_event, state_filter=state_filter, + last_event, state_filter=state_filter ) else: @@ -549,7 +575,7 @@ class SyncHandler(object): # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305 last_events, _ = yield self.store.get_recent_event_ids_for_room( - room_id, end_token=now_token.room_key, limit=1, + room_id, end_token=now_token.room_key, limit=1 ) if not last_events: @@ -559,28 +585,25 @@ class SyncHandler(object): last_event = last_events[-1] state_ids = yield self.store.get_state_ids_for_event( last_event.event_id, - state_filter=StateFilter.from_types([ - (EventTypes.Name, ''), - (EventTypes.CanonicalAlias, ''), - ]), + state_filter=StateFilter.from_types( + [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")] + ), ) # this is heavily cached, thus: fast. details = yield self.store.get_room_summary(room_id) - name_id = state_ids.get((EventTypes.Name, '')) - canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, '')) + name_id = state_ids.get((EventTypes.Name, "")) + canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, "")) summary = {} empty_ms = MemberSummary([], 0) # TODO: only send these when they change. - summary["m.joined_member_count"] = ( - details.get(Membership.JOIN, empty_ms).count - ) - summary["m.invited_member_count"] = ( - details.get(Membership.INVITE, empty_ms).count - ) + summary["m.joined_member_count"] = details.get(Membership.JOIN, empty_ms).count + summary["m.invited_member_count"] = details.get( + Membership.INVITE, empty_ms + ).count # if the room has a name or canonical_alias set, we can skip # calculating heroes. Empty strings are falsey, so we check @@ -592,7 +615,7 @@ class SyncHandler(object): if canonical_alias_id: canonical_alias = yield self.store.get_event( - canonical_alias_id, allow_none=True, + canonical_alias_id, allow_none=True ) if canonical_alias and canonical_alias.content.get("alias"): defer.returnValue(summary) @@ -600,26 +623,14 @@ class SyncHandler(object): me = sync_config.user.to_string() joined_user_ids = [ - r[0] - for r in details.get(Membership.JOIN, empty_ms).members - if r[0] != me + r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me ] invited_user_ids = [ - r[0] - for r in details.get(Membership.INVITE, empty_ms).members - if r[0] != me + r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me ] - gone_user_ids = ( - [ - r[0] - for r in details.get(Membership.LEAVE, empty_ms).members - if r[0] != me - ] + [ - r[0] - for r in details.get(Membership.BAN, empty_ms).members - if r[0] != me - ] - ) + gone_user_ids = [ + r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me + ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me] # FIXME: only build up a member_ids list for our heroes member_ids = {} @@ -627,20 +638,18 @@ class SyncHandler(object): Membership.JOIN, Membership.INVITE, Membership.LEAVE, - Membership.BAN + Membership.BAN, ): for user_id, event_id in details.get(membership, empty_ms).members: member_ids[user_id] = event_id # FIXME: order by stream ordering rather than as returned by SQL - if (joined_user_ids or invited_user_ids): - summary['m.heroes'] = sorted( + if joined_user_ids or invited_user_ids: + summary["m.heroes"] = sorted( [user_id for user_id in (joined_user_ids + invited_user_ids)] )[0:5] else: - summary['m.heroes'] = sorted( - [user_id for user_id in gone_user_ids] - )[0:5] + summary["m.heroes"] = sorted([user_id for user_id in gone_user_ids])[0:5] if not sync_config.filter_collection.lazy_load_members(): defer.returnValue(summary) @@ -652,8 +661,7 @@ class SyncHandler(object): # track which members the client should already know about via LL: # Ones which are already in state... existing_members = set( - user_id for (typ, user_id) in state.keys() - if typ == EventTypes.Member + user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member ) # ...or ones which are in the timeline... @@ -664,10 +672,10 @@ class SyncHandler(object): # ...and then ensure any missing ones get included in state. missing_hero_event_ids = [ member_ids[hero_id] - for hero_id in summary['m.heroes'] + for hero_id in summary["m.heroes"] if ( - cache.get(hero_id) != member_ids[hero_id] and - hero_id not in existing_members + cache.get(hero_id) != member_ids[hero_id] + and hero_id not in existing_members ) ] @@ -691,8 +699,9 @@ class SyncHandler(object): return cache @defer.inlineCallbacks - def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, - full_state): + def compute_state_delta( + self, room_id, batch, sync_config, since_token, now_token, full_state + ): """ Works out the difference in state between the start of the timeline and the previous sync. @@ -745,23 +754,23 @@ class SyncHandler(object): timeline_state = { (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() + for event in batch.events + if event.is_state() } if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, state_filter=state_filter, + batch.events[-1].event_id, state_filter=state_filter ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, state_filter=state_filter, + batch.events[0].event_id, state_filter=state_filter ) else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token, - state_filter=state_filter, + room_id, stream_position=now_token, state_filter=state_filter ) state_ids = current_state_ids @@ -775,7 +784,7 @@ class SyncHandler(object): ) elif batch.limited: state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, state_filter=state_filter, + batch.events[0].event_id, state_filter=state_filter ) # for now, we disable LL for gappy syncs - see @@ -793,12 +802,11 @@ class SyncHandler(object): state_filter = StateFilter.all() state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token, - state_filter=state_filter, + room_id, stream_position=since_token, state_filter=state_filter ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, state_filter=state_filter, + batch.events[-1].event_id, state_filter=state_filter ) state_ids = _calculate_state( @@ -854,8 +862,7 @@ class SyncHandler(object): # add any member IDs we are about to send into our LruCache for t, event_id in itertools.chain( - state_ids.items(), - timeline_state.items(), + state_ids.items(), timeline_state.items() ): if t[0] == EventTypes.Member: cache.set(t[1], event_id) @@ -864,10 +871,14 @@ class SyncHandler(object): if state_ids: state = yield self.store.get_events(list(state_ids.values())) - defer.returnValue({ - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state(list(state.values())) - }) + defer.returnValue( + { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + list(state.values()) + ) + } + ) @defer.inlineCallbacks def unread_notifs_for_room_id(self, room_id, sync_config): @@ -875,7 +886,7 @@ class SyncHandler(object): last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user( user_id=sync_config.user.to_string(), room_id=room_id, - receipt_type="m.read" + receipt_type="m.read", ) notifs = [] @@ -909,7 +920,9 @@ class SyncHandler(object): logger.info( "Calculating sync response for %r between %s and %s", - sync_config.user, since_token, now_token, + sync_config.user, + since_token, + now_token, ) user_id = sync_config.user.to_string() @@ -920,11 +933,12 @@ class SyncHandler(object): raise NotImplementedError() else: joined_room_ids = yield self.get_rooms_for_user_at( - user_id, now_token.room_stream_id, + user_id, now_token.room_stream_id ) sync_result_builder = SyncResultBuilder( - sync_config, full_state, + sync_config, + full_state, since_token=since_token, now_token=now_token, joined_room_ids=joined_room_ids, @@ -941,8 +955,7 @@ class SyncHandler(object): _, _, newly_left_rooms, newly_left_users = res block_all_presence_data = ( - since_token is None and - sync_config.filter_collection.blocks_all_presence() + since_token is None and sync_config.filter_collection.blocks_all_presence() ) if self.hs_config.use_presence and not block_all_presence_data: yield self._generate_sync_entry_for_presence( @@ -973,22 +986,23 @@ class SyncHandler(object): room_id = joined_room.room_id if room_id in newly_joined_rooms: issue4422_logger.debug( - "Sync result for newly joined room %s: %r", - room_id, joined_room, + "Sync result for newly joined room %s: %r", room_id, joined_room ) - defer.returnValue(SyncResult( - presence=sync_result_builder.presence, - account_data=sync_result_builder.account_data, - joined=sync_result_builder.joined, - invited=sync_result_builder.invited, - archived=sync_result_builder.archived, - to_device=sync_result_builder.to_device, - device_lists=device_lists, - groups=sync_result_builder.groups, - device_one_time_keys_count=one_time_key_counts, - next_batch=sync_result_builder.now_token, - )) + defer.returnValue( + SyncResult( + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + archived=sync_result_builder.archived, + to_device=sync_result_builder.to_device, + device_lists=device_lists, + groups=sync_result_builder.groups, + device_one_time_keys_count=one_time_key_counts, + next_batch=sync_result_builder.now_token, + ) + ) @measure_func("_generate_sync_entry_for_groups") @defer.inlineCallbacks @@ -999,11 +1013,11 @@ class SyncHandler(object): if since_token and since_token.groups_key: results = yield self.store.get_groups_changes_for_user( - user_id, since_token.groups_key, now_token.groups_key, + user_id, since_token.groups_key, now_token.groups_key ) else: results = yield self.store.get_all_groups_for_user( - user_id, now_token.groups_key, + user_id, now_token.groups_key ) invited = {} @@ -1031,17 +1045,19 @@ class SyncHandler(object): left[group_id] = content["content"] sync_result_builder.groups = GroupsSyncResult( - join=joined, - invite=invited, - leave=left, + join=joined, invite=invited, leave=left ) @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks - def _generate_sync_entry_for_device_list(self, sync_result_builder, - newly_joined_rooms, - newly_joined_or_invited_users, - newly_left_rooms, newly_left_users): + def _generate_sync_entry_for_device_list( + self, + sync_result_builder, + newly_joined_rooms, + newly_joined_or_invited_users, + newly_left_rooms, + newly_left_users, + ): user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token @@ -1065,24 +1081,20 @@ class SyncHandler(object): changed.update(newly_joined_or_invited_users) if not changed and not newly_left_users: - defer.returnValue(DeviceLists( - changed=[], - left=newly_left_users, - )) + defer.returnValue(DeviceLists(changed=[], left=newly_left_users)) users_who_share_room = yield self.store.get_users_who_share_room_with_user( user_id ) - defer.returnValue(DeviceLists( - changed=users_who_share_room & changed, - left=set(newly_left_users) - users_who_share_room, - )) + defer.returnValue( + DeviceLists( + changed=users_who_share_room & changed, + left=set(newly_left_users) - users_who_share_room, + ) + ) else: - defer.returnValue(DeviceLists( - changed=[], - left=[], - )) + defer.returnValue(DeviceLists(changed=[], left=[])) @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): @@ -1109,8 +1121,9 @@ class SyncHandler(object): deleted = yield self.store.delete_messages_for_device( user_id, device_id, since_stream_id ) - logger.debug("Deleted %d to-device messages up to %d", - deleted, since_stream_id) + logger.debug( + "Deleted %d to-device messages up to %d", deleted, since_stream_id + ) messages, stream_id = yield self.store.get_new_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key @@ -1118,7 +1131,10 @@ class SyncHandler(object): logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", - len(messages), since_stream_id, stream_id, now_token.to_device_key + len(messages), + since_stream_id, + stream_id, + now_token.to_device_key, ) sync_result_builder.now_token = now_token.copy_and_replace( "to_device_key", stream_id @@ -1145,8 +1161,7 @@ class SyncHandler(object): if since_token and not sync_result_builder.full_state: account_data, account_data_by_room = ( yield self.store.get_updated_account_data_for_user( - user_id, - since_token.account_data_key, + user_id, since_token.account_data_key ) ) @@ -1160,27 +1175,28 @@ class SyncHandler(object): ) else: account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user( - sync_config.user.to_string() - ) + yield self.store.get_account_data_for_user(sync_config.user.to_string()) ) - account_data['m.push_rules'] = yield self.push_rules_for_user( + account_data["m.push_rules"] = yield self.push_rules_for_user( sync_config.user ) - account_data_for_user = sync_config.filter_collection.filter_account_data([ - {"type": account_data_type, "content": content} - for account_data_type, content in account_data.items() - ]) + account_data_for_user = sync_config.filter_collection.filter_account_data( + [ + {"type": account_data_type, "content": content} + for account_data_type, content in account_data.items() + ] + ) sync_result_builder.account_data = account_data_for_user defer.returnValue(account_data_by_room) @defer.inlineCallbacks - def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms, - newly_joined_or_invited_users): + def _generate_sync_entry_for_presence( + self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users + ): """Generates the presence portion of the sync response. Populates the `sync_result_builder` with the result. @@ -1223,17 +1239,13 @@ class SyncHandler(object): extra_users_ids.discard(user.to_string()) if extra_users_ids: - states = yield self.presence_handler.get_states( - extra_users_ids, - ) + states = yield self.presence_handler.get_states(extra_users_ids) presence.extend(states) # Deduplicate the presence entries so that there's at most one per user presence = list({p.user_id: p for p in presence}.values()) - presence = sync_config.filter_collection.filter_presence( - presence - ) + presence = sync_config.filter_collection.filter_presence(presence) sync_result_builder.presence = presence @@ -1253,8 +1265,8 @@ class SyncHandler(object): """ user_id = sync_result_builder.sync_config.user.to_string() block_all_room_ephemeral = ( - sync_result_builder.since_token is None and - sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral() + sync_result_builder.since_token is None + and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral() ) if block_all_room_ephemeral: @@ -1275,15 +1287,14 @@ class SyncHandler(object): have_changed = yield self._have_rooms_changed(sync_result_builder) if not have_changed: tags_by_room = yield self.store.get_updated_tags( - user_id, - since_token.account_data_key, + user_id, since_token.account_data_key ) if not tags_by_room: logger.debug("no-oping sync") defer.returnValue(([], [], [], [])) ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( - "m.ignored_user_list", user_id=user_id, + "m.ignored_user_list", user_id=user_id ) if ignored_account_data: @@ -1296,7 +1307,7 @@ class SyncHandler(object): room_entries, invited, newly_joined_rooms, newly_left_rooms = res tags_by_room = yield self.store.get_updated_tags( - user_id, since_token.account_data_key, + user_id, since_token.account_data_key ) else: res = yield self._get_all_rooms(sync_result_builder, ignored_users) @@ -1331,8 +1342,8 @@ class SyncHandler(object): for event in it: if event.type == EventTypes.Member: if ( - event.membership == Membership.JOIN or - event.membership == Membership.INVITE + event.membership == Membership.JOIN + or event.membership == Membership.INVITE ): newly_joined_or_invited_users.add(event.state_key) else: @@ -1343,12 +1354,14 @@ class SyncHandler(object): newly_left_users -= newly_joined_or_invited_users - defer.returnValue(( - newly_joined_rooms, - newly_joined_or_invited_users, - newly_left_rooms, - newly_left_users, - )) + defer.returnValue( + ( + newly_joined_rooms, + newly_joined_or_invited_users, + newly_left_rooms, + newly_left_users, + ) + ) @defer.inlineCallbacks def _have_rooms_changed(self, sync_result_builder): @@ -1454,7 +1467,9 @@ class SyncHandler(object): prev_membership = old_mem_ev.membership issue4422_logger.debug( "Previous membership for room %s with join: %s (event %s)", - room_id, prev_membership, old_mem_ev_id, + room_id, + prev_membership, + old_mem_ev_id, ) if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: @@ -1476,8 +1491,7 @@ class SyncHandler(object): if not old_state_ids: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get( - (EventTypes.Member, user_id), - None, + (EventTypes.Member, user_id), None ) old_mem_ev = None if old_mem_ev_id: @@ -1498,7 +1512,8 @@ class SyncHandler(object): # Always include leave/ban events. Just take the last one. # TODO: How do we handle ban -> leave in same batch? leave_events = [ - e for e in non_joins + e + for e in non_joins if e.membership in (Membership.LEAVE, Membership.BAN) ] @@ -1526,15 +1541,17 @@ class SyncHandler(object): else: batch_events = None - room_entries.append(RoomSyncResultBuilder( - room_id=room_id, - rtype="archived", - events=batch_events, - newly_joined=room_id in newly_joined_rooms, - full_state=False, - since_token=since_token, - upto_token=leave_token, - )) + room_entries.append( + RoomSyncResultBuilder( + room_id=room_id, + rtype="archived", + events=batch_events, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=leave_token, + ) + ) timeline_limit = sync_config.filter_collection.timeline_limit() @@ -1581,7 +1598,8 @@ class SyncHandler(object): # debugging for https://github.com/matrix-org/synapse/issues/4422 issue4422_logger.debug( "RoomSyncResultBuilder events for newly joined room %s: %r", - room_id, entry.events, + room_id, + entry.events, ) room_entries.append(entry) @@ -1606,12 +1624,14 @@ class SyncHandler(object): sync_config = sync_result_builder.sync_config membership_list = ( - Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN + Membership.INVITE, + Membership.JOIN, + Membership.LEAVE, + Membership.BAN, ) room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=user_id, - membership_list=membership_list + user_id=user_id, membership_list=membership_list ) room_entries = [] @@ -1619,23 +1639,22 @@ class SyncHandler(object): for event in room_list: if event.membership == Membership.JOIN: - room_entries.append(RoomSyncResultBuilder( - room_id=event.room_id, - rtype="joined", - events=None, - newly_joined=False, - full_state=True, - since_token=since_token, - upto_token=now_token, - )) + room_entries.append( + RoomSyncResultBuilder( + room_id=event.room_id, + rtype="joined", + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=now_token, + ) + ) elif event.membership == Membership.INVITE: if event.sender in ignored_users: continue invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) + invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite)) elif event.membership in (Membership.LEAVE, Membership.BAN): # Always send down rooms we were banned or kicked from. if not sync_config.filter_collection.include_leave: @@ -1646,22 +1665,31 @@ class SyncHandler(object): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_entries.append(RoomSyncResultBuilder( - room_id=event.room_id, - rtype="archived", - events=None, - newly_joined=False, - full_state=True, - since_token=since_token, - upto_token=leave_token, - )) + room_entries.append( + RoomSyncResultBuilder( + room_id=event.room_id, + rtype="archived", + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=leave_token, + ) + ) defer.returnValue((room_entries, invited, [])) @defer.inlineCallbacks - def _generate_room_entry(self, sync_result_builder, ignored_users, - room_builder, ephemeral, tags, account_data, - always_include=False): + def _generate_room_entry( + self, + sync_result_builder, + ignored_users, + room_builder, + ephemeral, + tags, + account_data, + always_include=False, + ): """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. @@ -1678,9 +1706,7 @@ class SyncHandler(object): """ newly_joined = room_builder.newly_joined full_state = ( - room_builder.full_state - or newly_joined - or sync_result_builder.full_state + room_builder.full_state or newly_joined or sync_result_builder.full_state ) events = room_builder.events @@ -1697,7 +1723,8 @@ class SyncHandler(object): upto_token = room_builder.upto_token batch = yield self._load_filtered_recents( - room_id, sync_config, + room_id, + sync_config, now_token=upto_token, since_token=since_token, recents=events, @@ -1708,7 +1735,8 @@ class SyncHandler(object): # debug for https://github.com/matrix-org/synapse/issues/4422 issue4422_logger.debug( "Timeline events after filtering in newly-joined room %s: %r", - room_id, batch, + room_id, + batch, ) # When we join the room (or the client requests full_state), we should @@ -1726,16 +1754,10 @@ class SyncHandler(object): account_data_events = [] if tags is not None: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) + account_data_events.append({"type": "m.tag", "content": {"tags": tags}}) for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) + account_data_events.append({"type": account_data_type, "content": content}) account_data_events = sync_config.filter_collection.filter_room_account_data( account_data_events @@ -1743,16 +1765,13 @@ class SyncHandler(object): ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) - if not (always_include - or batch - or account_data_events - or ephemeral - or full_state): + if not ( + always_include or batch or account_data_events or ephemeral or full_state + ): return state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, - full_state=full_state + room_id, batch, sync_config, since_token, now_token, full_state=full_state ) summary = {} @@ -1760,22 +1779,19 @@ class SyncHandler(object): # we include a summary in room responses when we're lazy loading # members (as the client otherwise doesn't have enough info to form # the name itself). - if ( - sync_config.filter_collection.lazy_load_members() and - ( - # we recalulate the summary: - # if there are membership changes in the timeline, or - # if membership has changed during a gappy sync, or - # if this is an initial sync. - any(ev.type == EventTypes.Member for ev in batch.events) or - ( - # XXX: this may include false positives in the form of LL - # members which have snuck into state - batch.limited and - any(t == EventTypes.Member for (t, k) in state) - ) or - since_token is None + if sync_config.filter_collection.lazy_load_members() and ( + # we recalulate the summary: + # if there are membership changes in the timeline, or + # if membership has changed during a gappy sync, or + # if this is an initial sync. + any(ev.type == EventTypes.Member for ev in batch.events) + or ( + # XXX: this may include false positives in the form of LL + # members which have snuck into state + batch.limited + and any(t == EventTypes.Member for (t, k) in state) ) + or since_token is None ): summary = yield self.compute_summary( room_id, sync_config, batch, state, now_token @@ -1794,9 +1810,7 @@ class SyncHandler(object): ) if room_sync or always_include: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config - ) + notifs = yield self.unread_notifs_for_room_id(room_id, sync_config) if notifs is not None: unread_notifications["notification_count"] = notifs["notify_count"] @@ -1807,11 +1821,8 @@ class SyncHandler(object): if batch.limited and since_token: user_id = sync_result_builder.sync_config.user.to_string() logger.info( - "Incremental gappy sync of %s for user %s with %d state events" % ( - room_id, - user_id, - len(state), - ) + "Incremental gappy sync of %s for user %s with %d state events" + % (room_id, user_id, len(state)) ) elif room_builder.rtype == "archived": room_sync = ArchivedSyncResult( @@ -1841,9 +1852,7 @@ class SyncHandler(object): Deferred[frozenset[str]]: Set of room_ids the user is in at given stream_ordering. """ - joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering( - user_id, - ) + joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(user_id) joined_room_ids = set() @@ -1862,11 +1871,9 @@ class SyncHandler(object): logger.info("User joined room after current token: %s", room_id) extrems = yield self.store.get_forward_extremeties_for_room( - room_id, stream_ordering, - ) - users_in_room = yield self.state.get_current_users_in_room( - room_id, extrems, + room_id, stream_ordering ) + users_in_room = yield self.state.get_current_users_in_room(room_id, extrems) if user_id in users_in_room: joined_room_ids.add(room_id) @@ -1886,7 +1893,7 @@ def _action_has_highlight(actions): def _calculate_state( - timeline_contains, timeline_start, previous, current, lazy_load_members, + timeline_contains, timeline_start, previous, current, lazy_load_members ): """Works out what state to include in a sync response. @@ -1930,15 +1937,12 @@ def _calculate_state( if lazy_load_members: p_ids.difference_update( - e for t, e in iteritems(timeline_start) - if t[0] == EventTypes.Member + e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member ) state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids - return { - event_id_to_key[e]: e for e in state_ids - } + return {event_id_to_key[e]: e for e in state_ids} class SyncResultBuilder(object): @@ -1961,8 +1965,10 @@ class SyncResultBuilder(object): groups (GroupsSyncResult|None) to_device (list) """ - def __init__(self, sync_config, full_state, since_token, now_token, - joined_room_ids): + + def __init__( + self, sync_config, full_state, since_token, now_token, joined_room_ids + ): """ Args: sync_config (SyncConfig) @@ -1991,8 +1997,10 @@ class RoomSyncResultBuilder(object): """Stores information needed to create either a `JoinedSyncResult` or `ArchivedSyncResult`. """ - def __init__(self, room_id, rtype, events, newly_joined, full_state, - since_token, upto_token): + + def __init__( + self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token + ): """ Args: room_id(str) |