summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2020-02-04 12:07:05 +0000
committerRichard van der Hoff <richard@matrix.org>2020-02-04 12:07:05 +0000
commit5ef91b96f1cadbdf981f4e67c2dc97d0bb290d6c (patch)
tree47c7979d13fe0f1f9dc86af39065830a0289fbcc /synapse/handlers/sync.py
parentmake FederationClient.send_invite async (diff)
parentMerge pull request #6837 from matrix-org/rav/federation_async (diff)
downloadsynapse-5ef91b96f1cadbdf981f4e67c2dc97d0bb290d6c.tar.xz
Merge remote-tracking branch 'origin/develop' into rav/federation_client_async
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py705
1 files changed, 364 insertions, 341 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index cd95f85e3f..5f060241b4 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -14,20 +14,30 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import collections
 import itertools
 import logging
+from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
 
 from six import iteritems, itervalues
 
+import attr
 from prometheus_client import Counter
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.api.filtering import FilterCollection
+from synapse.events import EventBase
 from synapse.logging.context import LoggingContext
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.roommember import MemberSummary
 from synapse.storage.state import StateFilter
-from synapse.types import RoomStreamToken
+from synapse.types import (
+    Collection,
+    JsonDict,
+    RoomStreamToken,
+    StateMap,
+    StreamToken,
+    UserID,
+)
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.caches.lrucache import LruCache
@@ -62,17 +72,22 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
 LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
 
 
-SyncConfig = collections.namedtuple(
-    "SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"]
-)
+@attr.s(slots=True, frozen=True)
+class SyncConfig:
+    user = attr.ib(type=UserID)
+    filter_collection = attr.ib(type=FilterCollection)
+    is_guest = attr.ib(type=bool)
+    request_key = attr.ib(type=Tuple[Any, ...])
+    device_id = attr.ib(type=str)
 
 
-class TimelineBatch(
-    collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"])
-):
-    __slots__ = []
+@attr.s(slots=True, frozen=True)
+class TimelineBatch:
+    prev_batch = attr.ib(type=StreamToken)
+    events = attr.ib(type=List[EventBase])
+    limited = attr.ib(bool)
 
-    def __nonzero__(self):
+    def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
         to tell if room needs to be part of the sync result.
         """
@@ -81,23 +96,17 @@ class TimelineBatch(
     __bool__ = __nonzero__  # python3
 
 
-class JoinedSyncResult(
-    collections.namedtuple(
-        "JoinedSyncResult",
-        [
-            "room_id",  # str
-            "timeline",  # TimelineBatch
-            "state",  # dict[(str, str), FrozenEvent]
-            "ephemeral",
-            "account_data",
-            "unread_notifications",
-            "summary",
-        ],
-    )
-):
-    __slots__ = []
-
-    def __nonzero__(self):
+@attr.s(slots=True, frozen=True)
+class JoinedSyncResult:
+    room_id = attr.ib(type=str)
+    timeline = attr.ib(type=TimelineBatch)
+    state = attr.ib(type=StateMap[EventBase])
+    ephemeral = attr.ib(type=List[JsonDict])
+    account_data = attr.ib(type=List[JsonDict])
+    unread_notifications = attr.ib(type=JsonDict)
+    summary = attr.ib(type=Optional[JsonDict])
+
+    def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
         to tell if room needs to be part of the sync result.
         """
