summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/sync.py978
1 files changed, 481 insertions, 497 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9ebfccc8bf..80eccf19ae 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -194,157 +194,7 @@ class SyncHandler(object):
         Returns:
             A Deferred SyncResult.
         """
-        if since_token is None or full_state:
-            return self.full_state_sync(sync_config, since_token)
-        else:
-            return self.incremental_sync_with_gap(sync_config, since_token)
-
-    @defer.inlineCallbacks
-    def full_state_sync(self, sync_config, timeline_since_token):
-        """Get a sync for a client which is starting without any state.
-
-        If a 'message_since_token' is given, only timeline events which have
-        happened since that token will be returned.
-
-        Returns:
-            A Deferred SyncResult.
-        """
-        now_token = yield self.event_sources.get_current_token()
-
-        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token
-        )
-
-        presence_stream = self.event_sources.sources["presence"]
-        # TODO (mjark): This looks wrong, shouldn't we be getting the presence
-        # UP to the present rather than after the present?
-        pagination_config = PaginationConfig(from_token=now_token)
-        presence, _ = yield presence_stream.get_pagination_rows(
-            user=sync_config.user,
-            pagination_config=pagination_config.get_source_config("presence"),
-            key=None
-        )
-
-        membership_list = (
-            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
-        )
-
-        room_list = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=sync_config.user.to_string(),
-            membership_list=membership_list
-        )
-
-        account_data, account_data_by_room = (
-            yield self.store.get_account_data_for_user(
-                sync_config.user.to_string()
-            )
-        )
-
-        account_data['m.push_rules'] = yield self.push_rules_for_user(
-            sync_config.user
-        )
-
-        tags_by_room = yield self.store.get_tags_for_user(
-            sync_config.user.to_string()
-        )
-
-        ignored_users = account_data.get(
-            "m.ignored_user_list", {}
-        ).get("ignored_users", {}).keys()
-
-        joined = []
-        invited = []
-        archived = []
-
-        user_id = sync_config.user.to_string()
-
-        @defer.inlineCallbacks
-        def _generate_room_entry(event):
-            if event.membership == Membership.JOIN:
-                room_result = yield self.full_state_sync_for_joined_room(
-                    room_id=event.room_id,
-                    sync_config=sync_config,
-                    now_token=now_token,
-                    timeline_since_token=timeline_since_token,
-                    ephemeral_by_room=ephemeral_by_room,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                joined.append(room_result)
-            elif event.membership == Membership.INVITE:
-                if event.sender in ignored_users:
-                    return
-                invite = yield self.store.get_event(event.event_id)
-                invited.append(InvitedSyncResult(
-                    room_id=event.room_id,
-                    invite=invite,
-                ))
-            elif event.membership in (Membership.LEAVE, Membership.BAN):
-                # Always send down rooms we were banned or kicked from.
-                if not sync_config.filter_collection.include_leave:
-                    if event.membership == Membership.LEAVE:
-                        if user_id == event.sender:
-                            return
-
-                leave_token = now_token.copy_and_replace(
-                    "room_key", "s%d" % (event.stream_ordering,)
-                )
-                room_result = yield self.full_state_sync_for_archived_room(
-                    sync_config=sync_config,
-                    room_id=event.room_id,
-                    leave_event_id=event.event_id,
-                    leave_token=leave_token,
-                    timeline_since_token=timeline_since_token,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                archived.append(room_result)
-
-        yield concurrently_execute(_generate_room_entry, room_list, 10)
-
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
-            self.account_data_for_user(account_data)
-        )
-
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
-
-        defer.returnValue(SyncResult(
-            presence=presence,
-            account_data=account_data_for_user,
-            joined=joined,
-            invited=invited,
-            archived=archived,
-            next_batch=now_token,
-        ))
-
-    @defer.inlineCallbacks
-    def full_state_sync_for_joined_room(self, room_id, sync_config,
-                                        now_token, timeline_since_token,
-                                        ephemeral_by_room, tags_by_room,
-                                        account_data_by_room):
-        """Sync a room for a client which is starting without any state
-        Returns:
-            A Deferred JoinedSyncResult.
-        """
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, now_token, since_token=timeline_since_token
-        )
-
-        room_sync = yield self.incremental_sync_with_gap_for_room(
-            room_id, sync_config,
-            now_token=now_token,
-            since_token=timeline_since_token,
-            ephemeral_by_room=ephemeral_by_room,
-            tags_by_room=tags_by_room,
-            account_data_by_room=account_data_by_room,
-            batch=batch,
-            full_state=True,
-        )
-
-        defer.returnValue(room_sync)
+        return self.generate_sync_result(sync_config, since_token, full_state)
 
     @defer.inlineCallbacks
     def push_rules_for_user(self, user):
@@ -365,24 +215,6 @@ class SyncHandler(object):
 
         return account_data_events
 
-    def account_data_for_room(self, room_id, tags_by_room, account_data_by_room):
-        account_data_events = []
-        tags = tags_by_room.get(room_id)
-        if tags is not None:
-            account_data_events.append({
-                "type": "m.tag",
-                "content": {"tags": tags},
-            })
-
-        account_data = account_data_by_room.get(room_id, {})
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        return account_data_events
-
     @defer.inlineCallbacks
     def ephemeral_by_room(self, sync_config, now_token, since_token=None):
         """Get the ephemeral events for each room the user is in
