summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2018-10-09 10:05:02 +0100
committerDavid Baker <dave@matrix.org>2018-10-09 10:05:02 +0100
commitdc045ef20222bfbe8dcb5dae297e741509cce8d1 (patch)
treeee03ab45ce9791a06c12d15c01d3412cd101330a /synapse/handlers
parentApparently this blank line is Very Important (diff)
parentMerge pull request #4017 from matrix-org/rav/optimise_filter_events_for_server (diff)
downloadsynapse-dc045ef20222bfbe8dcb5dae297e741509cce8d1.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/e2e_backups
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py8
-rw-r--r--synapse/handlers/directory.py19
-rw-r--r--synapse/handlers/e2e_keys.py7
-rw-r--r--synapse/handlers/federation.py395
-rw-r--r--synapse/handlers/message.py25
-rw-r--r--synapse/handlers/pagination.py9
-rw-r--r--synapse/handlers/profile.py10
-rw-r--r--synapse/handlers/register.py4
-rw-r--r--synapse/handlers/room_list.py2
-rw-r--r--synapse/handlers/room_member.py5
-rw-r--r--synapse/handlers/search.py14
-rw-r--r--synapse/handlers/sync.py182
-rw-r--r--synapse/handlers/typing.py23
13 files changed, 498 insertions, 205 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 4a81bd2ba9..2a5eab124f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -895,22 +895,24 @@ class AuthHandler(BaseHandler):
 
         Args:
             password (unicode): Password to hash.
-            stored_hash (unicode): Expected hash value.
+            stored_hash (bytes): Expected hash value.
 
         Returns:
             Deferred(bool): Whether self.hash(password) == stored_hash.
         """
-
         def _do_validate_hash():
             # Normalise the Unicode in the password
             pw = unicodedata.normalize("NFKC", password)
 
             return bcrypt.checkpw(
                 pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
-                stored_hash.encode('utf8')
+                stored_hash
             )
 
         if stored_hash:
+            if not isinstance(stored_hash, bytes):
+                stored_hash = stored_hash.encode('ascii')
+
             return make_deferred_yieldable(
                 threads.deferToThreadPool(
                     self.hs.get_reactor(),
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index ef866da1b6..18741c5fac 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -20,7 +20,14 @@ import string
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    CodeMessageException,
+    Codes,
+    NotFoundError,
+    StoreError,
+    SynapseError,
+)
 from synapse.types import RoomAlias, UserID, get_domain_from_id
 
 from ._base import BaseHandler
@@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler):
     def delete_association(self, requester, user_id, room_alias):
         # association deletion for human users
 
-        can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+        try:
+            can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+        except StoreError as e:
+            if e.code == 404:
+                raise NotFoundError("Unknown room alias")
+            raise
+
         if not can_delete:
             raise AuthError(
                 403, "You don't have permission to delete the alias.",
@@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler):
     def _user_can_delete_alias(self, alias, user_id):
         creator = yield self.store.get_room_alias_creator(alias.to_string())
 
-        if creator and creator == user_id:
+        if creator is not None and creator == user_id:
             defer.returnValue(True)
 
         is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5816bf8b4f..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -330,7 +330,8 @@ class E2eKeysHandler(object):
                         (algorithm, key_id, ex_json, key)
                     )
             else:
-                new_keys.append((algorithm, key_id, encode_canonical_json(key)))
+                new_keys.append((
+                    algorithm, key_id, encode_canonical_json(key).decode('ascii')))
 
         yield self.store.add_e2e_one_time_keys(
             user_id, device_id, time_now, new_keys
@@ -340,7 +341,7 @@ class E2eKeysHandler(object):
 def _exception_to_failure(e):
     if isinstance(e, CodeMessageException):
         return {
-            "status": e.code, "message": e.message,
+            "status": e.code, "message": str(e),
         }
 
     if isinstance(e, NotRetryingDestination):
@@ -358,7 +359,7 @@ def _exception_to_failure(e):
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
     # give a string for e.message, which json then fails to serialize.
     return {
-        "status": 503, "message": str(e.message),
+        "status": 503, "message": str(e),
     }
 
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0ebf0fd188..45d955e6f5 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,7 +18,6 @@
 
 import itertools
 import logging
-import sys
 
 import six
 from six import iteritems, itervalues
@@ -69,6 +68,27 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
+def shortstr(iterable, maxitems=5):
+    """If iterable has maxitems or fewer, return the stringification of a list
+    containing those items.
+
+    Otherwise, return the stringification of a a list with the first maxitems items,
+    followed by "...".
+
+    Args:
+        iterable (Iterable): iterable to truncate
+        maxitems (int): number of items to return before truncating
+
+    Returns:
+        unicode
+    """
+
+    items = list(itertools.islice(iterable, maxitems + 1))
+    if len(items) <= maxitems:
+        return str(items)
+    return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
@@ -85,7 +105,7 @@ class FederationHandler(BaseHandler):
 
         self.hs = hs
 
-        self.store = hs.get_datastore()
+        self.store = hs.get_datastore()  # type: synapse.storage.DataStore
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -114,9 +134,8 @@ class FederationHandler(BaseHandler):
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
     @defer.inlineCallbacks
-    @log_function
     def on_receive_pdu(
-            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+            self, origin, pdu, sent_to_us_directly=False,
     ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
@@ -125,14 +144,23 @@ class FederationHandler(BaseHandler):
             origin (str): server which initiated the /send/ transaction. Will
                 be used to fetch missing events or state.
             pdu (FrozenEvent): received PDU
-            get_missing (bool): True if we should fetch missing prev_events
+            sent_to_us_directly (bool): True if this event was pushed to us; False if
+                we pulled it as the result of a missing prev_event.
 
         Returns (Deferred): completes with None
         """
 
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
+        logger.info(
+            "[%s %s] handling received PDU: %s",
+            room_id, event_id, pdu,
+        )
+
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self.store.get_event(
-            pdu.event_id,
+            event_id,
             allow_none=True,
             allow_rejected=True,
         )
