summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py979
1 files changed, 501 insertions, 478 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 19bca6717f..6bdb24baff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018, 2019 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,22 +14,30 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import collections
 import itertools
 import logging
+from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
 
 from six import iteritems, itervalues
 
+import attr
 from prometheus_client import Counter
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes, Membership
-from synapse.logging.context import LoggingContext
+from synapse.api.filtering import FilterCollection
+from synapse.events import EventBase
+from synapse.logging.context import current_context
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.roommember import MemberSummary
 from synapse.storage.state import StateFilter
-from synapse.types import RoomStreamToken
+from synapse.types import (
+    Collection,
+    JsonDict,
+    RoomStreamToken,
+    StateMap,
+    StreamToken,
+    UserID,
+)
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.caches.lrucache import LruCache
@@ -64,17 +72,22 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
 LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
 
 
-SyncConfig = collections.namedtuple(
-    "SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"]
-)
+@attr.s(slots=True, frozen=True)
+class SyncConfig:
+    user = attr.ib(type=UserID)
+    filter_collection = attr.ib(type=FilterCollection)
+    is_guest = attr.ib(type=bool)
+    request_key = attr.ib(type=Tuple[Any, ...])
+    device_id = attr.ib(type=str)
 
 
-class TimelineBatch(
-    collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"])
-):
-    __slots__ = []
+@attr.s(slots=True, frozen=True)
+class TimelineBatch:
+    prev_batch = attr.ib(type=StreamToken)
+    events = attr.ib(type=List[EventBase])
+    limited = attr.ib(bool)
 
-    def __nonzero__(self):
+    def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
         to tell if room needs to be part of the sync result.
         """
@@ -83,23 +96,17 @@ class TimelineBatch(
     __bool__ = __nonzero__  # python3
 
 
-class JoinedSyncResult(
-    collections.namedtuple(
-        "JoinedSyncResult",
-        [
-            "room_id",  # str
-            "timeline",  # TimelineBatch
-            "state",  # dict[(str, str), FrozenEvent]
-            "ephemeral",
-            "account_data",
-            "unread_notifications",
-            "summary",
-        ],
-    )
-):
-    __slots__ = []
-
-    def __nonzero__(self):
+@attr.s(slots=True, frozen=True)
+class JoinedSyncResult:
+    room_id = attr.ib(type=str)
+    timeline = attr.ib(type=TimelineBatch)
+    state = attr.ib(type=StateMap[EventBase])
+    ephemeral = attr.ib(type=List[JsonDict])
+    account_data = attr.ib(type=List[JsonDict])
+    unread_notifications = attr.ib(type=JsonDict)
+    summary = attr.ib(type=Optional[JsonDict])
+
+    def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
         to tell if room needs to be part of the sync result.
         """
@@ -115,20 +122,14 @@ class JoinedSyncResult(
     __bool__ = __nonzero__  # python3
 
 
-class ArchivedSyncResult(
-    collections.namedtuple(
-        "ArchivedSyncResult",
-        [
-            "room_id",  # str
-            "timeline",  # TimelineBatch
-            "state",  # dict[(str, str), FrozenEvent]
-            "account_data",
-        ],
-    )
-):
-    __slots__ = []
-
-    def __nonzero__(self):
+@attr.s(slots=True, frozen=True)
+class ArchivedSyncResult:
+    room_id = attr.ib(type=str)
+    timeline = attr.ib(type=TimelineBatch)
+    state = attr.ib(type=StateMap[EventBase])
+    account_data = attr.ib(type=List[JsonDict])
+
+    def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
         to tell if room needs to be part of the sync result.
         """
@@ -137,70 +138,88 @@ class ArchivedSyncResult(
     __bool__ = __nonzero__  # python3
 
 
-class InvitedSyncResult(
-    collections.namedtuple(
-        "InvitedSyncResult",
-        ["room_id", "invite"],  # str  # FrozenEvent: the invite event
-    )
-):
-    __slots__ = []
+@attr.s(slots=True, frozen=True)
+class InvitedSyncResult:
+    room_id = attr.ib(type=str)
+    invite = attr.ib(type=EventBase)
 
-    def __nonzero__(self):
+    def __nonzero__(self) -> bool:
         """Invited rooms should always be reported to the client"""
         return True
 
     __bool__ = __nonzero__  # python3
 
 
-class GroupsSyncResult(
-    collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"])
-):
-    __slots__ = []
+@attr.s(slots=True, frozen=True)
+class GroupsSyncResult:
+    join = attr.ib(type=JsonDict)
+    invite = attr.ib(type=JsonDict)
+    leave = attr.ib(type=JsonDict)
 
-    def __nonzero__(self):
+    def __nonzero__(self) -> bool:
         return bool(self.join or self.invite or self.leave)
 
     __bool__ = __nonzero__  # python3
 
 
-class DeviceLists(
-    collections.namedtuple(
-        "DeviceLists",
-        [
-            "changed",  # list of user_ids whose devices may have changed
-            "left",  # list of user_ids whose devices we no longer track
-        ],
-    )
-):
-    __slots__ = []
+@attr.s(slots=True, frozen=True)
+class DeviceLists:
+    """
+    Attributes:
+        changed: List of user_ids whose devices may have changed
+        left: List of user_ids whose devices we no longer track
+    """
+
+    changed = attr.ib(type=Collection[str])
+    left = attr.ib(type=Collection[str])
 
-    def __nonzero__(self):
+    def __nonzero__(self) -> bool:
         return bool(self.changed or self.left)
 
     __bool__ = __nonzero__  # python3
 
 
-class SyncResult(
-    collections.namedtuple(
-        "SyncResult",
-        [
-            "next_batch",  # Token for the next sync
-            "presence",  # List of presence events for the user.
-            "account_data",  # List of account_data events for the user.
-            "joined",  # JoinedSyncResult for each joined room.
-            "invited",  # InvitedSyncResult for each invited room.
-            "archived",  # ArchivedSyncResult for each archived room.
-            "to_device",  # List of direct messages for the device.
-            "device_lists",  # List of user_ids whose devices have changed
-            "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
-            # for this device
-            "groups",
-        ],
-    )
-):
-    __slots__ = []
-
-    def __nonzero__(self):
+@attr.s
+class _RoomChanges:
+    """The set of room entries to include in the sync, plus the set of joined
+    and left room IDs since last sync.
+    """
+
+    room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
+    invited = attr.ib(type=List[InvitedSyncResult])
+    newly_joined_rooms = attr.ib(type=List[str])
+    newly_left_rooms = attr.ib(type=List[str])
+
+
+@attr.s(slots=True, frozen=True)
+class SyncResult:
+    """
+    Attributes:
+        next_batch: Token for the next sync
+        presence: List of presence events for the user.
+        account_data: List of account_data events for the user.
+        joined: JoinedSyncResult for each joined room.
+        invited: InvitedSyncResult for each invited room.
+        archived: ArchivedSyncResult for each archived room.
+        to_device: List of direct messages for the device.
+        device_lists: List of user_ids whose devices have changed
+        device_one_time_keys_count: Dict of algorithm to count for one time keys
+            for this device
+        groups: Group updates, if any
+    """
+
+    next_batch = attr.ib(type=StreamToken)
+    presence = attr.ib(type=List[JsonDict])
+    account_data = attr.ib(type=List[JsonDict])
+    joined = attr.ib(type=List[JoinedSyncResult])
+    invited = attr.ib(type=List[InvitedSyncResult])
+    archived = attr.ib(type=List[ArchivedSyncResult])
+    to_device = attr.ib(type=List[JsonDict])
+    device_lists = attr.ib(type=DeviceLists)
+    device_one_time_keys_count = attr.ib(type=JsonDict)
+    groups = attr.ib(type=Optional[GroupsSyncResult])
+
+    def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
         to tell if the notifier needs to wait for more events when polling for
         events.
@@ -230,6 +249,8 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
         self.auth = hs.get_auth()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
         # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
         self.lazy_loaded_members_cache = ExpiringCache(
@@ -239,23 +260,24 @@ class SyncHandler(object):
             expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
         )
 
-    @defer.inlineCallbacks
-    def wait_for_sync_for_user(
-        self, sync_config, since_token=None, timeout=0, full_state=False
-    ):
+    async def wait_for_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+        timeout: int = 0,
+        full_state: bool = False,
+    ) -> SyncResult:
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
-        Returns:
-            Deferred[SyncResult]
         """
         # If the user is not part of the mau group, then check that limits have
         # not been exceeded (if not part of the group by this point, almost certain
         # auth_blocking will occur)
         user_id = sync_config.user.to_string()
-        yield self.auth.check_auth_blocking(user_id)
+        await self.auth.check_auth_blocking(user_id)
 
-        res = yield self.response_cache.wrap(
+        res = await self.response_cache.wrap(
             sync_config.request_key,
             self._wait_for_sync_for_user,
             sync_config,
@@ -265,8 +287,13 @@ class SyncHandler(object):
         )
         return res
 
-    @defer.inlineCallbacks
-    def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state):
+    async def _wait_for_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+        timeout: int = 0,
+        full_state: bool = False,
+    ) -> SyncResult:
         if since_token is None:
             sync_type = "initial_sync"
         elif full_state:
