summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py792
1 files changed, 379 insertions, 413 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 328c049b03..ddeed27965 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,11 +18,14 @@ from ._base import BaseHandler
 from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
 from synapse.util import unwrapFirstError
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.metrics import Measure
 
 from twisted.internet import defer
 
 import collections
 import logging
+import itertools
 
 logger = logging.getLogger(__name__)
 
@@ -72,7 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
         )
 
 
-class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
+class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
     "room_id",            # str
     "timeline",           # TimelineBatch
     "state",              # dict[(str, str), FrozenEvent]
@@ -139,6 +142,15 @@ class SyncHandler(BaseHandler):
             A Deferred SyncResult.
         """
 
+        context = LoggingContext.current_context()
+        if context:
+            if since_token is None:
+                context.tag = "initial_sync"
+            elif full_state:
+                context.tag = "full_state_sync"
+            else:
+                context.tag = "incremental_sync"
+
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
@@ -167,18 +179,6 @@ class SyncHandler(BaseHandler):
         else:
             return self.incremental_sync_with_gap(sync_config, since_token)
 
-    def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
-        if room_id not in ephemeral_by_room:
-            return None
-        for e in ephemeral_by_room[room_id]:
-            if e['type'] != 'm.receipt':
-                continue
-            for receipt_event_id, val in e['content'].items():
-                if 'm.read' in val:
-                    if user_id in val['m.read']:
-                        return receipt_event_id
-        return None
-
     @defer.inlineCallbacks
     def full_state_sync(self, sync_config, timeline_since_token):
         """Get a sync for a client which is starting without any state.
@@ -230,15 +230,16 @@ class SyncHandler(BaseHandler):
         deferreds = []
         for event in room_list:
             if event.membership == Membership.JOIN:
