diff options
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r-- | synapse/handlers/sync.py | 792 |
1 files changed, 379 insertions, 413 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 328c049b03..ddeed27965 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,11 +18,14 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.metrics import Measure from twisted.internet import defer import collections import logging +import itertools logger = logging.getLogger(__name__) @@ -72,7 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ ) -class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ +class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ "room_id", # str "timeline", # TimelineBatch "state", # dict[(str, str), FrozenEvent] @@ -139,6 +142,15 @@ class SyncHandler(BaseHandler): A Deferred SyncResult. """ + context = LoggingContext.current_context() + if context: + if since_token is None: + context.tag = "initial_sync" + elif full_state: + context.tag = "full_state_sync" + else: + context.tag = "incremental_sync" + if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. @@ -167,18 +179,6 @@ class SyncHandler(BaseHandler): else: return self.incremental_sync_with_gap(sync_config, since_token) - def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room): - if room_id not in ephemeral_by_room: - return None - for e in ephemeral_by_room[room_id]: - if e['type'] != 'm.receipt': - continue - for receipt_event_id, val in e['content'].items(): - if 'm.read' in val: - if user_id in val['m.read']: - return receipt_event_id - return None - @defer.inlineCallbacks def full_state_sync(self, sync_config, timeline_since_token): """Get a sync for a client which is starting without any state. @@ -230,15 +230,16 @@ class SyncHandler(BaseHandler): deferreds = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync_deferred = self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(joined.append) deferreds.append(room_sync_deferred) elif event.membership == Membership.INVITE: @@ -251,15 +252,16 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync_deferred = self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(archived.append) deferreds.append(room_sync_deferred) @@ -298,46 +300,18 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, ephemeral_by_room + room_sync = yield self.incremental_sync_with_gap_for_room( + room_id, sync_config, + now_token=now_token, + since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + batch=batch, + full_state=True, ) - unread_notifications = {} - if notifs is not None: - unread_notifications["notification_count"] = len(notifs) - unread_notifications["highlight_count"] = len([ - 1 for notif in notifs if _action_has_highlight(notif["actions"]) - ]) - - current_state = yield self.get_state_at(room_id, now_token) - - current_state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - current_state.values() - ) - } - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) - - defer.returnValue(JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=current_state, - ephemeral=ephemeral, - account_data=account_data, - unread_notifications=unread_notifications, - )) + defer.returnValue(room_sync) def account_data_for_user(self, account_data): account_data_events = [] @@ -382,91 +356,68 @@ class SyncHandler(BaseHandler): typing events for that room. """ - typing_key = since_token.typing_key if since_token else "0" - - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] - - typing_source = self.event_sources.sources["typing"] - typing, typing_key = yield typing_source.get_new_events( - user=sync_config.user, - from_key=typing_key, - limit=sync_config.filter_collection.ephemeral_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("typing_key", typing_key) - - ephemeral_by_room = {} + with Measure(self.clock, "ephemeral_by_room"): + typing_key = since_token.typing_key if since_token else "0" - for event in typing: - # we want to exclude the room_id from the event, but modifying the - # result returned by the event source is poor form (it might cache - # the object) - room_id = event["room_id"] - event_copy = {k: v for (k, v) in event.iteritems() - if k != "room_id"} - ephemeral_by_room.setdefault(room_id, []).append(event_copy) + rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) + room_ids = [room.room_id for room in rooms] - receipt_key = since_token.receipt_key if since_token else "0" - - receipt_source = self.event_sources.sources["receipt"] - receipts, receipt_key = yield receipt_source.get_new_events( - user=sync_config.user, - from_key=receipt_key, - limit=sync_config.filter_collection.ephemeral_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("receipt_key", receipt_key) + typing_source = self.event_sources.sources["typing"] + typing, typing_key = yield typing_source.get_new_events( + user=sync_config.user, + from_key=typing_key, + limit=sync_config.filter_collection.ephemeral_limit(), + room_ids=room_ids, + is_guest=sync_config.is_guest, + ) + now_token = now_token.copy_and_replace("typing_key", typing_key) + + ephemeral_by_room = {} + + for event in typing: + # we want to exclude the room_id from the event, but modifying the + # result returned by the event source is poor form (it might cache + # the object) + room_id = event["room_id"] + event_copy = {k: v for (k, v) in event.iteritems() + if k != "room_id"} + ephemeral_by_room.setdefault(room_id, []).append(event_copy) + + receipt_key = since_token.receipt_key if since_token else "0" + + receipt_source = self.event_sources.sources["receipt"] + receipts, receipt_key = yield receipt_source.get_new_events( + user=sync_config.user, + from_key=receipt_key, + limit=sync_config.filter_collection.ephemeral_limit(), + room_ids=room_ids, + is_guest=sync_config.is_guest, + ) + now_token = now_token.copy_and_replace("receipt_key", receipt_key) - for event in receipts: - room_id = event["room_id"] - # exclude room id, as above - event_copy = {k: v for (k, v) in event.iteritems() - if k != "room_id"} - ephemeral_by_room.setdefault(room_id, []).append(event_copy) + for event in receipts: + room_id = event["room_id"] + # exclude room id, as above + event_copy = {k: v for (k, v) in event.iteritems() + if k != "room_id"} + ephemeral_by_room.setdefault(room_id, []).append(event_copy) defer.returnValue((now_token, ephemeral_by_room)) - @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, timeline_since_token, tags_by_room, account_data_by_room): """Sync a room for a client which is starting without any state Returns: - A Deferred JoinedSyncResult. + A Deferred ArchivedSyncResult. """ - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token=timeline_since_token - ) - - leave_state = yield self.store.get_state_for_event(leave_event_id) - - leave_state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - leave_state.values() - ) - } - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data + return self.incremental_sync_for_archived_room( + sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, + account_data_by_room, full_state=True, leave_token=leave_token, ) - defer.returnValue(ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=leave_state, - account_data=account_data, - )) - @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -489,13 +440,6 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("presence_key", presence_key) - # We now fetch all ephemeral events for this room in order to get - # this users current read receipt. This could almost certainly be - # optimised. - _, all_ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) - now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token, since_token ) @@ -512,154 +456,126 @@ class SyncHandler(BaseHandler): sync_config.user ) - timeline_limit = sync_config.filter_collection.timeline_limit() + user_id = sync_config.user.to_string() - room_events, _ = yield self.store.get_room_events_stream( - sync_config.user.to_string(), - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) + timeline_limit = sync_config.filter_collection.timeline_limit() tags_by_room = yield self.store.get_updated_tags( - sync_config.user.to_string(), + user_id, since_token.account_data_key, ) account_data, account_data_by_room = ( yield self.store.get_updated_account_data_for_user( - sync_config.user.to_string(), + user_id, since_token.account_data_key, ) ) - joined = [] - archived = [] - if len(room_events) <= timeline_limit: - # There is no gap in any of the rooms. Therefore we can just - # partition the new events by room and return them. - logger.debug("Got %i events for incremental sync - not limited", - len(room_events)) - - invite_events = [] - leave_events = [] - events_by_room_id = {} - for event in room_events: - events_by_room_id.setdefault(event.room_id, []).append(event) - if event.room_id not in joined_room_ids: - if (event.type == EventTypes.Member - and event.state_key == sync_config.user.to_string()): - if event.membership == Membership.INVITE: - invite_events.append(event) - elif event.membership in (Membership.LEAVE, Membership.BAN): - leave_events.append(event) - - for room_id in joined_room_ids: - recents = events_by_room_id.get(room_id, []) - logger.debug("Events for room %s: %r", room_id, recents) - state = { - (event.type, event.state_key): event - for event in recents if event.is_state()} - limited = False - - if recents: - prev_batch = now_token.copy_and_replace( - "room_key", recents[0].internal_metadata.before - ) - else: - prev_batch = now_token - - just_joined = yield self.check_joined_room(sync_config, state) - if just_joined: - logger.debug("User has just joined %s: needs full state", - room_id) - state = yield self.get_state_at(room_id, now_token) - # the timeline is inherently limited if we've just joined - limited = True - - recents = sync_config.filter_collection.filter_room_timeline(recents) - - state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - state.values() - ) - } - - acc_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) - acc_data = sync_config.filter_collection.filter_room_account_data( - acc_data - ) + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) + newly_joined_rooms = [] + archived = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=TimelineBatch( - events=recents, - prev_batch=prev_batch, - limited=limited, - ), - state=state, - ephemeral=ephemeral, - account_data=acc_data, - unread_notifications={}, + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + room_sync = yield self.incremental_sync_for_archived_room( + sync_config, room_id, leave_event.event_id, since_token, + tags_by_room, account_data_by_room, + full_state=room_id in newly_joined_rooms ) - logger.debug("Result for room %s: %r", room_id, room_sync) - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, all_ephemeral_by_room - ) + archived.append(room_sync) - if notifs is not None: - notif_dict = room_sync.unread_notifications - notif_dict["notification_count"] = len(notifs) - notif_dict["highlight_count"] = len([ - 1 for notif in notifs - if _action_has_highlight(notif["actions"]) - ]) + # Get all events for rooms we're currently joined to. + room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_ids=joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) - joined.append(room_sync) + joined = [] + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) - else: - logger.debug("Got %i events for incremental sync - hit limit", - len(room_events)) + if room_entry: + events, start_key = room_entry - invite_events = yield self.store.get_invites_for_user( - sync_config.user.to_string() - ) + prev_batch_token = now_token.copy_and_replace("room_key", start_key) - leave_events = yield self.store.get_leave_and_ban_events_for_user( - sync_config.user.to_string() - ) + newly_joined_room = room_id in newly_joined_rooms + full_state = newly_joined_room - for room_id in joined_room_ids: - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room=all_ephemeral_by_room, + batch = yield self.load_filtered_recents( + room_id, sync_config, prev_batch_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined_room, ) - if room_sync: - joined.append(room_sync) + else: + batch = TimelineBatch( + events=[], + prev_batch=since_token, + limited=False, + ) + full_state = False - for leave_event in leave_events: - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room, - account_data_by_room + room_sync = yield self.incremental_sync_with_gap_for_room( + room_id=room_id, + sync_config=sync_config, + since_token=since_token, + now_token=now_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + batch=batch, + full_state=full_state, ) if room_sync: - archived.append(room_sync) - - invited = [ - InvitedSyncResult(room_id=event.room_id, invite=event) - for event in invite_events - ] + joined.append(room_sync) account_data_for_user = sync_config.filter_collection.filter_account_data( self.account_data_for_user(account_data) @@ -680,51 +596,73 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None): + since_token=None, recents=None, newly_joined_room=False): """ :returns a Deferred TimelineBatch """ - limited = True - recents = [] - filtering_factor = 2 - timeline_limit = sync_config.filter_collection.timeline_limit() - load_limit = max(timeline_limit * filtering_factor, 100) - max_repeat = 3 # Only try a few times per room, otherwise - room_key = now_token.room_key - end_key = room_key - - while limited and len(recents) < timeline_limit and max_repeat: - events, keys = yield self.store.get_recent_events_for_room( - room_id, - limit=load_limit + 1, - from_token=since_token.room_key if since_token else None, - end_token=end_key, - ) - room_key, _ = keys - end_key = "s" + room_key.split('-')[-1] - loaded_recents = sync_config.filter_collection.filter_room_timeline(events) - loaded_recents = yield self._filter_events_for_client( - sync_config.user.to_string(), - loaded_recents, - is_peeking=sync_config.is_guest, - ) - loaded_recents.extend(recents) - recents = loaded_recents - if len(events) <= load_limit: + with Measure(self.clock, "load_filtered_recents"): + filtering_factor = 2 + timeline_limit = sync_config.filter_collection.timeline_limit() + load_limit = max(timeline_limit * filtering_factor, 10) + max_repeat = 5 # Only try a few times per room, otherwise + room_key = now_token.room_key + end_key = room_key + + if recents is None or newly_joined_room or timeline_limit < len(recents): + limited = True + else: limited = False - max_repeat -= 1 - if len(recents) > timeline_limit: - limited = True - recents = recents[-timeline_limit:] - room_key = recents[0].internal_metadata.before + if recents is not None: + recents = sync_config.filter_collection.filter_room_timeline(recents) + recents = yield self._filter_events_for_client( + sync_config.user.to_string(), + recents, + is_peeking=sync_config.is_guest, + ) + else: + recents = [] + + since_key = None + if since_token and not newly_joined_room: + since_key = since_token.room_key + + while limited and len(recents) < timeline_limit and max_repeat: + events, end_key = yield self.store.get_room_events_stream_for_room( + room_id, + limit=load_limit + 1, + from_key=since_key, + to_key=end_key, + ) + loaded_recents = sync_config.filter_collection.filter_room_timeline( + events + ) + loaded_recents = yield self._filter_events_for_client( + sync_config.user.to_string(), + loaded_recents, + is_peeking=sync_config.is_guest, + ) + loaded_recents.extend(recents) + recents = loaded_recents + + if len(events) <= load_limit: + limited = False + break + max_repeat -= 1 - prev_batch_token = now_token.copy_and_replace( - "room_key", room_key - ) + if len(recents) > timeline_limit: + limited = True + recents = recents[-timeline_limit:] + room_key = recents[0].internal_metadata.before + + prev_batch_token = now_token.copy_and_replace( + "room_key", room_key + ) defer.returnValue(TimelineBatch( - events=recents, prev_batch=prev_batch_token, limited=limited + events=recents, + prev_batch=prev_batch_token, + limited=limited or newly_joined_room )) @defer.inlineCallbacks @@ -732,62 +670,12 @@ class SyncHandler(BaseHandler): since_token, now_token, ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room): - """ Get the incremental delta needed to bring the client up to date for - the room. Gives the client the most recent events and the changes to - state. - Returns: - A Deferred JoinedSyncResult - """ - logger.debug("Doing incremental sync for room %s between %s and %s", - room_id, since_token, now_token) - - # TODO(mjark): Check for redactions we might have missed. - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token, - ) - - logger.debug("Recents %r", batch) - - if batch.limited: - current_state = yield self.get_state_at(room_id, now_token) - - state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token - ) - - state = yield self.compute_state_delta( - since_token=since_token, - previous_state=state_at_previous_sync, - current_state=current_state, - ) - else: - state = { - (event.type, event.state_key): event - for event in batch.events if event.is_state() - } - - just_joined = yield self.check_joined_room(sync_config, state) - if just_joined: - state = yield self.get_state_at(room_id, now_token) - - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, all_ephemeral_by_room + batch, full_state=False): + state = yield self.compute_state_delta( + room_id, batch, sync_config, since_token, now_token, + full_state=full_state ) - unread_notifications = {} - if notifs is not None: - unread_notifications["notification_count"] = len(notifs) - unread_notifications["highlight_count"] = len([ - 1 for notif in notifs if _action_has_highlight(notif["actions"]) - ]) - - state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state(state.values()) - } - account_data = self.account_data_for_room( room_id, tags_by_room, account_data_by_room ) @@ -800,6 +688,7 @@ class SyncHandler(BaseHandler): ephemeral_by_room.get(room_id, []) ) + unread_notifications = {} room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, @@ -809,58 +698,53 @@ class SyncHandler(BaseHandler): unread_notifications=unread_notifications, ) + if room_sync: + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config + ) + + if notifs is not None: + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] + logger.debug("Room sync: %r", room_sync) defer.returnValue(room_sync) @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, leave_event, + def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, since_token, tags_by_room, - account_data_by_room): + account_data_by_room, full_state, + leave_token=None): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: A Deferred ArchivedSyncResult """ - stream_token = yield self.store.get_stream_token_for_event( - leave_event.event_id - ) + if not leave_token: + stream_token = yield self.store.get_stream_token_for_event( + leave_event_id + ) - leave_token = since_token.copy_and_replace("room_key", stream_token) + leave_token = since_token.copy_and_replace("room_key", stream_token) - if since_token.is_after(leave_token): + if since_token and since_token.is_after(leave_token): defer.returnValue(None) batch = yield self.load_filtered_recents( - leave_event.room_id, sync_config, leave_token, since_token, + room_id, sync_config, leave_token, since_token, ) logger.debug("Recents %r", batch) - state_events_at_leave = yield self.store.get_state_for_event( - leave_event.event_id - ) - - state_at_previous_sync = yield self.get_state_at( - leave_event.room_id, stream_position=since_token - ) - state_events_delta = yield self.compute_state_delta( - since_token=since_token, - previous_state=state_at_previous_sync, - current_state=state_events_at_leave, + room_id, batch, sync_config, since_token, leave_token, + full_state=full_state ) - state_events_delta = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - state_events_delta.values() - ) - } - account_data = self.account_data_for_room( - leave_event.room_id, tags_by_room, account_data_by_room + room_id, tags_by_room, account_data_by_room ) account_data = sync_config.filter_collection.filter_room_account_data( @@ -868,7 +752,7 @@ class SyncHandler(BaseHandler): ) room_sync = ArchivedSyncResult( - room_id=leave_event.room_id, + room_id=room_id, timeline=batch, state=state_events_delta, account_data=account_data, @@ -912,15 +796,19 @@ class SyncHandler(BaseHandler): state = {} defer.returnValue(state) - def compute_state_delta(self, since_token, previous_state, current_state): - """ Works out the differnce in state between the current state and the - state the client got when it last performed a sync. - - :param str since_token: the point we are comparing against - :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the - state to compare to - :param dict[(str,str), synapse.events.FrozenEvent] current_state: the - new state + @defer.inlineCallbacks + def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, + full_state): + """ Works out the differnce in state between the start of the timeline + and the previous sync. + + :param str room_id + :param TimelineBatch batch: The timeline batch for the room that will + be sent to the user. + :param sync_config + :param str since_token: Token of the end of the previous batch. May be None. + :param str now_token: Token of the end of the current batch. + :param bool full_state: Whether to force returning the full state. :returns A new event dictionary """ @@ -929,12 +817,53 @@ class SyncHandler(BaseHandler): # updates even if they occured logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - state_delta = {} - for key, event in current_state.iteritems(): - if (key not in previous_state or - previous_state[key].event_id != event.event_id): - state_delta[key] = event - return state_delta + with Measure(self.clock, "compute_state_delta"): + if full_state: + if batch: + state = yield self.store.get_state_for_event( + batch.events[0].event_id + ) + else: + state = yield self.get_state_at( + room_id, stream_position=now_token + ) + + timeline_state = { + (event.type, event.state_key): event + for event in batch.events if event.is_state() + } + + state = _calculate_state( + timeline_contains=timeline_state, + timeline_start=state, + previous={}, + ) + elif batch.limited: + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token + ) + + state_at_timeline_start = yield self.store.get_state_for_event( + batch.events[0].event_id + ) + + timeline_state = { + (event.type, event.state_key): event + for event in batch.events if event.is_state() + } + + state = _calculate_state( + timeline_contains=timeline_state, + timeline_start=state_at_timeline_start, + previous=state_at_previous_sync, + ) + else: + state = {} + + defer.returnValue({ + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state(state.values()) + }) def check_joined_room(self, sync_config, state_delta): """ @@ -955,21 +884,24 @@ class SyncHandler(BaseHandler): return False @defer.inlineCallbacks - def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room): - last_unread_event_id = self.last_read_event_id_for_room_and_user( - room_id, sync_config.user.to_string(), ephemeral_by_room - ) - - notifs = [] - if last_unread_event_id: - notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( - room_id, sync_config.user.to_string(), last_unread_event_id + def unread_notifs_for_room_id(self, room_id, sync_config): + with Measure(self.clock, "unread_notifs_for_room_id"): + last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user( + user_id=sync_config.user.to_string(), + room_id=room_id, + receipt_type="m.read" ) - defer.returnValue(notifs) - # There is no new information in this period, so your notification - # count is whatever it was last time. - defer.returnValue(None) + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( + room_id, sync_config.user.to_string(), last_unread_event_id + ) + defer.returnValue(notifs) + + # There is no new information in this period, so your notification + # count is whatever it was last time. + defer.returnValue(None) def _action_has_highlight(actions): @@ -981,3 +913,37 @@ def _action_has_highlight(actions): pass return False + + +def _calculate_state(timeline_contains, timeline_start, previous): + """Works out what state to include in a sync response. + + Args: + timeline_contains (dict): state in the timeline + timeline_start (dict): state at the start of the timeline + previous (dict): state at the end of the previous sync (or empty dict + if this is an initial sync) + + Returns: + dict + """ + event_id_to_state = { + e.event_id: e + for e in itertools.chain( + timeline_contains.values(), + previous.values(), + timeline_start.values(), + ) + } + + tc_ids = set(e.event_id for e in timeline_contains.values()) + p_ids = set(e.event_id for e in previous.values()) + ts_ids = set(e.event_id for e in timeline_start.values()) + + state_ids = (ts_ids - p_ids) - tc_ids + + evs = (event_id_to_state[e] for e in state_ids) + return { + (e.type, e.state_key): e + for e in evs + } |