@@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
             )
         )
         if already_seen:
-            logger.debug("Already seen pdu %s", pdu.event_id)
+            logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
             return
 
         # do some initial sanity-checking of the event. In particular, make
@@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
         try:
             self._sanity_check_event(pdu)
         except SynapseError as err:
+            logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
             raise FederationError(
                 "ERROR",
                 err.code,
@@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
 
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
-        if pdu.room_id in self.room_queues:
-            logger.info("Ignoring PDU %s for room %s from %s for now; join "
-                        "in progress", pdu.event_id, pdu.room_id, origin)
-            self.room_queues[pdu.room_id].append((pdu, origin))
+        if room_id in self.room_queues:
+            logger.info(
+                "[%s %s] Queuing PDU from %s for now: join in progress",
+                room_id, event_id, origin,
+            )
+            self.room_queues[room_id].append((pdu, origin))
             return
 
         # If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
         # we should check if we *are* in fact in the room. If we are then we
         # can magically rejoin the room.
         is_in_room = yield self.auth.check_host_in_room(
-            pdu.room_id,
+            room_id,
             self.server_name
         )
         if not is_in_room:
@@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
             )
             if was_in_room:
                 logger.info(
-                    "Ignoring PDU %s for room %s from %s as we've left the room!",
-                    pdu.event_id, pdu.room_id, origin,
+                    "[%s %s] Ignoring PDU from %s as we've left the room",
+                    room_id, event_id, origin,
                 )
                 defer.returnValue(None)
 