-                room_sync_deferred = self.full_state_sync_for_joined_room(
-                    room_id=event.room_id,
-                    sync_config=sync_config,
-                    now_token=now_token,
-                    timeline_since_token=timeline_since_token,
-                    ephemeral_by_room=ephemeral_by_room,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
+                with PreserveLoggingContext(LoggingContext.current_context()):
+                    room_sync_deferred = self.full_state_sync_for_joined_room(
+                        room_id=event.room_id,
+                        sync_config=sync_config,
+                        now_token=now_token,
+                        timeline_since_token=timeline_since_token,
+                        ephemeral_by_room=ephemeral_by_room,
+                        tags_by_room=tags_by_room,
+                        account_data_by_room=account_data_by_room,
+                    )
                 room_sync_deferred.addCallback(joined.append)
                 deferreds.append(room_sync_deferred)
             elif event.membership == Membership.INVITE:
@@ -251,15 +252,16 @@ class SyncHandler(BaseHandler):
                 leave_token = now_token.copy_and_replace(
                     "room_key", "s%d" % (event.stream_ordering,)
                 )
-                room_sync_deferred = self.full_state_sync_for_archived_room(
-                    sync_config=sync_config,
-                    room_id=event.room_id,
-                    leave_event_id=event.event_id,
-                    leave_token=leave_token,
-                    timeline_since_token=timeline_since_token,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
+                with PreserveLoggingContext(LoggingContext.current_context()):
+                    room_sync_deferred = self.full_state_sync_for_archived_room(
+                        sync_config=sync_config,
+                        room_id=event.room_id,
+                        leave_event_id=event.event_id,
+                        leave_token=leave_token,
+                        timeline_since_token=timeline_since_token,
+                        tags_by_room=tags_by_room,
+                        account_data_by_room=account_data_by_room,
+                    )
                 room_sync_deferred.addCallback(archived.append)
                 deferreds.append(room_sync_deferred)
 
@@ -298,46 +300,18 @@ class SyncHandler(BaseHandler):
             room_id, sync_config, now_token, since_token=timeline_since_token
         )
 
-        notifs = yield self.unread_notifs_for_room_id(
-            room_id, sync_config, ephemeral_by_room
+        room_sync = yield self.incremental_sync_with_gap_for_room(
+            room_id, sync_config,
+            now_token=now_token,
+            since_token=timeline_since_token,
+            ephemeral_by_room=ephemeral_by_room,
+            tags_by_room=tags_by_room,
+            account_data_by_room=account_data_by_room,
+            batch=batch,
+            full_state=True,
         )
 
-        unread_notifications = {}
-        if notifs is not None:
-            unread_notifications["notification_count"] = len(notifs)
-            unread_notifications["highlight_count"] = len([
-                1 for notif in notifs if _action_has_highlight(notif["actions"])
-            ])
-
-        current_state = yield self.get_state_at(room_id, now_token)
-
-        current_state = {
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(
-                current_state.values()
-            )
-        }
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        ephemeral = sync_config.filter_collection.filter_room_ephemeral(
-            ephemeral_by_room.get(room_id, [])
-        )
-
-        defer.returnValue(JoinedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=current_state,
-            ephemeral=ephemeral,
-            account_data=account_data,
-            unread_notifications=unread_notifications,
-        ))
+        defer.returnValue(room_sync)
 
     def account_data_for_user(self, account_data):
         account_data_events = []
@@ -382,91 +356,68 @@ class SyncHandler(BaseHandler):
             typing events for that room.
         """
 
-        typing_key = since_token.typing_key if since_token else "0"
-
-        rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-        room_ids = [room.room_id for room in rooms]
-
-        typing_source = self.event_sources.sources["typing"]
-        typing, typing_key = yield typing_source.get_new_events(
-            user=sync_config.user,
-            from_key=typing_key,
-            limit=sync_config.filter_collection.ephemeral_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("typing_key", typing_key)
-
-        ephemeral_by_room = {}
+        with Measure(self.clock, "ephemeral_by_room"):
+            typing_key = since_token.typing_key if since_token else "0"
 
-        for event in typing:
-            # we want to exclude the room_id from the event, but modifying the
-            # result returned by the event source is poor form (it might cache
-            # the object)
-            room_id = event["room_id"]
-            event_copy = {k: v for (k, v) in event.iteritems()
-                          if k != "room_id"}
-            ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+            rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+            room_ids = [room.room_id for room in rooms]
 
-        receipt_key = since_token.receipt_key if since_token else "0"
-
-        receipt_source = self.event_sources.sources["receipt"]
-        receipts, receipt_key = yield receipt_source.get_new_events(
-            user=sync_config.user,
-            from_key=receipt_key,
-            limit=sync_config.filter_collection.ephemeral_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+            typing_source = self.event_sources.sources["typing"]
+            typing, typing_key = yield typing_source.get_new_events(
+                user=sync_config.user,
+                from_key=typing_key,
+                limit=sync_config.filter_collection.ephemeral_limit(),
+                room_ids=room_ids,
+                is_guest=sync_config.is_guest,
+            )
+            now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+            ephemeral_by_room = {}
+
+            for event in typing:
+                # we want to exclude the room_id from the event, but modifying the
+                # result returned by the event source is poor form (it might cache
+                # the object)
+                room_id = event["room_id"]
+                event_copy = {k: v for (k, v) in event.iteritems()
+                              if k != "room_id"}
+                ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+            receipt_key = since_token.receipt_key if since_token else "0"
+
+            receipt_source = self.event_sources.sources["receipt"]
+            receipts, receipt_key = yield receipt_source.get_new_events(
+                user=sync_config.user,
+                from_key=receipt_key,
+                limit=sync_config.filter_collection.ephemeral_limit(),
+                room_ids=room_ids,
+                is_guest=sync_config.is_guest,
+            )
+            now_token = now_token.copy_and_replace("receipt_key", receipt_key)
 
-        for event in receipts:
-            room_id = event["room_id"]
-            # exclude room id, as above
-            event_copy = {k: v for (k, v) in event.iteritems()
-                          if k != "room_id"}
-            ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+            for event in receipts:
+                room_id = event["room_id"]
+                # exclude room id, as above
+                event_copy = {k: v for (k, v) in event.iteritems()
+                              if k != "room_id"}
+                ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
         defer.returnValue((now_token, ephemeral_by_room))
 
-    @defer.inlineCallbacks
     def full_state_sync_for_archived_room(self, room_id, sync_config,
                                           leave_event_id, leave_token,
                                           timeline_since_token, tags_by_room,
                                           account_data_by_room):
         """Sync a room for a client which is starting without any state
         Returns:
-            A Deferred JoinedSyncResult.
+            A Deferred ArchivedSyncResult.
         """
 
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, leave_token, since_token=timeline_since_token
-        )
-
-        leave_state = yield self.store.get_state_for_event(leave_event_id)
-
-        leave_state = {
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(
-                leave_state.values()
-            )
-        }
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
+        return self.incremental_sync_for_archived_room(
+            sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
+            account_data_by_room, full_state=True, leave_token=leave_token,
         )
 
-        defer.returnValue(ArchivedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=leave_state,
-            account_data=account_data,
-        ))
-
     @defer.inlineCallbacks
     def incremental_sync_with_gap(self, sync_config, since_token):
         """ Get the incremental delta needed to bring the client up to
@@ -489,13 +440,6 @@ class SyncHandler(BaseHandler):
         )
         now_token = now_token.copy_and_replace("presence_key", presence_key)
 
