diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/message.py | 15 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 402 |
2 files changed, 175 insertions, 242 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b73ad62147..82c8cb5f0c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import SynapseError, AuthError, Codes +from synapse.api.errors import AuthError, Codes from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -119,9 +119,12 @@ class MessageHandler(BaseHandler): if source_config.direction == 'b': # if we're going backwards, we might need to backfill. This # requires that we have a topo token. - if room_token.topological is None: - raise SynapseError(400, "Invalid token: cannot paginate " - "backwards from a stream token") + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token_for_stream_and_room( + room_id, room_token.stream + ) if membership == Membership.LEAVE: # If they have left the room then clamp the token to be before @@ -131,11 +134,11 @@ class MessageHandler(BaseHandler): member_event_id ) leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < room_token.topological: + if leave_token.topological < max_topo: source_config.from_key = str(leave_token) yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, room_token.topological + room_id, max_topo ) events, next_key = yield data_source.get_pagination_rows( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 328c049b03..51ec4702db 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -72,7 +72,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ ) -class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ +class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ "room_id", # str "timeline", # TimelineBatch "state", # dict[(str, str), FrozenEvent] @@ -298,46 +298,19 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, ephemeral_by_room - ) - - unread_notifications = {} - if notifs is not None: - unread_notifications["notification_count"] = len(notifs) - unread_notifications["highlight_count"] = len([ - 1 for notif in notifs if _action_has_highlight(notif["actions"]) - ]) - - current_state = yield self.get_state_at(room_id, now_token) - - current_state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - current_state.values() - ) - } - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data + room_sync = yield self.incremental_sync_with_gap_for_room( + room_id, sync_config, + now_token=now_token, + since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + all_ephemeral_by_room=ephemeral_by_room, + batch=batch, + full_state=True, ) - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) - - defer.returnValue(JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=current_state, - ephemeral=ephemeral, - account_data=account_data, - unread_notifications=unread_notifications, - )) + defer.returnValue(room_sync) def account_data_for_user(self, account_data): account_data_events = [] @@ -429,44 +402,20 @@ class SyncHandler(BaseHandler): defer.returnValue((now_token, ephemeral_by_room)) - @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, timeline_since_token, tags_by_room, account_data_by_room): """Sync a room for a client which is starting without any state Returns: - A Deferred JoinedSyncResult. + A Deferred ArchivedSyncResult. """ - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token=timeline_since_token - ) - - leave_state = yield self.store.get_state_for_event(leave_event_id) - - leave_state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - leave_state.values() - ) - } - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data + return self.incremental_sync_for_archived_room( + sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, + account_data_by_room, full_state=True, leave_token=leave_token, ) - defer.returnValue(ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=leave_state, - account_data=account_data, - )) - @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -512,154 +461,127 @@ class SyncHandler(BaseHandler): sync_config.user ) - timeline_limit = sync_config.filter_collection.timeline_limit() + user_id = sync_config.user.to_string() - room_events, _ = yield self.store.get_room_events_stream( - sync_config.user.to_string(), - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) + timeline_limit = sync_config.filter_collection.timeline_limit() tags_by_room = yield self.store.get_updated_tags( - sync_config.user.to_string(), + user_id, since_token.account_data_key, ) account_data, account_data_by_room = ( yield self.store.get_updated_account_data_for_user( - sync_config.user.to_string(), + user_id, since_token.account_data_key, ) ) - joined = [] - archived = [] - if len(room_events) <= timeline_limit: - # There is no gap in any of the rooms. Therefore we can just - # partition the new events by room and return them. - logger.debug("Got %i events for incremental sync - not limited", - len(room_events)) - - invite_events = [] - leave_events = [] - events_by_room_id = {} - for event in room_events: - events_by_room_id.setdefault(event.room_id, []).append(event) - if event.room_id not in joined_room_ids: - if (event.type == EventTypes.Member - and event.state_key == sync_config.user.to_string()): - if event.membership == Membership.INVITE: - invite_events.append(event) - elif event.membership in (Membership.LEAVE, Membership.BAN): - leave_events.append(event) - - for room_id in joined_room_ids: - recents = events_by_room_id.get(room_id, []) - logger.debug("Events for room %s: %r", room_id, recents) - state = { - (event.type, event.state_key): event - for event in recents if event.is_state()} - limited = False + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_room_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) - if recents: - prev_batch = now_token.copy_and_replace( - "room_key", recents[0].internal_metadata.before - ) - else: - prev_batch = now_token - - just_joined = yield self.check_joined_room(sync_config, state) - if just_joined: - logger.debug("User has just joined %s: needs full state", - room_id) - state = yield self.get_state_at(room_id, now_token) - # the timeline is inherently limited if we've just joined - limited = True - - recents = sync_config.filter_collection.filter_room_timeline(recents) - - state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - state.values() - ) - } - - acc_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - acc_data = sync_config.filter_collection.filter_room_account_data( - acc_data - ) - - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) + newly_joined_rooms = [] + archived = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=TimelineBatch( - events=recents, - prev_batch=prev_batch, - limited=limited, - ), - state=state, - ephemeral=ephemeral, - account_data=acc_data, - unread_notifications={}, + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + room_sync = yield self.incremental_sync_for_archived_room( + sync_config, room_id, leave_event.event_id, since_token, + tags_by_room, account_data_by_room, + full_state=room_id in newly_joined_rooms ) - logger.debug("Result for room %s: %r", room_id, room_sync) - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, all_ephemeral_by_room - ) + archived.append(room_sync) - if notifs is not None: - notif_dict = room_sync.unread_notifications - notif_dict["notification_count"] = len(notifs) - notif_dict["highlight_count"] = len([ - 1 for notif in notifs - if _action_has_highlight(notif["actions"]) - ]) + # Get all events for rooms we're currently joined to. + room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_ids=joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) - joined.append(room_sync) + joined = [] + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) - else: - logger.debug("Got %i events for incremental sync - hit limit", - len(room_events)) + if room_entry: + events, start_key = room_entry - invite_events = yield self.store.get_invites_for_user( - sync_config.user.to_string() - ) + prev_batch_token = now_token.copy_and_replace("room_key", start_key) - leave_events = yield self.store.get_leave_and_ban_events_for_user( - sync_config.user.to_string() - ) + newly_joined_room = room_id in newly_joined_rooms + full_state = newly_joined_room - for room_id in joined_room_ids: - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room=all_ephemeral_by_room, + batch = yield self.load_filtered_recents( + room_id, sync_config, prev_batch_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined_room, ) - if room_sync: - joined.append(room_sync) + else: + batch = TimelineBatch( + events=[], + prev_batch=since_token, + limited=False, + ) + full_state = False - for leave_event in leave_events: - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room, - account_data_by_room + room_sync = yield self.incremental_sync_with_gap_for_room( + room_id=room_id, + sync_config=sync_config, + since_token=since_token, + now_token=now_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + all_ephemeral_by_room=all_ephemeral_by_room, + batch=batch, + full_state=full_state, ) if room_sync: - archived.append(room_sync) - - invited = [ - InvitedSyncResult(room_id=event.room_id, invite=event) - for event in invite_events - ] + joined.append(room_sync) account_data_for_user = sync_config.filter_collection.filter_account_data( self.account_data_for_user(account_data) @@ -680,28 +602,40 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None): + since_token=None, recents=None, newly_joined_room=False): """ :returns a Deferred TimelineBatch """ - limited = True - recents = [] filtering_factor = 2 timeline_limit = sync_config.filter_collection.timeline_limit() - load_limit = max(timeline_limit * filtering_factor, 100) - max_repeat = 3 # Only try a few times per room, otherwise + load_limit = max(timeline_limit * filtering_factor, 10) + max_repeat = 5 # Only try a few times per room, otherwise room_key = now_token.room_key end_key = room_key + limited = recents is None or newly_joined_room or timeline_limit < len(recents) + + if recents is not None: + recents = sync_config.filter_collection.filter_room_timeline(recents) + recents = yield self._filter_events_for_client( + sync_config.user.to_string(), + recents, + is_peeking=sync_config.is_guest, + ) + else: + recents = [] + + since_key = None + if since_token and not newly_joined_room: + since_key = since_token.room_key + while limited and len(recents) < timeline_limit and max_repeat: - events, keys = yield self.store.get_recent_events_for_room( + events, end_key = yield self.store.get_room_events_stream_for_room( room_id, limit=load_limit + 1, - from_token=since_token.room_key if since_token else None, - end_token=end_key, + from_key=since_key, + to_key=end_key, ) - room_key, _ = keys - end_key = "s" + room_key.split('-')[-1] loaded_recents = sync_config.filter_collection.filter_room_timeline(events) loaded_recents = yield self._filter_events_for_client( sync_config.user.to_string(), @@ -710,8 +644,10 @@ class SyncHandler(BaseHandler): ) loaded_recents.extend(recents) recents = loaded_recents + if len(events) <= load_limit: limited = False + break max_repeat -= 1 if len(recents) > timeline_limit: @@ -724,7 +660,9 @@ class SyncHandler(BaseHandler): ) defer.returnValue(TimelineBatch( - events=recents, prev_batch=prev_batch_token, limited=limited + events=recents, + prev_batch=prev_batch_token, + limited=limited or newly_joined_room )) @defer.inlineCallbacks @@ -732,25 +670,12 @@ class SyncHandler(BaseHandler): since_token, now_token, ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room): - """ Get the incremental delta needed to bring the client up to date for - the room. Gives the client the most recent events and the changes to - state. - Returns: - A Deferred JoinedSyncResult - """ - logger.debug("Doing incremental sync for room %s between %s and %s", - room_id, since_token, now_token) - - # TODO(mjark): Check for redactions we might have missed. - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token, - ) - - logger.debug("Recents %r", batch) + all_ephemeral_by_room, + batch, full_state=False): + if full_state: + state = yield self.get_state_at(room_id, now_token) - if batch.limited: + elif batch.limited: current_state = yield self.get_state_at(room_id, now_token) state_at_previous_sync = yield self.get_state_at( @@ -814,43 +739,48 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, leave_event, + def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, since_token, tags_by_room, - account_data_by_room): + account_data_by_room, full_state, + leave_token=None): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: A Deferred ArchivedSyncResult """ - stream_token = yield self.store.get_stream_token_for_event( - leave_event.event_id - ) + if not leave_token: + stream_token = yield self.store.get_stream_token_for_event( + leave_event_id + ) - leave_token = since_token.copy_and_replace("room_key", stream_token) + leave_token = since_token.copy_and_replace("room_key", stream_token) - if since_token.is_after(leave_token): + if since_token and since_token.is_after(leave_token): defer.returnValue(None) batch = yield self.load_filtered_recents( - leave_event.room_id, sync_config, leave_token, since_token, + room_id, sync_config, leave_token, since_token, ) logger.debug("Recents %r", batch) state_events_at_leave = yield self.store.get_state_for_event( - leave_event.event_id + leave_event_id ) - state_at_previous_sync = yield self.get_state_at( - leave_event.room_id, stream_position=since_token - ) + if not full_state: + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token + ) - state_events_delta = yield self.compute_state_delta( - since_token=since_token, - previous_state=state_at_previous_sync, - current_state=state_events_at_leave, - ) + state_events_delta = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=state_events_at_leave, + ) + else: + state_events_delta = state_events_at_leave state_events_delta = { (e.type, e.state_key): e @@ -860,7 +790,7 @@ class SyncHandler(BaseHandler): } account_data = self.account_data_for_room( - leave_event.room_id, tags_by_room, account_data_by_room + room_id, tags_by_room, account_data_by_room ) account_data = sync_config.filter_collection.filter_room_account_data( @@ -868,7 +798,7 @@ class SyncHandler(BaseHandler): ) room_sync = ArchivedSyncResult( - room_id=leave_event.room_id, + room_id=room_id, timeline=batch, state=state_events_delta, account_data=account_data, |