@@ -113,20 +122,14 @@ class JoinedSyncResult(
     __bool__ = __nonzero__  # python3
 
 
-class ArchivedSyncResult(
-    collections.namedtuple(
-        "ArchivedSyncResult",
-        [
-            "room_id",  # str
-            "timeline",  # TimelineBatch
-            "state",  # dict[(str, str), FrozenEvent]
-            "account_data",
-        ],
-    )
-):
-    __slots__ = []
-
-    def __nonzero__(self):
+@attr.s(slots=True, frozen=True)
+class ArchivedSyncResult:
+    room_id = attr.ib(type=str)
+    timeline = attr.ib(type=TimelineBatch)
+    state = attr.ib(type=StateMap[EventBase])
+    account_data = attr.ib(type=List[JsonDict])
+
+    def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
         to tell if room needs to be part of the sync result.
         """
@@ -135,70 +138,88 @@ class ArchivedSyncResult(
     __bool__ = __nonzero__  # python3
 
 
-class InvitedSyncResult(
-    collections.namedtuple(
-        "InvitedSyncResult",
-        ["room_id", "invite"],  # str  # FrozenEvent: the invite event
-    )
-):
-    __slots__ = []
+@attr.s(slots=True, frozen=True)
+class InvitedSyncResult:
+    room_id = attr.ib(type=str)
+    invite = attr.ib(type=EventBase)
 
-    def __nonzero__(self):
+    def __nonzero__(self) -> bool:
         """Invited rooms should always be reported to the client"""
         return True
 
     __bool__ = __nonzero__  # python3
 
 
-class GroupsSyncResult(
-    collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"])
-):
-    __slots__ = []
+@attr.s(slots=True, frozen=True)
+class GroupsSyncResult:
+    join = attr.ib(type=JsonDict)
+    invite = attr.ib(type=JsonDict)
+    leave = attr.ib(type=JsonDict)
 
-    def __nonzero__(self):
+    def __nonzero__(self) -> bool:
         return bool(self.join or self.invite or self.leave)
 
     __bool__ = __nonzero__  # python3
 
 
-class DeviceLists(
-    collections.namedtuple(
-        "DeviceLists",
-        [
-            "changed",  # list of user_ids whose devices may have changed
-            "left",  # list of user_ids whose devices we no longer track
-        ],
-    )
-):
-    __slots__ = []
+@attr.s(slots=True, frozen=True)
+class DeviceLists:
+    """
+    Attributes:
+        changed: List of user_ids whose devices may have changed
+        left: List of user_ids whose devices we no longer track
+    """
+
+    changed = attr.ib(type=Collection[str])
+    left = attr.ib(type=Collection[str])
 
-    def __nonzero__(self):
+    def __nonzero__(self) -> bool:
         return bool(self.changed or self.left)
 
     __bool__ = __nonzero__  # python3
 
 
-class SyncResult(
-    collections.namedtuple(
-        "SyncResult",
-        [
-            "next_batch",  # Token for the next sync
-            "presence",  # List of presence events for the user.
-            "account_data",  # List of account_data events for the user.
-            "joined",  # JoinedSyncResult for each joined room.
-            "invited",  # InvitedSyncResult for each invited room.
-            "archived",  # ArchivedSyncResult for each archived room.
-            "to_device",  # List of direct messages for the device.
-            "device_lists",  # List of user_ids whose devices have changed
-            "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
-            # for this device
-            "groups",
-        ],
-    )
-):
-    __slots__ = []
-
-    def __nonzero__(self):
+@attr.s
+class _RoomChanges:
+    """The set of room entries to include in the sync, plus the set of joined
+    and left room IDs since last sync.
+    """
+
+    room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
+    invited = attr.ib(type=List[InvitedSyncResult])
+    newly_joined_rooms = attr.ib(type=List[str])
+    newly_left_rooms = attr.ib(type=List[str])
+
+
+@attr.s(slots=True, frozen=True)
+class SyncResult:
+    """
+    Attributes:
+        next_batch: Token for the next sync
+        presence: List of presence events for the user.
+        account_data: List of account_data events for the user.
+        joined: JoinedSyncResult for each joined room.
+        invited: InvitedSyncResult for each invited room.
+        archived: ArchivedSyncResult for each archived room.
+        to_device: List of direct messages for the device.
+        device_lists: List of user_ids whose devices have changed
+        device_one_time_keys_count: Dict of algorithm to count for one time keys
+            for this device
+        groups: Group updates, if any
+    """
+
+    next_batch = attr.ib(type=StreamToken)
+    presence = attr.ib(type=List[JsonDict])
+    account_data = attr.ib(type=List[JsonDict])
+    joined = attr.ib(type=List[JoinedSyncResult])
+    invited = attr.ib(type=List[InvitedSyncResult])
+    archived = attr.ib(type=List[ArchivedSyncResult])
+    to_device = attr.ib(type=List[JsonDict])
+    device_lists = attr.ib(type=DeviceLists)
+    device_one_time_keys_count = attr.ib(type=JsonDict)
+    groups = attr.ib(type=Optional[GroupsSyncResult])
+
+    def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
         to tell if the notifier needs to wait for more events when polling for
         events.
@@ -240,13 +261,15 @@ class SyncHandler(object):
         )
 
     async def wait_for_sync_for_user(
-        self, sync_config, since_token=None, timeout=0, full_state=False
-    ):
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+        timeout: int = 0,
+        full_state: bool = False,
+    ) -> SyncResult:
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
-        Returns:
-            Deferred[SyncResult]
         """
         # If the user is not part of the mau group, then check that limits have
         # not been exceeded (if not part of the group by this point, almost certain
@@ -265,8 +288,12 @@ class SyncHandler(object):
         return res
 
     async def _wait_for_sync_for_user(
-        self, sync_config, since_token, timeout, full_state
-    ):
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+        timeout: int = 0,
+        full_state: bool = False,
+    ) -> SyncResult:
         if since_token is None:
             sync_type = "initial_sync"
         elif full_state:
@@ -305,25 +332,33 @@ class SyncHandler(object):
 
         return result
 
-    def current_sync_for_user(self, sync_config, since_token=None, full_state=False):
+    async def current_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+        full_state: bool = False,
+    ) -> SyncResult:
         """Get the sync for client needed to match what the server has now.
-        Returns:
-            A Deferred SyncResult.
         """