-        # We now fetch all ephemeral events for this room in order to get
-        # this users current read receipt. This could almost certainly be
-        # optimised.
-        _, all_ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token
-        )
-
         now_token, ephemeral_by_room = yield self.ephemeral_by_room(
             sync_config, now_token, since_token
         )
@@ -512,154 +456,126 @@ class SyncHandler(BaseHandler):
                 sync_config.user
             )
 
-        timeline_limit = sync_config.filter_collection.timeline_limit()
+        user_id = sync_config.user.to_string()
 
-        room_events, _ = yield self.store.get_room_events_stream(
-            sync_config.user.to_string(),
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
-            limit=timeline_limit + 1,
-        )
+        timeline_limit = sync_config.filter_collection.timeline_limit()
 
         tags_by_room = yield self.store.get_updated_tags(
-            sync_config.user.to_string(),
+            user_id,
             since_token.account_data_key,
         )
 
         account_data, account_data_by_room = (
             yield self.store.get_updated_account_data_for_user(
-                sync_config.user.to_string(),
+                user_id,
                 since_token.account_data_key,
             )
         )
 
-        joined = []
-        archived = []
-        if len(room_events) <= timeline_limit:
-            # There is no gap in any of the rooms. Therefore we can just
-            # partition the new events by room and return them.
-            logger.debug("Got %i events for incremental sync - not limited",
-                         len(room_events))
-
-            invite_events = []
-            leave_events = []
-            events_by_room_id = {}
-            for event in room_events:
-                events_by_room_id.setdefault(event.room_id, []).append(event)
-                if event.room_id not in joined_room_ids:
-                    if (event.type == EventTypes.Member
-                            and event.state_key == sync_config.user.to_string()):
-                        if event.membership == Membership.INVITE:
-                            invite_events.append(event)
-                        elif event.membership in (Membership.LEAVE, Membership.BAN):
-                            leave_events.append(event)
-
-            for room_id in joined_room_ids:
-                recents = events_by_room_id.get(room_id, [])
-                logger.debug("Events for room %s: %r", room_id, recents)
-                state = {
-                    (event.type, event.state_key): event
-                    for event in recents if event.is_state()}
-                limited = False
-
-                if recents:
-                    prev_batch = now_token.copy_and_replace(
-                        "room_key", recents[0].internal_metadata.before
-                    )
-                else:
-                    prev_batch = now_token
-
-                just_joined = yield self.check_joined_room(sync_config, state)
-                if just_joined:
-                    logger.debug("User has just joined %s: needs full state",
-                                 room_id)
-                    state = yield self.get_state_at(room_id, now_token)
-                    # the timeline is inherently limited if we've just joined
-                    limited = True
-
-                recents = sync_config.filter_collection.filter_room_timeline(recents)
-
-                state = {
-                    (e.type, e.state_key): e
-                    for e in sync_config.filter_collection.filter_room_state(
-                        state.values()
-                    )
-                }
-
-                acc_data = self.account_data_for_room(
-                    room_id, tags_by_room, account_data_by_room
-                )
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
 