@@ -274,14 +301,14 @@ class SyncHandler(object):
         else:
             sync_type = "incremental_sync"
 
-        context = LoggingContext.current_context()
+        context = current_context()
         if context:
             context.tag = sync_type
 
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
-            result = yield self.current_sync_for_user(
+            result = await self.current_sync_for_user(
                 sync_config, since_token, full_state=full_state
             )
         else:
@@ -289,7 +316,7 @@ class SyncHandler(object):
             def current_sync_callback(before_token, after_token):
                 return self.current_sync_for_user(sync_config, since_token)
 
-            result = yield self.notifier.wait_for_events(
+            result = await self.notifier.wait_for_events(
                 sync_config.user.to_string(),
                 timeout,
                 current_sync_callback,
@@ -305,27 +332,33 @@ class SyncHandler(object):
 
         return result
 
-    def current_sync_for_user(self, sync_config, since_token=None, full_state=False):
+    async def current_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+        full_state: bool = False,
+    ) -> SyncResult:
         """Get the sync for client needed to match what the server has now.
-        Returns:
-            A Deferred SyncResult.
         """
-        return self.generate_sync_result(sync_config, since_token, full_state)
+        return await self.generate_sync_result(sync_config, since_token, full_state)
 
-    @defer.inlineCallbacks
-    def push_rules_for_user(self, user):
+    async def push_rules_for_user(self, user: UserID) -> JsonDict:
         user_id = user.to_string()
-        rules = yield self.store.get_push_rules_for_user(user_id)
+        rules = await self.store.get_push_rules_for_user(user_id)
         rules = format_push_rules_for_user(user, rules)
         return rules
 
-    @defer.inlineCallbacks
-    def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
+    async def ephemeral_by_room(
+        self,
+        sync_result_builder: "SyncResultBuilder",
+        now_token: StreamToken,
+        since_token: Optional[StreamToken] = None,
+    ) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]:
         """Get the ephemeral events for each room the user is in
         Args:
-            sync_result_builder(SyncResultBuilder)
-            now_token (StreamToken): Where the server is currently up to.
-            since_token (StreamToken): Where the server was when the client
+            sync_result_builder
+            now_token: Where the server is currently up to.
+            since_token: Where the server was when the client
                 last synced.
         Returns:
             A tuple of the now StreamToken, updated to reflect the which typing
@@ -341,7 +374,7 @@ class SyncHandler(object):
             room_ids = sync_result_builder.joined_room_ids
 
             typing_source = self.event_sources.sources["typing"]
-            typing, typing_key = yield typing_source.get_new_events(
+            typing, typing_key = await typing_source.get_new_events(
                 user=sync_config.user,
                 from_key=typing_key,
                 limit=sync_config.filter_collection.ephemeral_limit(),
@@ -350,7 +383,7 @@ class SyncHandler(object):
             )
             now_token = now_token.copy_and_replace("typing_key", typing_key)
 
-            ephemeral_by_room = {}
+            ephemeral_by_room = {}  # type: JsonDict
 
             for event in typing:
                 # we want to exclude the room_id from the event, but modifying the
@@ -363,7 +396,7 @@ class SyncHandler(object):
             receipt_key = since_token.receipt_key if since_token else "0"
 
             receipt_source = self.event_sources.sources["receipt"]
-            receipts, receipt_key = yield receipt_source.get_new_events(
+            receipts, receipt_key = await receipt_source.get_new_events(
                 user=sync_config.user,
                 from_key=receipt_key,
                 limit=sync_config.filter_collection.ephemeral_limit(),
@@ -380,16 +413,15 @@ class SyncHandler(object):
 
         return now_token, ephemeral_by_room
 
-    @defer.inlineCallbacks
-    def _load_filtered_recents(
+    async def _load_filtered_recents(
         self,
-        room_id,
-        sync_config,
-        now_token,
-        since_token=None,
-        recents=None,
-        newly_joined_room=False,
-    ):
+        room_id: str,
+        sync_config: SyncConfig,
+        now_token: StreamToken,
+        since_token: Optional[StreamToken] = None,
+        potential_recents: Optional[List[EventBase]] = None,
+        newly_joined_room: bool = False,
+    ) -> TimelineBatch:
         """
         Returns:
             a Deferred TimelineBatch
@@ -400,24 +432,32 @@ class SyncHandler(object):
                 sync_config.filter_collection.blocks_all_room_timeline()
             )
 
-            if recents is None or newly_joined_room or timeline_limit < len(recents):
+            if (
+                potential_recents is None
+                or newly_joined_room
+                or timeline_limit < len(potential_recents)
+            ):
                 limited = True
             else:
                 limited = False
 
-            if recents:
-                recents = sync_config.filter_collection.filter_room_timeline(recents)
+            if potential_recents:
+                recents = sync_config.filter_collection.filter_room_timeline(
+                    potential_recents
+                )
 
                 # We check if there are any state events, if there are then we pass
                 # all current state events to the filter_events function. This is to
                 # ensure that we always include current state in the timeline
-                current_state_ids = frozenset()
+                current_state_ids = frozenset()  # type: FrozenSet[str]
                 if any(e.is_state() for e in recents):
-                    current_state_ids = yield self.state.get_current_state_ids(room_id)
-                    current_state_ids = frozenset(itervalues(current_state_ids))
+                    current_state_ids_map = await self.state.get_current_state_ids(
+                        room_id
+                    )
+                    current_state_ids = frozenset(itervalues(current_state_ids_map))
 
