summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-10-20 14:06:35 -0500
committerEric Eastwood <erice@element.io>2022-10-20 14:06:35 -0500
commit7d70acd10587e95d4413060bc1f6933f53949c2d (patch)
treea1a62e71fd714111cc3905dba9e9dd3f63e3d119 /synapse/handlers
parentMerge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry (diff)
parentUse servlets for /key/ endpoints. (#14229) (diff)
downloadsynapse-7d70acd10587e95d4413060bc1f6933f53949c2d.tar.xz
Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	poetry.lock (conflicts not fixed)
	synapse/handlers/message.py
	synapse/handlers/relations.py
	synapse/storage/databases/main/devices.py
	synapse/storage/schema/__init__.py
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/account_data.py2
-rw-r--r--synapse/handlers/appservice.py9
-rw-r--r--synapse/handlers/cas.py3
-rw-r--r--synapse/handlers/device.py5
-rw-r--r--synapse/handlers/directory.py19
-rw-r--r--synapse/handlers/event_auth.py18
-rw-r--r--synapse/handlers/federation.py105
-rw-r--r--synapse/handlers/federation_event.py96
-rw-r--r--synapse/handlers/initial_sync.py27
-rw-r--r--synapse/handlers/message.py126
-rw-r--r--synapse/handlers/pagination.py5
-rw-r--r--synapse/handlers/presence.py4
-rw-r--r--synapse/handlers/receipts.py13
-rw-r--r--synapse/handlers/relations.py137
-rw-r--r--synapse/handlers/room.py6
-rw-r--r--synapse/handlers/room_member.py17
-rw-r--r--synapse/handlers/sso.py2
-rw-r--r--synapse/handlers/sync.py186
-rw-r--r--synapse/handlers/typing.py2
-rw-r--r--synapse/handlers/ui_auth/checkers.py3
20 files changed, 490 insertions, 295 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 0478448b47..fc21d58001 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -225,7 +225,7 @@ class AccountDataEventSource(EventSource[int, JsonDict]):
         self,
         user: UserID,
         from_key: int,
-        limit: Optional[int],
+        limit: int,
         room_ids: Collection[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 203b62e015..66f5b8d108 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -109,10 +109,13 @@ class ApplicationServicesHandler:
                     last_token = await self.store.get_appservice_last_pos()
                     (
                         upper_bound,
-                        events,
                         event_to_received_ts,
-                    ) = await self.store.get_all_new_events_stream(
-                        last_token, self.current_max, limit=100, get_prev_content=True
+                    ) = await self.store.get_all_new_event_ids_stream(
+                        last_token, self.current_max, limit=100
+                    )
+
+                    events = await self.store.get_events_as_list(
+                        event_to_received_ts.keys(), get_prev_content=True
                     )
 
                     events_by_room: Dict[str, List[EventBase]] = {}
diff --git a/synapse/handlers/cas.py b/synapse/handlers/cas.py
index 7163af8004..fc467bc7c1 100644
--- a/synapse/handlers/cas.py
+++ b/synapse/handlers/cas.py
@@ -130,6 +130,9 @@ class CasHandler:
         except PartialDownloadError as pde:
             # Twisted raises this error if the connection is closed,
             # even if that's being used old-http style to signal end-of-data
+            # Assertion is for mypy's benefit. Error.response is Optional[bytes],
+            # but a PartialDownloadError should always have a non-None response.
+            assert pde.response is not None
             body = pde.response
         except HttpResponseException as e:
             description = (
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a3b11613e9..2567954679 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -937,7 +937,10 @@ class DeviceListUpdater:
         # Check if we are partially joining any rooms. If so we need to store
         # all device list updates so that we can handle them correctly once we
         # know who is in the room.
-        partial_rooms = await self.store.get_partial_state_rooms_and_servers()
+        # TODO(faster joins): this fetches and processes a bunch of data that we don't
+        # use. Could be replaced by a tighter query e.g.
+        #   SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
+        partial_rooms = await self.store.get_partial_state_room_resync_info()
         if partial_rooms:
             await self.store.add_remote_device_list_to_pending(
                 user_id,
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 7127d5aefc..d52ebada6b 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -16,6 +16,8 @@ import logging
 import string
 from typing import TYPE_CHECKING, Iterable, List, Optional
 
+from typing_extensions import Literal
+
 from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
 from synapse.api.errors import (
     AuthError,
@@ -429,7 +431,10 @@ class DirectoryHandler:
         return await self.auth.check_can_change_room_list(room_id, requester)
 
     async def edit_published_room_list(
-        self, requester: Requester, room_id: str, visibility: str
+        self,
+        requester: Requester,
+        room_id: str,
+        visibility: Literal["public", "private"],
     ) -> None:
         """Edit the entry of the room in the published room list.
 
@@ -451,9 +456,6 @@ class DirectoryHandler:
         if requester.is_guest:
             raise AuthError(403, "Guests cannot edit the published room list")
 
-        if visibility not in ["public", "private"]:
-            raise SynapseError(400, "Invalid visibility setting")
-
         if visibility == "public" and not self.enable_room_list_search:
             # The room list has been disabled.
             raise AuthError(
@@ -505,7 +507,11 @@ class DirectoryHandler:
         await self.store.set_room_is_public(room_id, making_public)
 
     async def edit_published_appservice_room_list(
-        self, appservice_id: str, network_id: str, room_id: str, visibility: str
+        self,
+        appservice_id: str,
+        network_id: str,
+        room_id: str,
+        visibility: Literal["public", "private"],
     ) -> None:
         """Add or remove a room from the appservice/network specific public
         room list.
@@ -516,9 +522,6 @@ class DirectoryHandler:
             room_id
             visibility: either "public" or "private"
         """
-        if visibility not in ["public", "private"]:
-            raise SynapseError(400, "Invalid visibility setting")
-
         await self.store.set_room_is_public_appservice(
             room_id, appservice_id, network_id, visibility == "public"
         )
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index 8249ca1ed2..3bbad0271b 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Collection, List, Optional, Union
+from typing import TYPE_CHECKING, Collection, List, Mapping, Optional, Union
 
 from synapse import event_auth
 from synapse.api.constants import (
@@ -29,7 +29,6 @@ from synapse.event_auth import (
 )
 from synapse.events import EventBase
 from synapse.events.builder import EventBuilder
-from synapse.events.snapshot import EventContext
 from synapse.types import StateMap, get_domain_from_id
 
 if TYPE_CHECKING:
@@ -51,12 +50,21 @@ class EventAuthHandler:
     async def check_auth_rules_from_context(
         self,
         event: EventBase,
-        context: EventContext,
+        batched_auth_events: Optional[Mapping[str, EventBase]] = None,
     ) -> None:
-        """Check an event passes the auth rules at its own auth events"""
-        await check_state_independent_auth_rules(self._store, event)
+        """Check an event passes the auth rules at its own auth events
+        Args:
+            event: event to be authed
+            batched_auth_events: if the event being authed is part of a batch, any events
+            from the same batch that may be necessary to auth the current event
+        """
+        await check_state_independent_auth_rules(
+            self._store, event, batched_auth_events
+        )
         auth_event_ids = event.auth_event_ids()
         auth_events_by_id = await self._store.get_events(auth_event_ids)
+        if batched_auth_events:
+            auth_events_by_id.update(batched_auth_events)
         check_state_dependent_auth_rules(event, auth_events_by_id.values())
 
     def compute_auth_events(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 610432b55a..57856b8ec5 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
     Codes,
     FederationDeniedError,
     FederationError,
+    FederationPullAttemptBackoffError,
     HttpResponseException,
     LimitExceededError,
     NotFoundError,
@@ -631,6 +632,7 @@ class FederationHandler:
                     room_id=room_id,
                     servers=ret.servers_in_room,
                     device_lists_stream_id=self.store.get_device_stream_token(),
+                    joined_via=origin,
                 )
 
             try:
@@ -781,15 +783,27 @@ class FederationHandler:
 
         # Send the signed event back to the room, and potentially receive some
         # further information about the room in the form of partial state events
-        stripped_room_state = await self.federation_client.send_knock(
-            target_hosts, event
-        )
+        knock_response = await self.federation_client.send_knock(target_hosts, event)
 
         # Store any stripped room state events in the "unsigned" key of the event.
         # This is a bit of a hack and is cribbing off of invites. Basically we
         # store the room state here and retrieve it again when this event appears
         # in the invitee's sync stream. It is stripped out for all other local users.
-        event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
+        stripped_room_state = (
+            knock_response.get("knock_room_state")
+            # Since v1.37, Synapse incorrectly used "knock_state_events" for this field.
+            # Thus, we also check for a 'knock_state_events' to support old instances.
+            # See https://github.com/matrix-org/synapse/issues/14088.
+            or knock_response.get("knock_state_events")
+        )
+
+        if stripped_room_state is None:
+            raise KeyError(
+                "Missing 'knock_room_state' (or legacy 'knock_state_events') field in "
+                "send_knock response"
+            )
+
+        event.unsigned["knock_room_state"] = stripped_room_state
 
         context = EventContext.for_outlier(self._storage_controllers)
         stream_id = await self._federation_event_handler.persist_events_and_notify(
@@ -928,7 +942,7 @@ class FederationHandler:
 
         # The remote hasn't signed it yet, obviously. We'll do the full checks
         # when we get the event back in `on_send_join_request`
-        await self._event_auth_handler.check_auth_rules_from_context(event, context)
+        await self._event_auth_handler.check_auth_rules_from_context(event)
         return event
 
     async def on_invite_request(
@@ -1109,7 +1123,7 @@ class FederationHandler:
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_leave_request`
-            await self._event_auth_handler.check_auth_rules_from_context(event, context)
+            await self._event_auth_handler.check_auth_rules_from_context(event)
         except AuthError as e:
             logger.warning("Failed to create new leave %r because %s", event, e)
             raise e
@@ -1168,7 +1182,7 @@ class FederationHandler:
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_knock_request`
-            await self._event_auth_handler.check_auth_rules_from_context(event, context)
+            await self._event_auth_handler.check_auth_rules_from_context(event)
         except AuthError as e:
             logger.warning("Failed to create new knock %r because %s", event, e)
             raise e
@@ -1334,9 +1348,7 @@ class FederationHandler:
 
             try:
                 validate_event_for_room_version(event)
-                await self._event_auth_handler.check_auth_rules_from_context(
-                    event, context
-                )
+                await self._event_auth_handler.check_auth_rules_from_context(event)
             except AuthError as e:
                 logger.warning("Denying new third party invite %r because %s", event, e)
                 raise e
@@ -1386,7 +1398,7 @@ class FederationHandler:
 
         try:
             validate_event_for_room_version(event)
-            await self._event_auth_handler.check_auth_rules_from_context(event, context)
+            await self._event_auth_handler.check_auth_rules_from_context(event)
         except AuthError as e:
             logger.warning("Denying third party invite %r because %s", event, e)
             raise e
@@ -1602,13 +1614,13 @@ class FederationHandler:
         """Resumes resyncing of all partial-state rooms after a restart."""
         assert not self.config.worker.worker_app
 
-        partial_state_rooms = await self.store.get_partial_state_rooms_and_servers()
-        for room_id, servers_in_room in partial_state_rooms.items():
+        partial_state_rooms = await self.store.get_partial_state_room_resync_info()
+        for room_id, resync_info in partial_state_rooms.items():
             run_as_background_process(
                 desc="sync_partial_state_room",
                 func=self._sync_partial_state_room,
-                initial_destination=None,
-                other_destinations=servers_in_room,
+                initial_destination=resync_info.joined_via,
+                other_destinations=resync_info.servers_in_room,
                 room_id=room_id,
             )
 
@@ -1637,28 +1649,12 @@ class FederationHandler:
         #   really leave, that might mean we have difficulty getting the room state over
         #   federation.
         #   https://github.com/matrix-org/synapse/issues/12802
-        #
-        # TODO(faster_joins): we need some way of prioritising which homeservers in
-        #   `other_destinations` to try first, otherwise we'll spend ages trying dead
-        #   homeservers for large rooms.
-        #   https://github.com/matrix-org/synapse/issues/12999
-
-        if initial_destination is None and len(other_destinations) == 0:
-            raise ValueError(
-                f"Cannot resync state of {room_id}: no destinations provided"
-            )
 
         # Make an infinite iterator of destinations to try. Once we find a working
         # destination, we'll stick with it until it flakes.
-        destinations: Collection[str]
-        if initial_destination is not None:
-            # Move `initial_destination` to the front of the list.
-            destinations = list(other_destinations)
-            if initial_destination in destinations:
-                destinations.remove(initial_destination)
-            destinations = [initial_destination] + destinations
-        else:
-            destinations = other_destinations
+        destinations = _prioritise_destinations_for_partial_state_resync(
+            initial_destination, other_destinations, room_id
+        )
         destination_iter = itertools.cycle(destinations)
 
         # `destination` is the current remote homeserver we're pulling from.
@@ -1708,7 +1704,22 @@ class FederationHandler:
                             destination, event
                         )
                         break
+                    except FederationPullAttemptBackoffError as exc:
+                        # Log a warning about why we failed to process the event (the error message
+                        # for `FederationPullAttemptBackoffError` is pretty good)
+                        logger.warning("_sync_partial_state_room: %s", exc)
+                        # We do not record a failed pull attempt when we backoff fetching a missing
+                        # `prev_event` because not being able to fetch the `prev_events` just means
+                        # we won't be able to de-outlier the pulled event. But we can still use an
+                        # `outlier` in the state/auth chain for another event. So we shouldn't stop
+                        # a downstream event from trying to pull it.
+                        #
+                        # This avoids a cascade of backoff for all events in the DAG downstream from
+                        # one event backoff upstream.
                     except FederationError as e:
+                        # TODO: We should `record_event_failed_pull_attempt` here,
+                        #   see https://github.com/matrix-org/synapse/issues/13700
+
                         if attempt == len(destinations) - 1:
                             # We have tried every remote server for this event. Give up.
                             # TODO(faster_joins) giving up isn't the right thing to do
@@ -1741,3 +1752,29 @@ class FederationHandler:
                             room_id,
                             destination,
                         )
+
+
+def _prioritise_destinations_for_partial_state_resync(
+    initial_destination: Optional[str],
+    other_destinations: Collection[str],
+    room_id: str,
+) -> Collection[str]:
+    """Work out the order in which we should ask servers to resync events.
+
+    If an `initial_destination` is given, it takes top priority. Otherwise
+    all servers are treated equally.
+
+    :raises ValueError: if no destination is provided at all.
+    """
+    if initial_destination is None and len(other_destinations) == 0:
+        raise ValueError(f"Cannot resync state of {room_id}: no destinations provided")
+
+    if initial_destination is None:
+        return other_destinations
+
+    # Move `initial_destination` to the front of the list.
+    destinations = list(other_destinations)
+    if initial_destination in destinations:
+        destinations.remove(initial_destination)
+    destinations = [initial_destination] + destinations
+    return destinations
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index aa0740b08d..4e1d6dea2c 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -44,6 +44,7 @@ from synapse.api.errors import (
     AuthError,
     Codes,
     FederationError,
+    FederationPullAttemptBackoffError,
     HttpResponseException,
     RequestSendFailed,
     SynapseError,
@@ -414,7 +415,9 @@ class FederationEventHandler:
 
         # First, precalculate the joined hosts so that the federation sender doesn't
         # need to.
-        await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
+        await self._event_creation_handler.cache_joined_hosts_for_events(
+            [(event, context)]
+        )
 
         await self._check_for_soft_fail(event, context=context, origin=origin)
         await self._run_push_actions_and_persist_event(event, context)
@@ -565,6 +568,9 @@ class FederationEventHandler:
             event: partial-state event to be de-partial-stated
 
         Raises:
+            FederationPullAttemptBackoffError if we are are deliberately not attempting
+                to pull the given event over federation because we've already done so
+                recently and are backing off.
             FederationError if we fail to request state from the remote server.
         """
         logger.info("Updating state for %s", event.event_id)
@@ -792,9 +798,42 @@ class FederationEventHandler:
             ],
         )
 
+        # Check if we already any of these have these events.
+        # Note: we currently make a lookup in the database directly here rather than
+        # checking the event cache, due to:
+        # https://github.com/matrix-org/synapse/issues/13476
+        existing_events_map = await self._store._get_events_from_db(
+            [event.event_id for event in events]
+        )
+
+        new_events = []
+        for event in events:
+            event_id = event.event_id
+
+            # If we've already seen this event ID...
+            if event_id in existing_events_map:
+                existing_event = existing_events_map[event_id]
+
+                # ...and the event itself was not previously stored as an outlier...
+                if not existing_event.event.internal_metadata.is_outlier():
+                    # ...then there's no need to persist it. We have it already.
+                    logger.info(
+                        "_process_pulled_event: Ignoring received event %s which we "
+                        "have already seen",
+                        event.event_id,
+                    )
+                    continue
+
+                # While we have seen this event before, it was stored as an outlier.
+                # We'll now persist it as a non-outlier.
+                logger.info("De-outliering event %s", event_id)
+
+            # Continue on with the events that are new to us.
+            new_events.append(event)
+
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
-        sorted_events = sorted(events, key=lambda x: x.depth)
+        sorted_events = sorted(new_events, key=lambda x: x.depth)
         for ev in sorted_events:
             with nested_logging_context(ev.event_id):
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
@@ -846,18 +885,6 @@ class FederationEventHandler:
 
         event_id = event.event_id
 
-        existing = await self._store.get_event(
-            event_id, allow_none=True, allow_rejected=True
-        )
-        if existing:
-            if not existing.internal_metadata.is_outlier():
-                logger.info(
-                    "_process_pulled_event: Ignoring received event %s which we have already seen",
-                    event_id,
-                )
-                return
-            logger.info("De-outliering event %s", event_id)
-
         try:
             self._sanity_check_event(event)
         except SynapseError as err:
@@ -866,11 +893,6 @@ class FederationEventHandler:
                 event.room_id, event_id, str(err)
             )
             return
-        except Exception as exc:
-            await self._store.record_event_failed_pull_attempt(
-                event.room_id, event_id, str(exc)
-            )
-            raise exc
 
         try:
             try:
@@ -904,6 +926,18 @@ class FederationEventHandler:
                     context,
                     backfilled=backfilled,
                 )
+        except FederationPullAttemptBackoffError as exc:
+            # Log a warning about why we failed to process the event (the error message
+            # for `FederationPullAttemptBackoffError` is pretty good)
+            logger.warning("_process_pulled_event: %s", exc)
+            # We do not record a failed pull attempt when we backoff fetching a missing
+            # `prev_event` because not being able to fetch the `prev_events` just means
+            # we won't be able to de-outlier the pulled event. But we can still use an
+            # `outlier` in the state/auth chain for another event. So we shouldn't stop
+            # a downstream event from trying to pull it.
+            #
+            # This avoids a cascade of backoff for all events in the DAG downstream from
+            # one event backoff upstream.
         except FederationError as e:
             await self._store.record_event_failed_pull_attempt(
                 event.room_id, event_id, str(e)
@@ -913,11 +947,6 @@ class FederationEventHandler:
                 logger.warning("Pulled event %s failed history check.", event_id)
             else:
                 raise
-        except Exception as exc:
-            await self._store.record_event_failed_pull_attempt(
-                event.room_id, event_id, str(exc)
-            )
-            raise exc
 
     @trace
     async def _compute_event_context_with_maybe_missing_prevs(
@@ -955,6 +984,9 @@ class FederationEventHandler:
             The event context.
 
         Raises:
+            FederationPullAttemptBackoffError if we are are deliberately not attempting
+                to pull the given event over federation because we've already done so
+                recently and are backing off.
             FederationError if we fail to get the state from the remote server after any
                 missing `prev_event`s.
         """
@@ -965,6 +997,18 @@ class FederationEventHandler:
         seen = await self._store.have_events_in_timeline(prevs)
         missing_prevs = prevs - seen
 
+        # If we've already recently attempted to pull this missing event, don't
+        # try it again so soon. Since we have to fetch all of the prev_events, we can
+        # bail early here if we find any to ignore.
+        prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
+            room_id, missing_prevs
+        )
+        if len(prevs_to_ignore) > 0:
+            raise FederationPullAttemptBackoffError(
+                event_ids=prevs_to_ignore,
+                message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
+            )
+
         if not missing_prevs:
             return await self._state_handler.compute_event_context(event)
 
@@ -2250,8 +2294,8 @@ class FederationEventHandler:
         event_pos = PersistedEventPosition(
             self._instance_name, event.internal_metadata.stream_ordering
         )
-        await self._notifier.on_new_room_event(
-            event, event_pos, max_stream_token, extra_users=extra_users
+        await self._notifier.on_new_room_events(
+            [(event, event_pos)], max_stream_token, extra_users=extra_users
         )
 
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 860c82c110..9c335e6863 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -57,13 +57,7 @@ class InitialSyncHandler:
         self.validator = EventValidator()
         self.snapshot_cache: ResponseCache[
             Tuple[
-                str,
-                Optional[StreamToken],
-                Optional[StreamToken],
-                str,
-                Optional[int],
-                bool,
-                bool,
+                str, Optional[StreamToken], Optional[StreamToken], str, int, bool, bool
             ]
         ] = ResponseCache(hs.get_clock(), "initial_sync_cache")
         self._event_serializer = hs.get_event_client_serializer()
@@ -154,11 +148,6 @@ class InitialSyncHandler:
 
         public_room_ids = await self.store.get_public_room_ids()
 
-        if pagin_config.limit is not None:
-            limit = pagin_config.limit
-        else:
-            limit = 10
-
         serializer_options = SerializeEventConfig(as_client_event=as_client_event)
 
         async def handle_room(event: RoomsForUser) -> None:
@@ -210,7 +199,7 @@ class InitialSyncHandler:
                             run_in_background(
                                 self.store.get_recent_events_for_room,
                                 event.room_id,
-                                limit=limit,
+                                limit=pagin_config.limit,
                                 end_token=room_end_token,
                             ),
                             deferred_room_state,
@@ -360,15 +349,11 @@ class InitialSyncHandler:
             member_event_id
         )
 
-        limit = pagin_config.limit if pagin_config else None
-        if limit is None:
-            limit = 10
-
         leave_position = await self.store.get_position_for_event(member_event_id)
         stream_token = leave_position.to_room_stream_token()
 
         messages, token = await self.store.get_recent_events_for_room(
-            room_id, limit=limit, end_token=stream_token
+            room_id, limit=pagin_config.limit, end_token=stream_token
         )
 
         messages = await filter_events_for_client(
@@ -420,10 +405,6 @@ class InitialSyncHandler:
 
         now_token = self.hs.get_event_sources().get_current_token()
 
-        limit = pagin_config.limit if pagin_config else None
-        if limit is None:
-            limit = 10
-
         room_members = [
             m
             for m in current_state.values()
@@ -467,7 +448,7 @@ class InitialSyncHandler:
                     run_in_background(
                         self.store.get_recent_events_for_room,
                         room_id,
-                        limit=limit,
+                        limit=pagin_config.limit,
                         end_token=now_token.room_key,
                     ),
                 ),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5123c93893..e8b0a758a4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1360,8 +1360,16 @@ class EventCreationHandler:
             else:
                 try:
                     validate_event_for_room_version(event)
+                    # If we are persisting a batch of events the event(s) needed to auth the
+                    # current event may be part of the batch and will not be in the DB yet
+                    event_id_to_event = {e.event_id: e for e, _ in events_and_context}
+                    batched_auth_events = {}
+                    for event_id in event.auth_event_ids():
+                        auth_event = event_id_to_event.get(event_id)
+                        if auth_event:
+                            batched_auth_events[event_id] = auth_event
                     await self._event_auth_handler.check_auth_rules_from_context(
-                        event, context
+                        event, batched_auth_events
                     )
                 except AuthError as err:
                     logger.warning("Denying new event %r because %s", event, err)
@@ -1390,7 +1398,7 @@ class EventCreationHandler:
                             extra_users=extra_users,
                         ),
                         run_in_background(
-                            self.cache_joined_hosts_for_event, event, context
+                            self.cache_joined_hosts_for_events, events_and_context
                         ).addErrback(
                             log_failure, "cache_joined_hosts_for_event failed"
                         ),
@@ -1491,62 +1499,65 @@ class EventCreationHandler:
                 await self.store.remove_push_actions_from_staging(event.event_id)
             raise
 
-    async def cache_joined_hosts_for_event(
-        self, event: EventBase, context: EventContext
+    async def cache_joined_hosts_for_events(
+        self, events_and_context: List[Tuple[EventBase, EventContext]]
     ) -> None:
-        """Precalculate the joined hosts at the event, when using Redis, so that
+        """Precalculate the joined hosts at each of the given events, when using Redis, so that
         external federation senders don't have to recalculate it themselves.
         """
 
-        if not self._external_cache.is_enabled():
-            return
-
-        # If external cache is enabled we should always have this.
-        assert self._external_cache_joined_hosts_updates is not None
+        for event, _ in events_and_context:
+            if not self._external_cache.is_enabled():
+                return
 
-        # We actually store two mappings, event ID -> prev state group,
-        # state group -> joined hosts, which is much more space efficient
-        # than event ID -> joined hosts.
-        #
-        # Note: We have to cache event ID -> prev state group, as we don't
-        # store that in the DB.
-        #
-        # Note: We set the state group -> joined hosts cache if it hasn't been
-        # set for a while, so that the expiry time is reset.
+            # If external cache is enabled we should always have this.
+            assert self._external_cache_joined_hosts_updates is not None
 
-        state_entry = await self.state.resolve_state_groups_for_events(
-            event.room_id, event_ids=event.prev_event_ids()
-        )
+            # We actually store two mappings, event ID -> prev state group,
+            # state group -> joined hosts, which is much more space efficient
+            # than event ID -> joined hosts.
+            #
+            # Note: We have to cache event ID -> prev state group, as we don't
+            # store that in the DB.
+            #
+            # Note: We set the state group -> joined hosts cache if it hasn't been
+            # set for a while, so that the expiry time is reset.
 
-        if state_entry.state_group:
-            await self._external_cache.set(
-                "event_to_prev_state_group",
-                event.event_id,
-                state_entry.state_group,
-                expiry_ms=60 * 60 * 1000,
+            state_entry = await self.state.resolve_state_groups_for_events(
+                event.room_id, event_ids=event.prev_event_ids()
             )
 
-            if state_entry.state_group in self._external_cache_joined_hosts_updates:
-                return
+            if state_entry.state_group:
+                await self._external_cache.set(
+                    "event_to_prev_state_group",
+                    event.event_id,
+                    state_entry.state_group,
+                    expiry_ms=60 * 60 * 1000,
+                )
 
-            state = await state_entry.get_state(
-                self._storage_controllers.state, StateFilter.all()
-            )
-            with tracing.start_active_span("get_joined_hosts"):
-                joined_hosts = await self.store.get_joined_hosts(
-                    event.room_id, state, state_entry
+                if state_entry.state_group in self._external_cache_joined_hosts_updates:
+                    return
+
+                state = await state_entry.get_state(
+                    self._storage_controllers.state, StateFilter.all()
                 )
+                with tracing.start_active_span("get_joined_hosts"):
+                    joined_hosts = await self.store.get_joined_hosts(
+                        event.room_id, state, state_entry
+                    )
 
-            # Note that the expiry times must be larger than the expiry time in
-            # _external_cache_joined_hosts_updates.
-            await self._external_cache.set(
-                "get_joined_hosts",
-                str(state_entry.state_group),
-                list(joined_hosts),
-                expiry_ms=60 * 60 * 1000,
-            )
+                # Note that the expiry times must be larger than the expiry time in
+                # _external_cache_joined_hosts_updates.
+                await self._external_cache.set(
+                    "get_joined_hosts",
+                    str(state_entry.state_group),
+                    list(joined_hosts),
+                    expiry_ms=60 * 60 * 1000,
+                )
 
-            self._external_cache_joined_hosts_updates[state_entry.state_group] = None
+                self._external_cache_joined_hosts_updates[
+                    state_entry.state_group
+                ] = None
 
     async def _validate_canonical_alias(
         self,
@@ -1872,6 +1883,7 @@ class EventCreationHandler:
             events_and_context, backfilled=backfilled
         )
 
+        events_and_pos = []
         for event in persisted_events:
             if self._ephemeral_events_enabled:
                 # If there's an expiry timestamp on the event, schedule its expiry.
@@ -1880,25 +1892,23 @@ class EventCreationHandler:
             stream_ordering = event.internal_metadata.stream_ordering
             assert stream_ordering is not None
             pos = PersistedEventPosition(self._instance_name, stream_ordering)
-
-            async def _notify() -> None:
-                try:
-                    await self.notifier.on_new_room_event(
-                        event, pos, max_stream_token, extra_users=extra_users
-                    )
-                except Exception:
-                    logger.exception(
-                        "Error notifying about new room event %s",
-                        event.event_id,
-                    )
-
-            run_in_background(_notify)
+            events_and_pos.append((event, pos))
 
             if event.type == EventTypes.Message:
                 # We don't want to block sending messages on any presence code. This
                 # matters as sometimes presence code can take a while.
                 run_in_background(self._bump_active_time, requester.user)
 
+        async def _notify() -> None:
+            try:
+                await self.notifier.on_new_room_events(
+                    events_and_pos, max_stream_token, extra_users=extra_users
+                )
+            except Exception:
+                logger.exception("Error notifying about new room events")
+
+        run_in_background(_notify)
+
         return persisted_events[-1]
 
     async def _maybe_kick_guest_users(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d865ee6e73..fcb8572348 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -458,11 +458,6 @@ class PaginationHandler:
             # `/messages` should still works with live tokens when manually provided.
             assert from_token.room_key.topological is not None
 
-        if pagin_config.limit is None:
-            # This shouldn't happen as we've set a default limit before this
-            # gets called.
-            raise Exception("limit not set")
-
         room_token = from_token.room_key
 
         async with self.pagination_lock.read(room_id):
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 4e575ffbaa..2670e561d7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1596,7 +1596,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
         self,
         user: UserID,
         from_key: Optional[int],
-        limit: Optional[int] = None,
+        # Having a default limit doesn't match the EventSource API, but some
+        # callers do not provide it. It is unused in this class.
+        limit: int = 0,
         room_ids: Optional[Collection[str]] = None,
         is_guest: bool = False,
         explicit_room_id: Optional[str] = None,
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 4768a34c07..ac01582442 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -63,8 +63,6 @@ class ReceiptsHandler:
         self.clock = self.hs.get_clock()
         self.state = hs.get_state_handler()
 
-        self._msc3771_enabled = hs.config.experimental.msc3771_enabled
-
     async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None:
         """Called when we receive an EDU of type m.receipt from a remote HS."""
         receipts = []
@@ -96,11 +94,10 @@ class ReceiptsHandler:
                     # Check if these receipts apply to a thread.
                     thread_id = None
                     data = user_values.get("data", {})
-                    if self._msc3771_enabled and isinstance(data, dict):
-                        thread_id = data.get("thread_id")
-                        # If the thread ID is invalid, consider it missing.
-                        if not isinstance(thread_id, str):
-                            thread_id = None
+                    thread_id = data.get("thread_id")
+                    # If the thread ID is invalid, consider it missing.
+                    if not isinstance(thread_id, str):
+                        thread_id = None
 
                     receipts.append(
                         ReadReceipt(
@@ -260,7 +257,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
         self,
         user: UserID,
         from_key: int,
-        limit: Optional[int],
+        limit: int,
         room_ids: Iterable[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 49c9d6e3c6..8fdb9df012 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import enum
 import logging
 from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Tuple
 
@@ -20,7 +21,8 @@ from synapse.api.constants import RelationTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase, relation_from_event
 from synapse.logging.tracing import trace
-from synapse.storage.databases.main.relations import _RelatedEvent
+from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
+from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, Requester, StreamToken, UserID
 from synapse.visibility import filter_events_for_client
 
@@ -31,6 +33,13 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+class ThreadsListInclude(str, enum.Enum):
+    """Valid values for the 'include' flag of /threads."""
+
+    all = "all"
+    participated = "participated"
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _ThreadAggregation:
     # The latest event in the thread.
@@ -72,12 +81,10 @@ class RelationsHandler:
         requester: Requester,
         event_id: str,
         room_id: str,
+        pagin_config: PaginationConfig,
+        include_original_event: bool,
         relation_type: Optional[str] = None,
         event_type: Optional[str] = None,
-        limit: int = 5,
-        direction: str = "b",
-        from_token: Optional[StreamToken] = None,
-        to_token: Optional[StreamToken] = None,
     ) -> JsonDict:
         """Get related events of a event, ordered by topological ordering.
 
@@ -87,13 +94,10 @@ class RelationsHandler:
             requester: The user requesting the relations.
             event_id: Fetch events that relate to this event ID.
             room_id: The room the event belongs to.
+            pagin_config: The pagination config rules to apply, if any.
+            include_original_event: Whether to include the parent event.
             relation_type: Only fetch events with this relation type, if given.
             event_type: Only fetch events with this event type, if given.
-            limit: Only fetch the most recent `limit` events.
-            direction: Whether to fetch the most recent first (`"b"`) or the
-                oldest first (`"f"`).
-            from_token: Fetch rows from the given token, or from the start if None.
-            to_token: Fetch rows up to the given token, or up to the end if None.
 
         Returns:
             The pagination chunk.
@@ -121,10 +125,10 @@ class RelationsHandler:
             room_id=room_id,
             relation_type=relation_type,
             event_type=event_type,
-            limit=limit,
-            direction=direction,
-            from_token=from_token,
-            to_token=to_token,
+            limit=pagin_config.limit,
+            direction=pagin_config.direction,
+            from_token=pagin_config.from_token,
+            to_token=pagin_config.to_token,
         )
 
         events = await self._main_store.get_events_as_list(
@@ -138,31 +142,32 @@ class RelationsHandler:
             is_peeking=(member_event_id is None),
         )
 
-        now = self._clock.time_msec()
-        # Do not bundle aggregations when retrieving the original event because
-        # we want the content before relations are applied to it.
-        original_event = self._event_serializer.serialize_event(
-            event, now, bundle_aggregations=None
-        )
         # The relations returned for the requested event do include their
         # bundled aggregations.
         aggregations = await self.get_bundled_aggregations(
             events, requester.user.to_string()
         )
-        serialized_events = self._event_serializer.serialize_events(
-            events, now, bundle_aggregations=aggregations
-        )
 
-        return_value = {
-            "chunk": serialized_events,
-            "original_event": original_event,
+        now = self._clock.time_msec()
+        return_value: JsonDict = {
+            "chunk": self._event_serializer.serialize_events(
+                events, now, bundle_aggregations=aggregations
+            ),
         }
+        if include_original_event:
+            # Do not bundle aggregations when retrieving the original event because
+            # we want the content before relations are applied to it.
+            return_value["original_event"] = self._event_serializer.serialize_event(
+                event, now, bundle_aggregations=None
+            )
 
         if next_token:
             return_value["next_batch"] = await next_token.to_string(self._main_store)
 
-        if from_token:
-            return_value["prev_batch"] = await from_token.to_string(self._main_store)
+        if pagin_config.from_token:
+            return_value["prev_batch"] = await pagin_config.from_token.to_string(
+                self._main_store
+            )
 
         return return_value
 
@@ -482,3 +487,79 @@ class RelationsHandler:
             results.setdefault(event_id, BundledAggregations()).replace = edit
 
         return results
+
+    async def get_threads(
+        self,
+        requester: Requester,
+        room_id: str,
+        include: ThreadsListInclude,
+        limit: int = 5,
+        from_token: Optional[ThreadsNextBatch] = None,
+    ) -> JsonDict:
+        """Get related events of a event, ordered by topological ordering.
+
+        Args:
+            requester: The user requesting the relations.
+            room_id: The room the event belongs to.
+            include: One of "all" or "participated" to indicate which threads should
+                be returned.
+            limit: Only fetch the most recent `limit` events.
+            from_token: Fetch rows from the given token, or from the start if None.
+
+        Returns:
+            The pagination chunk.
+        """
+
+        user_id = requester.user.to_string()
+
+        # TODO Properly handle a user leaving a room.
+        (_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
+            room_id, requester, allow_departed_users=True
+        )
+
+        # Note that ignored users are not passed into get_relations_for_event
+        # below. Ignored users are handled in filter_events_for_client (and by
+        # not passing them in here we should get a better cache hit rate).
+        thread_roots, next_batch = await self._main_store.get_threads(
+            room_id=room_id, limit=limit, from_token=from_token
+        )
+
+        events = await self._main_store.get_events_as_list(thread_roots)
+
+        if include == ThreadsListInclude.participated:
+            # Pre-seed thread participation with whether the requester sent the event.
+            participated = {event.event_id: event.sender == user_id for event in events}
+            # For events the requester did not send, check the database for whether
+            # the requester sent a threaded reply.
+            participated.update(
+                await self._main_store.get_threads_participated(
+                    [eid for eid, p in participated.items() if not p],
+                    user_id,
+                )
+            )
+
+            # Limit the returned threads to those the user has participated in.
+            events = [event for event in events if participated[event.event_id]]
+
+        events = await filter_events_for_client(
+            self._storage_controllers,
+            user_id,
+            events,
+            is_peeking=(member_event_id is None),
+        )
+
+        aggregations = await self.get_bundled_aggregations(
+            events, requester.user.to_string()
+        )
+
+        now = self._clock.time_msec()
+        serialized_events = self._event_serializer.serialize_events(
+            events, now, bundle_aggregations=aggregations
+        )
+
+        return_value: JsonDict = {"chunk": serialized_events}
+
+        if next_batch:
+            return_value["next_batch"] = str(next_batch)
+
+        return return_value
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 57ab05ad25..638f54051a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -229,9 +229,7 @@ class RoomCreationHandler:
             },
         )
         validate_event_for_room_version(tombstone_event)
-        await self._event_auth_handler.check_auth_rules_from_context(
-            tombstone_event, tombstone_context
-        )
+        await self._event_auth_handler.check_auth_rules_from_context(tombstone_event)
 
         # Upgrade the room
         #
@@ -1646,7 +1644,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
         self,
         user: UserID,
         from_key: RoomStreamToken,
-        limit: Optional[int],
+        limit: int,
         room_ids: Collection[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index fbf874da15..2ebd2e6eb7 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -322,6 +322,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
+        origin_server_ts: Optional[int] = None,
     ) -> Tuple[str, int]:
         """
         Internal membership update function to get an existing event or create
@@ -361,6 +362,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+            origin_server_ts: The origin_server_ts to use if a new event is created. Uses
+                the current timestamp if set to None.
 
         Returns:
             Tuple of event ID and stream ordering position
@@ -399,6 +402,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 "state_key": user_id,
                 # For backwards compatibility:
                 "membership": membership,
+                "origin_server_ts": origin_server_ts,
             },
             txn_id=txn_id,
             allow_no_prev_events=allow_no_prev_events,
@@ -504,6 +508,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         prev_event_ids: Optional[List[str]] = None,
         state_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
+        origin_server_ts: Optional[int] = None,
     ) -> Tuple[str, int]:
         """Update a user's membership in a room.
 
@@ -542,6 +547,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
+            origin_server_ts: The origin_server_ts to use if a new event is created. Uses
+                the current timestamp if set to None.
 
         Returns:
             A tuple of the new event ID and stream ID.
@@ -583,6 +590,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                         prev_event_ids=prev_event_ids,
                         state_event_ids=state_event_ids,
                         depth=depth,
+                        origin_server_ts=origin_server_ts,
                     )
 
         return result
@@ -606,6 +614,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         prev_event_ids: Optional[List[str]] = None,
         state_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
+        origin_server_ts: Optional[int] = None,
     ) -> Tuple[str, int]:
         """Helper for update_membership.
 
@@ -646,6 +655,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
+            origin_server_ts: The origin_server_ts to use if a new event is created. Uses
+                the current timestamp if set to None.
 
         Returns:
             A tuple of the new event ID and stream ID.
@@ -785,6 +796,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 require_consent=require_consent,
                 outlier=outlier,
                 historical=historical,
+                origin_server_ts=origin_server_ts,
             )
 
         latest_event_ids = await self.store.get_prev_events_for_room(room_id)
@@ -1030,6 +1042,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             content=content,
             require_consent=require_consent,
             outlier=outlier,
+            origin_server_ts=origin_server_ts,
         )
 
     async def _should_perform_remote_join(
@@ -1150,8 +1163,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         logger.info("Transferring room state from %s to %s", old_room_id, room_id)
 
         # Find all local users that were in the old room and copy over each user's state
-        users = await self.store.get_users_in_room(old_room_id)
-        await self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
+        local_users = await self.store.get_local_users_in_room(old_room_id)
+        await self.copy_user_state_on_room_upgrade(old_room_id, room_id, local_users)
 
         # Add new room to the room directory if the old room was there
         # Remove old room from the room directory
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index e035677b8a..5943f08e91 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -874,7 +874,7 @@ class SsoHandler:
         )
 
     async def handle_terms_accepted(
-        self, request: Request, session_id: str, terms_version: str
+        self, request: SynapseRequest, session_id: str, terms_version: str
     ) -> None:
         """Handle a request to the new-user 'consent' endpoint
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index fed9701ece..05eb041ec7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -45,7 +45,7 @@ from synapse.logging.tracing import (
     start_active_span,
 )
 from synapse.push.clientformat import format_push_rules_for_user
-from synapse.storage.databases.main.event_push_actions import NotifCounts
+from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
 from synapse.storage.roommember import MemberSummary
 from synapse.storage.state import StateFilter
 from synapse.types import (
@@ -133,6 +133,7 @@ class JoinedSyncResult:
     ephemeral: List[JsonDict]
     account_data: List[JsonDict]
     unread_notifications: JsonDict
+    unread_thread_notifications: JsonDict
     summary: Optional[JsonDict]
     unread_count: int
 
@@ -1293,7 +1294,7 @@ class SyncHandler:
 
     async def unread_notifs_for_room_id(
         self, room_id: str, sync_config: SyncConfig
-    ) -> NotifCounts:
+    ) -> RoomNotifCounts:
         with Measure(self.clock, "unread_notifs_for_room_id"):
 
             return await self.store.get_unread_event_push_actions_by_room_for_user(
@@ -1319,6 +1320,19 @@ class SyncHandler:
         At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
         instance to signify that the sync calculation is complete.
         """
+
+        user_id = sync_config.user.to_string()
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
+
+        # Note: we get the users room list *before* we get the current token, this
+        # avoids checking back in history if rooms are joined after the token is fetched.
+        token_before_rooms = self.event_sources.get_current_token()
+        mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
+
         # NB: The now_token gets changed by some of the generate_sync_* methods,
         # this is due to some of the underlying streams not supporting the ability
         # to query up to a given point.
@@ -1326,6 +1340,57 @@ class SyncHandler:
         now_token = self.event_sources.get_current_token()
         log_kv({"now_token": str(now_token)})
 
+        # Since we fetched the users room list before the token, there's a small window
+        # during which membership events may have been persisted, so we fetch these now
+        # and modify the joined room list for any changes between the get_rooms_for_user
+        # call and the get_current_token call.
+        membership_change_events = []
+        if since_token:
+            membership_change_events = await self.store.get_membership_changes_for_user(
+                user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+            )
+
+            mem_last_change_by_room_id: Dict[str, EventBase] = {}
+            for event in membership_change_events:
+                mem_last_change_by_room_id[event.room_id] = event
+
+            # For the latest membership event in each room found, add/remove the room ID
+            # from the joined room list accordingly. In this case we only care if the
+            # latest change is JOIN.
+
+            for room_id, event in mem_last_change_by_room_id.items():
+                assert event.internal_metadata.stream_ordering
+                if (
+                    event.internal_metadata.stream_ordering
+                    < token_before_rooms.room_key.stream
+                ):
+                    continue
+
+                logger.info(
+                    "User membership change between getting rooms and current token: %s %s %s",
+                    user_id,
+                    event.membership,
+                    room_id,
+                )
+                # User joined a room - we have to then check the room state to ensure we
+                # respect any bans if there's a race between the join and ban events.
+                if event.membership == Membership.JOIN:
+                    user_ids_in_room = await self.store.get_users_in_room(room_id)
+                    if user_id in user_ids_in_room:
+                        mutable_joined_room_ids.add(room_id)
+                # The user left the room, or left and was re-invited but not joined yet
+                else:
+                    mutable_joined_room_ids.discard(room_id)
+
+        # Now we have our list of joined room IDs, exclude as configured and freeze
+        joined_room_ids = frozenset(
+            (
+                room_id
+                for room_id in mutable_joined_room_ids
+                if room_id not in self.rooms_to_exclude
+            )
+        )
+
         logger.debug(
             "Calculating sync response for %r between %s and %s",
             sync_config.user,
@@ -1333,22 +1398,13 @@ class SyncHandler:
             now_token,
         )
 
-        user_id = sync_config.user.to_string()
-        app_service = self.store.get_app_service_by_user_id(user_id)
-        if app_service:
-            # We no longer support AS users using /sync directly.
-            # See https://github.com/matrix-org/matrix-doc/issues/1144
-            raise NotImplementedError()
-        else:
-            joined_room_ids = await self.get_rooms_for_user_at(
-                user_id, now_token.room_key
-            )
         sync_result_builder = SyncResultBuilder(
             sync_config,
             full_state,
             since_token=since_token,
             now_token=now_token,
             joined_room_ids=joined_room_ids,
+            membership_change_events=membership_change_events,
         )
 
         logger.debug("Fetching account data")
@@ -1829,19 +1885,12 @@ class SyncHandler:
 
         Does not modify the `sync_result_builder`.
         """
-        user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
-        now_token = sync_result_builder.now_token
+        membership_change_events = sync_result_builder.membership_change_events
 
         assert since_token
 
-        # Get a list of membership change events that have happened to the user
-        # requesting the sync.
-        membership_changes = await self.store.get_membership_changes_for_user(
-            user_id, since_token.room_key, now_token.room_key
-        )
-
-        if membership_changes:
+        if membership_change_events:
             return True
 
         stream_id = since_token.room_key.stream
@@ -1880,16 +1929,10 @@ class SyncHandler:
         since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
+        membership_change_events = sync_result_builder.membership_change_events
 
         assert since_token
 
-        # TODO: we've already called this function and ran this query in
-        #       _have_rooms_changed. We could keep the results in memory to avoid a
-        #       second query, at the cost of more complicated source code.
-        membership_change_events = await self.store.get_membership_changes_for_user(
-            user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
-        )
-
         mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
         for event in membership_change_events:
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
@@ -2358,6 +2401,7 @@ class SyncHandler:
                     ephemeral=ephemeral,
                     account_data=account_data_events,
                     unread_notifications=unread_notifications,
+                    unread_thread_notifications={},
                     summary=summary,
                     unread_count=0,
                 )
@@ -2365,10 +2409,33 @@ class SyncHandler:
                 if room_sync or always_include:
                     notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
 
-                    unread_notifications["notification_count"] = notifs.notify_count
-                    unread_notifications["highlight_count"] = notifs.highlight_count
-
-                    room_sync.unread_count = notifs.unread_count
+                    # Notifications for the main timeline.
+                    notify_count = notifs.main_timeline.notify_count
+                    highlight_count = notifs.main_timeline.highlight_count
+                    unread_count = notifs.main_timeline.unread_count
+
+                    # Check the sync configuration.
+                    if sync_config.filter_collection.unread_thread_notifications():
+                        # And add info for each thread.
+                        room_sync.unread_thread_notifications = {
+                            thread_id: {
+                                "notification_count": thread_notifs.notify_count,
+                                "highlight_count": thread_notifs.highlight_count,
+                            }
+                            for thread_id, thread_notifs in notifs.threads.items()
+                            if thread_id is not None
+                        }
+
+                    else:
+                        # Combine the unread counts for all threads and main timeline.
+                        for thread_notifs in notifs.threads.values():
+                            notify_count += thread_notifs.notify_count
+                            highlight_count += thread_notifs.highlight_count
+                            unread_count += thread_notifs.unread_count
+
+                    unread_notifications["notification_count"] = notify_count
+                    unread_notifications["highlight_count"] = highlight_count
+                    room_sync.unread_count = unread_count
 
                     sync_result_builder.joined.append(room_sync)
 
@@ -2390,60 +2457,6 @@ class SyncHandler:
             else:
                 raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
-    async def get_rooms_for_user_at(
-        self,
-        user_id: str,
-        room_key: RoomStreamToken,
-    ) -> FrozenSet[str]:
-        """Get set of joined rooms for a user at the given stream ordering.
-
-        The stream ordering *must* be recent, otherwise this may throw an
-        exception if older than a month. (This function is called with the
-        current token, which should be perfectly fine).
-
-        Args:
-            user_id
-            stream_ordering
-
-        ReturnValue:
-            Set of room_ids the user is in at given stream_ordering.
-        """
-        joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
-
-        joined_room_ids = set()
-
-        # We need to check that the stream ordering of the join for each room
-        # is before the stream_ordering asked for. This might not be the case
-        # if the user joins a room between us getting the current token and
-        # calling `get_rooms_for_user_with_stream_ordering`.
-        # If the membership's stream ordering is after the given stream
-        # ordering, we need to go and work out if the user was in the room
-        # before.
-        # We also need to check whether the room should be excluded from sync
-        # responses as per the homeserver config.
-        for joined_room in joined_rooms:
-            if joined_room.room_id in self.rooms_to_exclude:
-                continue
-
-            if not joined_room.event_pos.persisted_after(room_key):
-                joined_room_ids.add(joined_room.room_id)
-                continue
-
-            logger.info("User joined room after current token: %s", joined_room.room_id)
-
-            extrems = (
-                await self.store.get_forward_extremities_for_room_at_stream_ordering(
-                    joined_room.room_id, joined_room.event_pos.stream
-                )
-            )
-            user_ids_in_room = await self.state.get_current_user_ids_in_room(
-                joined_room.room_id, extrems
-            )
-            if user_id in user_ids_in_room:
-                joined_room_ids.add(joined_room.room_id)
-
-        return frozenset(joined_room_ids)
-
 
 def _action_has_highlight(actions: List[JsonDict]) -> bool:
     for action in actions:
@@ -2540,6 +2553,7 @@ class SyncResultBuilder:
     since_token: Optional[StreamToken]
     now_token: StreamToken
     joined_room_ids: FrozenSet[str]
+    membership_change_events: List[EventBase]
 
     presence: List[UserPresenceState] = attr.Factory(list)
     account_data: List[JsonDict] = attr.Factory(list)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index f953691669..a0ea719430 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -513,7 +513,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
         self,
         user: UserID,
         from_key: int,
-        limit: Optional[int],
+        limit: int,
         room_ids: Iterable[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index a744d68c64..332edcca24 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -119,6 +119,9 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
         except PartialDownloadError as pde:
             # Twisted is silly
             data = pde.response
+            # For mypy's benefit. A general Error.response is Optional[bytes], but
+            # a PartialDownloadError.response should be bytes AFAICS.
+            assert data is not None
             resp_body = json_decoder.decode(data.decode("utf-8"))
 
         if "success" in resp_body: