From b5605dfecc1f277e03165b374bd6ce81638ccb36 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 May 2016 17:37:01 +0100 Subject: Refactor SyncHandler --- synapse/handlers/sync.py | 984 +++++++++++++++++++++++------------------------ 1 file changed, 484 insertions(+), 500 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9ebfccc8bf..80eccf19ae 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -194,157 +194,7 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - if since_token is None or full_state: - return self.full_state_sync(sync_config, since_token) - else: - return self.incremental_sync_with_gap(sync_config, since_token) - - @defer.inlineCallbacks - def full_state_sync(self, sync_config, timeline_since_token): - """Get a sync for a client which is starting without any state. - - If a 'message_since_token' is given, only timeline events which have - happened since that token will be returned. - - Returns: - A Deferred SyncResult. - """ - now_token = yield self.event_sources.get_current_token() - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) - - presence_stream = self.event_sources.sources["presence"] - # TODO (mjark): This looks wrong, shouldn't we be getting the presence - # UP to the present rather than after the present? - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( - user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None - ) - - membership_list = ( - Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN - ) - - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=membership_list - ) - - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user( - sync_config.user.to_string() - ) - ) - - account_data['m.push_rules'] = yield self.push_rules_for_user( - sync_config.user - ) - - tags_by_room = yield self.store.get_tags_for_user( - sync_config.user.to_string() - ) - - ignored_users = account_data.get( - "m.ignored_user_list", {} - ).get("ignored_users", {}).keys() - - joined = [] - invited = [] - archived = [] - - user_id = sync_config.user.to_string() - - @defer.inlineCallbacks - def _generate_room_entry(event): - if event.membership == Membership.JOIN: - room_result = yield self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - joined.append(room_result) - elif event.membership == Membership.INVITE: - if event.sender in ignored_users: - return - invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) - elif event.membership in (Membership.LEAVE, Membership.BAN): - # Always send down rooms we were banned or kicked from. - if not sync_config.filter_collection.include_leave: - if event.membership == Membership.LEAVE: - if user_id == event.sender: - return - - leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) - ) - room_result = yield self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - archived.append(room_result) - - yield concurrently_execute(_generate_room_entry, room_list, 10) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=now_token, - )) - - @defer.inlineCallbacks - def full_state_sync_for_joined_room(self, room_id, sync_config, - now_token, timeline_since_token, - ephemeral_by_room, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred JoinedSyncResult. - """ - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token=timeline_since_token - ) - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, - now_token=now_token, - since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=True, - ) - - defer.returnValue(room_sync) + return self.generate_sync_result(sync_config, since_token, full_state) @defer.inlineCallbacks def push_rules_for_user(self, user): @@ -365,24 +215,6 @@ class SyncHandler(object): return account_data_events - def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): - account_data_events = [] - tags = tags_by_room.get(room_id) - if tags is not None: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) - - account_data = account_data_by_room.get(room_id, {}) - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): """Get the ephemeral events for each room the user is in @@ -445,237 +277,6 @@ class SyncHandler(object): defer.returnValue((now_token, ephemeral_by_room)) - def full_state_sync_for_archived_room(self, room_id, sync_config, - leave_event_id, leave_token, - timeline_since_token, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred ArchivedSyncResult. - """ - - return self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, - account_data_by_room, full_state=True, leave_token=leave_token, - ) - - @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, since_token): - """ Get the incremental delta needed to bring the client up to - date with the server. - Returns: - A Deferred SyncResult. - """ - now_token = yield self.event_sources.get_current_token() - - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] - - presence_source = self.event_sources.sources["presence"] - presence, presence_key = yield presence_source.get_new_events( - user=sync_config.user, - from_key=since_token.presence_key, - limit=sync_config.filter_collection.presence_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("presence_key", presence_key) - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token - ) - - app_service = yield self.store.get_app_service_by_user_id( - sync_config.user.to_string() - ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) - else: - rooms = yield self.store.get_rooms_for_user( - sync_config.user.to_string() - ) - joined_room_ids = set(r.room_id for r in rooms) - - user_id = sync_config.user.to_string() - - timeline_limit = sync_config.filter_collection.timeline_limit() - - tags_by_room = yield self.store.get_updated_tags( - user_id, - since_token.account_data_key, - ) - - account_data, account_data_by_room = ( - yield self.store.get_updated_account_data_for_user( - user_id, - since_token.account_data_key, - ) - ) - - push_rules_changed = yield self.store.have_push_rules_changed_for_user( - user_id, int(since_token.push_rules_key) - ) - - if push_rules_changed: - account_data["m.push_rules"] = yield self.push_rules_for_user( - sync_config.user - ) - - ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( - "m.ignored_user_list", user_id=user_id, - ) - - if ignored_account_data: - ignored_users = ignored_account_data.get("ignored_users", {}).keys() - else: - ignored_users = frozenset() - - # Get a list of membership change events that have happened. - rooms_changed = yield self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key - ) - - mem_change_events_by_room_id = {} - for event in rooms_changed: - mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - - newly_joined_rooms = [] - archived = [] - invited = [] - for room_id, events in mem_change_events_by_room_id.items(): - non_joins = [e for e in events if e.membership != Membership.JOIN] - has_join = len(non_joins) != len(events) - - # We want to figure out if we joined the room at some point since - # the last sync (even if we have since left). This is to make sure - # we do send down the room, and with full state, where necessary - if room_id in joined_room_ids or has_join: - old_state = yield self.get_state_at(room_id, since_token) - old_mem_ev = old_state.get((EventTypes.Member, user_id), None) - if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: - newly_joined_rooms.append(room_id) - - if room_id in joined_room_ids: - continue - - if not non_joins: - continue - - # Only bother if we're still currently invited - should_invite = non_joins[-1].membership == Membership.INVITE - if should_invite: - if event.sender not in ignored_users: - room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) - if room_sync: - invited.append(room_sync) - - # Always include leave/ban events. Just take the last one. - # TODO: How do we handle ban -> leave in same batch? - leave_events = [ - e for e in non_joins - if e.membership in (Membership.LEAVE, Membership.BAN) - ] - - if leave_events: - leave_event = leave_events[-1] - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event.event_id, since_token, - tags_by_room, account_data_by_room, - full_state=room_id in newly_joined_rooms - ) - if room_sync: - archived.append(room_sync) - - # Get all events for rooms we're currently joined to. - room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) - - joined = [] - # We loop through all room ids, even if there are no new events, in case - # there are non room events taht we need to notify about. - for room_id in joined_room_ids: - room_entry = room_to_events.get(room_id, None) - - if room_entry: - events, start_key = room_entry - - prev_batch_token = now_token.copy_and_replace("room_key", start_key) - - newly_joined_room = room_id in newly_joined_rooms - full_state = newly_joined_room - - batch = yield self.load_filtered_recents( - room_id, sync_config, prev_batch_token, - since_token=since_token, - recents=events, - newly_joined_room=newly_joined_room, - ) - else: - batch = TimelineBatch( - events=[], - prev_batch=since_token, - limited=False, - ) - full_state = False - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id=room_id, - sync_config=sync_config, - since_token=since_token, - now_token=now_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=full_state, - ) - if room_sync: - joined.append(room_sync) - - # For each newly joined room, we want to send down presence of - # existing users. - presence_handler = self.presence_handler - extra_presence_users = set() - for room_id in newly_joined_rooms: - users = yield self.store.get_users_in_room(event.room_id) - extra_presence_users.update(users) - - # For each new member, send down presence. - for joined_sync in joined: - it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - extra_presence_users.add(event.state_key) - - states = yield presence_handler.get_states( - [u for u in extra_presence_users if u != user_id], - as_event=True, - ) - presence.extend(states) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=now_token, - )) - @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None, recents=None, newly_joined_room=False): @@ -696,6 +297,10 @@ class SyncHandler(object): else: limited = False + if since_token: + if not now_token.is_after(since_token): + limited = False + if recents is not None: recents = sync_config.filter_collection.filter_room_timeline(recents) recents = yield filter_events_for_client( @@ -749,106 +354,9 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def incremental_sync_with_gap_for_room(self, room_id, sync_config, - since_token, now_token, - ephemeral_by_room, tags_by_room, - account_data_by_room, - batch, full_state=False): - state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) - - unread_notifications = {} - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - ephemeral=ephemeral, - account_data=account_data, - unread_notifications=unread_notifications, - ) - - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config - ) - - if notifs is not None: - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - - @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, - since_token, tags_by_room, - account_data_by_room, full_state, - leave_token=None): - """ Get the incremental delta needed to bring the client up to date for - the archived room. - Returns: - A Deferred ArchivedSyncResult - """ - - if not leave_token: - stream_token = yield self.store.get_stream_token_for_event( - leave_event_id - ) - - leave_token = since_token.copy_and_replace("room_key", stream_token) - - if since_token and since_token.is_after(leave_token): - defer.returnValue(None) - - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token, - ) - - logger.debug("Recents %r", batch) - - state_events_delta = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, leave_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - room_sync = ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=state_events_delta, - account_data=account_data, - ) - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - - @defer.inlineCallbacks - def get_state_after_event(self, event): - """ - Get the room state after the given event + def get_state_after_event(self, event): + """ + Get the room state after the given event Args: event(synapse.events.EventBase): event of interest @@ -1010,6 +518,457 @@ class SyncHandler(object): # count is whatever it was last time. defer.returnValue(None) + @defer.inlineCallbacks + def generate_sync_result(self, sync_config, since_token=None, full_state=False): + now_token = yield self.event_sources.get_current_token() + + sync_result_builer = SyncResultBuilder( + sync_config, full_state, + since_token=since_token, + now_token=now_token, + ) + + account_data_by_room = yield self.generate_sync_entry_for_account_data( + sync_result_builer + ) + + newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms( + sync_result_builer, account_data_by_room + ) + + yield self.generate_sync_entry_for_presence( + sync_result_builer, newly_joined_rooms, newly_joined_users + ) + + defer.returnValue(SyncResult( + presence=sync_result_builer.presence, + account_data=sync_result_builer.account_data, + joined=sync_result_builer.joined, + invited=sync_result_builer.invited, + archived=sync_result_builer.archived, + next_batch=sync_result_builer.now_token, + )) + + @defer.inlineCallbacks + def generate_sync_entry_for_account_data(self, sync_result_builer): + sync_config = sync_result_builer.sync_config + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + + if since_token and not sync_result_builer.full_state: + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + user_id, + since_token.account_data_key, + ) + ) + + push_rules_changed = yield self.store.have_push_rules_changed_for_user( + user_id, int(since_token.push_rules_key) + ) + + if push_rules_changed: + account_data["m.push_rules"] = yield self.push_rules_for_user( + sync_config.user + ) + else: + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + + account_data['m.push_rules'] = yield self.push_rules_for_user( + sync_config.user + ) + + account_data_for_user = sync_config.filter_collection.filter_account_data( + self.account_data_for_user(account_data) + ) + + sync_result_builer.account_data = account_data_for_user + + defer.returnValue(account_data_by_room) + + @defer.inlineCallbacks + def generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + newly_joined_users): + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + user = sync_result_builer.sync_config.user + + presence_source = self.event_sources.sources["presence"] + + since_token = sync_result_builer.since_token + if since_token and not sync_result_builer.full_state: + presence_key = since_token.presence_key + else: + presence_key = None + + presence, presence_key = yield presence_source.get_new_events( + user=user, + from_key=presence_key, + is_guest=sync_config.is_guest, + ) + sync_result_builer.now_token = now_token.copy_and_replace( + "presence_key", presence_key + ) + + extra_users_ids = set(newly_joined_users) + for room_id in newly_joined_rooms: + users = yield self.store.get_users_in_room(room_id) + extra_users_ids.update(users) + extra_users_ids.discard(user.to_string()) + + states = yield self.presence_handler.get_states( + extra_users_ids, + as_event=True, + ) + presence.extend(states) + + presence = sync_config.filter_collection.filter_presence( + presence + ) + + sync_result_builer.presence = presence + + @defer.inlineCallbacks + def generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + user_id = sync_result_builer.sync_config.user.to_string() + + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_result_builer.sync_config, sync_result_builer.now_token + ) + sync_result_builer.now_token = now_token + + ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( + "m.ignored_user_list", user_id=user_id, + ) + + if ignored_account_data: + ignored_users = ignored_account_data.get("ignored_users", {}).keys() + else: + ignored_users = frozenset() + + if sync_result_builer.since_token: + res = yield self._get_rooms_changed(sync_result_builer, ignored_users) + joined, invited, archived, newly_joined_rooms = res + + tags_by_room = yield self.store.get_updated_tags( + user_id, + sync_result_builer.since_token.account_data_key, + ) + else: + res = yield self._get_all_rooms(sync_result_builer, ignored_users) + joined, invited, archived, newly_joined_rooms = res + + tags_by_room = yield self.store.get_tags_for_user(user_id) + + for room_entry in joined: + yield self._generate_room_entry( + "joined", + sync_result_builer, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builer.full_state, + ) + for room_entry in archived: + yield self._generate_room_entry( + "archived", + sync_result_builer, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builer.full_state, + ) + + sync_result_builer.invited.extend(invited) + + # Now we want to get any newly joined users + newly_joined_users = set() + for joined_sync in sync_result_builer.joined: + it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) + + defer.returnValue((newly_joined_rooms, newly_joined_users)) + + @defer.inlineCallbacks + def _get_rooms_changed(self, sync_result_builer, ignored_users): + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + assert since_token + + app_service = yield self.store.get_app_service_by_user_id(user_id) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + joined_room_ids = set(r.room_id for r in rooms) + else: + rooms = yield self.store.get_rooms_for_user(user_id) + joined_room_ids = set(r.room_id for r in rooms) + + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) + + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + + newly_joined_rooms = [] + archived = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue + + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + if event.sender not in ignored_users: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + leave_stream_token = yield self.store.get_stream_token_for_event( + leave_event.event_id + ) + leave_token = since_token.copy_and_replace( + "room_key", leave_stream_token + ) + + if since_token and since_token.is_after(leave_token): + continue + + archived.append(RoomSyncResultBuilder( + room_id=room_id, + events=None, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=leave_token, + )) + + timeline_limit = sync_config.filter_collection.timeline_limit() + + # Get all events for rooms we're currently joined to. + room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_ids=joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) + + joined = [] + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) + + if room_entry: + events, start_key = room_entry + + prev_batch_token = now_token.copy_and_replace("room_key", start_key) + + joined.append(RoomSyncResultBuilder( + room_id=room_id, + events=events, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=None if room_id in newly_joined_rooms else since_token, + upto_token=prev_batch_token, + )) + else: + joined.append(RoomSyncResultBuilder( + room_id=room_id, + events=[], + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=since_token, + )) + + defer.returnValue((joined, invited, archived, newly_joined_rooms)) + + @defer.inlineCallbacks + def _get_all_rooms(self, sync_result_builer, ignored_users): + user_id = sync_result_builer.sync_config.user.to_string() + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + membership_list = ( + Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN + ) + + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user_id, + membership_list=membership_list + ) + + joined = [] + invited = [] + archived = [] + + for event in room_list: + if event.membership == Membership.JOIN: + joined.append(RoomSyncResultBuilder( + room_id=event.room_id, + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=now_token, + )) + elif event.membership == Membership.INVITE: + if event.sender in ignored_users: + continue + invite = yield self.store.get_event(event.event_id) + invited.append(InvitedSyncResult( + room_id=event.room_id, + invite=invite, + )) + elif event.membership in (Membership.LEAVE, Membership.BAN): + # Always send down rooms we were banned or kicked from. + if not sync_config.filter_collection.include_leave: + if event.membership == Membership.LEAVE: + if user_id == event.sender: + continue + + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + archived.append(RoomSyncResultBuilder( + room_id=event.room_id, + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=leave_token, + )) + + defer.returnValue((joined, invited, archived, [])) + + @defer.inlineCallbacks + def _generate_room_entry(self, room_type, sync_result_builer, ignored_users, + room_builder, ephemeral, tags, account_data, + always_include=False): + since_token = sync_result_builer.since_token + now_token = sync_result_builer.now_token + sync_config = sync_result_builer.sync_config + + room_id = room_builder.room_id + events = room_builder.events + newly_joined = room_builder.newly_joined + full_state = ( + room_builder.full_state + or newly_joined + or sync_result_builer.full_state + ) + since_token = room_builder.since_token + upto_token = room_builder.upto_token + + batch = yield self.load_filtered_recents( + room_id, sync_config, + now_token=upto_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined, # FIXME + ) + + account_data_events = [] + if tags is not None: + account_data_events.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data_events + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + + if not (always_include or batch or account_data or ephemeral or full_state): + return + + state = yield self.compute_state_delta( + room_id, batch, sync_config, since_token, now_token, + full_state=full_state + ) + + if room_type == "joined": + unread_notifications = {} + room_sync = JoinedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + ephemeral=ephemeral, + account_data=account_data_events, + unread_notifications=unread_notifications, + ) + + if room_sync or always_include: + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config + ) + + if notifs is not None: + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] + + sync_result_builer.joined.append(room_sync) + elif room_type == "archived": + room_sync = ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + account_data=account_data, + ) + if room_sync or always_include: + sync_result_builer.archived.append(room_sync) + def _action_has_highlight(actions): for action in actions: @@ -1057,3 +1016,28 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): (e.type, e.state_key): e for e in evs } + + +class SyncResultBuilder(object): + def __init__(self, sync_config, full_state, since_token, now_token): + self.sync_config = sync_config + self.full_state = full_state + self.since_token = since_token + self.now_token = now_token + + self.presence = [] + self.account_data = [] + self.joined = [] + self.invited = [] + self.archived = [] + + +class RoomSyncResultBuilder(object): + def __init__(self, room_id, events, newly_joined, full_state, since_token, + upto_token): + self.room_id = room_id + self.events = events + self.newly_joined = newly_joined + self.full_state = full_state + self.since_token = since_token + self.upto_token = upto_token -- cgit 1.4.1 From c0c79ef444ca0f21e8324abc8a813026aaf6cf17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 May 2016 18:21:27 +0100 Subject: Add back concurrently_execute --- synapse/handlers/sync.py | 34 +++++++++------------------------- 1 file changed, 9 insertions(+), 25 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 80eccf19ae..bc6d6af133 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute from synapse.util.logcontext import LoggingContext @@ -478,26 +477,6 @@ class SyncHandler(object): for e in sync_config.filter_collection.filter_room_state(state.values()) }) - def check_joined_room(self, sync_config, state_delta): - """ - Check if the user has just joined the given room (so should - be given the full state) - - Args: - sync_config(synapse.handlers.sync.SyncConfig): - state_delta(dict[(str,str), synapse.events.FrozenEvent]): the - difference in state since the last sync - - Returns: - A deferred Tuple (state_delta, limited) - """ - join_event = state_delta.get(( - EventTypes.Member, sync_config.user.to_string()), None) - if join_event is not None: - if join_event.content["membership"] == Membership.JOIN: - return True - return False - @defer.inlineCallbacks def unread_notifs_for_room_id(self, room_id, sync_config): with Measure(self.clock, "unread_notifs_for_room_id"): @@ -664,8 +643,8 @@ class SyncHandler(object): tags_by_room = yield self.store.get_tags_for_user(user_id) - for room_entry in joined: - yield self._generate_room_entry( + def handle_joined(room_entry): + return self._generate_room_entry( "joined", sync_result_builer, ignored_users, @@ -675,8 +654,11 @@ class SyncHandler(object): account_data=account_data_by_room.get(room_entry.room_id, {}), always_include=sync_result_builer.full_state, ) - for room_entry in archived: - yield self._generate_room_entry( + + yield concurrently_execute(handle_joined, joined, 10) + + def handle_archived(room_entry): + return self._generate_room_entry( "archived", sync_result_builer, ignored_users, @@ -687,6 +669,8 @@ class SyncHandler(object): always_include=sync_result_builer.full_state, ) + yield concurrently_execute(handle_archived, archived, 10) + sync_result_builer.invited.extend(invited) # Now we want to get any newly joined users -- cgit 1.4.1 From 137e6a45577d5850ef6936670791af12c2fe74d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 09:43:35 +0100 Subject: Shuffle things room --- synapse/handlers/sync.py | 70 +++++++++++++++++++++++------------------------- 1 file changed, 33 insertions(+), 37 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bc6d6af133..6b7c6a436e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -499,6 +499,10 @@ class SyncHandler(object): @defer.inlineCallbacks def generate_sync_result(self, sync_config, since_token=None, full_state=False): + # NB: The now_token gets changed by some of the generate_sync_* methods, + # this is due to some of the underlying streams not supporting the ability + # to query up to a given point. + # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() sync_result_builer = SyncResultBuilder( @@ -511,9 +515,10 @@ class SyncHandler(object): sync_result_builer ) - newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms( + res = yield self.generate_sync_entry_for_rooms( sync_result_builer, account_data_by_room ) + newly_joined_rooms, newly_joined_users = res yield self.generate_sync_entry_for_presence( sync_result_builer, newly_joined_rooms, newly_joined_users @@ -631,7 +636,7 @@ class SyncHandler(object): if sync_result_builer.since_token: res = yield self._get_rooms_changed(sync_result_builer, ignored_users) - joined, invited, archived, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, @@ -639,13 +644,12 @@ class SyncHandler(object): ) else: res = yield self._get_all_rooms(sync_result_builer, ignored_users) - joined, invited, archived, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_tags_for_user(user_id) - def handle_joined(room_entry): + def handle_room_entries(room_entry): return self._generate_room_entry( - "joined", sync_result_builer, ignored_users, room_entry, @@ -655,21 +659,7 @@ class SyncHandler(object): always_include=sync_result_builer.full_state, ) - yield concurrently_execute(handle_joined, joined, 10) - - def handle_archived(room_entry): - return self._generate_room_entry( - "archived", - sync_result_builer, - ignored_users, - room_entry, - ephemeral=ephemeral_by_room.get(room_entry.room_id, []), - tags=tags_by_room.get(room_entry.room_id), - account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builer.full_state, - ) - - yield concurrently_execute(handle_archived, archived, 10) + yield concurrently_execute(handle_room_entries, room_entries, 10) sync_result_builer.invited.extend(invited) @@ -711,7 +701,7 @@ class SyncHandler(object): mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) newly_joined_rooms = [] - archived = [] + room_entries = [] invited = [] for room_id, events in mem_change_events_by_room_id.items(): non_joins = [e for e in events if e.membership != Membership.JOIN] @@ -759,8 +749,9 @@ class SyncHandler(object): if since_token and since_token.is_after(leave_token): continue - archived.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="archived", events=None, newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -778,7 +769,6 @@ class SyncHandler(object): limit=timeline_limit + 1, ) - joined = [] # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. for room_id in joined_room_ids: @@ -789,8 +779,9 @@ class SyncHandler(object): prev_batch_token = now_token.copy_and_replace("room_key", start_key) - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="joined", events=events, newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -798,8 +789,9 @@ class SyncHandler(object): upto_token=prev_batch_token, )) else: - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=room_id, + rtype="joined", events=[], newly_joined=room_id in newly_joined_rooms, full_state=False, @@ -807,7 +799,7 @@ class SyncHandler(object): upto_token=since_token, )) - defer.returnValue((joined, invited, archived, newly_joined_rooms)) + defer.returnValue((room_entries, invited, newly_joined_rooms)) @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builer, ignored_users): @@ -825,14 +817,14 @@ class SyncHandler(object): membership_list=membership_list ) - joined = [] + room_entries = [] invited = [] - archived = [] for event in room_list: if event.membership == Membership.JOIN: - joined.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=event.room_id, + rtype="joined", events=None, newly_joined=False, full_state=True, @@ -857,8 +849,9 @@ class SyncHandler(object): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - archived.append(RoomSyncResultBuilder( + room_entries.append(RoomSyncResultBuilder( room_id=event.room_id, + rtype="archived", events=None, newly_joined=False, full_state=True, @@ -866,10 +859,10 @@ class SyncHandler(object): upto_token=leave_token, )) - defer.returnValue((joined, invited, archived, [])) + defer.returnValue((room_entries, invited, [])) @defer.inlineCallbacks - def _generate_room_entry(self, room_type, sync_result_builer, ignored_users, + def _generate_room_entry(self, sync_result_builer, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): since_token = sync_result_builer.since_token @@ -892,7 +885,7 @@ class SyncHandler(object): now_token=upto_token, since_token=since_token, recents=events, - newly_joined_room=newly_joined, # FIXME + newly_joined_room=newly_joined, ) account_data_events = [] @@ -922,7 +915,7 @@ class SyncHandler(object): full_state=full_state ) - if room_type == "joined": + if room_builder.rtype == "joined": unread_notifications = {} room_sync = JoinedSyncResult( room_id=room_id, @@ -943,7 +936,7 @@ class SyncHandler(object): unread_notifications["highlight_count"] = notifs["highlight_count"] sync_result_builer.joined.append(room_sync) - elif room_type == "archived": + elif room_builder.rtype == "archived": room_sync = ArchivedSyncResult( room_id=room_id, timeline=batch, @@ -952,6 +945,8 @@ class SyncHandler(object): ) if room_sync or always_include: sync_result_builer.archived.append(room_sync) + else: + raise Exception("Unrecognized rtype: %r", room_builder.rtype) def _action_has_highlight(actions): @@ -1017,9 +1012,10 @@ class SyncResultBuilder(object): class RoomSyncResultBuilder(object): - def __init__(self, room_id, events, newly_joined, full_state, since_token, - upto_token): + def __init__(self, room_id, rtype, events, newly_joined, full_state, + since_token, upto_token): self.room_id = room_id + self.rtype = rtype self.events = events self.newly_joined = newly_joined self.full_state = full_state -- cgit 1.4.1 From 84f94e4cbbd8c79d867503159b564aaf233e46d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:14:53 +0100 Subject: Add comments --- synapse/handlers/sync.py | 112 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 7 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6b7c6a436e..271dd4c147 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -499,6 +499,17 @@ class SyncHandler(object): @defer.inlineCallbacks def generate_sync_result(self, sync_config, since_token=None, full_state=False): + """Generates a sync result. + + Args: + sync_config (SyncConfig) + since_token (StreamToken) + full_state (bool) + + Returns: + Deferred(SyncResult) + """ + # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. @@ -511,16 +522,16 @@ class SyncHandler(object): now_token=now_token, ) - account_data_by_room = yield self.generate_sync_entry_for_account_data( + account_data_by_room = yield self._generate_sync_entry_for_account_data( sync_result_builer ) - res = yield self.generate_sync_entry_for_rooms( + res = yield self._generate_sync_entry_for_rooms( sync_result_builer, account_data_by_room ) newly_joined_rooms, newly_joined_users = res - yield self.generate_sync_entry_for_presence( + yield self._generate_sync_entry_for_presence( sync_result_builer, newly_joined_rooms, newly_joined_users ) @@ -534,7 +545,16 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def generate_sync_entry_for_account_data(self, sync_result_builer): + def _generate_sync_entry_for_account_data(self, sync_result_builer): + """Generates the account data portion of the sync response. Populates + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + + Returns: + Deferred(dict): A dictionary containing the per room account data. + """ sync_config = sync_result_builer.sync_config user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token @@ -575,8 +595,18 @@ class SyncHandler(object): defer.returnValue(account_data_by_room) @defer.inlineCallbacks - def generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, - newly_joined_users): + def _generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + newly_joined_users): + """Generates the presence portion of the sync response. Populates the + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + newly_joined_rooms(list): List of rooms that the user has joined + since the last sync (or empty if an initial sync) + newly_joined_users(list): List of users that have joined rooms + since the last sync (or empty if an initial sync) + """ now_token = sync_result_builer.now_token sync_config = sync_result_builer.sync_config user = sync_result_builer.sync_config.user @@ -617,7 +647,18 @@ class SyncHandler(object): sync_result_builer.presence = presence @defer.inlineCallbacks - def generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + """Generates the rooms portion of the sync response. Populates the + `sync_result_builer` with the result. + + Args: + sync_result_builer(SyncResultBuilder) + account_data_by_room(dict): Dictionary of per room account data + + Returns: + Deferred(tuple): Returns a 2-tuple of + `(newly_joined_rooms, newly_joined_users)` + """ user_id = sync_result_builer.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( @@ -676,6 +717,16 @@ class SyncHandler(object): @defer.inlineCallbacks def _get_rooms_changed(self, sync_result_builer, ignored_users): + """Gets the the changes that have happened since the last sync. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` + """ user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token @@ -803,6 +854,17 @@ class SyncHandler(object): @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builer, ignored_users): + """Returns entries for all rooms for the user. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], [])` + """ + user_id = sync_result_builer.sync_config.user.to_string() since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token @@ -865,6 +927,20 @@ class SyncHandler(object): def _generate_room_entry(self, sync_result_builer, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): + """Populates the `joined` and `archived` section of `sync_result_builer` + based on the `room_builder`. + + Args: + sync_result_builer(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + room_builder(RoomSyncResultBuilder) + ephemeral(list): List of new ephemeral events for room + tags(list): List of *all* tags for room, or None if there has been + no change. + account_data(list): List of new account data for room + always_include(bool): Always include this room in the sync response, + even if empty. + """ since_token = sync_result_builer.since_token now_token = sync_result_builer.now_token sync_config = sync_result_builer.sync_config @@ -998,7 +1074,15 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): + "Used to help build up a new SyncResult for a user" def __init__(self, sync_config, full_state, since_token, now_token): + """ + Args: + sync_config(SyncConfig) + full_state(bool): The full_state flag as specified by user + since_token(StreamToken): The token supplied by user, or None. + now_token(StreamToken): The token to sync up to. + """ self.sync_config = sync_config self.full_state = full_state self.since_token = since_token @@ -1012,8 +1096,22 @@ class SyncResultBuilder(object): class RoomSyncResultBuilder(object): + """Stores information needed to create either a `JoinedSyncResult` or + `ArchivedSyncResult`. + """ def __init__(self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token): + """ + Args: + room_id(str) + rtype(str): One of `"joined"` or `"archived"` + events(list): List of events to include in the room, (more events + may be added when generating result). + newly_joined(bool): If the user has newly joined the room + full_state(bool): Whether the full state should be sent in result + since_token(StreamToken): Earliest point to return events from, or None + upto_token(StreamToken): Latest point to return events from. + """ self.room_id = room_id self.rtype = rtype self.events = events -- cgit 1.4.1 From 79bea8ab9a4c4cb4a0907784603698731424c00a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:22:24 +0100 Subject: Inline function. Make load_filtered_recents private --- synapse/handlers/sync.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 271dd4c147..3aa4432d68 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -203,17 +203,6 @@ class SyncHandler(object): rules = format_push_rules_for_user(user, rawrules, enabled_map) defer.returnValue(rules) - def account_data_for_user(self, account_data): - account_data_events = [] - - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): """Get the ephemeral events for each room the user is in @@ -277,8 +266,8 @@ class SyncHandler(object): defer.returnValue((now_token, ephemeral_by_room)) @defer.inlineCallbacks - def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None, recents=None, newly_joined_room=False): + def _load_filtered_recents(self, room_id, sync_config, now_token, + since_token=None, recents=None, newly_joined_room=False): """ Returns: a Deferred TimelineBatch @@ -586,9 +575,10 @@ class SyncHandler(object): sync_config.user ) - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) + account_data_for_user = sync_config.filter_collection.filter_account_data([ + {"type": account_data_type, "content": content} + for account_data_type, content in account_data.items() + ]) sync_result_builer.account_data = account_data_for_user @@ -956,7 +946,7 @@ class SyncHandler(object): since_token = room_builder.since_token upto_token = room_builder.upto_token - batch = yield self.load_filtered_recents( + batch = yield self._load_filtered_recents( room_id, sync_config, now_token=upto_token, since_token=since_token, -- cgit 1.4.1 From be2c67738640ff88fcf63701c9e3d82afc385e47 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 10:53:03 +0100 Subject: Spell builder correctly --- synapse/handlers/sync.py | 126 +++++++++++++++++++++++------------------------ 1 file changed, 63 insertions(+), 63 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3aa4432d68..12a4cc8b57 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -505,50 +505,50 @@ class SyncHandler(object): # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() - sync_result_builer = SyncResultBuilder( + sync_result_builder = SyncResultBuilder( sync_config, full_state, since_token=since_token, now_token=now_token, ) account_data_by_room = yield self._generate_sync_entry_for_account_data( - sync_result_builer + sync_result_builder ) res = yield self._generate_sync_entry_for_rooms( - sync_result_builer, account_data_by_room + sync_result_builder, account_data_by_room ) newly_joined_rooms, newly_joined_users = res yield self._generate_sync_entry_for_presence( - sync_result_builer, newly_joined_rooms, newly_joined_users + sync_result_builder, newly_joined_rooms, newly_joined_users ) defer.returnValue(SyncResult( - presence=sync_result_builer.presence, - account_data=sync_result_builer.account_data, - joined=sync_result_builer.joined, - invited=sync_result_builer.invited, - archived=sync_result_builer.archived, - next_batch=sync_result_builer.now_token, + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + archived=sync_result_builder.archived, + next_batch=sync_result_builder.now_token, )) @defer.inlineCallbacks - def _generate_sync_entry_for_account_data(self, sync_result_builer): + def _generate_sync_entry_for_account_data(self, sync_result_builder): """Generates the account data portion of the sync response. Populates - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) Returns: Deferred(dict): A dictionary containing the per room account data. """ - sync_config = sync_result_builer.sync_config - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token + sync_config = sync_result_builder.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token - if since_token and not sync_result_builer.full_state: + if since_token and not sync_result_builder.full_state: account_data, account_data_by_room = ( yield self.store.get_updated_account_data_for_user( user_id, @@ -580,31 +580,31 @@ class SyncHandler(object): for account_data_type, content in account_data.items() ]) - sync_result_builer.account_data = account_data_for_user + sync_result_builder.account_data = account_data_for_user defer.returnValue(account_data_by_room) @defer.inlineCallbacks - def _generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms, + def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms, newly_joined_users): """Generates the presence portion of the sync response. Populates the - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) newly_joined_rooms(list): List of rooms that the user has joined since the last sync (or empty if an initial sync) newly_joined_users(list): List of users that have joined rooms since the last sync (or empty if an initial sync) """ - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config - user = sync_result_builer.sync_config.user + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + user = sync_result_builder.sync_config.user presence_source = self.event_sources.sources["presence"] - since_token = sync_result_builer.since_token - if since_token and not sync_result_builer.full_state: + since_token = sync_result_builder.since_token + if since_token and not sync_result_builder.full_state: presence_key = since_token.presence_key else: presence_key = None @@ -614,7 +614,7 @@ class SyncHandler(object): from_key=presence_key, is_guest=sync_config.is_guest, ) - sync_result_builer.now_token = now_token.copy_and_replace( + sync_result_builder.now_token = now_token.copy_and_replace( "presence_key", presence_key ) @@ -634,27 +634,27 @@ class SyncHandler(object): presence ) - sync_result_builer.presence = presence + sync_result_builder.presence = presence @defer.inlineCallbacks - def _generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): """Generates the rooms portion of the sync response. Populates the - `sync_result_builer` with the result. + `sync_result_builder` with the result. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) account_data_by_room(dict): Dictionary of per room account data Returns: Deferred(tuple): Returns a 2-tuple of `(newly_joined_rooms, newly_joined_users)` """ - user_id = sync_result_builer.sync_config.user.to_string() + user_id = sync_result_builder.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builer.sync_config, sync_result_builer.now_token + sync_result_builder.sync_config, sync_result_builder.now_token ) - sync_result_builer.now_token = now_token + sync_result_builder.now_token = now_token ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, @@ -665,38 +665,38 @@ class SyncHandler(object): else: ignored_users = frozenset() - if sync_result_builer.since_token: - res = yield self._get_rooms_changed(sync_result_builer, ignored_users) + if sync_result_builder.since_token: + res = yield self._get_rooms_changed(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, - sync_result_builer.since_token.account_data_key, + sync_result_builder.since_token.account_data_key, ) else: - res = yield self._get_all_rooms(sync_result_builer, ignored_users) + res = yield self._get_all_rooms(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_tags_for_user(user_id) def handle_room_entries(room_entry): return self._generate_room_entry( - sync_result_builer, + sync_result_builder, ignored_users, room_entry, ephemeral=ephemeral_by_room.get(room_entry.room_id, []), tags=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builer.full_state, + always_include=sync_result_builder.full_state, ) yield concurrently_execute(handle_room_entries, room_entries, 10) - sync_result_builer.invited.extend(invited) + sync_result_builder.invited.extend(invited) # Now we want to get any newly joined users newly_joined_users = set() - for joined_sync in sync_result_builer.joined: + for joined_sync in sync_result_builder.joined: it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) for event in it: if event.type == EventTypes.Member: @@ -706,21 +706,21 @@ class SyncHandler(object): defer.returnValue((newly_joined_rooms, newly_joined_users)) @defer.inlineCallbacks - def _get_rooms_changed(self, sync_result_builer, ignored_users): + def _get_rooms_changed(self, sync_result_builder, ignored_users): """Gets the the changes that have happened since the last sync. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. Returns: Deferred(tuple): Returns a tuple of the form: `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` """ - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config assert since_token @@ -843,11 +843,11 @@ class SyncHandler(object): defer.returnValue((room_entries, invited, newly_joined_rooms)) @defer.inlineCallbacks - def _get_all_rooms(self, sync_result_builer, ignored_users): + def _get_all_rooms(self, sync_result_builder, ignored_users): """Returns entries for all rooms for the user. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. Returns: @@ -855,10 +855,10 @@ class SyncHandler(object): `([RoomSyncResultBuilder], [InvitedSyncResult], [])` """ - user_id = sync_result_builer.sync_config.user.to_string() - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config membership_list = ( Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN @@ -914,14 +914,14 @@ class SyncHandler(object): defer.returnValue((room_entries, invited, [])) @defer.inlineCallbacks - def _generate_room_entry(self, sync_result_builer, ignored_users, + def _generate_room_entry(self, sync_result_builder, ignored_users, room_builder, ephemeral, tags, account_data, always_include=False): - """Populates the `joined` and `archived` section of `sync_result_builer` + """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. Args: - sync_result_builer(SyncResultBuilder) + sync_result_builder(SyncResultBuilder) ignored_users(set(str)): Set of users ignored by user. room_builder(RoomSyncResultBuilder) ephemeral(list): List of new ephemeral events for room @@ -931,9 +931,9 @@ class SyncHandler(object): always_include(bool): Always include this room in the sync response, even if empty. """ - since_token = sync_result_builer.since_token - now_token = sync_result_builer.now_token - sync_config = sync_result_builer.sync_config + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config room_id = room_builder.room_id events = room_builder.events @@ -941,7 +941,7 @@ class SyncHandler(object): full_state = ( room_builder.full_state or newly_joined - or sync_result_builer.full_state + or sync_result_builder.full_state ) since_token = room_builder.since_token upto_token = room_builder.upto_token @@ -1001,7 +1001,7 @@ class SyncHandler(object): unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] - sync_result_builer.joined.append(room_sync) + sync_result_builder.joined.append(room_sync) elif room_builder.rtype == "archived": room_sync = ArchivedSyncResult( room_id=room_id, @@ -1010,7 +1010,7 @@ class SyncHandler(object): account_data=account_data, ) if room_sync or always_include: - sync_result_builer.archived.append(room_sync) + sync_result_builder.archived.append(room_sync) else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) -- cgit 1.4.1 From b08ad0389e0e412798f32d9f5656db483238173d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:04:35 +0100 Subject: Only include non-offline presence in initial sync --- synapse/handlers/sync.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 12a4cc8b57..8143171c11 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -606,13 +606,16 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and not sync_result_builder.full_state: presence_key = since_token.presence_key + include_offline = True else: presence_key = None + include_offline = False presence, presence_key = yield presence_source.get_new_events( user=user, from_key=presence_key, is_guest=sync_config.is_guest, + include_offline=include_offline, ) sync_result_builder.now_token = now_token.copy_and_replace( "presence_key", presence_key -- cgit 1.4.1 From 1c5ed2a19ba6da4478c54df072e623598c0ea60d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:21:34 +0100 Subject: Only work out newly_joined_users for incremental sync --- synapse/handlers/sync.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8143171c11..476dcf38e2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -699,12 +699,15 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() - for joined_sync in sync_result_builder.joined: - it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - newly_joined_users.add(event.state_key) + if sync_result_builder.since_token: + for joined_sync in sync_result_builder.joined: + it = itertools.chain( + joined_sync.timeline.events, joined_sync.state.values() + ) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) defer.returnValue((newly_joined_rooms, newly_joined_users)) -- cgit 1.4.1 From 69003039973169097592359f3f34fc32c5bbaeb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 11:44:55 +0100 Subject: Don't send down all ephemeral events --- synapse/handlers/sync.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 476dcf38e2..6f7dd45ef3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -655,7 +655,9 @@ class SyncHandler(object): user_id = sync_result_builder.sync_config.user.to_string() now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builder.sync_config, sync_result_builder.now_token + sync_result_builder.sync_config, + now_token=sync_result_builder.now_token, + since_token=sync_result_builder.since_token, ) sync_result_builder.now_token = now_token -- cgit 1.4.1 From faad233ea61cfff2c377609fa1d3c64d39f8a039 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 14:00:43 +0100 Subject: Change short circuit path --- synapse/handlers/sync.py | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6f7dd45ef3..3b89582d79 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -273,23 +273,14 @@ class SyncHandler(object): a Deferred TimelineBatch """ with Measure(self.clock, "load_filtered_recents"): - filtering_factor = 2 timeline_limit = sync_config.filter_collection.timeline_limit() - load_limit = max(timeline_limit * filtering_factor, 10) - max_repeat = 5 # Only try a few times per room, otherwise - room_key = now_token.room_key - end_key = room_key if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: limited = False - if since_token: - if not now_token.is_after(since_token): - limited = False - - if recents is not None: + if recents: recents = sync_config.filter_collection.filter_room_timeline(recents) recents = yield filter_events_for_client( self.store, @@ -299,6 +290,19 @@ class SyncHandler(object): else: recents = [] + if not limited: + defer.returnValue(TimelineBatch( + events=recents, + prev_batch=now_token, + limited=False + )) + + filtering_factor = 2 + load_limit = max(timeline_limit * filtering_factor, 10) + max_repeat = 5 # Only try a few times per room, otherwise + room_key = now_token.room_key + end_key = room_key + since_key = None if since_token and not newly_joined_room: since_key = since_token.room_key @@ -939,18 +943,24 @@ class SyncHandler(object): always_include(bool): Always include this room in the sync response, even if empty. """ - since_token = sync_result_builder.since_token - now_token = sync_result_builder.now_token - sync_config = sync_result_builder.sync_config - - room_id = room_builder.room_id - events = room_builder.events newly_joined = room_builder.newly_joined full_state = ( room_builder.full_state or newly_joined or sync_result_builder.full_state ) + events = room_builder.events + + # We want to shortcut out as early as possible. + if not (always_include or account_data or ephemeral or full_state): + if events == [] and tags is None: + return + + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + room_id = room_builder.room_id since_token = room_builder.since_token upto_token = room_builder.upto_token -- cgit 1.4.1 From 661a540dd1de89a3ab3a8f6ca0f780ea7d264176 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 2 Jun 2016 15:20:28 +0100 Subject: Deduplicate presence entries in sync (#818) --- synapse/handlers/sync.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3b89582d79..5307b62b85 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -637,6 +637,9 @@ class SyncHandler(object): ) presence.extend(states) + # Deduplicate the presence entries so that there's at most one per user + presence = {p["content"]["user_id"]: p for p in presence}.values() + presence = sync_config.filter_collection.filter_presence( presence ) -- cgit 1.4.1 From 6a0afa582aa5bf816e082af31ac44e2a8fee28c0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jun 2016 14:27:07 +0100 Subject: Load push rules in storage layer, so that they get cached --- synapse/handlers/sync.py | 5 ++--- synapse/push/bulk_push_rule_evaluator.py | 28 ----------------------- synapse/push/clientformat.py | 30 ++++++++++++++++++------- synapse/rest/client/v1/push_rule.py | 6 ++--- synapse/storage/push_rule.py | 38 +++++++++++++++++++++++++++++++- 5 files changed, 63 insertions(+), 44 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5307b62b85..be26a491ff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -198,9 +198,8 @@ class SyncHandler(object): @defer.inlineCallbacks def push_rules_for_user(self, user): user_id = user.to_string() - rawrules = yield self.store.get_push_rules_for_user(user_id) - enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) - rules = format_push_rules_for_user(user, rawrules, enabled_map) + rules = yield self.store.get_push_rules_for_user(user_id) + rules = format_push_rules_for_user(user, rules) defer.returnValue(rules) @defer.inlineCallbacks diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index af5212a5d1..6e42121b1d 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -18,7 +18,6 @@ import ujson as json from twisted.internet import defer -from .baserules import list_with_base_rules from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes, Membership @@ -38,36 +37,9 @@ def decode_rule_json(rule): @defer.inlineCallbacks def _get_rules(room_id, user_ids, store): rules_by_user = yield store.bulk_get_push_rules(user_ids) - rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids) rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} - rules_by_user = { - uid: list_with_base_rules([ - decode_rule_json(rule_list) - for rule_list in rules_by_user.get(uid, []) - ]) - for uid in user_ids - } - - # We apply the rules-enabled map here: bulk_get_push_rules doesn't - # fetch disabled rules, but this won't account for any server default - # rules the user has disabled, so we need to do this too. - for uid in user_ids: - user_enabled_map = rules_enabled_by_user.get(uid) - if not user_enabled_map: - continue - - for i, rule in enumerate(rules_by_user[uid]): - rule_id = rule['rule_id'] - - if rule_id in user_enabled_map: - if rule.get('enabled', True) != bool(user_enabled_map[rule_id]): - # Rules are cached across users. - rule = dict(rule) - rule['enabled'] = bool(user_enabled_map[rule_id]) - rules_by_user[uid][i] = rule - defer.returnValue(rules_by_user) diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py index ae9db9ec2f..b3983f7940 100644 --- a/synapse/push/clientformat.py +++ b/synapse/push/clientformat.py @@ -23,10 +23,7 @@ import copy import simplejson as json -def format_push_rules_for_user(user, rawrules, enabled_map): - """Converts a list of rawrules and a enabled map into nested dictionaries - to match the Matrix client-server format for push rules""" - +def load_rules_for_user(user, rawrules, enabled_map): ruleslist = [] for rawrule in rawrules: rule = dict(rawrule) @@ -35,7 +32,26 @@ def format_push_rules_for_user(user, rawrules, enabled_map): ruleslist.append(rule) # We're going to be mutating this a lot, so do a deep copy - ruleslist = copy.deepcopy(list_with_base_rules(ruleslist)) + rules = list(list_with_base_rules(ruleslist)) + + for i, rule in enumerate(rules): + rule_id = rule['rule_id'] + if rule_id in enabled_map: + if rule.get('enabled', True) != bool(enabled_map[rule_id]): + # Rules are cached across users. + rule = dict(rule) + rule['enabled'] = bool(enabled_map[rule_id]) + rules[i] = rule + + return rules + + +def format_push_rules_for_user(user, ruleslist): + """Converts a list of rawrules and a enabled map into nested dictionaries + to match the Matrix client-server format for push rules""" + + # We're going to be mutating this a lot, so do a deep copy + ruleslist = copy.deepcopy(ruleslist) rules = {'global': {}, 'device': {}} @@ -60,9 +76,7 @@ def format_push_rules_for_user(user, rawrules, enabled_map): template_rule = _rule_to_template(r) if template_rule: - if r['rule_id'] in enabled_map: - template_rule['enabled'] = enabled_map[r['rule_id']] - elif 'enabled' in r: + if 'enabled' in r: template_rule['enabled'] = r['enabled'] else: template_rule['enabled'] = True diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 02d837ee6a..6bb4821ec6 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -128,11 +128,9 @@ class PushRuleRestServlet(ClientV1RestServlet): # we build up the full structure and then decide which bits of it # to send which means doing unnecessary work sometimes but is # is probably not going to make a whole lot of difference - rawrules = yield self.store.get_push_rules_for_user(user_id) + rules = yield self.store.get_push_rules_for_user(user_id) - enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) - - rules = format_push_rules_for_user(requester.user, rawrules, enabled_map) + rules = format_push_rules_for_user(requester.user, rules) path = request.postpath[1:] diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index ebb97c8474..786d6f6d67 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.push.baserules import list_with_base_rules from twisted.internet import defer import logging @@ -23,6 +24,29 @@ import simplejson as json logger = logging.getLogger(__name__) +def _load_rules(rawrules, enabled_map): + ruleslist = [] + for rawrule in rawrules: + rule = dict(rawrule) + rule["conditions"] = json.loads(rawrule["conditions"]) + rule["actions"] = json.loads(rawrule["actions"]) + ruleslist.append(rule) + + # We're going to be mutating this a lot, so do a deep copy + rules = list(list_with_base_rules(ruleslist)) + + for i, rule in enumerate(rules): + rule_id = rule['rule_id'] + if rule_id in enabled_map: + if rule.get('enabled', True) != bool(enabled_map[rule_id]): + # Rules are cached across users. + rule = dict(rule) + rule['enabled'] = bool(enabled_map[rule_id]) + rules[i] = rule + + return rules + + class PushRuleStore(SQLBaseStore): @cachedInlineCallbacks(lru=True) def get_push_rules_for_user(self, user_id): @@ -42,7 +66,11 @@ class PushRuleStore(SQLBaseStore): key=lambda row: (-int(row["priority_class"]), -int(row["priority"])) ) - defer.returnValue(rows) + enabled_map = yield self.get_push_rules_enabled_for_user(user_id) + + rules = _load_rules(rows, enabled_map) + + defer.returnValue(rules) @cachedInlineCallbacks(lru=True) def get_push_rules_enabled_for_user(self, user_id): @@ -85,6 +113,14 @@ class PushRuleStore(SQLBaseStore): for row in rows: results.setdefault(row['user_name'], []).append(row) + + enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids) + + for user_id, rules in results.items(): + results[user_id] = _load_rules( + rules, enabled_map_by_user.get(user_id, {}) + ) + defer.returnValue(results) @cachedList(cached_method_name="get_push_rules_enabled_for_user", -- cgit 1.4.1 From 248e6770ca0faadf574cfd62f72d8e200cb5b57a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 10:30:12 +0100 Subject: Cache federation state responses --- synapse/federation/federation_server.py | 66 ++++++++++++++++++++++----------- synapse/handlers/federation.py | 7 +--- synapse/handlers/room.py | 4 +- synapse/handlers/sync.py | 2 +- synapse/util/caches/response_cache.py | 13 ++++++- 5 files changed, 60 insertions(+), 32 deletions(-) (limited to 'synapse/handlers/sync.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 85f5e752fe..d15c7e1b40 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -21,10 +21,11 @@ from .units import Transaction, Edu from synapse.util.async import Linearizer from synapse.util.logutils import log_function +from synapse.util.caches.response_cache import ResponseCache from synapse.events import FrozenEvent import synapse.metrics -from synapse.api.errors import FederationError, SynapseError +from synapse.api.errors import AuthError, FederationError, SynapseError from synapse.crypto.event_signing import compute_event_signature @@ -48,9 +49,15 @@ class FederationServer(FederationBase): def __init__(self, hs): super(FederationServer, self).__init__(hs) + self.auth = hs.get_auth() + self._room_pdu_linearizer = Linearizer() self._server_linearizer = Linearizer() + # We cache responses to state queries, as they take a while and often + # come in waves. + self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -188,28 +195,45 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_context_state_request(self, origin, room_id, event_id): - with (yield self._server_linearizer.queue((origin, room_id))): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + result = self._state_resp_cache.get((room_id, event_id)) + if not result: + with (yield self._server_linearizer.queue((origin, room_id))): + resp = yield self.response_cache.set( + (room_id, event_id), + self._on_context_state_request_compute(room_id, event_id) ) + else: + resp = yield result - for event in auth_chain: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - else: - raise NotImplementedError("Specify an event") + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def _on_context_state_request_compute(self, room_id, event_id): + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + for event in auth_chain: + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) defer.returnValue((200, { "pdus": [pdu.get_pdu_json() for pdu in pdus], diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3f138daf17..fcad41d7b6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -991,14 +991,9 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): + def get_state_for_pdu(self, room_id, event_id): yield run_on_reactor() - if do_auth: - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ae44c7a556..bf6b1c1535 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler): class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache() - self.remote_list_request_cache = ResponseCache() + self.response_cache = ResponseCache(hs) + self.remote_list_request_cache = ResponseCache(hs) self.remote_list_cache = {} self.fetch_looping_call = hs.get_clock().looping_call( self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index be26a491ff..0ee4ebe504 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -138,7 +138,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache() + self.response_cache = ResponseCache(hs) def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 36686b479e..00af539880 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -24,9 +24,12 @@ class ResponseCache(object): used rather than trying to compute a new response. """ - def __init__(self): + def __init__(self, hs, timeout_ms=0): self.pending_result_cache = {} # Requests that haven't finished yet. + self.clock = hs.get_clock() + self.timeout_sec = timeout_ms / 1000. + def get(self, key): result = self.pending_result_cache.get(key) if result is not None: @@ -39,7 +42,13 @@ class ResponseCache(object): self.pending_result_cache[key] = result def remove(r): - self.pending_result_cache.pop(key, None) + if self.timeout_sec: + self.clock.call_later( + self.timeout_sec, + self.pending_result_cache.pop, key, None, + ) + else: + self.pending_result_cache.pop(key, None) return r result.addBoth(remove) -- cgit 1.4.1