@@ -445,237 +277,6 @@ class SyncHandler(object):
 
         defer.returnValue((now_token, ephemeral_by_room))
 
-    def full_state_sync_for_archived_room(self, room_id, sync_config,
-                                          leave_event_id, leave_token,
-                                          timeline_since_token, tags_by_room,
-                                          account_data_by_room):
-        """Sync a room for a client which is starting without any state
-        Returns:
-            A Deferred ArchivedSyncResult.
-        """
-
-        return self.incremental_sync_for_archived_room(
-            sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
-            account_data_by_room, full_state=True, leave_token=leave_token,
-        )
-
-    @defer.inlineCallbacks
-    def incremental_sync_with_gap(self, sync_config, since_token):
-        """ Get the incremental delta needed to bring the client up to
-        date with the server.
-        Returns:
-            A Deferred SyncResult.
-        """
-        now_token = yield self.event_sources.get_current_token()
-
-        rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-        room_ids = [room.room_id for room in rooms]
-
-        presence_source = self.event_sources.sources["presence"]
-        presence, presence_key = yield presence_source.get_new_events(
-            user=sync_config.user,
-            from_key=since_token.presence_key,
-            limit=sync_config.filter_collection.presence_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("presence_key", presence_key)
-
-        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token, since_token
-        )
-
-        app_service = yield self.store.get_app_service_by_user_id(
-            sync_config.user.to_string()
-        )
-        if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
-        else:
-            rooms = yield self.store.get_rooms_for_user(
-                sync_config.user.to_string()
-            )
-            joined_room_ids = set(r.room_id for r in rooms)
-
-        user_id = sync_config.user.to_string()
-
-        timeline_limit = sync_config.filter_collection.timeline_limit()
-
-        tags_by_room = yield self.store.get_updated_tags(
-            user_id,
-            since_token.account_data_key,
-        )
-
-        account_data, account_data_by_room = (
-            yield self.store.get_updated_account_data_for_user(
-                user_id,
-                since_token.account_data_key,
-            )
-        )
-
-        push_rules_changed = yield self.store.have_push_rules_changed_for_user(
-            user_id, int(since_token.push_rules_key)
-        )
-
-        if push_rules_changed:
-            account_data["m.push_rules"] = yield self.push_rules_for_user(
-                sync_config.user
-            )
-
-        ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
-            "m.ignored_user_list", user_id=user_id,
-        )
-
-        if ignored_account_data:
-            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
-        else:
-            ignored_users = frozenset()
-
-        # Get a list of membership change events that have happened.
-        rooms_changed = yield self.store.get_membership_changes_for_user(
-            user_id, since_token.room_key, now_token.room_key
-        )
-
-        mem_change_events_by_room_id = {}
-        for event in rooms_changed:
-            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
-
-        newly_joined_rooms = []
-        archived = []
-        invited = []
-        for room_id, events in mem_change_events_by_room_id.items():
-            non_joins = [e for e in events if e.membership != Membership.JOIN]
-            has_join = len(non_joins) != len(events)
-
-            # We want to figure out if we joined the room at some point since
-            # the last sync (even if we have since left). This is to make sure
-            # we do send down the room, and with full state, where necessary
-            if room_id in joined_room_ids or has_join:
-                old_state = yield self.get_state_at(room_id, since_token)
-                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
-                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
-                        newly_joined_rooms.append(room_id)
-
-                if room_id in joined_room_ids:
-                    continue
-
-            if not non_joins:
-                continue
-
-            # Only bother if we're still currently invited
-            should_invite = non_joins[-1].membership == Membership.INVITE
-            if should_invite:
-                if event.sender not in ignored_users:
-                    room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
-                    if room_sync:
-                        invited.append(room_sync)
-
-            # Always include leave/ban events. Just take the last one.
-            # TODO: How do we handle ban -> leave in same batch?
-            leave_events = [
-                e for e in non_joins
-                if e.membership in (Membership.LEAVE, Membership.BAN)
-            ]
-
-            if leave_events:
-                leave_event = leave_events[-1]
-                room_sync = yield self.incremental_sync_for_archived_room(
-                    sync_config, room_id, leave_event.event_id, since_token,
-                    tags_by_room, account_data_by_room,
-                    full_state=room_id in newly_joined_rooms
-                )
-                if room_sync:
-                    archived.append(room_sync)
-
-        # Get all events for rooms we're currently joined to.
-        room_to_events = yield self.store.get_room_events_stream_for_rooms(
-            room_ids=joined_room_ids,
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
-            limit=timeline_limit + 1,
-        )
-
-        joined = []
-        # We loop through all room ids, even if there are no new events, in case
-        # there are non room events taht we need to notify about.
-        for room_id in joined_room_ids:
-            room_entry = room_to_events.get(room_id, None)
-
-            if room_entry:
-                events, start_key = room_entry
-
-                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
-
-                newly_joined_room = room_id in newly_joined_rooms
-                full_state = newly_joined_room
-
-                batch = yield self.load_filtered_recents(
-                    room_id, sync_config, prev_batch_token,
-                    since_token=since_token,
-                    recents=events,
-                    newly_joined_room=newly_joined_room,
-                )
-            else:
-                batch = TimelineBatch(
-                    events=[],
-                    prev_batch=since_token,
-                    limited=False,
-                )
-                full_state = False
-
-            room_sync = yield self.incremental_sync_with_gap_for_room(
-                room_id=room_id,
-                sync_config=sync_config,
-                since_token=since_token,
-                now_token=now_token,
-                ephemeral_by_room=ephemeral_by_room,
-                tags_by_room=tags_by_room,
-                account_data_by_room=account_data_by_room,
-                batch=batch,
-                full_state=full_state,
-            )
-            if room_sync:
-                joined.append(room_sync)
-
-        # For each newly joined room, we want to send down presence of
-        # existing users.
-        presence_handler = self.presence_handler
-        extra_presence_users = set()
-        for room_id in newly_joined_rooms:
-            users = yield self.store.get_users_in_room(event.room_id)
-            extra_presence_users.update(users)
-
-        # For each new member, send down presence.
-        for joined_sync in joined:
-            it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values())
-            for event in it:
-                if event.type == EventTypes.Member:
-                    if event.membership == Membership.JOIN:
-                        extra_presence_users.add(event.state_key)
-
-        states = yield presence_handler.get_states(
-            [u for u in extra_presence_users if u != user_id],
-            as_event=True,
-        )
-        presence.extend(states)
-
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
-            self.account_data_for_user(account_data)
-        )
-
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
-
-        defer.returnValue(SyncResult(
-            presence=presence,
-            account_data=account_data_for_user,
-            joined=joined,
-            invited=invited,
-            archived=archived,
-            next_batch=now_token,
-        ))
-
     @defer.inlineCallbacks
     def load_filtered_recents(self, room_id, sync_config, now_token,
                               since_token=None, recents=None, newly_joined_room=False):