@@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
             )
 
             logger.debug(
-                "_handle_new_pdu min_depth for %s: %d",
-                pdu.room_id, min_depth
+                "[%s %s] min_depth: %d",
+                room_id, event_id, min_depth,
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
                 # send to the clients.
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
-                if get_missing and prevs - seen:
+                missing_prevs = prevs - seen
+                if sent_to_us_directly and missing_prevs:
                     # If we're missing stuff, ensure we only fetch stuff one
                     # at a time.
                     logger.info(
-                        "Acquiring lock for room %r to fetch %d missing events: %r...",
-                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+                        "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
                     with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
                         logger.info(
-                            "Acquired lock for room %r to fetch %d missing events",
-                            pdu.room_id, len(prevs - seen),
+                            "[%s %s] Acquired room lock to fetch %d missing prev_events",
+                            room_id, event_id, len(missing_prevs),
                         )
 
                         yield self._get_missing_events_for_pdu(
@@ -241,69 +273,150 @@ class FederationHandler(BaseHandler):
 
                         if not prevs - seen:
                             logger.info(
-                                "Found all missing prev events for %s", pdu.event_id
+                                "[%s %s] Found all missing prev_events",
+                                room_id, event_id,
                             )
-                elif prevs - seen:
+                elif missing_prevs:
                     logger.info(
-                        "Not fetching %d missing events for room %r,event %s: %r...",
-                        len(prevs - seen), pdu.room_id, pdu.event_id,
-                        list(prevs - seen)[:5],
+                        "[%s %s] Not recursively fetching %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
 
-            if sent_to_us_directly and prevs - seen:
-                # If they have sent it to us directly, and the server
-                # isn't telling us about the auth events that it's
-                # made a message referencing, we explode
-                raise FederationError(
-                    "ERROR",
-                    403,
-                    (
-                        "Your server isn't divulging details about prev_events "
-                        "referenced in this event."
-                    ),
-                    affected=pdu.event_id,
-                )
-            elif prevs - seen:
-                # Calculate the state of the previous events, and
-                # de-conflict them to find the current state.
-                state_groups = []
+            if prevs - seen:
+                # We've still not been able to get all of the prev_events for this event.
+                #
+                # In this case, we need to fall back to asking another server in the
+                # federation for the state at this event. That's ok provided we then
+                # resolve the state against other bits of the DAG before using it (which
+                # will ensure that you can't just take over a room by sending an event,
+                # withholding its prev_events, and declaring yourself to be an admin in
+                # the subsequent state request).
+                #
+                # Now, if we're pulling this event as a missing prev_event, then clearly
+                # this event is not going to become the only forward-extremity and we are
+                # guaranteed to resolve its state against our existing forward
+                # extremities, so that should be fine.
+                #
+                # On the other hand, if this event was pushed to us, it is possible for
+                # it to become the only forward-extremity in the room, and we would then
+                # trust its state to be the state for the whole room. This is very bad.
+                # Further, if the event was pushed to us, there is no excuse for us not to
+                # have all the prev_events. We therefore reject any such events.
+                #
+                # XXX this really feels like it could/should be merged with the above,
+                # but there is an interaction with min_depth that I'm not really
+                # following.
+
+                if sent_to_us_directly:
+                    logger.warn(
+                        "[%s %s] Failed to fetch %d prev events: rejecting",
+                        room_id, event_id, len(prevs - seen),
+                    )
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        (
+                            "Your server isn't divulging details about prev_events "
+                            "referenced in this event."
+                        ),
+                        affected=pdu.event_id,
+                    )
+
+                # Calculate the state after each of the previous events, and
+                # resolve them to find the correct state at the current event.
                 auth_chains = set()
+                event_map = {
+                    event_id: pdu,
+                }
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
-                    state_groups.append(ours)
+                    ours = yield self.store.get_state_groups_ids(room_id, seen)
+
+                    # state_maps is a list of mappings from (type, state_key) to event_id
+                    # type: list[dict[tuple[str, str], str]]
+                    state_maps = list(ours.values())
+
+                    # we don't need this any more, let's delete it.
+                    del ours
 
                     # Ask the remote server for the states we don't
                     # know about
                     for p in prevs - seen:
-                        state, got_auth_chain = (
-                            yield self.federation_client.get_state_for_room(
-                                origin, pdu.room_id, p
-                            )
+                        logger.info(
+                            "[%s %s] Requesting state at missing prev_event %s",
+                            room_id, event_id, p,
                         )
-                        auth_chains.update(got_auth_chain)
-                        state_group = {(x.type, x.state_key): x.event_id for x in state}
-                        state_groups.append(state_group)
+
+                        with logcontext.nested_logging_context(p):
+                            # note that if any of the missing prevs share missing state or
+                            # auth events, the requests to fetch those events are deduped
+                            # by the get_pdu_cache in federation_client.
+                            remote_state, got_auth_chain = (
+                                yield self.federation_client.get_state_for_room(
+                                    origin, room_id, p,
+                                )
+                            )
+
+                            # we want the state *after* p; get_state_for_room returns the
+                            # state *before* p.
+                            remote_event = yield self.federation_client.get_pdu(
+                                [origin], p, outlier=True,
+                            )
+
+                            if remote_event is None:
+                                raise Exception(
+                                    "Unable to get missing prev_event %s" % (p, )
+                                )
+
+                            if remote_event.is_state():
+                                remote_state.append(remote_event)
+
+                            # XXX hrm I'm not convinced that duplicate events will compare
+                            # for equality, so I'm not sure this does what the author
+                            # hoped.
+                            auth_chains.update(got_auth_chain)
+
+                            remote_state_map = {
+                                (x.type, x.state_key): x.event_id for x in remote_state
+                            }
+                            state_maps.append(remote_state_map)
+
+                            for x in remote_state:
+                                event_map[x.event_id] = x
 
                     # Resolve any conflicting state
+                    @defer.inlineCallbacks
                     def fetch(ev_ids):
-                        return self.store.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False
+                        fetched = yield self.store.get_events(
+                            ev_ids, get_prev_content=False, check_redacted=False,
                         )
+                        # add any events we fetch here to the `event_map` so that we
+                        # can use them to build the state event list below.
+                        event_map.update(fetched)
+                        defer.returnValue(fetched)
 
-                    room_version = yield self.store.get_room_version(pdu.room_id)
+                    room_version = yield self.store.get_room_version(room_id)
                     state_map = yield resolve_events_with_factory(
-                        room_version, state_groups, {pdu.event_id: pdu}, fetch
+                        room_version, state_maps, event_map, fetch,
                     )
 
-                    state = (yield self.store.get_events(state_map.values())).values()
+                    # we need to give _process_received_pdu the actual state events
+                    # rather than event ids, so generate that now.
+                    state = [
+                        event_map[e] for e in six.itervalues(state_map)
+                    ]
                     auth_chain = list(auth_chains)
                 except Exception:
+                    logger.warn(
+                        "[%s %s] Error attempting to resolve state at missing "
+                        "prev_events",
+                        room_id, event_id, exc_info=True,
+                    )
                     raise FederationError(
                         "ERROR",
                         403,
                         "We can't get valid state history.",
-                        affected=pdu.event_id,
+                        affected=event_id,
                     )
 
         yield self._process_received_pdu(
@@ -322,15 +435,16 @@ class FederationHandler(BaseHandler):
             prevs (set(str)): List of event ids which we are missing
             min_depth (int): Minimum depth of events to return.
         """
-        # We recalculate seen, since it may have changed.
+
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
         seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
 
-        latest = yield self.store.get_latest_event_ids_in_room(
-            pdu.room_id
-        )
+        latest = yield self.store.get_latest_event_ids_in_room(room_id)
 
         # We add the prev events that we have seen to the latest
         # list to ensure the remote server doesn't give them to us
@@ -338,8 +452,8 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "Missing %d events for room %r pdu %s: %r...",
-            len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+            "[%s %s]: Requesting %d prev_events: %s",
+            room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
         )
 
         # XXX: we set timeout to 10s to help workaround
@@ -360,49 +474,88 @@ class FederationHandler(BaseHandler):
         # apparently.
         #
         # see https://github.com/matrix-org/synapse/pull/1744
+        #
+        # ----
+        #
+        # Update richvdh 2018/09/18: There are a number of problems with timing this
+        # request out agressively on the client side:
+        #
+        # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+        #   if you send too many requests at once, so you end up with the server carefully
+        #   working through the backlog of your requests, which you have already timed
+        #   out.
+        #
+        # - for this request in particular, we now (as of
+        #   https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+        #   server can't produce a plausible-looking set of prev_events - so we becone
+        #   much more likely to reject the event.
+        #
+        # - contrary to what it says above, we do *not* fall back to fetching fresh state
+        #   for the room if get_missing_events times out. Rather, we give up processing
+        #   the PDU whose prevs we are missing, which then makes it much more likely that
+        #   we'll end up back here for the *next* PDU in the list, which exacerbates the
+        #   problem.
+        #
+        # - the agressive 10s timeout was introduced to deal with incoming federation
+        #   requests taking 8 hours to process. It's not entirely clear why that was going
+        #   on; certainly there were other issues causing traffic storms which are now
+        #   resolved, and I think in any case we may be more sensible about our locking
+        #   now. We're *certainly* more sensible about our logging.
+        #
+        # All that said: Let's try increasing the timout to 60s and see what happens.
 
         missing_events = yield self.federation_client.get_missing_events(
             origin,
-            pdu.room_id,
+            room_id,
             earliest_events_ids=list(latest),
             latest_events=[pdu],
             limit=10,
             min_depth=min_depth,
-            timeout=10000,
+            timeout=60000,
         )
 
         logger.info(
-            "Got %d events: %r...",
-            len(missing_events), [e.event_id for e in missing_events[:5]]
+            "[%s %s]: Got %d prev_events: %s",
+            room_id, event_id, len(missing_events), shortstr(missing_events),
         )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
-        for e in missing_events:
-            logger.info("Handling found event %s", e.event_id)
-            try:
-                yield self.on_receive_pdu(
-                    origin,
-                    e,
-                    get_missing=False
-                )
-            except FederationError as e:
-                if e.code == 403:
-                    logger.warn("Event %s failed history check.")
-                else:
-                    raise
+        for ev in missing_events:
+            logger.info(
+                "[%s %s] Handling received prev_event %s",
+                room_id, event_id, ev.event_id,
+            )
+            with logcontext.nested_logging_context(ev.event_id):
+                try:
+                    yield self.on_receive_pdu(
+                        origin,
+                        ev,
+                        sent_to_us_directly=False,
+                    )
+                except FederationError as e:
+                    if e.code == 403:
+                        logger.warn(
+                            "[%s %s] Received prev_event %s failed history check.",
+                            room_id, event_id, ev.event_id,
+                        )
+                    else:
+                        raise
 
-    @log_function
     @defer.inlineCallbacks
-    def _process_received_pdu(self, origin, pdu, state, auth_chain):
+    def _process_received_pdu(self, origin, event, state, auth_chain):
         """ Called when we have a new pdu. We need to do auth checks and put it
         through the StateHandler.
         """
-        event = pdu
+        room_id = event.room_id
+        event_id = event.event_id
 
-        logger.debug("Processing event: %s", event)
+        logger.debug(
+            "[%s %s] Processing event: %s",
+            room_id, event_id, event,
+        )
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
@@ -411,15 +564,16 @@ class FederationHandler(BaseHandler):
         # event.
         if state and auth_chain and not event.internal_metadata.is_outlier():
             is_in_room = yield self.auth.check_host_in_room(
-                event.room_id,
+                room_id,
                 self.server_name
             )
         else:
             is_in_room = True
+
         if not is_in_room:
             logger.info(
-                "Got event for room we're not in: %r %r",
-                event.room_id, event.event_id
+                "[%s %s] Got event for room we're not in",
+                room_id, event_id,
             )
 
             try:
@@ -431,7 +585,7 @@ class FederationHandler(BaseHandler):
                     "ERROR",
                     e.code,
                     e.msg,
-                    affected=event.event_id,
+                    affected=event_id,
                 )
 
         else:
@@ -464,6 +618,10 @@ class FederationHandler(BaseHandler):
                     })
                     seen_ids.add(e.event_id)
 
+                logger.info(
+                    "[%s %s] persisting newly-received auth/state events %s",
+                    room_id, event_id, [e["event"].event_id for e in event_infos]
+                )
                 yield self._handle_new_events(origin, event_infos)
 
             try:
@@ -480,12 +638,12 @@ class FederationHandler(BaseHandler):
                     affected=event.event_id,
                 )
 
-        room = yield self.store.get_room(event.room_id)
+        room = yield self.store.get_room(room_id)
 
         if not room:
             try:
                 yield self.store.store_room(
-                    room_id=event.room_id,
+                    room_id=room_id,
                     room_creator_user_id="",
                     is_public=False,
                 )
@@ -513,7 +671,7 @@ class FederationHandler(BaseHandler):
 
                 if newly_joined:
                     user = UserID.from_string(event.state_key)
-                    yield self.user_joined_room(user, event.room_id)
+                    yield self.user_joined_room(user, room_id)
 
     @log_function
     @defer.inlineCallbacks
@@ -594,7 +752,7 @@ class FederationHandler(BaseHandler):
 
         required_auth = set(
             a_id
-            for event in events + state_events.values() + auth_events.values()
+            for event in events + list(state_events.values()) + list(auth_events.values())
             for a_id, _ in event.auth_events
         )
         auth_events.update({
@@ -802,7 +960,7 @@ class FederationHandler(BaseHandler):
                     )
                     continue
                 except NotRetryingDestination as e:
-                    logger.info(e.message)
+                    logger.info(str(e))
                     continue
                 except FederationDeniedError as e:
                     logger.info(e)
@@ -1027,7 +1185,8 @@ class FederationHandler(BaseHandler):
             try:
                 logger.info("Processing queued PDU %s which was received "
                             "while we were joining %s", p.event_id, p.room_id)
-                yield self.on_receive_pdu(origin, p)
+                with logcontext.nested_logging_context(p.event_id):
+                    yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
                 logger.warn(
                     "Error handling queued PDU %s from %s: %s",
@@ -1358,7 +1517,7 @@ class FederationHandler(BaseHandler):
         )
 
         if state_groups:
-            _, state = state_groups.items().pop()
+            _, state = list(state_groups.items()).pop()
             results = state
 
             if event.is_state():
@@ -1430,12 +1589,10 @@ class FederationHandler(BaseHandler):
         else:
             defer.returnValue(None)
 
-    @log_function
     def get_min_depth_for_context(self, context):
         return self.store.get_min_depth(context)
 
     @defer.inlineCallbacks
-    @log_function
     def _handle_new_event(self, origin, event, state=None, auth_events=None,
                           backfilled=False):
         context = yield self._prep_event(
@@ -1444,6 +1601,9 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             if not event.internal_metadata.is_outlier() and not backfilled:
                 yield self.action_generator.handle_push_actions_for_event(
@@ -1454,15 +1614,13 @@ class FederationHandler(BaseHandler):
                 [(event, context)],
                 backfilled=backfilled,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            tp, value, tb = sys.exc_info()
-
-            logcontext.run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
-
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                logcontext.run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
         defer.returnValue(context)
 
@@ -1475,15 +1633,22 @@ class FederationHandler(BaseHandler):
 
         Notifies about the events where appropriate.
         """
-        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.run_in_background(
-                    self._prep_event,
+
+        @defer.inlineCallbacks
+        def prep(ev_info):
+            event = ev_info["event"]
+            with logcontext.nested_logging_context(suffix=event.event_id):
+                res = yield self._prep_event(
                     origin,
-                    ev_info["event"],
+                    event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
+            defer.returnValue(res)
+
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.run_in_background(prep, ev_info)
                 for ev_info in event_infos
             ], consumeErrors=True,
         ))
@@ -1635,8 +1800,8 @@ class FederationHandler(BaseHandler):
             )
         except AuthError as e:
             logger.warn(
-                "Rejecting %s because %s",
-                event.event_id, e.msg
+                "[%s %s] Rejecting: %s",
+                event.room_id, event.event_id, e.msg
             )
 
             context.rejected = RejectedReason.AUTH_ERROR
@@ -1831,7 +1996,7 @@ class FederationHandler(BaseHandler):
 
                 room_version = yield self.store.get_room_version(event.room_id)
 
-                new_state = self.state_handler.resolve_events(
+                new_state = yield self.state_handler.resolve_events(
                     room_version,
                     [list(local_view.values()), list(remote_view.values())],
                     event
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e484061cc0..4954b23a0d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,9 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import sys
 
-import six
 from six import iteritems, itervalues, string_types
 
 from canonicaljson import encode_canonical_json, json
@@ -624,6 +622,9 @@ class EventCreationHandler(object):
             event, context
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
@@ -636,6 +637,7 @@ class EventCreationHandler(object):
                     ratelimit=ratelimit,
                     extra_users=extra_users,
                 )
+                success = True
                 return
 
             yield self.persist_and_notify_client_event(
@@ -645,17 +647,16 @@ class EventCreationHandler(object):
                 ratelimit=ratelimit,
                 extra_users=extra_users,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            # Ensure that we actually remove the entries in the push actions
-            # staging area, if we calculated them.
-            tp, value, tb = sys.exc_info()
-
-            run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
 
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                # Ensure that we actually remove the entries in the push actions
+                # staging area, if we calculated them.
+                run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5170d093e3..a155b6e938 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -269,14 +269,7 @@ class PaginationHandler(object):
 
             if state_ids:
                 state = yield self.store.get_events(list(state_ids.values()))
-
-            if state:
-                state = yield filter_events_for_client(
-                    self.store,
-                    user_id,
-                    state.values(),
-                    is_peeking=(member_event_id is None),
-                )
+                state = state.values()
 
         time_now = self.clock.time_msec()
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce6a..1dfbde84fd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -142,10 +142,8 @@ class BaseProfileHandler(BaseHandler):
                 if e.code != 404:
                     logger.exception("Failed to get displayname")
                 raise
-            except Exception:
-                logger.exception("Failed to get displayname")
-            else:
-                defer.returnValue(result["displayname"])
+
+            defer.returnValue(result["displayname"])
 
     @defer.inlineCallbacks
     def set_displayname(self, target_user, requester, new_displayname, by_admin=False):
@@ -199,8 +197,6 @@ class BaseProfileHandler(BaseHandler):
                 if e.code != 404:
                     logger.exception("Failed to get avatar_url")
                 raise
-            except Exception:
-                logger.exception("Failed to get avatar_url")
 
             defer.returnValue(result["avatar_url"])
 
@@ -278,7 +274,7 @@ class BaseProfileHandler(BaseHandler):
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    room_id, str(e.message)
+                    room_id, str(e)
                 )
 
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f03ee1476b..da914c46ff 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -125,6 +125,7 @@ class RegistrationHandler(BaseHandler):
         guest_access_token=None,
         make_guest=False,
         admin=False,
+        threepid=None,
     ):
         """Registers a new client on the server.
 
@@ -145,7 +146,7 @@ class RegistrationHandler(BaseHandler):
             RegistrationError if there was a problem registering.
         """
 
-        yield self.auth.check_auth_blocking()
+        yield self.auth.check_auth_blocking(threepid=threepid)
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -533,4 +534,5 @@ class RegistrationHandler(BaseHandler):
             room_id=room_id,
             remote_room_hosts=remote_room_hosts,
             action="join",
+            ratelimit=False,
         )
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 37e41afd61..38e1737ec9 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -162,7 +162,7 @@ class RoomListHandler(BaseHandler):
         # Filter out rooms that we don't want to return
         rooms_to_scan = [
             r for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0
+            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
         ]
 
         total_room_count = len(rooms_to_scan)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f643619047..07fd3e82fc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -583,6 +583,11 @@ class RoomMemberHandler(object):
         room_id = mapping["room_id"]
         servers = mapping["servers"]
 
+        # put the server which owns the alias at the front of the server list.
+        if room_alias.domain in servers:
+            servers.remove(room_alias.domain)
+        servers.insert(0, room_alias.domain)
+
         defer.returnValue((RoomID.from_string(room_id), servers))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index c464adbd0b..0c1d52fd11 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -54,7 +54,7 @@ class SearchHandler(BaseHandler):
         batch_token = None
         if batch:
             try:
-                b = decode_base64(batch)
+                b = decode_base64(batch).decode('ascii')
                 batch_group, batch_group_key, batch_token = b.split("\n")
 
                 assert batch_group is not None
@@ -258,18 +258,18 @@ class SearchHandler(BaseHandler):
                 # it returns more from the same group (if applicable) rather
                 # than reverting to searching all results again.
                 if batch_group and batch_group_key:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         batch_group, batch_group_key, pagination_token
-                    ))
+                    )).encode('ascii'))
                 else:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         "all", "", pagination_token