-                acc_data = sync_config.filter_collection.filter_room_account_data(
-                    acc_data
-                )
+        mem_change_events_by_room_id = {}
+        for event in rooms_changed:
+            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
-                ephemeral = sync_config.filter_collection.filter_room_ephemeral(
-                    ephemeral_by_room.get(room_id, [])
-                )
+        newly_joined_rooms = []
+        archived = []
+        invited = []
+        for room_id, events in mem_change_events_by_room_id.items():
+            non_joins = [e for e in events if e.membership != Membership.JOIN]
+            has_join = len(non_joins) != len(events)
+
+            # We want to figure out if we joined the room at some point since
+            # the last sync (even if we have since left). This is to make sure
+            # we do send down the room, and with full state, where necessary
+            if room_id in joined_room_ids or has_join:
+                old_state = yield self.get_state_at(room_id, since_token)
+                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+                        newly_joined_rooms.append(room_id)
+
+                if room_id in joined_room_ids:
+                    continue
+
+            if not non_joins:
+                continue
 
-                room_sync = JoinedSyncResult(
-                    room_id=room_id,
-                    timeline=TimelineBatch(
-                        events=recents,
-                        prev_batch=prev_batch,
-                        limited=limited,
-                    ),
-                    state=state,
-                    ephemeral=ephemeral,
-                    account_data=acc_data,
-                    unread_notifications={},
+            # Only bother if we're still currently invited
+            should_invite = non_joins[-1].membership == Membership.INVITE
+            if should_invite:
+                room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+                if room_sync:
+                    invited.append(room_sync)
+
+            # Always include leave/ban events. Just take the last one.
+            # TODO: How do we handle ban -> leave in same batch?
+            leave_events = [
+                e for e in non_joins
+                if e.membership in (Membership.LEAVE, Membership.BAN)
+            ]
+
+            if leave_events:
+                leave_event = leave_events[-1]
+                room_sync = yield self.incremental_sync_for_archived_room(
+                    sync_config, room_id, leave_event.event_id, since_token,
+                    tags_by_room, account_data_by_room,
+                    full_state=room_id in newly_joined_rooms
                 )
-                logger.debug("Result for room %s: %r", room_id, room_sync)
-
                 if room_sync:
-                    notifs = yield self.unread_notifs_for_room_id(
-                        room_id, sync_config, all_ephemeral_by_room
-                    )
+                    archived.append(room_sync)
 
-                    if notifs is not None:
-                        notif_dict = room_sync.unread_notifications
-                        notif_dict["notification_count"] = len(notifs)
-                        notif_dict["highlight_count"] = len([
-                            1 for notif in notifs
-                            if _action_has_highlight(notif["actions"])
-                        ])
+        # Get all events for rooms we're currently joined to.
+        room_to_events = yield self.store.get_room_events_stream_for_rooms(
+            room_ids=joined_room_ids,
+            from_key=since_token.room_key,
+            to_key=now_token.room_key,
+            limit=timeline_limit + 1,
+        )
 
-                    joined.append(room_sync)
+        joined = []
+        # We loop through all room ids, even if there are no new events, in case
+        # there are non room events taht we need to notify about.
+        for room_id in joined_room_ids:
+            room_entry = room_to_events.get(room_id, None)
 
-        else:
-            logger.debug("Got %i events for incremental sync - hit limit",
-                         len(room_events))
+            if room_entry:
+                events, start_key = room_entry
 
-            invite_events = yield self.store.get_invites_for_user(
-                sync_config.user.to_string()
-            )
+                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
 
-            leave_events = yield self.store.get_leave_and_ban_events_for_user(
-                sync_config.user.to_string()
-            )
+                newly_joined_room = room_id in newly_joined_rooms
+                full_state = newly_joined_room
 
-            for room_id in joined_room_ids:
-                room_sync = yield self.incremental_sync_with_gap_for_room(
-                    room_id, sync_config, since_token, now_token,
-                    ephemeral_by_room, tags_by_room, account_data_by_room,
-                    all_ephemeral_by_room=all_ephemeral_by_room,
+                batch = yield self.load_filtered_recents(
+                    room_id, sync_config, prev_batch_token,
+                    since_token=since_token,
+                    recents=events,
+                    newly_joined_room=newly_joined_room,
                 )
-                if room_sync:
-                    joined.append(room_sync)
+            else:
+                batch = TimelineBatch(
+                    events=[],
+                    prev_batch=since_token,
+                    limited=False,
+                )
+                full_state = False
 
-        for leave_event in leave_events:
-            room_sync = yield self.incremental_sync_for_archived_room(
-                sync_config, leave_event, since_token, tags_by_room,
-                account_data_by_room
+            room_sync = yield self.incremental_sync_with_gap_for_room(
+                room_id=room_id,
+                sync_config=sync_config,
+                since_token=since_token,
+                now_token=now_token,
+                ephemeral_by_room=ephemeral_by_room,
+                tags_by_room=tags_by_room,
+                account_data_by_room=account_data_by_room,
+                batch=batch,
+                full_state=full_state,
             )
             if room_sync:
-                archived.append(room_sync)
-
-        invited = [
-            InvitedSyncResult(room_id=event.room_id, invite=event)
-            for event in invite_events
-        ]
+                joined.append(room_sync)
 
         account_data_for_user = sync_config.filter_collection.filter_account_data(
             self.account_data_for_user(account_data)
@@ -680,51 +596,73 @@ class SyncHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def load_filtered_recents(self, room_id, sync_config, now_token,
-                              since_token=None):
+                              since_token=None, recents=None, newly_joined_room=False):
         """
         :returns a Deferred TimelineBatch
         """
-        limited = True
-        recents = []
-        filtering_factor = 2
-        timeline_limit = sync_config.filter_collection.timeline_limit()
-        load_limit = max(timeline_limit * filtering_factor, 100)
-        max_repeat = 3  # Only try a few times per room, otherwise
-        room_key = now_token.room_key
-        end_key = room_key
-
-        while limited and len(recents) < timeline_limit and max_repeat:
-            events, keys = yield self.store.get_recent_events_for_room(
-                room_id,
-                limit=load_limit + 1,
-                from_token=since_token.room_key if since_token else None,
-                end_token=end_key,
-            )
-            room_key, _ = keys
-            end_key = "s" + room_key.split('-')[-1]
-            loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
-            loaded_recents = yield self._filter_events_for_client(
-                sync_config.user.to_string(),
-                loaded_recents,
-                is_peeking=sync_config.is_guest,
-            )
-            loaded_recents.extend(recents)
-            recents = loaded_recents
-            if len(events) <= load_limit:
+        with Measure(self.clock, "load_filtered_recents"):
+            filtering_factor = 2
+            timeline_limit = sync_config.filter_collection.timeline_limit()
+            load_limit = max(timeline_limit * filtering_factor, 10)
+            max_repeat = 5  # Only try a few times per room, otherwise
+            room_key = now_token.room_key
+            end_key = room_key
+
+            if recents is None or newly_joined_room or timeline_limit < len(recents):
+                limited = True
+            else:
                 limited = False
-            max_repeat -= 1
 
-        if len(recents) > timeline_limit:
-            limited = True
-            recents = recents[-timeline_limit:]
-            room_key = recents[0].internal_metadata.before
+            if recents is not None:
+                recents = sync_config.filter_collection.filter_room_timeline(recents)
+                recents = yield self._filter_events_for_client(
+                    sync_config.user.to_string(),
+                    recents,
+                    is_peeking=sync_config.is_guest,
+                )
+            else:
+                recents = []
+
+            since_key = None
+            if since_token and not newly_joined_room:
+                since_key = since_token.room_key
+
+            while limited and len(recents) < timeline_limit and max_repeat:
+                events, end_key = yield self.store.get_room_events_stream_for_room(
+                    room_id,
+                    limit=load_limit + 1,
+                    from_key=since_key,
+                    to_key=end_key,
+                )
+                loaded_recents = sync_config.filter_collection.filter_room_timeline(
+                    events
+                )
+                loaded_recents = yield self._filter_events_for_client(
+                    sync_config.user.to_string(),
+                    loaded_recents,
+                    is_peeking=sync_config.is_guest,
+                )
+                loaded_recents.extend(recents)
+                recents = loaded_recents
+
+                if len(events) <= load_limit:
+                    limited = False
+                    break
+                max_repeat -= 1
 
-        prev_batch_token = now_token.copy_and_replace(
-            "room_key", room_key
-        )
+            if len(recents) > timeline_limit:
+                limited = True
+                recents = recents[-timeline_limit:]
+                room_key = recents[0].internal_metadata.before
+
+            prev_batch_token = now_token.copy_and_replace(
+                "room_key", room_key
+            )
 
         defer.returnValue(TimelineBatch(
-            events=recents, prev_batch=prev_batch_token, limited=limited
+            events=recents,
+            prev_batch=prev_batch_token,
+            limited=limited or newly_joined_room
         ))
 
     @defer.inlineCallbacks
@@ -732,62 +670,12 @@ class SyncHandler(BaseHandler):
                                            since_token, now_token,
                                            ephemeral_by_room, tags_by_room,
                                            account_data_by_room,
-                                           all_ephemeral_by_room):
-        """ Get the incremental delta needed to bring the client up to date for
-        the room. Gives the client the most recent events and the changes to
-        state.
-        Returns:
-            A Deferred JoinedSyncResult
-        """
-        logger.debug("Doing incremental sync for room %s between %s and %s",
-                     room_id, since_token, now_token)
-
-        # TODO(mjark): Check for redactions we might have missed.
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, now_token, since_token,
-        )
-
-        logger.debug("Recents %r", batch)
-
-        if batch.limited:
-            current_state = yield self.get_state_at(room_id, now_token)
-
-            state_at_previous_sync = yield self.get_state_at(
-                room_id, stream_position=since_token
-            )
-
-            state = yield self.compute_state_delta(
-                since_token=since_token,
-                previous_state=state_at_previous_sync,
-                current_state=current_state,
-            )
-        else:
-            state = {
-                (event.type, event.state_key): event
-                for event in batch.events if event.is_state()
-            }
-
-        just_joined = yield self.check_joined_room(sync_config, state)
-        if just_joined:
-            state = yield self.get_state_at(room_id, now_token)
-
-        notifs = yield self.unread_notifs_for_room_id(
-            room_id, sync_config, all_ephemeral_by_room
+                                           batch, full_state=False):
+        state = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, now_token,
+            full_state=full_state
         )
 