-        return self.generate_sync_result(sync_config, since_token, full_state)
+        return await self.generate_sync_result(sync_config, since_token, full_state)
 
-    async def push_rules_for_user(self, user):
+    async def push_rules_for_user(self, user: UserID) -> JsonDict:
         user_id = user.to_string()
         rules = await self.store.get_push_rules_for_user(user_id)
         rules = format_push_rules_for_user(user, rules)
         return rules
 
-    async def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
+    async def ephemeral_by_room(
+        self,
+        sync_result_builder: "SyncResultBuilder",
+        now_token: StreamToken,
+        since_token: Optional[StreamToken] = None,
+    ) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]:
         """Get the ephemeral events for each room the user is in
         Args:
-            sync_result_builder(SyncResultBuilder)
-            now_token (StreamToken): Where the server is currently up to.
-            since_token (StreamToken): Where the server was when the client
+            sync_result_builder
+            now_token: Where the server is currently up to.
+            since_token: Where the server was when the client
                 last synced.
         Returns:
             A tuple of the now StreamToken, updated to reflect the which typing
@@ -348,7 +383,7 @@ class SyncHandler(object):
             )
             now_token = now_token.copy_and_replace("typing_key", typing_key)
 
-            ephemeral_by_room = {}
+            ephemeral_by_room = {}  # type: JsonDict
 
             for event in typing:
                 # we want to exclude the room_id from the event, but modifying the
@@ -380,13 +415,13 @@ class SyncHandler(object):
 
     async def _load_filtered_recents(
         self,
-        room_id,
-        sync_config,
-        now_token,
-        since_token=None,
-        recents=None,
-        newly_joined_room=False,
-    ):
+        room_id: str,
+        sync_config: SyncConfig,
+        now_token: StreamToken,
+        since_token: Optional[StreamToken] = None,
+        potential_recents: Optional[List[EventBase]] = None,
+        newly_joined_room: bool = False,
+    ) -> TimelineBatch:
         """
         Returns:
             a Deferred TimelineBatch
@@ -397,21 +432,29 @@ class SyncHandler(object):
                 sync_config.filter_collection.blocks_all_room_timeline()
             )
 
-            if recents is None or newly_joined_room or timeline_limit < len(recents):
+            if (
+                potential_recents is None
+                or newly_joined_room
+                or timeline_limit < len(potential_recents)
+            ):
                 limited = True
             else:
                 limited = False
 
-            if recents:
-                recents = sync_config.filter_collection.filter_room_timeline(recents)
+            if potential_recents:
+                recents = sync_config.filter_collection.filter_room_timeline(
+                    potential_recents
+                )
 
                 # We check if there are any state events, if there are then we pass
                 # all current state events to the filter_events function. This is to
                 # ensure that we always include current state in the timeline
-                current_state_ids = frozenset()
+                current_state_ids = frozenset()  # type: FrozenSet[str]
                 if any(e.is_state() for e in recents):