-                recents = yield filter_events_for_client(
-                    self.store,
+                recents = await filter_events_for_client(
+                    self.storage,
                     sync_config.user.to_string(),
                     recents,
                     always_include_ids=current_state_ids,
@@ -447,14 +487,14 @@ class SyncHandler(object):
                 # Otherwise, we want to return the last N events in the room
                 # in toplogical ordering.
                 if since_key:
-                    events, end_key = yield self.store.get_room_events_stream_for_room(
+                    events, end_key = await self.store.get_room_events_stream_for_room(
                         room_id,
                         limit=load_limit + 1,
                         from_key=since_key,
                         to_key=end_key,
                     )
                 else:
-                    events, end_key = yield self.store.get_recent_events_for_room(
+                    events, end_key = await self.store.get_recent_events_for_room(
                         room_id, limit=load_limit + 1, end_token=end_key
                     )
                 loaded_recents = sync_config.filter_collection.filter_room_timeline(
@@ -466,11 +506,13 @@ class SyncHandler(object):
                 # ensure that we always include current state in the timeline
                 current_state_ids = frozenset()
                 if any(e.is_state() for e in loaded_recents):
-                    current_state_ids = yield self.state.get_current_state_ids(room_id)
-                    current_state_ids = frozenset(itervalues(current_state_ids))
+                    current_state_ids_map = await self.state.get_current_state_ids(
+                        room_id
+                    )
+                    current_state_ids = frozenset(itervalues(current_state_ids_map))
 
-                loaded_recents = yield filter_events_for_client(
-                    self.store,
+                loaded_recents = await filter_events_for_client(
+                    self.storage,
                     sync_config.user.to_string(),
                     loaded_recents,
                     always_include_ids=current_state_ids,
@@ -496,20 +538,17 @@ class SyncHandler(object):
             limited=limited or newly_joined_room,
         )
 
-    @defer.inlineCallbacks
-    def get_state_after_event(self, event, state_filter=StateFilter.all()):
+    async def get_state_after_event(
+        self, event: EventBase, state_filter: StateFilter = StateFilter.all()
+    ) -> StateMap[str]:
         """
         Get the room state after the given event
 
         Args:
-            event(synapse.events.EventBase): event of interest
-            state_filter (StateFilter): The state filter used to fetch state
-                from the database.
-
-        Returns:
-            A Deferred map from ((type, state_key)->Event)
+            event: event of interest
+            state_filter: The state filter used to fetch state from the database.
         """
-        state_ids = yield self.store.get_state_ids_for_event(
+        state_ids = await self.state_store.get_state_ids_for_event(
             event.event_id, state_filter=state_filter
         )
         if event.is_state():
@@ -517,30 +556,30 @@ class SyncHandler(object):
             state_ids[(event.type, event.state_key)] = event.event_id
         return state_ids
 
-    @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
+    async def get_state_at(
+        self,
+        room_id: str,
+        stream_position: StreamToken,
+        state_filter: StateFilter = StateFilter.all(),
+    ) -> StateMap[str]:
         """ Get the room state at a particular stream position
 
         Args:
-            room_id(str): room for which to get state
-            stream_position(StreamToken): point at which to get state
-            state_filter (StateFilter): The state filter used to fetch state
-                from the database.
-
-        Returns:
-            A Deferred map from ((type, state_key)->Event)
+            room_id: room for which to get state
+            stream_position: point at which to get state
+            state_filter: The state filter used to fetch state from the database.
         """
         # FIXME this claims to get the state at a stream position, but
         # get_recent_events_for_room operates by topo ordering. This therefore
         # does not reliably give you the state at the given stream position.
         # (https://github.com/matrix-org/synapse/issues/3305)
-        last_events, _ = yield self.store.get_recent_events_for_room(
+        last_events, _ = await self.store.get_recent_events_for_room(
             room_id, end_token=stream_position.room_key, limit=1
         )
 
         if last_events:
             last_event = last_events[-1]
-            state = yield self.get_state_after_event(
+            state = await self.get_state_after_event(
                 last_event, state_filter=state_filter
             )
 
@@ -549,30 +588,31 @@ class SyncHandler(object):
             state = {}
         return state
 
-    @defer.inlineCallbacks
-    def compute_summary(self, room_id, sync_config, batch, state, now_token):
+    async def compute_summary(
+        self,
+        room_id: str,
+        sync_config: SyncConfig,
+        batch: TimelineBatch,
+        state: StateMap[EventBase],
+        now_token: StreamToken,
+    ) -> Optional[JsonDict]:
         """ Works out a room summary block for this room, summarising the number
         of joined members in the room, and providing the 'hero' members if the
         room has no name so clients can consistently name rooms.  Also adds
         state events to 'state' if needed to describe the heroes.
 
-        Args:
-            room_id(str):
-            sync_config(synapse.handlers.sync.SyncConfig):
-            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
-                the room that will be sent to the user.
-            state(dict): dict of (type, state_key) -> Event as returned by
-                compute_state_delta
-            now_token(str): Token of the end of the current batch.
-
-        Returns:
-             A deferred dict describing the room summary
+        Args
+            room_id
+            sync_config
+            batch: The timeline batch for the room that will be sent to the user.
+            state: State as returned by compute_state_delta
+            now_token: Token of the end of the current batch.
         """
 
         # FIXME: we could/should get this from room_stats when matthew/stats lands
 
         # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
-        last_events, _ = yield self.store.get_recent_event_ids_for_room(
+        last_events, _ = await self.store.get_recent_event_ids_for_room(
             room_id, end_token=now_token.room_key, limit=1
         )
 
@@ -580,7 +620,7 @@ class SyncHandler(object):
             return None
 
         last_event = last_events[-1]
-        state_ids = yield self.store.get_state_ids_for_event(
+        state_ids = await self.state_store.get_state_ids_for_event(
             last_event.event_id,
             state_filter=StateFilter.from_types(
                 [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
@@ -588,7 +628,7 @@ class SyncHandler(object):
         )
 
         # this is heavily cached, thus: fast.
-        details = yield self.store.get_room_summary(room_id)
+        details = await self.store.get_room_summary(room_id)
 
         name_id = state_ids.get((EventTypes.Name, ""))
         canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
@@ -606,12 +646,12 @@ class SyncHandler(object):
         # calculating heroes. Empty strings are falsey, so we check
         # for the "name" value and default to an empty string.
         if name_id:
-            name = yield self.store.get_event(name_id, allow_none=True)
+            name = await self.store.get_event(name_id, allow_none=True)
             if name and name.content.get("name"):
                 return summary
 
         if canonical_alias_id:
-            canonical_alias = yield self.store.get_event(
+            canonical_alias = await self.store.get_event(
                 canonical_alias_id, allow_none=True
             )
             if canonical_alias and canonical_alias.content.get("alias"):
@@ -642,11 +682,9 @@ class SyncHandler(object):
 
         # FIXME: order by stream ordering rather than as returned by SQL
         if joined_user_ids or invited_user_ids:
-            summary["m.heroes"] = sorted(
-                [user_id for user_id in (joined_user_ids + invited_user_ids)]
-            )[0:5]
+            summary["m.heroes"] = sorted(joined_user_ids + invited_user_ids)[0:5]
         else:
-            summary["m.heroes"] = sorted([user_id for user_id in gone_user_ids])[0:5]
+            summary["m.heroes"] = sorted(gone_user_ids)[0:5]
 
         if not sync_config.filter_collection.lazy_load_members():
             return summary
@@ -657,9 +695,9 @@ class SyncHandler(object):
 
         # track which members the client should already know about via LL:
         # Ones which are already in state...
-        existing_members = set(
+        existing_members = {
             user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member
-        )
+        }
 
         # ...or ones which are in the timeline...
         for ev in batch.events:
@@ -676,7 +714,7 @@ class SyncHandler(object):
             )
         ]
 
-        missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
+        missing_hero_state = await self.store.get_events(missing_hero_event_ids)
         missing_hero_state = missing_hero_state.values()
 
         for s in missing_hero_state:
@@ -685,7 +723,7 @@ class SyncHandler(object):
 
         return summary
 
-    def get_lazy_loaded_members_cache(self, cache_key):
+    def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache:
         cache = self.lazy_loaded_members_cache.get(cache_key)
         if cache is None:
             logger.debug("creating LruCache for %r", cache_key)
@@ -695,25 +733,25 @@ class SyncHandler(object):
             logger.debug("found LruCache for %r", cache_key)
         return cache
 
-    @defer.inlineCallbacks
-    def compute_state_delta(
-        self, room_id, batch, sync_config, since_token, now_token, full_state
-    ):
+    async def compute_state_delta(
+        self,
+        room_id: str,
+        batch: TimelineBatch,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken],
+        now_token: StreamToken,
+        full_state: bool,
+    ) -> StateMap[EventBase]:
         """ Works out the difference in state between the start of the timeline
         and the previous sync.
 
         Args:
-            room_id(str):
-            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
-                the room that will be sent to the user.
-            sync_config(synapse.handlers.sync.SyncConfig):
-            since_token(str|None): Token of the end of the previous batch. May
-                be None.
-            now_token(str): Token of the end of the current batch.
-            full_state(bool): Whether to force returning the full state.
-
-        Returns:
-             A deferred dict of (type, state_key) -> Event
+            room_id:
+            batch: The timeline batch for the room that will be sent to the user.
+            sync_config:
+            since_token: Token of the end of the previous batch. May be None.
+            now_token: Token of the end of the current batch.
+            full_state: Whether to force returning the full state.
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -733,10 +771,10 @@ class SyncHandler(object):
                 # We only request state for the members needed to display the
                 # timeline:
 
-                members_to_fetch = set(
+                members_to_fetch = {
                     event.sender  # FIXME: we also care about invite targets etc.
                     for event in batch.events
-                )
+                }
 
                 if full_state:
                     # always make sure we LL ourselves so we know we're in the room
@@ -757,16 +795,16 @@ class SyncHandler(object):
 
             if full_state:
                 if batch:
-                    current_state_ids = yield self.store.get_state_ids_for_event(
+                    current_state_ids = await self.state_store.get_state_ids_for_event(
                         batch.events[-1].event_id, state_filter=state_filter
                     )
 
-                    state_ids = yield self.store.get_state_ids_for_event(
+                    state_ids = await self.state_store.get_state_ids_for_event(
                         batch.events[0].event_id, state_filter=state_filter
                     )
 
                 else:
-                    current_state_ids = yield self.get_state_at(
+                    current_state_ids = await self.get_state_at(
                         room_id, stream_position=now_token, state_filter=state_filter
                     )
 
@@ -781,13 +819,13 @@ class SyncHandler(object):
                 )
             elif batch.limited:
                 if batch:
-                    state_at_timeline_start = yield self.store.get_state_ids_for_event(
+                    state_at_timeline_start = await self.state_store.get_state_ids_for_event(
                         batch.events[0].event_id, state_filter=state_filter
                     )
                 else:
                     # We can get here if the user has ignored the senders of all
                     # the recent events.
-                    state_at_timeline_start = yield self.get_state_at(
+                    state_at_timeline_start = await self.get_state_at(
                         room_id, stream_position=now_token, state_filter=state_filter
                     )
 
@@ -805,19 +843,23 @@ class SyncHandler(object):
                 # about them).
                 state_filter = StateFilter.all()
 
-                state_at_previous_sync = yield self.get_state_at(
+                # If this is an initial sync then full_state should be set, and
+                # that case is handled above. We assert here to ensure that this
+                # is indeed the case.
+                assert since_token is not None
+                state_at_previous_sync = await self.get_state_at(
                     room_id, stream_position=since_token, state_filter=state_filter
                 )
 
                 if batch:
-                    current_state_ids = yield self.store.get_state_ids_for_event(
+                    current_state_ids = await self.state_store.get_state_ids_for_event(
                         batch.events[-1].event_id, state_filter=state_filter
                     )
                 else:
                     # Its not clear how we get here, but empirically we do
                     # (#5407). Logging has been added elsewhere to try and
                     # figure out where this state comes from.
-                    current_state_ids = yield self.get_state_at(
+                    current_state_ids = await self.get_state_at(
                         room_id, stream_position=now_token, state_filter=state_filter
                     )
 
@@ -841,7 +883,7 @@ class SyncHandler(object):
                         # So we fish out all the member events corresponding to the
                         # timeline here, and then dedupe any redundant ones below.
 
-                        state_ids = yield self.store.get_state_ids_for_event(
+                        state_ids = await self.state_store.get_state_ids_for_event(
                             batch.events[0].event_id,
                             # we only want members!
                             state_filter=StateFilter.from_types(
@@ -879,29 +921,30 @@ class SyncHandler(object):
                     if t[0] == EventTypes.Member:
                         cache.set(t[1], event_id)
 
-        state = {}
+        state = {}  # type: Dict[str, EventBase]
         if state_ids:
-            state = yield self.store.get_events(list(state_ids.values()))
+            state = await self.store.get_events(list(state_ids.values()))
 
         return {
             (e.type, e.state_key): e
             for e in sync_config.filter_collection.filter_room_state(
                 list(state.values())
             )
+            if e.type != EventTypes.Aliases  # until MSC2261 or alternative solution
         }
 
-    @defer.inlineCallbacks
-    def unread_notifs_for_room_id(self, room_id, sync_config):
+    async def unread_notifs_for_room_id(
+        self, room_id: str, sync_config: SyncConfig
+    ) -> Optional[Dict[str, str]]:
         with Measure(self.clock, "unread_notifs_for_room_id"):
-            last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
+            last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
                 user_id=sync_config.user.to_string(),
                 room_id=room_id,
                 receipt_type="m.read",
             )
 
-            notifs = []
             if last_unread_event_id:
-                notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+                notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
                     room_id, sync_config.user.to_string(), last_unread_event_id
                 )
                 return notifs
@@ -910,25 +953,21 @@ class SyncHandler(object):
         # count is whatever it was last time.
         return None
 
-    @defer.inlineCallbacks
-    def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+    async def generate_sync_result(
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+        full_state: bool = False,
+    ) -> SyncResult:
         """Generates a sync result.
-
-        Args:
-            sync_config (SyncConfig)
-            since_token (StreamToken)
-            full_state (bool)
-
-        Returns:
-            Deferred(SyncResult)
         """
         # NB: The now_token gets changed by some of the generate_sync_* methods,
         # this is due to some of the underlying streams not supporting the ability
         # to query up to a given point.
         # Always use the `now_token` in `SyncResultBuilder`
-        now_token = yield self.event_sources.get_current_token()
+        now_token = await self.event_sources.get_current_token()
 
-        logger.info(
+        logger.debug(
             "Calculating sync response for %r between %s and %s",
             sync_config.user,
             since_token,
@@ -942,10 +981,9 @@ class SyncHandler(object):
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
         else:
-            joined_room_ids = yield self.get_rooms_for_user_at(
+            joined_room_ids = await self.get_rooms_for_user_at(
                 user_id, now_token.room_stream_id
             )
-
         sync_result_builder = SyncResultBuilder(
             sync_config,
             full_state,
@@ -954,11 +992,11 @@ class SyncHandler(object):
             joined_room_ids=joined_room_ids,
         )
 
-        account_data_by_room = yield self._generate_sync_entry_for_account_data(
+        account_data_by_room = await self._generate_sync_entry_for_account_data(
             sync_result_builder
         )
 
-        res = yield self._generate_sync_entry_for_rooms(
+        res = await self._generate_sync_entry_for_rooms(
             sync_result_builder, account_data_by_room
         )
         newly_joined_rooms, newly_joined_or_invited_users, _, _ = res
@@ -968,13 +1006,13 @@ class SyncHandler(object):
             since_token is None and sync_config.filter_collection.blocks_all_presence()
         )
         if self.hs_config.use_presence and not block_all_presence_data:
-            yield self._generate_sync_entry_for_presence(
+            await self._generate_sync_entry_for_presence(
                 sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
             )
 
-        yield self._generate_sync_entry_for_to_device(sync_result_builder)
+        await self._generate_sync_entry_for_to_device(sync_result_builder)
 
-        device_lists = yield self._generate_sync_entry_for_device_list(
+        device_lists = await self._generate_sync_entry_for_device_list(
             sync_result_builder,
             newly_joined_rooms=newly_joined_rooms,
             newly_joined_or_invited_users=newly_joined_or_invited_users,
@@ -983,13 +1021,13 @@ class SyncHandler(object):
         )
 
         device_id = sync_config.device_id
-        one_time_key_counts = {}
+        one_time_key_counts = {}  # type: JsonDict
         if device_id:
-            one_time_key_counts = yield self.store.count_e2e_one_time_keys(
+            one_time_key_counts = await self.store.count_e2e_one_time_keys(
                 user_id, device_id
             )
 
-        yield self._generate_sync_entry_for_groups(sync_result_builder)
+        await self._generate_sync_entry_for_groups(sync_result_builder)
 
         # debug for https://github.com/matrix-org/synapse/issues/4422
         for joined_room in sync_result_builder.joined:
@@ -1013,18 +1051,19 @@ class SyncHandler(object):
         )
 
     @measure_func("_generate_sync_entry_for_groups")
-    @defer.inlineCallbacks
-    def _generate_sync_entry_for_groups(self, sync_result_builder):
+    async def _generate_sync_entry_for_groups(
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> None:
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
 
         if since_token and since_token.groups_key:
-            results = yield self.store.get_groups_changes_for_user(
+            results = await self.store.get_groups_changes_for_user(
                 user_id, since_token.groups_key, now_token.groups_key
             )
         else:
-            results = yield self.store.get_all_groups_for_user(
+            results = await self.store.get_all_groups_for_user(
                 user_id, now_token.groups_key
             )
 
@@ -1057,30 +1096,24 @@ class SyncHandler(object):
         )
 
     @measure_func("_generate_sync_entry_for_device_list")
-    @defer.inlineCallbacks
-    def _generate_sync_entry_for_device_list(
+    async def _generate_sync_entry_for_device_list(
         self,
-        sync_result_builder,
-        newly_joined_rooms,
-        newly_joined_or_invited_users,
-        newly_left_rooms,
-        newly_left_users,
-    ):
+        sync_result_builder: "SyncResultBuilder",
+        newly_joined_rooms: Set[str],
+        newly_joined_or_invited_users: Set[str],
+        newly_left_rooms: Set[str],
+        newly_left_users: Set[str],
+    ) -> DeviceLists:
         """Generate the DeviceLists section of sync
 
         Args:
-            sync_result_builder (SyncResultBuilder)
-            newly_joined_rooms (set[str]): Set of rooms user has joined since
-                previous sync
-            newly_joined_or_invited_users (set[str]): Set of users that have
-                joined or been invited to a room since previous sync.
-            newly_left_rooms (set[str]): Set of rooms user has left since
+            sync_result_builder
+            newly_joined_rooms: Set of rooms user has joined since previous sync
+            newly_joined_or_invited_users: Set of users that have joined or
+                been invited to a room since previous sync.
+            newly_left_rooms: Set of rooms user has left since previous sync
+            newly_left_users: Set of users that have left a room we're in since
                 previous sync
-            newly_left_users (set[str]): Set of users that have left a room
-                we're in since previous sync
-
-        Returns:
-            Deferred[DeviceLists]
         """
 
         user_id = sync_result_builder.sync_config.user.to_string()
@@ -1106,27 +1139,41 @@ class SyncHandler(object):
             # room with by looking at all users that have left a room plus users
             # that were in a room we've left.
 
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            users_who_share_room = await self.store.get_users_who_share_room_with_user(
                 user_id
             )
 
+            # Always tell the user about their own devices. We check as the user
+            # ID is almost certainly already included (unless they're not in any
+            # rooms) and taking a copy of the set is relatively expensive.
+            if user_id not in users_who_share_room:
+                users_who_share_room = set(users_who_share_room)
+                users_who_share_room.add(user_id)
+
+            tracked_users = users_who_share_room
+
             # Step 1a, check for changes in devices of users we share a room with
-            users_that_have_changed = yield self.store.get_users_whose_devices_changed(
-                since_token.device_list_key, users_who_share_room
+            users_that_have_changed = await self.store.get_users_whose_devices_changed(
+                since_token.device_list_key, tracked_users
             )
 
             # Step 1b, check for newly joined rooms
             for room_id in newly_joined_rooms:
-                joined_users = yield self.state.get_current_users_in_room(room_id)
+                joined_users = await self.state.get_current_users_in_room(room_id)
                 newly_joined_or_invited_users.update(joined_users)
 
             # TODO: Check that these users are actually new, i.e. either they
             # weren't in the previous sync *or* they left and rejoined.
             users_that_have_changed.update(newly_joined_or_invited_users)
 
+            user_signatures_changed = await self.store.get_users_whose_signatures_changed(
+                user_id, since_token.device_list_key
+            )
+            users_that_have_changed.update(user_signatures_changed)
+
             # Now find users that we no longer track
             for room_id in newly_left_rooms:
-                left_users = yield self.state.get_current_users_in_room(room_id)
+                left_users = await self.state.get_current_users_in_room(room_id)
                 newly_left_users.update(left_users)
 
             # Remove any users that we still share a room with.
@@ -1136,16 +1183,11 @@ class SyncHandler(object):
         else:
             return DeviceLists(changed=[], left=[])
 
-    @defer.inlineCallbacks
-    def _generate_sync_entry_for_to_device(self, sync_result_builder):
+    async def _generate_sync_entry_for_to_device(
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> None:
         """Generates the portion of the sync response. Populates
         `sync_result_builder` with the result.
-
-        Args:
-            sync_result_builder(SyncResultBuilder)
-
-        Returns:
-            Deferred(dict): A dictionary containing the per room account data.
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         device_id = sync_result_builder.sync_config.device_id
@@ -1158,14 +1200,14 @@ class SyncHandler(object):
             # We only delete messages when a new message comes in, but that's
             # fine so long as we delete them at some point.
 
-            deleted = yield self.store.delete_messages_for_device(
+            deleted = await self.store.delete_messages_for_device(
                 user_id, device_id, since_stream_id
             )
             logger.debug(
                 "Deleted %d to-device messages up to %d", deleted, since_stream_id
             )
 
-            messages, stream_id = yield self.store.get_new_messages_for_device(
+            messages, stream_id = await self.store.get_new_messages_for_device(
                 user_id, device_id, since_stream_id, now_token.to_device_key
             )
 
@@ -1183,42 +1225,45 @@ class SyncHandler(object):
         else:
             sync_result_builder.to_device = []
 
-    @defer.inlineCallbacks
-    def _generate_sync_entry_for_account_data(self, sync_result_builder):
+    async def _generate_sync_entry_for_account_data(
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> Dict[str, Dict[str, JsonDict]]:
         """Generates the account data portion of the sync response. Populates
         `sync_result_builder` with the result.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
+            sync_result_builder
 
         Returns:
-            Deferred(dict): A dictionary containing the per room account data.
+            A dictionary containing the per room account data.
         """
         sync_config = sync_result_builder.sync_config
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
         if since_token and not sync_result_builder.full_state:
-            account_data, account_data_by_room = (
-                yield self.store.get_updated_account_data_for_user(
-                    user_id, since_token.account_data_key
-                )
+            (
+                account_data,
+                account_data_by_room,
+            ) = await self.store.get_updated_account_data_for_user(
+                user_id, since_token.account_data_key
             )
 
-            push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+            push_rules_changed = await self.store.have_push_rules_changed_for_user(
                 user_id, int(since_token.push_rules_key)
             )
 
             if push_rules_changed:
-                account_data["m.push_rules"] = yield self.push_rules_for_user(
+                account_data["m.push_rules"] = await self.push_rules_for_user(
                     sync_config.user
                 )
         else:
-            account_data, account_data_by_room = (
-                yield self.store.get_account_data_for_user(sync_config.user.to_string())
-            )
+            (
+                account_data,
+                account_data_by_room,
+            ) = await self.store.get_account_data_for_user(sync_config.user.to_string())
 
-            account_data["m.push_rules"] = yield self.push_rules_for_user(
+            account_data["m.push_rules"] = await self.push_rules_for_user(
                 sync_config.user
             )
 
@@ -1233,20 +1278,22 @@ class SyncHandler(object):
 
         return account_data_by_room
 
-    @defer.inlineCallbacks
-    def _generate_sync_entry_for_presence(
-        self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
-    ):
+    async def _generate_sync_entry_for_presence(
+        self,
+        sync_result_builder: "SyncResultBuilder",
+        newly_joined_rooms: Set[str],
+        newly_joined_or_invited_users: Set[str],
+    ) -> None:
         """Generates the presence portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
-            newly_joined_rooms(list): List of rooms that the user has joined
-                since the last sync (or empty if an initial sync)
-            newly_joined_or_invited_users(list): List of users that have joined
-                or been invited to rooms since the last sync (or empty if an initial
-                sync)
+            sync_result_builder
+            newly_joined_rooms: Set of rooms that the user has joined since
+                the last sync (or empty if an initial sync)
+            newly_joined_or_invited_users: Set of users that have joined or
+                been invited to rooms since the last sync (or empty if an
+                initial sync)
         """
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
@@ -1262,7 +1309,7 @@ class SyncHandler(object):
             presence_key = None
             include_offline = False
 
-        presence, presence_key = yield presence_source.get_new_events(
+        presence, presence_key = await presence_source.get_new_events(
             user=user,
             from_key=presence_key,
             is_guest=sync_config.is_guest,
@@ -1274,12 +1321,12 @@ class SyncHandler(object):
 
         extra_users_ids = set(newly_joined_or_invited_users)
         for room_id in newly_joined_rooms:
-            users = yield self.state.get_current_users_in_room(room_id)
+            users = await self.state.get_current_users_in_room(room_id)
             extra_users_ids.update(users)
         extra_users_ids.discard(user.to_string())
 
         if extra_users_ids:
-            states = yield self.presence_handler.get_states(extra_users_ids)
+            states = await self.presence_handler.get_states(extra_users_ids)
             presence.extend(states)
 
             # Deduplicate the presence entries so that there's at most one per user
@@ -1289,17 +1336,20 @@ class SyncHandler(object):
 
         sync_result_builder.presence = presence
 
-    @defer.inlineCallbacks
-    def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
+    async def _generate_sync_entry_for_rooms(
+        self,
+        sync_result_builder: "SyncResultBuilder",
+        account_data_by_room: Dict[str, Dict[str, JsonDict]],
+    ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
         """Generates the rooms portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
-            account_data_by_room(dict): Dictionary of per room account data
+            sync_result_builder
+            account_data_by_room: Dictionary of per room account data
 
         Returns:
-            Deferred(tuple): Returns a 4-tuple of
+            Returns a 4-tuple of
             `(newly_joined_rooms, newly_joined_or_invited_users,
             newly_left_rooms, newly_left_users)`
         """
@@ -1310,9 +1360,9 @@ class SyncHandler(object):
         )
 
         if block_all_room_ephemeral:
-            ephemeral_by_room = {}
+            ephemeral_by_room = {}  # type: Dict[str, List[JsonDict]]
         else:
-            now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+            now_token, ephemeral_by_room = await self.ephemeral_by_room(
                 sync_result_builder,
                 now_token=sync_result_builder.now_token,
                 since_token=sync_result_builder.since_token,
@@ -1320,20 +1370,20 @@ class SyncHandler(object):
             sync_result_builder.now_token = now_token
 
         # We check up front if anything has changed, if it hasn't then there is
-        # no point in going futher.
+        # no point in going further.
         since_token = sync_result_builder.since_token
         if not sync_result_builder.full_state:
             if since_token and not ephemeral_by_room and not account_data_by_room:
-                have_changed = yield self._have_rooms_changed(sync_result_builder)
+                have_changed = await self._have_rooms_changed(sync_result_builder)
                 if not have_changed:
-                    tags_by_room = yield self.store.get_updated_tags(
+                    tags_by_room = await self.store.get_updated_tags(
                         user_id, since_token.account_data_key
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
-                        return [], [], [], []
+                        return set(), set(), set(), set()
 
-        ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+        ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id
         )
 
@@ -1343,18 +1393,21 @@ class SyncHandler(object):
             ignored_users = frozenset()
 
         if since_token:
-            res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
-            room_entries, invited, newly_joined_rooms, newly_left_rooms = res
-
-            tags_by_room = yield self.store.get_updated_tags(
+            room_changes = await self._get_rooms_changed(
+                sync_result_builder, ignored_users
+            )
+            tags_by_room = await self.store.get_updated_tags(
                 user_id, since_token.account_data_key
             )
         else:
-            res = yield self._get_all_rooms(sync_result_builder, ignored_users)
-            room_entries, invited, newly_joined_rooms = res
-            newly_left_rooms = []
+            room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
 
-            tags_by_room = yield self.store.get_tags_for_user(user_id)
+            tags_by_room = await self.store.get_tags_for_user(user_id)
+
+        room_entries = room_changes.room_entries
+        invited = room_changes.invited
+        newly_joined_rooms = room_changes.newly_joined_rooms
+        newly_left_rooms = room_changes.newly_left_rooms
 
         def handle_room_entries(room_entry):
             return self._generate_room_entry(
@@ -1367,7 +1420,7 @@ class SyncHandler(object):
                 always_include=sync_result_builder.full_state,
             )
 
-        yield concurrently_execute(handle_room_entries, room_entries, 10)
+        await concurrently_execute(handle_room_entries, room_entries, 10)
 
         sync_result_builder.invited.extend(invited)
 
@@ -1395,14 +1448,15 @@ class SyncHandler(object):
         newly_left_users -= newly_joined_or_invited_users
 
         return (
-            newly_joined_rooms,
+            set(newly_joined_rooms),
             newly_joined_or_invited_users,
-            newly_left_rooms,
+            set(newly_left_rooms),
             newly_left_users,
         )
 
-    @defer.inlineCallbacks
-    def _have_rooms_changed(self, sync_result_builder):
+    async def _have_rooms_changed(
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> bool:
         """Returns whether there may be any new events that should be sent down
         the sync. Returns True if there are.
         """
@@ -1413,7 +1467,7 @@ class SyncHandler(object):
         assert since_token
 
         # Get a list of membership change events that have happened.
-        rooms_changed = yield self.store.get_membership_changes_for_user(
+        rooms_changed = await self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
         )
 
@@ -1426,23 +1480,10 @@ class SyncHandler(object):
                 return True
         return False
 
-    @defer.inlineCallbacks
-    def _get_rooms_changed(self, sync_result_builder, ignored_users):
+    async def _get_rooms_changed(
+        self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+    ) -> _RoomChanges:
         """Gets the the changes that have happened since the last sync.
-
-        Args:
-            sync_result_builder(SyncResultBuilder)
-            ignored_users(set(str)): Set of users ignored by user.
-
-        Returns:
-            Deferred(tuple): Returns a tuple of the form:
-            `(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)`
-
-            where:
-                room_entries is a list [RoomSyncResultBuilder]
-                invited_rooms is a list [InvitedSyncResult]
-                newly_joined_rooms is a list[str] of room ids
-                newly_left_rooms is a list[str] of room ids
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
@@ -1452,11 +1493,11 @@ class SyncHandler(object):
         assert since_token
 
         # Get a list of membership change events that have happened.
-        rooms_changed = yield self.store.get_membership_changes_for_user(
+        rooms_changed = await self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
         )
 
-        mem_change_events_by_room_id = {}
+        mem_change_events_by_room_id = {}  # type: Dict[str, List[EventBase]]
         for event in rooms_changed:
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
@@ -1465,7 +1506,7 @@ class SyncHandler(object):
         room_entries = []
         invited = []
         for room_id, events in iteritems(mem_change_events_by_room_id):
-            logger.info(
+            logger.debug(
                 "Membership changes in %s: [%s]",
                 room_id,
                 ", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)),
@@ -1490,11 +1531,11 @@ class SyncHandler(object):
                 continue
 
             if room_id in sync_result_builder.joined_room_ids or has_join:
-                old_state_ids = yield self.get_state_at(room_id, since_token)
+                old_state_ids = await self.get_state_at(room_id, since_token)
                 old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
                 old_mem_ev = None
                 if old_mem_ev_id:
-                    old_mem_ev = yield self.store.get_event(
+                    old_mem_ev = await self.store.get_event(
                         old_mem_ev_id, allow_none=True
                     )
 
@@ -1527,13 +1568,13 @@ class SyncHandler(object):
                     newly_left_rooms.append(room_id)
                 else:
                     if not old_state_ids:
-                        old_state_ids = yield self.get_state_at(room_id, since_token)
+                        old_state_ids = await self.get_state_at(room_id, since_token)
                         old_mem_ev_id = old_state_ids.get(
                             (EventTypes.Member, user_id), None
                         )
                         old_mem_ev = None
                         if old_mem_ev_id:
-                            old_mem_ev = yield self.store.get_event(
+                            old_mem_ev = await self.store.get_event(
                                 old_mem_ev_id, allow_none=True
                             )
                     if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
@@ -1557,7 +1598,7 @@ class SyncHandler(object):
 
             if leave_events:
                 leave_event = leave_events[-1]
-                leave_stream_token = yield self.store.get_stream_token_for_event(
+                leave_stream_token = await self.store.get_stream_token_for_event(
                     leave_event.event_id
                 )
                 leave_token = since_token.copy_and_replace(
@@ -1575,7 +1616,7 @@ class SyncHandler(object):
                 # This is all screaming out for a refactor, as the logic here is
                 # subtle and the moving parts numerous.
                 if leave_event.internal_metadata.is_out_of_band_membership():
-                    batch_events = [leave_event]
+                    batch_events = [leave_event]  # type: Optional[List[EventBase]]
                 else:
                     batch_events = None
 
@@ -1594,7 +1635,7 @@ class SyncHandler(object):
         timeline_limit = sync_config.filter_collection.timeline_limit()
 
         # Get all events for rooms we're currently joined to.
-        room_to_events = yield self.store.get_room_events_stream_for_rooms(
+        room_to_events = await self.store.get_room_events_stream_for_rooms(
             room_ids=sync_result_builder.joined_room_ids,
             from_key=since_token.room_key,
             to_key=now_token.room_key,
@@ -1602,7 +1643,7 @@ class SyncHandler(object):
         )
 
         # We loop through all room ids, even if there are no new events, in case
-        # there are non room events taht we need to notify about.
+        # there are non room events that we need to notify about.
         for room_id in sync_result_builder.joined_room_ids:
             room_entry = room_to_events.get(room_id, None)
 
@@ -1641,19 +1682,17 @@ class SyncHandler(object):
                 )
             room_entries.append(entry)
 
-        return room_entries, invited, newly_joined_rooms, newly_left_rooms
+        return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms)
 
-    @defer.inlineCallbacks
-    def _get_all_rooms(self, sync_result_builder, ignored_users):
+    async def _get_all_rooms(
+        self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+    ) -> _RoomChanges:
         """Returns entries for all rooms for the user.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
-            ignored_users(set(str)): Set of users ignored by user.
+            sync_result_builder
+            ignored_users: Set of users ignored by user.
 
-        Returns:
-            Deferred(tuple): Returns a tuple of the form:
-            `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
         """
 
         user_id = sync_result_builder.sync_config.user.to_string()
@@ -1668,7 +1707,7 @@ class SyncHandler(object):
             Membership.BAN,
         )
 
-        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+        room_list = await self.store.get_rooms_for_local_user_where_membership_is(
             user_id=user_id, membership_list=membership_list
         )
 
@@ -1691,7 +1730,7 @@ class SyncHandler(object):
             elif event.membership == Membership.INVITE:
                 if event.sender in ignored_users:
                     continue
-                invite = yield self.store.get_event(event.event_id)
+                invite = await self.store.get_event(event.event_id)
                 invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite))
             elif event.membership in (Membership.LEAVE, Membership.BAN):
                 # Always send down rooms we were banned or kicked from.
@@ -1715,31 +1754,30 @@ class SyncHandler(object):
                     )
                 )
 
-        return room_entries, invited, []
+        return _RoomChanges(room_entries, invited, [], [])
 
-    @defer.inlineCallbacks
-    def _generate_room_entry(
+    async def _generate_room_entry(
         self,
-        sync_result_builder,
-        ignored_users,
-        room_builder,
-        ephemeral,
-        tags,
-        account_data,
-        always_include=False,
+        sync_result_builder: "SyncResultBuilder",
+        ignored_users: Set[str],
+        room_builder: "RoomSyncResultBuilder",
+        ephemeral: List[JsonDict],
+        tags: Optional[List[JsonDict]],
+        account_data: Dict[str, JsonDict],
+        always_include: bool = False,
     ):
         """Populates the `joined` and `archived` section of `sync_result_builder`
         based on the `room_builder`.
 
         Args:
-            sync_result_builder(SyncResultBuilder)
-            ignored_users(set(str)): Set of users ignored by user.
-            room_builder(RoomSyncResultBuilder)
-            ephemeral(list): List of new ephemeral events for room
-            tags(list): List of *all* tags for room, or None if there has been
+            sync_result_builder
+            ignored_users: Set of users ignored by user.
+            room_builder
+            ephemeral: List of new ephemeral events for room
+            tags: List of *all* tags for room, or None if there has been
                 no change.
-            account_data(list): List of new account data for room
-            always_include(bool): Always include this room in the sync response,
+            account_data: List of new account data for room
+            always_include: Always include this room in the sync response,
                 even if empty.
         """
         newly_joined = room_builder.newly_joined
@@ -1760,12 +1798,12 @@ class SyncHandler(object):
         since_token = room_builder.since_token
         upto_token = room_builder.upto_token
 
-        batch = yield self._load_filtered_recents(
+        batch = await self._load_filtered_recents(
             room_id,
             sync_config,
             now_token=upto_token,
             since_token=since_token,
-            recents=events,
+            potential_recents=events,
             newly_joined_room=newly_joined,
         )
 
@@ -1787,7 +1825,7 @@ class SyncHandler(object):
         # tag was added by synapse e.g. for server notice rooms.
         if full_state:
             user_id = sync_result_builder.sync_config.user.to_string()
-            tags = yield self.store.get_tags_for_room(user_id, room_id)
+            tags = await self.store.get_tags_for_room(user_id, room_id)
 
             # If there aren't any tags, don't send the empty tags list down
             # sync
@@ -1812,11 +1850,11 @@ class SyncHandler(object):
         ):
             return
 
-        state = yield self.compute_state_delta(
+        state = await self.compute_state_delta(
             room_id, batch, sync_config, since_token, now_token, full_state=full_state
         )
 
-        summary = {}
+        summary = {}  # type: Optional[JsonDict]
 
         # we include a summary in room responses when we're lazy loading
         # members (as the client otherwise doesn't have enough info to form
@@ -1835,12 +1873,12 @@ class SyncHandler(object):
             )
             or since_token is None
         ):
-            summary = yield self.compute_summary(
+            summary = await self.compute_summary(
                 room_id, sync_config, batch, state, now_token
             )
 
         if room_builder.rtype == "joined":
-            unread_notifications = {}
+            unread_notifications = {}  # type: Dict[str, str]
             room_sync = JoinedSyncResult(
                 room_id=room_id,
                 timeline=batch,
@@ -1852,7 +1890,7 @@ class SyncHandler(object):
             )
 
             if room_sync or always_include:
-                notifs = yield self.unread_notifs_for_room_id(room_id, sync_config)
+                notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
 
                 if notifs is not None:
                     unread_notifications["notification_count"] = notifs["notify_count"]
@@ -1862,24 +1900,25 @@ class SyncHandler(object):
 
             if batch.limited and since_token:
                 user_id = sync_result_builder.sync_config.user.to_string()
-                logger.info(
+                logger.debug(
                     "Incremental gappy sync of %s for user %s with %d state events"
                     % (room_id, user_id, len(state))
                 )
         elif room_builder.rtype == "archived":
-            room_sync = ArchivedSyncResult(
+            archived_room_sync = ArchivedSyncResult(
                 room_id=room_id,
                 timeline=batch,
                 state=state,
                 account_data=account_data_events,
             )
-            if room_sync or always_include:
-                sync_result_builder.archived.append(room_sync)
+            if archived_room_sync or always_include:
+                sync_result_builder.archived.append(archived_room_sync)
         else:
             raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
-    @defer.inlineCallbacks
-    def get_rooms_for_user_at(self, user_id, stream_ordering):
+    async def get_rooms_for_user_at(
+        self, user_id: str, stream_ordering: int
+    ) -> FrozenSet[str]:
         """Get set of joined rooms for a user at the given stream ordering.
 
         The stream ordering *must* be recent, otherwise this may throw an
@@ -1887,14 +1926,13 @@ class SyncHandler(object):
         current token, which should be perfectly fine).
 
         Args:
-            user_id (str)
-            stream_ordering (int)
+            user_id
+            stream_ordering
 
         ReturnValue:
-            Deferred[frozenset[str]]: Set of room_ids the user is in at given
-            stream_ordering.
+            Set of room_ids the user is in at given stream_ordering.
         """
-        joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(user_id)
+        joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
 
         joined_room_ids = set()
 
@@ -1912,18 +1950,17 @@ class SyncHandler(object):
 
             logger.info("User joined room after current token: %s", room_id)
 
-            extrems = yield self.store.get_forward_extremeties_for_room(
+            extrems = await self.store.get_forward_extremeties_for_room(
                 room_id, stream_ordering
             )
-            users_in_room = yield self.state.get_current_users_in_room(room_id, extrems)
+            users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
             if user_id in users_in_room:
                 joined_room_ids.add(room_id)
 
-        joined_room_ids = frozenset(joined_room_ids)
-        return joined_room_ids
+        return frozenset(joined_room_ids)
 
 
-def _action_has_highlight(actions):
+def _action_has_highlight(actions: List[JsonDict]) -> bool:
     for action in actions:
         try:
             if action.get("set_tweak", None) == "highlight":
@@ -1935,22 +1972,23 @@ def _action_has_highlight(actions):
 
 
 def _calculate_state(
-    timeline_contains, timeline_start, previous, current, lazy_load_members
-):
+    timeline_contains: StateMap[str],
+    timeline_start: StateMap[str],
+    previous: StateMap[str],
+    current: StateMap[str],
+    lazy_load_members: bool,
+) -> StateMap[str]:
     """Works out what state to include in a sync response.
 
     Args:
-        timeline_contains (dict): state in the timeline
-        timeline_start (dict): state at the start of the timeline
-        previous (dict): state at the end of the previous sync (or empty dict
+        timeline_contains: state in the timeline
+        timeline_start: state at the start of the timeline
+        previous: state at the end of the previous sync (or empty dict
             if this is an initial sync)
-        current (dict): state at the end of the timeline
-        lazy_load_members (bool): whether to return members from timeline_start
+        current: state at the end of the timeline
+        lazy_load_members: whether to return members from timeline_start
             or not.  assumes that timeline_start has already been filtered to
             include only the members the client needs to know about.
-
-    Returns:
-        dict
     """
     event_id_to_key = {
         e: key
@@ -1962,10 +2000,10 @@ def _calculate_state(
         )
     }
 
-    c_ids = set(e for e in itervalues(current))
-    ts_ids = set(e for e in itervalues(timeline_start))
-    p_ids = set(e for e in itervalues(previous))
-    tc_ids = set(e for e in itervalues(timeline_contains))
+    c_ids = set(itervalues(current))
+    ts_ids = set(itervalues(timeline_start))
+    p_ids = set(itervalues(previous))
+    tc_ids = set(itervalues(timeline_contains))
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
@@ -1987,15 +2025,16 @@ def _calculate_state(
     return {event_id_to_key[e]: e for e in state_ids}
 
 
-class SyncResultBuilder(object):
+@attr.s
+class SyncResultBuilder:
     """Used to help build up a new SyncResult for a user
 
     Attributes:
-        sync_config (SyncConfig)
-        full_state (bool)
-        since_token (StreamToken)
-        now_token (StreamToken)
-        joined_room_ids (list[str])
+        sync_config
+        full_state: The full_state flag as specified by user
+        since_token: The token supplied by user, or None.
+        now_token: The token to sync up to.
+        joined_room_ids: List of rooms the user is joined to
 
         # The following mirror the fields in a sync response
         presence (list)
@@ -2003,61 +2042,45 @@ class SyncResultBuilder(object):
         joined (list[JoinedSyncResult])
         invited (list[InvitedSyncResult])
         archived (list[ArchivedSyncResult])
-        device (list)
         groups (GroupsSyncResult|None)
         to_device (list)
     """
 
-    def __init__(
-        self, sync_config, full_state, since_token, now_token, joined_room_ids
-    ):
-        """
-        Args:
-            sync_config (SyncConfig)
-            full_state (bool): The full_state flag as specified by user
-            since_token (StreamToken): The token supplied by user, or None.
-            now_token (StreamToken): The token to sync up to.
-            joined_room_ids (list[str]): List of rooms the user is joined to
-        """
-        self.sync_config = sync_config
-        self.full_state = full_state
-        self.since_token = since_token
-        self.now_token = now_token
-        self.joined_room_ids = joined_room_ids
-
-        self.presence = []
-        self.account_data = []
-        self.joined = []
-        self.invited = []
-        self.archived = []
-        self.device = []
-        self.groups = None
-        self.to_device = []
+    sync_config = attr.ib(type=SyncConfig)
+    full_state = attr.ib(type=bool)
+    since_token = attr.ib(type=Optional[StreamToken])
+    now_token = attr.ib(type=StreamToken)
+    joined_room_ids = attr.ib(type=FrozenSet[str])
+
+    presence = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+    account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+    joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list))
+    invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list))
+    archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list))
+    groups = attr.ib(type=Optional[GroupsSyncResult], default=None)
+    to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list))
 
 
+@attr.s
 class RoomSyncResultBuilder(object):
     """Stores information needed to create either a `JoinedSyncResult` or
     `ArchivedSyncResult`.
+
+    Attributes:
+        room_id
+        rtype: One of `"joined"` or `"archived"`
+        events: List of events to include in the room (more events may be added
+            when generating result).
+        newly_joined: If the user has newly joined the room
+        full_state: Whether the full state should be sent in result
+        since_token: Earliest point to return events from, or None
+        upto_token: Latest point to return events from.
     """
 
-    def __init__(
-        self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token
-    ):
-        """
-        Args:
-            room_id(str)
-            rtype(str): One of `"joined"` or `"archived"`
-            events(list[FrozenEvent]): List of events to include in the room
-                (more events may be added when generating result).
-            newly_joined(bool): If the user has newly joined the room
-            full_state(bool): Whether the full state should be sent in result
-            since_token(StreamToken): Earliest point to return events from, or None
-            upto_token(StreamToken): Latest point to return events from.
-        """
-        self.room_id = room_id
-        self.rtype = rtype
-        self.events = events
-        self.newly_joined = newly_joined
-        self.full_state = full_state
-        self.since_token = since_token
-        self.upto_token = upto_token
+    room_id = attr.ib(type=str)
+    rtype = attr.ib(type=str)
+    events = attr.ib(type=Optional[List[EventBase]])
+    newly_joined = attr.ib(type=bool)
+    full_state = attr.ib(type=bool)
+    since_token = attr.ib(type=Optional[StreamToken])
+    upto_token = attr.ib(type=StreamToken)