-        unread_notifications = {}
-        if notifs is not None:
-            unread_notifications["notification_count"] = len(notifs)
-            unread_notifications["highlight_count"] = len([
-                1 for notif in notifs if _action_has_highlight(notif["actions"])
-            ])
-
-        state = {
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(state.values())
-        }
-
         account_data = self.account_data_for_room(
             room_id, tags_by_room, account_data_by_room
         )
@@ -800,6 +688,7 @@ class SyncHandler(BaseHandler):
             ephemeral_by_room.get(room_id, [])
         )
 
+        unread_notifications = {}
         room_sync = JoinedSyncResult(
             room_id=room_id,
             timeline=batch,
@@ -809,58 +698,53 @@ class SyncHandler(BaseHandler):
             unread_notifications=unread_notifications,
         )
 
+        if room_sync:
+            notifs = yield self.unread_notifs_for_room_id(
+                room_id, sync_config
+            )
+
+            if notifs is not None:
+                unread_notifications["notification_count"] = notifs["notify_count"]
+                unread_notifications["highlight_count"] = notifs["highlight_count"]
+
         logger.debug("Room sync: %r", room_sync)
 
         defer.returnValue(room_sync)
 
     @defer.inlineCallbacks
-    def incremental_sync_for_archived_room(self, sync_config, leave_event,
+    def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
                                            since_token, tags_by_room,
-                                           account_data_by_room):
+                                           account_data_by_room, full_state,
+                                           leave_token=None):
         """ Get the incremental delta needed to bring the client up to date for
         the archived room.
         Returns:
             A Deferred ArchivedSyncResult
         """
 