-                    current_state_ids = await self.state.get_current_state_ids(room_id)
-                    current_state_ids = frozenset(itervalues(current_state_ids))
+                    current_state_ids_map = await self.state.get_current_state_ids(
+                        room_id
+                    )
+                    current_state_ids = frozenset(itervalues(current_state_ids_map))
 
                 recents = await filter_events_for_client(
                     self.storage,
@@ -463,8 +506,10 @@ class SyncHandler(object):
                 # ensure that we always include current state in the timeline
                 current_state_ids = frozenset()
                 if any(e.is_state() for e in loaded_recents):
-                    current_state_ids = await self.state.get_current_state_ids(room_id)
-                    current_state_ids = frozenset(itervalues(current_state_ids))
+                    current_state_ids_map = await self.state.get_current_state_ids(
+                        room_id
+                    )
+                    current_state_ids = frozenset(itervalues(current_state_ids_map))
 
                 loaded_recents = await filter_events_for_client(
                     self.storage,
@@ -493,17 +538,15 @@ class SyncHandler(object):
             limited=limited or newly_joined_room,
         )
 
-    async def get_state_after_event(self, event, state_filter=StateFilter.all()):
+    async def get_state_after_event(
+        self, event: EventBase, state_filter: StateFilter = StateFilter.all()
+    ) -> StateMap[str]:
         """
         Get the room state after the given event
 
         Args:
-            event(synapse.events.EventBase): event of interest
-            state_filter (StateFilter): The state filter used to fetch state
-                from the database.
-
-        Returns:
-            A Deferred map from ((type, state_key)->Event)
+            event: event of interest
+            state_filter: The state filter used to fetch state from the database.
         """
         state_ids = await self.state_store.get_state_ids_for_event(
             event.event_id, state_filter=state_filter
@@ -514,18 +557,17 @@ class SyncHandler(object):
         return state_ids
 
     async def get_state_at(
-        self, room_id, stream_position, state_filter=StateFilter.all()
-    ):
+        self,
+        room_id: str,
+        stream_position: StreamToken,
+        state_filter: StateFilter = StateFilter.all(),
+    ) -> StateMap[str]:
         """ Get the room state at a particular stream position
 
         Args:
-            room_id(str): room for which to get state
-            stream_position(StreamToken): point at which to get state
-            state_filter (StateFilter): The state filter used to fetch state
-                from the database.
-
-        Returns:
-            A Deferred map from ((type, state_key)->Event)
+            room_id: room for which to get state
+            stream_position: point at which to get state
+            state_filter: The state filter used to fetch state from the database.
         """
         # FIXME this claims to get the state at a stream position, but
         # get_recent_events_for_room operates by topo ordering. This therefore
@@ -546,23 +588,25 @@ class SyncHandler(object):
             state = {}
         return state
 
-    async def compute_summary(self, room_id, sync_config, batch, state, now_token):
+    async def compute_summary(
+        self,
+        room_id: str,
+        sync_config: SyncConfig,
+        batch: TimelineBatch,
+        state: StateMap[EventBase],
+        now_token: StreamToken,
+    ) -> Optional[JsonDict]:
         """ Works out a room summary block for this room, summarising the number
         of joined members in the room, and providing the 'hero' members if the
         room has no name so clients can consistently name rooms.  Also adds
         state events to 'state' if needed to describe the heroes.
 
-        Args:
-            room_id(str):
-            sync_config(synapse.handlers.sync.SyncConfig):
-            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
-                the room that will be sent to the user.
-            state(dict): dict of (type, state_key) -> Event as returned by
-                compute_state_delta
-            now_token(str): Token of the end of the current batch.
-
-        Returns:
-             A deferred dict describing the room summary
+        Args
+            room_id
+            sync_config
+            batch: The timeline batch for the room that will be sent to the user.
+            state: State as returned by compute_state_delta
+            now_token: Token of the end of the current batch.
         """
 
         # FIXME: we could/should get this from room_stats when matthew/stats lands
@@ -681,7 +725,7 @@ class SyncHandler(object):
 
         return summary
 
-    def get_lazy_loaded_members_cache(self, cache_key):
+    def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache:
         cache = self.lazy_loaded_members_cache.get(cache_key)
         if cache is None:
             logger.debug("creating LruCache for %r", cache_key)
@@ -692,23 +736,24 @@ class SyncHandler(object):
         return cache
 
     async def compute_state_delta(
-        self, room_id, batch, sync_config, since_token, now_token, full_state
-    ):
+        self,
+        room_id: str,
+        batch: TimelineBatch,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken],
+        now_token: StreamToken,
+        full_state: bool,
+    ) -> StateMap[EventBase]:
         """ Works out the difference in state between the start of the timeline
         and the previous sync.
 
         Args:
-            room_id(str):
-            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
-                the room that will be sent to the user.
-            sync_config(synapse.handlers.sync.SyncConfig):
-            since_token(str|None): Token of the end of the previous batch. May
-                be None.
-            now_token(str): Token of the end of the current batch.
-            full_state(bool): Whether to force returning the full state.
-
-        Returns:
-             A deferred dict of (type, state_key) -> Event
+            room_id:
+            batch: The timeline batch for the room that will be sent to the user.
+            sync_config:
+            since_token: Token of the end of the previous batch. May be None.
+            now_token: Token of the end of the current batch.
+            full_state: Whether to force returning the full state.
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -800,6 +845,10 @@ class SyncHandler(object):
                 # about them).
                 state_filter = StateFilter.all()
 
+                # If this is an initial sync then full_state should be set, and
+                # that case is handled above. We assert here to ensure that this
+                # is indeed the case.
+                assert since_token is not None
                 state_at_previous_sync = await self.get_state_at(
                     room_id, stream_position=since_token, state_filter=state_filter
                 )
@@ -874,7 +923,7 @@ class SyncHandler(object):
                     if t[0] == EventTypes.Member:
                         cache.set(t[1], event_id)
 
-        state = {}
+        state = {}  # type: Dict[str, EventBase]
         if state_ids:
             state = await self.store.get_events(list(state_ids.values()))
 
@@ -885,7 +934,9 @@ class SyncHandler(object):
             )
         }
 
-    async def unread_notifs_for_room_id(self, room_id, sync_config):
+    async def unread_notifs_for_room_id(
+        self, room_id: str, sync_config: SyncConfig
+    ) -> Optional[Dict[str, str]]:
         with Measure(self.clock, "unread_notifs_for_room_id"):
             last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
                 user_id=sync_config.user.to_string(),
@@ -893,7 +944,6 @@ class SyncHandler(object):
                 receipt_type="m.read",
             )
 
-            notifs = []
             if last_unread_event_id:
                 notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
                     room_id, sync_config.user.to_string(), last_unread_event_id
@@ -905,17 +955,12 @@ class SyncHandler(object):
         return None
 
     async def generate_sync_result(
-        self, sync_config, since_token=None, full_state=False
-    ):
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+        full_state: bool = False,
+    ) -> SyncResult:
         """Generates a sync result.