@@ -696,6 +297,10 @@ class SyncHandler(object):
             else:
                 limited = False
 
+            if since_token:
+                if not now_token.is_after(since_token):
+                    limited = False
+
             if recents is not None:
                 recents = sync_config.filter_collection.filter_room_timeline(recents)
                 recents = yield filter_events_for_client(
@@ -749,103 +354,6 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def incremental_sync_with_gap_for_room(self, room_id, sync_config,
-                                           since_token, now_token,
-                                           ephemeral_by_room, tags_by_room,
-                                           account_data_by_room,
-                                           batch, full_state=False):
-        state = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, now_token,
-            full_state=full_state
-        )
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        ephemeral = sync_config.filter_collection.filter_room_ephemeral(
-            ephemeral_by_room.get(room_id, [])
-        )
-
-        unread_notifications = {}
-        room_sync = JoinedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=state,
-            ephemeral=ephemeral,
-            account_data=account_data,
-            unread_notifications=unread_notifications,
-        )
-
-        if room_sync:
-            notifs = yield self.unread_notifs_for_room_id(
-                room_id, sync_config
-            )
-
-            if notifs is not None:
-                unread_notifications["notification_count"] = notifs["notify_count"]
-                unread_notifications["highlight_count"] = notifs["highlight_count"]
-
-        logger.debug("Room sync: %r", room_sync)
-
-        defer.returnValue(room_sync)
-
-    @defer.inlineCallbacks
-    def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
-                                           since_token, tags_by_room,
-                                           account_data_by_room, full_state,
-                                           leave_token=None):
-        """ Get the incremental delta needed to bring the client up to date for
-        the archived room.
-        Returns:
-            A Deferred ArchivedSyncResult
-        """
-
-        if not leave_token:
-            stream_token = yield self.store.get_stream_token_for_event(
-                leave_event_id
-            )
-
-            leave_token = since_token.copy_and_replace("room_key", stream_token)
-
-        if since_token and since_token.is_after(leave_token):
-            defer.returnValue(None)
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, leave_token, since_token,
-        )
-
-        logger.debug("Recents %r", batch)
-
-        state_events_delta = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, leave_token,
-            full_state=full_state
-        )
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        room_sync = ArchivedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=state_events_delta,
-            account_data=account_data,
-        )
-
-        logger.debug("Room sync: %r", room_sync)
-
-        defer.returnValue(room_sync)
-
-    @defer.inlineCallbacks
     def get_state_after_event(self, event):
         """
         Get the room state after the given event
@@ -1010,6 +518,457 @@ class SyncHandler(object):
             # count is whatever it was last time.
             defer.returnValue(None)
 
+    @defer.inlineCallbacks
+    def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+        now_token = yield self.event_sources.get_current_token()
+
+        sync_result_builer = SyncResultBuilder(
+            sync_config, full_state,
+            since_token=since_token,
+            now_token=now_token,
+        )
+
+        account_data_by_room = yield self.generate_sync_entry_for_account_data(
+            sync_result_builer
+        )
+
+        newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms(
+            sync_result_builer, account_data_by_room
+        )
+
+        yield self.generate_sync_entry_for_presence(
+            sync_result_builer, newly_joined_rooms, newly_joined_users
+        )
+
+        defer.returnValue(SyncResult(
+            presence=sync_result_builer.presence,
+            account_data=sync_result_builer.account_data,
+            joined=sync_result_builer.joined,
+            invited=sync_result_builer.invited,
+            archived=sync_result_builer.archived,
+            next_batch=sync_result_builer.now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def generate_sync_entry_for_account_data(self, sync_result_builer):
+        sync_config = sync_result_builer.sync_config
+        user_id = sync_result_builer.sync_config.user.to_string()
+        since_token = sync_result_builer.since_token
+
+        if since_token and not sync_result_builer.full_state:
+            account_data, account_data_by_room = (
+                yield self.store.get_updated_account_data_for_user(
+                    user_id,
+                    since_token.account_data_key,
+                )
+            )
+
+            push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+                user_id, int(since_token.push_rules_key)
+            )
+
+            if push_rules_changed:
+                account_data["m.push_rules"] = yield self.push_rules_for_user(
+                    sync_config.user
+                )
+        else:
+            account_data, account_data_by_room = (
+                yield self.store.get_account_data_for_user(
+                    sync_config.user.to_string()
+                )
+            )
+
+            account_data['m.push_rules'] = yield self.push_rules_for_user(
+                sync_config.user
+            )
+
+        account_data_for_user = sync_config.filter_collection.filter_account_data(
+            self.account_data_for_user(account_data)
+        )
+
+        sync_result_builer.account_data = account_data_for_user
+
+        defer.returnValue(account_data_by_room)
+
+    @defer.inlineCallbacks
+    def generate_sync_entry_for_presence(self, sync_result_builer, newly_joined_rooms,
+                                         newly_joined_users):
+        now_token = sync_result_builer.now_token
+        sync_config = sync_result_builer.sync_config
+        user = sync_result_builer.sync_config.user
+
+        presence_source = self.event_sources.sources["presence"]
+
+        since_token = sync_result_builer.since_token
+        if since_token and not sync_result_builer.full_state:
+            presence_key = since_token.presence_key
+        else:
+            presence_key = None
+
+        presence, presence_key = yield presence_source.get_new_events(
+            user=user,
+            from_key=presence_key,
+            is_guest=sync_config.is_guest,
+        )
+        sync_result_builer.now_token = now_token.copy_and_replace(
+            "presence_key", presence_key
+        )
+
+        extra_users_ids = set(newly_joined_users)
+        for room_id in newly_joined_rooms:
+            users = yield self.store.get_users_in_room(room_id)
+            extra_users_ids.update(users)
+        extra_users_ids.discard(user.to_string())
+
+        states = yield self.presence_handler.get_states(
+            extra_users_ids,
+            as_event=True,
+        )
+        presence.extend(states)
+
+        presence = sync_config.filter_collection.filter_presence(
+            presence
+        )
+
+        sync_result_builer.presence = presence
+
+    @defer.inlineCallbacks
+    def generate_sync_entry_for_rooms(self, sync_result_builer, account_data_by_room):
+        user_id = sync_result_builer.sync_config.user.to_string()
+
+        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+            sync_result_builer.sync_config, sync_result_builer.now_token
+        )
+        sync_result_builer.now_token = now_token
+
+        ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+            "m.ignored_user_list", user_id=user_id,
+        )
+
+        if ignored_account_data:
+            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
+        else:
+            ignored_users = frozenset()
+
+        if sync_result_builer.since_token:
+            res = yield self._get_rooms_changed(sync_result_builer, ignored_users)
+            joined, invited, archived, newly_joined_rooms = res
+
+            tags_by_room = yield self.store.get_updated_tags(
+                user_id,
+                sync_result_builer.since_token.account_data_key,
+            )
+        else:
+            res = yield self._get_all_rooms(sync_result_builer, ignored_users)
+            joined, invited, archived, newly_joined_rooms = res
+
+            tags_by_room = yield self.store.get_tags_for_user(user_id)
+
+        for room_entry in joined:
+            yield self._generate_room_entry(
+                "joined",
+                sync_result_builer,
+                ignored_users,
+                room_entry,
+                ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+                tags=tags_by_room.get(room_entry.room_id),
+                account_data=account_data_by_room.get(room_entry.room_id, {}),
+                always_include=sync_result_builer.full_state,
+            )
+        for room_entry in archived:
+            yield self._generate_room_entry(
+                "archived",
+                sync_result_builer,
+                ignored_users,
+                room_entry,
+                ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+                tags=tags_by_room.get(room_entry.room_id),
+                account_data=account_data_by_room.get(room_entry.room_id, {}),
+                always_include=sync_result_builer.full_state,
+            )
+
+        sync_result_builer.invited.extend(invited)
+
+        # Now we want to get any newly joined users
+        newly_joined_users = set()
+        for joined_sync in sync_result_builer.joined:
+            it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values())
+            for event in it:
+                if event.type == EventTypes.Member:
+                    if event.membership == Membership.JOIN:
+                        newly_joined_users.add(event.state_key)
+
+        defer.returnValue((newly_joined_rooms, newly_joined_users))
+
+    @defer.inlineCallbacks
+    def _get_rooms_changed(self, sync_result_builer, ignored_users):
+        user_id = sync_result_builer.sync_config.user.to_string()
+        since_token = sync_result_builer.since_token
+        now_token = sync_result_builer.now_token
+        sync_config = sync_result_builer.sync_config
+
+        assert since_token
+
+        app_service = yield self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+            joined_room_ids = set(r.room_id for r in rooms)
+        else:
+            rooms = yield self.store.get_rooms_for_user(user_id)
+            joined_room_ids = set(r.room_id for r in rooms)
+
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
+
+        mem_change_events_by_room_id = {}
+        for event in rooms_changed:
+            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
+
+        newly_joined_rooms = []
+        archived = []
+        invited = []
+        for room_id, events in mem_change_events_by_room_id.items():
+            non_joins = [e for e in events if e.membership != Membership.JOIN]
+            has_join = len(non_joins) != len(events)
+
+            # We want to figure out if we joined the room at some point since
+            # the last sync (even if we have since left). This is to make sure
+            # we do send down the room, and with full state, where necessary
+            if room_id in joined_room_ids or has_join:
+                old_state = yield self.get_state_at(room_id, since_token)
+                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+                    newly_joined_rooms.append(room_id)
+
+                if room_id in joined_room_ids:
+                    continue
+
+            if not non_joins:
+                continue
+
+            # Only bother if we're still currently invited
+            should_invite = non_joins[-1].membership == Membership.INVITE
+            if should_invite:
+                if event.sender not in ignored_users:
+                    room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+                    if room_sync:
+                        invited.append(room_sync)
+
+            # Always include leave/ban events. Just take the last one.
+            # TODO: How do we handle ban -> leave in same batch?
+            leave_events = [
+                e for e in non_joins
+                if e.membership in (Membership.LEAVE, Membership.BAN)
+            ]
+
+            if leave_events:
+                leave_event = leave_events[-1]
+                leave_stream_token = yield self.store.get_stream_token_for_event(
+                    leave_event.event_id
+                )
+                leave_token = since_token.copy_and_replace(
+                    "room_key", leave_stream_token
+                )
+
+                if since_token and since_token.is_after(leave_token):
+                    continue
+
+                archived.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    events=None,
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=since_token,
+                    upto_token=leave_token,
+                ))
+
+        timeline_limit = sync_config.filter_collection.timeline_limit()
+
+        # Get all events for rooms we're currently joined to.
+        room_to_events = yield self.store.get_room_events_stream_for_rooms(
+            room_ids=joined_room_ids,
+            from_key=since_token.room_key,
+            to_key=now_token.room_key,
+            limit=timeline_limit + 1,
+        )
+
+        joined = []
+        # We loop through all room ids, even if there are no new events, in case
+        # there are non room events taht we need to notify about.
+        for room_id in joined_room_ids:
+            room_entry = room_to_events.get(room_id, None)
+
+            if room_entry:
+                events, start_key = room_entry
+
+                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+
+                joined.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    events=events,
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=None if room_id in newly_joined_rooms else since_token,
+                    upto_token=prev_batch_token,
+                ))
+            else:
+                joined.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    events=[],
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=since_token,
+                    upto_token=since_token,
+                ))
+
+        defer.returnValue((joined, invited, archived, newly_joined_rooms))
+
+    @defer.inlineCallbacks
+    def _get_all_rooms(self, sync_result_builer, ignored_users):
+        user_id = sync_result_builer.sync_config.user.to_string()
+        since_token = sync_result_builer.since_token
+        now_token = sync_result_builer.now_token
+        sync_config = sync_result_builer.sync_config
+
+        membership_list = (
+            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+        )
+
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=user_id,
+            membership_list=membership_list
+        )
+
+        joined = []
+        invited = []
+        archived = []
+
+        for event in room_list:
+            if event.membership == Membership.JOIN:
+                joined.append(RoomSyncResultBuilder(
+                    room_id=event.room_id,
+                    events=None,
+                    newly_joined=False,
+                    full_state=True,
+                    since_token=since_token,
+                    upto_token=now_token,
+                ))
+            elif event.membership == Membership.INVITE:
+                if event.sender in ignored_users:
+                    continue
+                invite = yield self.store.get_event(event.event_id)
+                invited.append(InvitedSyncResult(
+                    room_id=event.room_id,
+                    invite=invite,
+                ))
+            elif event.membership in (Membership.LEAVE, Membership.BAN):
+                # Always send down rooms we were banned or kicked from.
+                if not sync_config.filter_collection.include_leave:
+                    if event.membership == Membership.LEAVE:
+                        if user_id == event.sender:
+                            continue
+
+                leave_token = now_token.copy_and_replace(
+                    "room_key", "s%d" % (event.stream_ordering,)
+                )
+                archived.append(RoomSyncResultBuilder(
+                    room_id=event.room_id,
+                    events=None,
+                    newly_joined=False,
+                    full_state=True,
+                    since_token=since_token,
+                    upto_token=leave_token,
+                ))
+
+        defer.returnValue((joined, invited, archived, []))
+
+    @defer.inlineCallbacks
+    def _generate_room_entry(self, room_type, sync_result_builer, ignored_users,
+                             room_builder, ephemeral, tags, account_data,
+                             always_include=False):
+        since_token = sync_result_builer.since_token
+        now_token = sync_result_builer.now_token
+        sync_config = sync_result_builer.sync_config
+
+        room_id = room_builder.room_id
+        events = room_builder.events
+        newly_joined = room_builder.newly_joined
+        full_state = (
+            room_builder.full_state
+            or newly_joined
+            or sync_result_builer.full_state
+        )
+        since_token = room_builder.since_token
+        upto_token = room_builder.upto_token
+
+        batch = yield self.load_filtered_recents(
+            room_id, sync_config,
+            now_token=upto_token,
+            since_token=since_token,
+            recents=events,
+            newly_joined_room=newly_joined,  # FIXME
+        )
+
+        account_data_events = []
+        if tags is not None:
+            account_data_events.append({
+                "type": "m.tag",
+                "content": {"tags": tags},
+            })
+
+        for account_data_type, content in account_data.items():
+            account_data_events.append({
+                "type": account_data_type,
+                "content": content,
+            })
+
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data_events
+        )
+
+        ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+
+        if not (always_include or batch or account_data or ephemeral or full_state):
+            return
+
+        state = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, now_token,
+            full_state=full_state
+        )
+
+        if room_type == "joined":
+            unread_notifications = {}
+            room_sync = JoinedSyncResult(
+                room_id=room_id,
+                timeline=batch,
+                state=state,
+                ephemeral=ephemeral,
+                account_data=account_data_events,
+                unread_notifications=unread_notifications,
+            )
+
+            if room_sync or always_include:
+                notifs = yield self.unread_notifs_for_room_id(
+                    room_id, sync_config
+                )
+
+                if notifs is not None:
+                    unread_notifications["notification_count"] = notifs["notify_count"]
+                    unread_notifications["highlight_count"] = notifs["highlight_count"]
+
+                sync_result_builer.joined.append(room_sync)
+        elif room_type == "archived":
+            room_sync = ArchivedSyncResult(
+                room_id=room_id,
+                timeline=batch,
+                state=state,
+                account_data=account_data,
+            )
+            if room_sync or always_include:
+                sync_result_builer.archived.append(room_sync)
+
 
 def _action_has_highlight(actions):
     for action in actions:
@@ -1057,3 +1016,28 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         (e.type, e.state_key): e
         for e in evs
     }
+
+
+class SyncResultBuilder(object):
+    def __init__(self, sync_config, full_state, since_token, now_token):
+        self.sync_config = sync_config
+        self.full_state = full_state
+        self.since_token = since_token
+        self.now_token = now_token
+
+        self.presence = []
+        self.account_data = []
+        self.joined = []
+        self.invited = []
+        self.archived = []
+
+
+class RoomSyncResultBuilder(object):
+    def __init__(self, room_id, events, newly_joined, full_state, since_token,
+                 upto_token):
+        self.room_id = room_id
+        self.events = events
+        self.newly_joined = newly_joined
+        self.full_state = full_state
+        self.since_token = since_token
+        self.upto_token = upto_token