-        stream_token = yield self.store.get_stream_token_for_event(
-            leave_event.event_id
-        )
+        if not leave_token:
+            stream_token = yield self.store.get_stream_token_for_event(
+                leave_event_id
+            )
 
-        leave_token = since_token.copy_and_replace("room_key", stream_token)
+            leave_token = since_token.copy_and_replace("room_key", stream_token)
 
-        if since_token.is_after(leave_token):
+        if since_token and since_token.is_after(leave_token):
             defer.returnValue(None)
 
         batch = yield self.load_filtered_recents(
-            leave_event.room_id, sync_config, leave_token, since_token,
+            room_id, sync_config, leave_token, since_token,
         )
 
         logger.debug("Recents %r", batch)
 
-        state_events_at_leave = yield self.store.get_state_for_event(
-            leave_event.event_id
-        )
-
-        state_at_previous_sync = yield self.get_state_at(
-            leave_event.room_id, stream_position=since_token
-        )
-
         state_events_delta = yield self.compute_state_delta(
-            since_token=since_token,
-            previous_state=state_at_previous_sync,
-            current_state=state_events_at_leave,
+            room_id, batch, sync_config, since_token, leave_token,
+            full_state=full_state
         )
 
-        state_events_delta = {
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(
-                state_events_delta.values()
-            )
-        }
-
         account_data = self.account_data_for_room(
-            leave_event.room_id, tags_by_room, account_data_by_room
+            room_id, tags_by_room, account_data_by_room
         )
 
         account_data = sync_config.filter_collection.filter_room_account_data(
@@ -868,7 +752,7 @@ class SyncHandler(BaseHandler):
         )
 
         room_sync = ArchivedSyncResult(
-            room_id=leave_event.room_id,
+            room_id=room_id,
             timeline=batch,
             state=state_events_delta,
             account_data=account_data,
@@ -912,15 +796,19 @@ class SyncHandler(BaseHandler):
             state = {}
         defer.returnValue(state)
 
-    def compute_state_delta(self, since_token, previous_state, current_state):
-        """ Works out the differnce in state between the current state and the
-        state the client got when it last performed a sync.
-
-        :param str since_token: the point we are comparing against
-        :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
-            state to compare to
-        :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
-            new state
+    @defer.inlineCallbacks
+    def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
+                            full_state):
+        """ Works out the differnce in state between the start of the timeline
+        and the previous sync.
+
+        :param str room_id
+        :param TimelineBatch batch: The timeline batch for the room that will
+            be sent to the user.
+        :param sync_config
+        :param str since_token: Token of the end of the previous batch. May be None.
+        :param str now_token: Token of the end of the current batch.
+        :param bool full_state: Whether to force returning the full state.
 
         :returns A new event dictionary
         """
@@ -929,12 +817,53 @@ class SyncHandler(BaseHandler):
         # updates even if they occured logically before the previous event.
         # TODO(mjark) Check for new redactions in the state events.
 
-        state_delta = {}
-        for key, event in current_state.iteritems():
-            if (key not in previous_state or
-                    previous_state[key].event_id != event.event_id):
-                state_delta[key] = event
-        return state_delta
+        with Measure(self.clock, "compute_state_delta"):
+            if full_state:
+                if batch:
+                    state = yield self.store.get_state_for_event(
+                        batch.events[0].event_id
+                    )
+                else:
+                    state = yield self.get_state_at(
+                        room_id, stream_position=now_token
+                    )
+
+                timeline_state = {
+                    (event.type, event.state_key): event
+                    for event in batch.events if event.is_state()
+                }
+
+                state = _calculate_state(
+                    timeline_contains=timeline_state,
+                    timeline_start=state,
+                    previous={},
+                )
+            elif batch.limited:
+                state_at_previous_sync = yield self.get_state_at(
+                    room_id, stream_position=since_token
+                )
+
+                state_at_timeline_start = yield self.store.get_state_for_event(
+                    batch.events[0].event_id
+                )
+
+                timeline_state = {
+                    (event.type, event.state_key): event
+                    for event in batch.events if event.is_state()
+                }
+
+                state = _calculate_state(
+                    timeline_contains=timeline_state,
+                    timeline_start=state_at_timeline_start,
+                    previous=state_at_previous_sync,
+                )
+            else:
+                state = {}
+
+            defer.returnValue({
+                (e.type, e.state_key): e
+                for e in sync_config.filter_collection.filter_room_state(state.values())
+            })
 
     def check_joined_room(self, sync_config, state_delta):
         """
@@ -955,21 +884,24 @@ class SyncHandler(BaseHandler):
         return False
 
     @defer.inlineCallbacks
-    def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
-        last_unread_event_id = self.last_read_event_id_for_room_and_user(
-            room_id, sync_config.user.to_string(), ephemeral_by_room
-        )
-
-        notifs = []
-        if last_unread_event_id:
-            notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
-                room_id, sync_config.user.to_string(), last_unread_event_id
+    def unread_notifs_for_room_id(self, room_id, sync_config):
+        with Measure(self.clock, "unread_notifs_for_room_id"):
+            last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
+                user_id=sync_config.user.to_string(),
+                room_id=room_id,
+                receipt_type="m.read"
             )
-            defer.returnValue(notifs)
 
-        # There is no new information in this period, so your notification
-        # count is whatever it was last time.
-        defer.returnValue(None)
+            notifs = []
+            if last_unread_event_id:
+                notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+                    room_id, sync_config.user.to_string(), last_unread_event_id
+                )
+                defer.returnValue(notifs)
+
+            # There is no new information in this period, so your notification
+            # count is whatever it was last time.
+            defer.returnValue(None)
 
 
 def _action_has_highlight(actions):
@@ -981,3 +913,37 @@ def _action_has_highlight(actions):
             pass
 
     return False
+
+
+def _calculate_state(timeline_contains, timeline_start, previous):
+    """Works out what state to include in a sync response.
+
+    Args:
+        timeline_contains (dict): state in the timeline
+        timeline_start (dict): state at the start of the timeline
+        previous (dict): state at the end of the previous sync (or empty dict
+            if this is an initial sync)
+
+    Returns:
+        dict
+    """
+    event_id_to_state = {
+        e.event_id: e
+        for e in itertools.chain(
+            timeline_contains.values(),
+            previous.values(),
+            timeline_start.values(),
+        )
+    }
+
+    tc_ids = set(e.event_id for e in timeline_contains.values())
+    p_ids = set(e.event_id for e in previous.values())
+    ts_ids = set(e.event_id for e in timeline_start.values())
+
+    state_ids = (ts_ids - p_ids) - tc_ids
+
+    evs = (event_id_to_state[e] for e in state_ids)
+    return {
+        (e.type, e.state_key): e
+        for e in evs
+    }