-
-        Args:
-            sync_config (SyncConfig)
-            since_token (StreamToken)
-            full_state (bool)
-
-        Returns:
-            Deferred(SyncResult)
         """
         # NB: The now_token gets changed by some of the generate_sync_* methods,
         # this is due to some of the underlying streams not supporting the ability
@@ -977,7 +1022,7 @@ class SyncHandler(object):
         )
 
         device_id = sync_config.device_id
-        one_time_key_counts = {}
+        one_time_key_counts = {}  # type: JsonDict
         if device_id:
             one_time_key_counts = await self.store.count_e2e_one_time_keys(
                 user_id, device_id
@@ -1007,7 +1052,9 @@ class SyncHandler(object):
         )
 
     @measure_func("_generate_sync_entry_for_groups")
-    async def _generate_sync_entry_for_groups(self, sync_result_builder):
+    async def _generate_sync_entry_for_groups(
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> None:
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
@@ -1052,27 +1099,22 @@ class SyncHandler(object):
     @measure_func("_generate_sync_entry_for_device_list")
     async def _generate_sync_entry_for_device_list(
         self,
-        sync_result_builder,
-        newly_joined_rooms,
-        newly_joined_or_invited_users,
-        newly_left_rooms,
-        newly_left_users,
-    ):
+        sync_result_builder: "SyncResultBuilder",
+        newly_joined_rooms: Set[str],
+        newly_joined_or_invited_users: Set[str],
+        newly_left_rooms: Set[str],
+        newly_left_users: Set[str],
+    ) -> DeviceLists:
         """Generate the DeviceLists section of sync
 
         Args:
-            sync_result_builder (SyncResultBuilder)
-            newly_joined_rooms (set[str]): Set of rooms user has joined since
-                previous sync
-            newly_joined_or_invited_users (set[str]): Set of users that have
-                joined or been invited to a room since previous sync.
-            newly_left_rooms (set[str]): Set of rooms user has left since
+            sync_result_builder
+            newly_joined_rooms: Set of rooms user has joined since previous sync
+            newly_joined_or_invited_users: Set of users that have joined or
+                been invited to a room since previous sync.
+            newly_left_rooms: Set of rooms user has left since previous sync
+            newly_left_users: Set of users that have left a room we're in since
                 previous sync
-            newly_left_users (set[str]): Set of users that have left a room
-                we're in since previous sync
-
-        Returns:
-            Deferred[DeviceLists]
         """
 
         user_id = sync_result_builder.sync_config.user.to_string()
@@ -1133,15 +1175,11 @@ class SyncHandler(object):
         else:
             return DeviceLists(changed=[], left=[])
 
-    async def _generate_sync_entry_for_to_device(self, sync_result_builder):
+    async def _generate_sync_entry_for_to_device(
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> None:
         """Generates the portion of the sync response. Populates
         `sync_result_builder` with the result.
-
-        Args:
-            sync_result_builder(SyncResultBuilder)
-
-        Returns:
-            Deferred(dict): A dictionary containing the per room account data.
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         device_id = sync_result_builder.sync_config.device_id
@@ -1179,15 +1217,17 @@ class SyncHandler(object):
         else:
             sync_result_builder.to_device = []
 
-    async def _generate_sync_entry_for_account_data(self, sync_result_builder):
+    async def _generate_sync_entry_for_account_data(
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> Dict[str, Dict[str, JsonDict]]:
         """Generates the account data portion of the sync response. Populates
         `sync_result_builder` with the result.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
+            sync_result_builder
 
         Returns:
-            Deferred(dict): A dictionary containing the per room account data.
+            A dictionary containing the per room account data.
         """
         sync_config = sync_result_builder.sync_config
         user_id = sync_result_builder.sync_config.user.to_string()
@@ -1231,18 +1271,21 @@ class SyncHandler(object):
         return account_data_by_room
 
     async def _generate_sync_entry_for_presence(
-        self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
-    ):
+        self,
+        sync_result_builder: "SyncResultBuilder",
+        newly_joined_rooms: Set[str],
+        newly_joined_or_invited_users: Set[str],
+    ) -> None:
         """Generates the presence portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
-            newly_joined_rooms(list): List of rooms that the user has joined
-                since the last sync (or empty if an initial sync)
-            newly_joined_or_invited_users(list): List of users that have joined
-                or been invited to rooms since the last sync (or empty if an initial
-                sync)
+            sync_result_builder
+            newly_joined_rooms: Set of rooms that the user has joined since
+                the last sync (or empty if an initial sync)
+            newly_joined_or_invited_users: Set of users that have joined or
+                been invited to rooms since the last sync (or empty if an
+                initial sync)
         """
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
@@ -1286,17 +1329,19 @@ class SyncHandler(object):
         sync_result_builder.presence = presence
 
     async def _generate_sync_entry_for_rooms(
-        self, sync_result_builder, account_data_by_room
-    ):
+        self,
+        sync_result_builder: "SyncResultBuilder",
+        account_data_by_room: Dict[str, Dict[str, JsonDict]],
+    ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
         """Generates the rooms portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
-            account_data_by_room(dict): Dictionary of per room account data
+            sync_result_builder
+            account_data_by_room: Dictionary of per room account data
 
         Returns:
-            Deferred(tuple): Returns a 4-tuple of
+            Returns a 4-tuple of
             `(newly_joined_rooms, newly_joined_or_invited_users,
             newly_left_rooms, newly_left_users)`
         """
@@ -1307,7 +1352,7 @@ class SyncHandler(object):
         )
 
         if block_all_room_ephemeral:
-            ephemeral_by_room = {}
+            ephemeral_by_room = {}  # type: Dict[str, List[JsonDict]]
         else:
             now_token, ephemeral_by_room = await self.ephemeral_by_room(
                 sync_result_builder,
@@ -1328,7 +1373,7 @@ class SyncHandler(object):
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
-                        return [], [], [], []
+                        return set(), set(), set(), set()
 
         ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id
@@ -1340,19 +1385,22 @@ class SyncHandler(object):
             ignored_users = frozenset()
 
         if since_token:
-            res = await self._get_rooms_changed(sync_result_builder, ignored_users)
-            room_entries, invited, newly_joined_rooms, newly_left_rooms = res
-
+            room_changes = await self._get_rooms_changed(
+                sync_result_builder, ignored_users
+            )
             tags_by_room = await self.store.get_updated_tags(
                 user_id, since_token.account_data_key
             )
         else:
-            res = await self._get_all_rooms(sync_result_builder, ignored_users)
-            room_entries, invited, newly_joined_rooms = res
-            newly_left_rooms = []
+            room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
 
             tags_by_room = await self.store.get_tags_for_user(user_id)
 
+        room_entries = room_changes.room_entries
+        invited = room_changes.invited
+        newly_joined_rooms = room_changes.newly_joined_rooms
+        newly_left_rooms = room_changes.newly_left_rooms
+
         def handle_room_entries(room_entry):
             return self._generate_room_entry(
                 sync_result_builder,
@@ -1392,13 +1440,15 @@ class SyncHandler(object):
         newly_left_users -= newly_joined_or_invited_users
 
         return (
-            newly_joined_rooms,
+            set(newly_joined_rooms),
             newly_joined_or_invited_users,
-            newly_left_rooms,
+            set(newly_left_rooms),
             newly_left_users,
         )
 
-    async def _have_rooms_changed(self, sync_result_builder):
+    async def _have_rooms_changed(
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> bool:
         """Returns whether there may be any new events that should be sent down
         the sync. Returns True if there are.
         """
@@ -1422,22 +1472,10 @@ class SyncHandler(object):
                 return True
         return False
 
-    async def _get_rooms_changed(self, sync_result_builder, ignored_users):
+    async def _get_rooms_changed(
+        self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+    ) -> _RoomChanges:
         """Gets the the changes that have happened since the last sync.
-
-        Args:
-            sync_result_builder(SyncResultBuilder)
-            ignored_users(set(str)): Set of users ignored by user.
-
-        Returns:
-            Deferred(tuple): Returns a tuple of the form:
-            `(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)`
-
-            where:
-                room_entries is a list [RoomSyncResultBuilder]
-                invited_rooms is a list [InvitedSyncResult]
-                newly_joined_rooms is a list[str] of room ids
-                newly_left_rooms is a list[str] of room ids
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
@@ -1451,7 +1489,7 @@ class SyncHandler(object):
             user_id, since_token.room_key, now_token.room_key
         )
 
-        mem_change_events_by_room_id = {}
+        mem_change_events_by_room_id = {}  # type: Dict[str, List[EventBase]]
         for event in rooms_changed:
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
@@ -1570,7 +1608,7 @@ class SyncHandler(object):
                 # This is all screaming out for a refactor, as the logic here is
                 # subtle and the moving parts numerous.
                 if leave_event.internal_metadata.is_out_of_band_membership():
-                    batch_events = [leave_event]
+                    batch_events = [leave_event]  # type: Optional[List[EventBase]]
                 else:
                     batch_events = None
 
@@ -1636,18 +1674,17 @@ class SyncHandler(object):
                 )
             room_entries.append(entry)
 
-        return room_entries, invited, newly_joined_rooms, newly_left_rooms
+        return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms)
 
-    async def _get_all_rooms(self, sync_result_builder, ignored_users):
+    async def _get_all_rooms(
+        self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+    ) -> _RoomChanges:
         """Returns entries for all rooms for the user.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
-            ignored_users(set(str)): Set of users ignored by user.
+            sync_result_builder
+            ignored_users: Set of users ignored by user.
 
-        Returns:
-            Deferred(tuple): Returns a tuple of the form:
-            `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
         """
 
         user_id = sync_result_builder.sync_config.user.to_string()
@@ -1709,30 +1746,30 @@ class SyncHandler(object):
                     )
                 )
 
-        return room_entries, invited, []
+        return _RoomChanges(room_entries, invited, [], [])
 
     async def _generate_room_entry(
         self,
-        sync_result_builder,
-        ignored_users,
-        room_builder,
-        ephemeral,
-        tags,
-        account_data,
-        always_include=False,
+        sync_result_builder: "SyncResultBuilder",
+        ignored_users: Set[str],
+        room_builder: "RoomSyncResultBuilder",
+        ephemeral: List[JsonDict],
+        tags: Optional[List[JsonDict]],
+        account_data: Dict[str, JsonDict],
+        always_include: bool = False,
     ):
         """Populates the `joined` and `archived` section of `sync_result_builder`
         based on the `room_builder`.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
-            ignored_users(set(str)): Set of users ignored by user.
-            room_builder(RoomSyncResultBuilder)
-            ephemeral(list): List of new ephemeral events for room
-            tags(list): List of *all* tags for room, or None if there has been
+            sync_result_builder
+            ignored_users: Set of users ignored by user.
+            room_builder
+            ephemeral: List of new ephemeral events for room
+            tags: List of *all* tags for room, or None if there has been
                 no change.
-            account_data(list): List of new account data for room
-            always_include(bool): Always include this room in the sync response,
+            account_data: List of new account data for room
+            always_include: Always include this room in the sync response,
                 even if empty.
         """
         newly_joined = room_builder.newly_joined
@@ -1758,7 +1795,7 @@ class SyncHandler(object):
             sync_config,
             now_token=upto_token,
             since_token=since_token,
-            recents=events,
+            potential_recents=events,
             newly_joined_room=newly_joined,
         )
 
@@ -1809,7 +1846,7 @@ class SyncHandler(object):
             room_id, batch, sync_config, since_token, now_token, full_state=full_state
         )
 
-        summary = {}
+        summary = {}  # type: Optional[JsonDict]
 
         # we include a summary in room responses when we're lazy loading
         # members (as the client otherwise doesn't have enough info to form
@@ -1833,7 +1870,7 @@ class SyncHandler(object):
             )
 
         if room_builder.rtype == "joined":
-            unread_notifications = {}
+            unread_notifications = {}  # type: Dict[str, str]
             room_sync = JoinedSyncResult(
                 room_id=room_id,
                 timeline=batch,
@@ -1860,18 +1897,20 @@ class SyncHandler(object):
                     % (room_id, user_id, len(state))
                 )
         elif room_builder.rtype == "archived":
-            room_sync = ArchivedSyncResult(
+            archived_room_sync = ArchivedSyncResult(
                 room_id=room_id,
                 timeline=batch,
                 state=state,
                 account_data=account_data_events,
             )
-            if room_sync or always_include:
-                sync_result_builder.archived.append(room_sync)
+            if archived_room_sync or always_include:
+                sync_result_builder.archived.append(archived_room_sync)
         else:
             raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
-    async def get_rooms_for_user_at(self, user_id, stream_ordering):
+    async def get_rooms_for_user_at(
+        self, user_id: str, stream_ordering: int
+    ) -> FrozenSet[str]:
         """Get set of joined rooms for a user at the given stream ordering.
 
         The stream ordering *must* be recent, otherwise this may throw an
@@ -1879,12 +1918,11 @@ class SyncHandler(object):
         current token, which should be perfectly fine).
 
         Args:
-            user_id (str)
-            stream_ordering (int)
+            user_id
+            stream_ordering
 
         ReturnValue:
-            Deferred[frozenset[str]]: Set of room_ids the user is in at given
-            stream_ordering.
+            Set of room_ids the user is in at given stream_ordering.
         """
         joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
 
@@ -1911,11 +1949,10 @@ class SyncHandler(object):
             if user_id in users_in_room:
                 joined_room_ids.add(room_id)
 
-        joined_room_ids = frozenset(joined_room_ids)
-        return joined_room_ids
+        return frozenset(joined_room_ids)
 
 
-def _action_has_highlight(actions):
+def _action_has_highlight(actions: List[JsonDict]) -> bool:
     for action in actions:
         try:
             if action.get("set_tweak", None) == "highlight":
@@ -1927,22 +1964,23 @@ def _action_has_highlight(actions):
 
 
 def _calculate_state(
-    timeline_contains, timeline_start, previous, current, lazy_load_members
-):
+    timeline_contains: StateMap[str],
+    timeline_start: StateMap[str],
+    previous: StateMap[str],
+    current: StateMap[str],
+    lazy_load_members: bool,
+) -> StateMap[str]:
     """Works out what state to include in a sync response.
 
     Args:
-        timeline_contains (dict): state in the timeline
-        timeline_start (dict): state at the start of the timeline
-        previous (dict): state at the end of the previous sync (or empty dict
+        timeline_contains: state in the timeline
+        timeline_start: state at the start of the timeline
+        previous: state at the end of the previous sync (or empty dict
             if this is an initial sync)
-        current (dict): state at the end of the timeline
-        lazy_load_members (bool): whether to return members from timeline_start
+        current: state at the end of the timeline
+        lazy_load_members: whether to return members from timeline_start
             or not.  assumes that timeline_start has already been filtered to
             include only the members the client needs to know about.
-
-    Returns:
-        dict
     """
     event_id_to_key = {
         e: key
@@ -1979,15 +2017,16 @@ def _calculate_state(
     return {event_id_to_key[e]: e for e in state_ids}
 
 
-class SyncResultBuilder(object):
+@attr.s
+class SyncResultBuilder:
     """Used to help build up a new SyncResult for a user
 
     Attributes:
-        sync_config (SyncConfig)
-        full_state (bool)
-        since_token (StreamToken)
-        now_token (StreamToken)
-        joined_room_ids (list[str])
+        sync_config
+        full_state: The full_state flag as specified by user
+        since_token: The token supplied by user, or None.
+        now_token: The token to sync up to.
+        joined_room_ids: List of rooms the user is joined to
 
         # The following mirror the fields in a sync response
         presence (list)
@@ -1995,61 +2034,45 @@ class SyncResultBuilder(object):
         joined (list[JoinedSyncResult])
         invited (list[InvitedSyncResult])
         archived (list[ArchivedSyncResult])
-        device (list)
         groups (GroupsSyncResult|None)
         to_device (list)
     """
 
-    def __init__(
-        self, sync_config, full_state, since_token, now_token, joined_room_ids
-    ):
-        """
-        Args:
-            sync_config (SyncConfig)
-            full_state (bool): The full_state flag as specified by user
-            since_token (StreamToken): The token supplied by user, or None.
-            now_token (StreamToken): The token to sync up to.
-            joined_room_ids (list[str]): List of rooms the user is joined to
-        """
-        self.sync_config = sync_config
-        self.full_state = full_state
-        self.since_token = since_token
-        self.now_token = now_token
-        self.joined_room_ids = joined_room_ids
-
-        self.presence = []
-        self.account_data = []
-        self.joined = []
-        self.invited = []
-        self.archived = []
-        self.device = []
-        self.groups = None
-        self.to_device = []
+    sync_config = attr.ib(type=SyncConfig)
+    full_state = attr.ib(type=bool)
+    since_token = attr.ib(type=Optional[StreamToken])
+    now_token = attr.ib(type=StreamToken)
+    joined_room_ids = attr.ib(type=FrozenSet[str])
+
+    presence = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+    account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+    joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list))
+    invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list))
+    archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list))
+    groups = attr.ib(type=Optional[GroupsSyncResult], default=None)
+    to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list))
 
 
+@attr.s
 class RoomSyncResultBuilder(object):
     """Stores information needed to create either a `JoinedSyncResult` or
     `ArchivedSyncResult`.
+
+    Attributes:
+        room_id
+        rtype: One of `"joined"` or `"archived"`
+        events: List of events to include in the room (more events may be added
+            when generating result).
+        newly_joined: If the user has newly joined the room
+        full_state: Whether the full state should be sent in result
+        since_token: Earliest point to return events from, or None
+        upto_token: Latest point to return events from.
     """
 
-    def __init__(
-        self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token
-    ):
-        """
-        Args:
-            room_id(str)
-            rtype(str): One of `"joined"` or `"archived"`
-            events(list[FrozenEvent]): List of events to include in the room
-                (more events may be added when generating result).
-            newly_joined(bool): If the user has newly joined the room
-            full_state(bool): Whether the full state should be sent in result
-            since_token(StreamToken): Earliest point to return events from, or None
-            upto_token(StreamToken): Latest point to return events from.
-        """
-        self.room_id = room_id
-        self.rtype = rtype
-        self.events = events
-        self.newly_joined = newly_joined
-        self.full_state = full_state
-        self.since_token = since_token
-        self.upto_token = upto_token
+    room_id = attr.ib(type=str)
+    rtype = attr.ib(type=str)
+    events = attr.ib(type=Optional[List[EventBase]])
+    newly_joined = attr.ib(type=bool)
+    full_state = attr.ib(type=bool)
+    since_token = attr.ib(type=Optional[StreamToken])
+    upto_token = attr.ib(type=StreamToken)