-                    ))
+                    )).encode('ascii'))
 
                 for room_id, group in room_groups.items():
-                    group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+                    group["next_batch"] = encode_base64(("%s\n%s\n%s" % (
                         "room_id", room_id, pagination_token
-                    ))
+                    )).encode('ascii'))
 
             allowed_events.extend(room_events)
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 648debc8aa..67b8ca28c7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -24,6 +24,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.storage.roommember import MemberSummary
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -525,6 +526,8 @@ class SyncHandler(object):
              A deferred dict describing the room summary
         """
 
+        # FIXME: we could/should get this from room_stats when matthew/stats lands
+
         # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
         last_events, _ = yield self.store.get_recent_event_ids_for_room(
             room_id, end_token=now_token.room_key, limit=1,
@@ -537,44 +540,67 @@ class SyncHandler(object):
         last_event = last_events[-1]
         state_ids = yield self.store.get_state_ids_for_event(
             last_event.event_id, [
-                (EventTypes.Member, None),
                 (EventTypes.Name, ''),
                 (EventTypes.CanonicalAlias, ''),
             ]
         )
 
-        member_ids = {
-            state_key: event_id
-            for (t, state_key), event_id in state_ids.iteritems()
-            if t == EventTypes.Member
-        }
+        # this is heavily cached, thus: fast.
+        details = yield self.store.get_room_summary(room_id)
+
         name_id = state_ids.get((EventTypes.Name, ''))
         canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
 
         summary = {}
-
-        # FIXME: it feels very heavy to load up every single membership event
-        # just to calculate the counts.
-        member_events = yield self.store.get_events(member_ids.values())
-
-        joined_user_ids = []
-        invited_user_ids = []
-
-        for ev in member_events.values():
-            if ev.content.get("membership") == Membership.JOIN:
-                joined_user_ids.append(ev.state_key)
-            elif ev.content.get("membership") == Membership.INVITE:
-                invited_user_ids.append(ev.state_key)
+        empty_ms = MemberSummary([], 0)
 
         # TODO: only send these when they change.
-        summary["m.joined_member_count"] = len(joined_user_ids)
-        summary["m.invited_member_count"] = len(invited_user_ids)
+        summary["m.joined_member_count"] = (
+            details.get(Membership.JOIN, empty_ms).count
+        )
+        summary["m.invited_member_count"] = (
+            details.get(Membership.INVITE, empty_ms).count
+        )
 
-        if name_id or canonical_alias_id:
-            defer.returnValue(summary)
+        # if the room has a name or canonical_alias set, we can skip
+        # calculating heroes.  we assume that if the event has contents, it'll
+        # be a valid name or canonical_alias - i.e. we're checking that they
+        # haven't been "deleted" by blatting {} over the top.
+        if name_id:
+            name = yield self.store.get_event(name_id, allow_none=True)
+            if name and name.content:
+                defer.returnValue(summary)
+
+        if canonical_alias_id:
+            canonical_alias = yield self.store.get_event(
+                canonical_alias_id, allow_none=True,
+            )
+            if canonical_alias and canonical_alias.content:
+                defer.returnValue(summary)
+
+        joined_user_ids = [
+            r[0] for r in details.get(Membership.JOIN, empty_ms).members
+        ]
+        invited_user_ids = [
+            r[0] for r in details.get(Membership.INVITE, empty_ms).members
+        ]
+        gone_user_ids = (
+            [r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
+            [r[0] for r in details.get(Membership.BAN, empty_ms).members]
+        )
 
-        # FIXME: order by stream ordering, not alphabetic
+        # FIXME: only build up a member_ids list for our heroes
+        member_ids = {}
+        for membership in (
+            Membership.JOIN,
+            Membership.INVITE,
+            Membership.LEAVE,
+            Membership.BAN
+        ):
+            for user_id, event_id in details.get(membership, empty_ms).members:
+                member_ids[user_id] = event_id
 
+        # FIXME: order by stream ordering rather than as returned by SQL
         me = sync_config.user.to_string()
         if (joined_user_ids or invited_user_ids):
             summary['m.heroes'] = sorted(
@@ -586,7 +612,11 @@ class SyncHandler(object):
             )[0:5]
         else:
             summary['m.heroes'] = sorted(
-                [user_id for user_id in member_ids.keys() if user_id != me]
+                [
+                    user_id
+                    for user_id in gone_user_ids
+                    if user_id != me
+                ]
             )[0:5]
 
         if not sync_config.filter_collection.lazy_load_members():
@@ -692,6 +722,13 @@ class SyncHandler(object):
             }
 
             if full_state:
+                if lazy_load_members:
+                    # always make sure we LL ourselves so we know we're in the room
+                    # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+                    # We only need apply this on full state syncs given we disabled
+                    # LL for incr syncs in #3840.
+                    types.append((EventTypes.Member, sync_config.user.to_string()))
+
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
                         batch.events[-1].event_id, types=types,
@@ -719,6 +756,26 @@ class SyncHandler(object):
                     lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
+                state_at_timeline_start = yield self.store.get_state_ids_for_event(
+                    batch.events[0].event_id, types=types,
+                    filtered_types=filtered_types,
+                )
+
+                # for now, we disable LL for gappy syncs - see
+                # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
+                # N.B. this slows down incr syncs as we are now processing way
+                # more state in the server than if we were LLing.
+                #
+                # We still have to filter timeline_start to LL entries (above) in order
+                # for _calculate_state's LL logic to work, as we have to include LL
+                # members for timeline senders in case they weren't loaded in the initial
+                # sync.  We do this by (counterintuitively) by filtering timeline_start
+                # members to just be ones which were timeline senders, which then ensures
+                # all of the rest get included in the state block (if we need to know
+                # about them).
+                types = None
+                filtered_types = None
+
                 state_at_previous_sync = yield self.get_state_at(
                     room_id, stream_position=since_token, types=types,
                     filtered_types=filtered_types,
@@ -729,25 +786,29 @@ class SyncHandler(object):
                     filtered_types=filtered_types,
                 )
 
-                state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id, types=types,
-                    filtered_types=filtered_types,
-                )
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    # we have to include LL members in case LL initial sync missed them
                     lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
                 if lazy_load_members:
-                    if types:
+                    if types and batch.events:
+                        # We're returning an incremental sync, with no
+                        # "gap" since the previous sync, so normally there would be
+                        # no state to return.
+                        # But we're lazy-loading, so the client might need some more
+                        # member events to understand the events in this timeline.
+                        # So we fish out all the member events corresponding to the
+                        # timeline here, and then dedupe any redundant ones below.
+
                         state_ids = yield self.store.get_state_ids_for_event(
                             batch.events[0].event_id, types=types,
-                            filtered_types=filtered_types,
+                            filtered_types=None,  # we only want members!
                         )
 
             if lazy_load_members and not include_redundant_members:
@@ -767,7 +828,7 @@ class SyncHandler(object):
                     logger.debug("filtering state from %r...", state_ids)
                     state_ids = {
                         t: event_id
-                        for t, event_id in state_ids.iteritems()
+                        for t, event_id in iteritems(state_ids)
                         if cache.get(t[1]) != event_id
                     }
                     logger.debug("...to %r", state_ids)
@@ -1568,6 +1629,19 @@ class SyncHandler(object):
             newly_joined_room=newly_joined,
         )
 
+        # When we join the room (or the client requests full_state), we should
+        # send down any existing tags. Usually the user won't have tags in a
+        # newly joined room, unless either a) they've joined before or b) the
+        # tag was added by synapse e.g. for server notice rooms.
+        if full_state:
+            user_id = sync_result_builder.sync_config.user.to_string()
+            tags = yield self.store.get_tags_for_room(user_id, room_id)
+
+            # If there aren't any tags, don't send the empty tags list down
+            # sync
+            if not tags:
+                tags = None
+
         account_data_events = []
         if tags is not None:
             account_data_events.append({
@@ -1596,10 +1670,24 @@ class SyncHandler(object):
         )
 
         summary = {}
+
+        # we include a summary in room responses when we're lazy loading
+        # members (as the client otherwise doesn't have enough info to form
+        # the name itself).
         if (
             sync_config.filter_collection.lazy_load_members() and
             (
+                # we recalulate the summary:
+                #   if there are membership changes in the timeline, or
+                #   if membership has changed during a gappy sync, or
+                #   if this is an initial sync.
                 any(ev.type == EventTypes.Member for ev in batch.events) or
+                (
+                    # XXX: this may include false positives in the form of LL
+                    # members which have snuck into state
+                    batch.limited and
+                    any(t == EventTypes.Member for (t, k) in state)
+                ) or
                 since_token is None
             )
         ):
@@ -1629,6 +1717,16 @@ class SyncHandler(object):
                     unread_notifications["highlight_count"] = notifs["highlight_count"]
 
                 sync_result_builder.joined.append(room_sync)
+
+            if batch.limited and since_token:
+                user_id = sync_result_builder.sync_config.user.to_string()
+                logger.info(
+                    "Incremental gappy sync of %s for user %s with %d state events" % (
+                        room_id,
+                        user_id,
+                        len(state),
+                    )
+                )
         elif room_builder.rtype == "archived":
             room_sync = ArchivedSyncResult(
                 room_id=room_id,
@@ -1722,17 +1820,17 @@ def _calculate_state(
     event_id_to_key = {
         e: key
         for key, e in itertools.chain(
-            timeline_contains.items(),
-            previous.items(),
-            timeline_start.items(),
-            current.items(),
+            iteritems(timeline_contains),
+            iteritems(previous),
+            iteritems(timeline_start),
+            iteritems(current),
         )
     }
 
-    c_ids = set(e for e in current.values())
-    ts_ids = set(e for e in timeline_start.values())
-    p_ids = set(e for e in previous.values())
-    tc_ids = set(e for e in timeline_contains.values())
+    c_ids = set(e for e in itervalues(current))
+    ts_ids = set(e for e in itervalues(timeline_start))
+    p_ids = set(e for e in itervalues(previous))
+    tc_ids = set(e for e in itervalues(timeline_contains))
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
@@ -1746,7 +1844,7 @@ def _calculate_state(
 
     if lazy_load_members:
         p_ids.difference_update(
-            e for t, e in timeline_start.iteritems()
+            e for t, e in iteritems(timeline_start)
             if t[0] == EventTypes.Member
         )
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 2d2d3d5a0d..c610933dd4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.types import UserID, get_domain_from_id
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -68,6 +69,11 @@ class TypingHandler(object):
         # map room IDs to sets of users currently typing
         self._room_typing = {}
 
+        # caches which room_ids changed at which serials
+        self._typing_stream_change_cache = StreamChangeCache(
+            "TypingStreamChangeCache", self._latest_room_serial,
+        )
+
         self.clock.looping_call(
             self._handle_timeouts,
             5000,
@@ -218,6 +224,7 @@ class TypingHandler(object):
 
             for domain in set(get_domain_from_id(u) for u in users):
                 if domain != self.server_name:
+                    logger.debug("sending typing update to %s", domain)
                     self.federation.send_edu(
                         destination=domain,
                         edu_type="m.typing",
@@ -274,19 +281,29 @@ class TypingHandler(object):
 
         self._latest_room_serial += 1
         self._room_serials[member.room_id] = self._latest_room_serial
+        self._typing_stream_change_cache.entity_has_changed(
+            member.room_id, self._latest_room_serial,
+        )
 
         self.notifier.on_new_event(
             "typing_key", self._latest_room_serial, rooms=[member.room_id]
         )
 
     def get_all_typing_updates(self, last_id, current_id):
-        # TODO: Work out a way to do this without scanning the entire state.
         if last_id == current_id:
             return []
 
+        changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
+            last_id,
+        )
+
+        if changed_rooms is None:
+            changed_rooms = self._room_serials
+
         rows = []
-        for room_id, serial in self._room_serials.items():
-            if last_id < serial and serial <= current_id:
+        for room_id in changed_rooms:
+            serial = self._room_serials[room_id]
+            if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
                 rows.append((serial, room_id, list(typing)))
         rows.sort()