summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2023-02-06 12:55:59 +0000
committerDavid Robertson <davidr@element.io>2023-02-06 12:55:59 +0000
commitbe45edb35b2a235be159b0d974d2880539bfbc0c (patch)
tree327fab0685eeae5337649c6bb03ee34df3c53dc5 /synapse/federation
parentManual format of md example (diff)
parentProperly typecheck tests.api (#14983) (diff)
downloadsynapse-be45edb35b2a235be159b0d974d2880539bfbc0c.tar.xz
Merge remote-tracking branch 'origin/develop' into mv/mypy-unused-awaitable
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py2
-rw-r--r--synapse/federation/federation_client.py111
-rw-r--r--synapse/federation/federation_server.py22
-rw-r--r--synapse/federation/sender/__init__.py20
-rw-r--r--synapse/federation/sender/per_destination_queue.py185
-rw-r--r--synapse/federation/transport/client.py54
-rw-r--r--synapse/federation/transport/server/__init__.py8
-rw-r--r--synapse/federation/transport/server/federation.py20
8 files changed, 295 insertions, 127 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 6bd4742140..29fae716f5 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -280,7 +280,7 @@ def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventB
         _strip_unsigned_values(pdu_json)
 
     depth = pdu_json["depth"]
-    if not isinstance(depth, int):
+    if type(depth) is not int:
         raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
 
     if depth < 0:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c4c0bc7315..0ac85a3be7 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,7 @@ import itertools
 import logging
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Awaitable,
     Callable,
     Collection,
@@ -37,7 +38,7 @@ from typing import (
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.api.constants import Direction, EventContentFields, EventTypes, Membership
 from synapse.api.errors import (
     CodeMessageException,
     Codes,
@@ -110,8 +111,9 @@ class SendJoinResult:
     # True if 'state' elides non-critical membership events
     partial_state: bool
 
-    # if 'partial_state' is set, a list of the servers in the room (otherwise empty)
-    servers_in_room: List[str]
+    # If 'partial_state' is set, a set of the servers in the room (otherwise empty).
+    # Always contains the server we joined off.
+    servers_in_room: AbstractSet[str]
 
 
 class FederationClient(FederationBase):
@@ -771,17 +773,28 @@ class FederationClient(FederationBase):
         """
         if synapse_error is None:
             synapse_error = e.to_synapse_error()
-        # There is no good way to detect an "unknown" endpoint.
+        # MSC3743 specifies that servers should return a 404 or 405 with an errcode
+        # of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
+        # to an unknown method, respectively.
         #
-        # Dendrite returns a 404 (with a body of "404 page not found");
-        # Conduit returns a 404 (with no body); and Synapse returns a 400
-        # with M_UNRECOGNIZED.
-        #
-        # This needs to be rather specific as some endpoints truly do return 404
-        # errors.
+        # Older versions of servers don't properly handle this. This needs to be
+        # rather specific as some endpoints truly do return 404 errors.
         return (
-            e.code == 404 and (not e.response or e.response == b"404 page not found")
-        ) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED)
+            # 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
+            (e.code == 404 or e.code == 405)
+            and (
+                # Older Dendrites returned a text or empty body.
+                # Older Conduit returned an empty body.
+                not e.response
+                or e.response == b"404 page not found"
+                # The proper response JSON with M_UNRECOGNIZED errcode.
+                or synapse_error.errcode == Codes.UNRECOGNIZED
+            )
+        ) or (
+            # Older Synapses returned a 400 error.
+            e.code == 400
+            and synapse_error.errcode == Codes.UNRECOGNIZED
+        )
 
     async def _try_destination_list(
         self,
@@ -1003,7 +1016,11 @@ class FederationClient(FederationBase):
         )
 
     async def send_join(
-        self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion
+        self,
+        destinations: Iterable[str],
+        pdu: EventBase,
+        room_version: RoomVersion,
+        partial_state: bool = True,
     ) -> SendJoinResult:
         """Sends a join event to one of a list of homeservers.
 
@@ -1016,6 +1033,10 @@ class FederationClient(FederationBase):
             pdu: event to be sent
             room_version: the version of the room (according to the server that
                 did the make_join)
+            partial_state: whether to ask the remote server to omit membership state
+                events from the response. If the remote server complies,
+                `partial_state` in the send join result will be set. Defaults to
+                `True`.
 
         Returns:
             The result of the send join request.
@@ -1026,7 +1047,9 @@ class FederationClient(FederationBase):
         """
 
         async def send_request(destination: str) -> SendJoinResult:
-            response = await self._do_send_join(room_version, destination, pdu)
+            response = await self._do_send_join(
+                room_version, destination, pdu, omit_members=partial_state
+            )
 
             # If an event was returned (and expected to be returned):
             #
@@ -1131,18 +1154,32 @@ class FederationClient(FederationBase):
                     % (auth_chain_create_events,)
                 )
 
-            if response.partial_state and not response.servers_in_room:
-                raise InvalidResponseError(
-                    "partial_state was set, but no servers were listed in the room"
-                )
+            servers_in_room = None
+            if response.servers_in_room is not None:
+                servers_in_room = set(response.servers_in_room)
+
+            if response.members_omitted:
+                if not servers_in_room:
+                    raise InvalidResponseError(
+                        "members_omitted was set, but no servers were listed in the room"
+                    )
+
+                if not partial_state:
+                    raise InvalidResponseError(
+                        "members_omitted was set, but we asked for full state"
+                    )
+
+                # `servers_in_room` is supposed to be a complete list.
+                # Fix things up in case the remote homeserver is badly behaved.
+                servers_in_room.add(destination)
 
             return SendJoinResult(
                 event=event,
                 state=signed_state,
                 auth_chain=signed_auth,
                 origin=destination,
-                partial_state=response.partial_state,
-                servers_in_room=response.servers_in_room or [],
+                partial_state=response.members_omitted,
+                servers_in_room=servers_in_room or frozenset(),
             )
 
         # MSC3083 defines additional error codes for room joins.
@@ -1166,7 +1203,11 @@ class FederationClient(FederationBase):
         )
 
     async def _do_send_join(
-        self, room_version: RoomVersion, destination: str, pdu: EventBase
+        self,
+        room_version: RoomVersion,
+        destination: str,
+        pdu: EventBase,
+        omit_members: bool,
     ) -> SendJoinResponse:
         time_now = self._clock.time_msec()
 
@@ -1177,6 +1218,7 @@ class FederationClient(FederationBase):
                 room_id=pdu.room_id,
                 event_id=pdu.event_id,
                 content=pdu.get_pdu_json(time_now),
+                omit_members=omit_members,
             )
         except HttpResponseException as e:
             # If an error is received that is due to an unrecognised endpoint,
@@ -1649,7 +1691,12 @@ class FederationClient(FederationBase):
         return result
 
     async def timestamp_to_event(
-        self, *, destinations: List[str], room_id: str, timestamp: int, direction: str
+        self,
+        *,
+        destinations: List[str],
+        room_id: str,
+        timestamp: int,
+        direction: Direction,
     ) -> Optional["TimestampToEventResponse"]:
         """
         Calls each remote federating server from `destinations` asking for their closest
@@ -1662,7 +1709,7 @@ class FederationClient(FederationBase):
             room_id: Room to fetch the event from
             timestamp: The point in time (inclusive) we should navigate from in
                 the given direction to find the closest event.
-            direction: ["f"|"b"] to indicate whether we should navigate forward
+            direction: indicates whether we should navigate forward
                 or backward from the given timestamp to find the closest event.
 
         Returns:
@@ -1691,13 +1738,23 @@ class FederationClient(FederationBase):
                 #   to return events on *both* sides of the timestamp to
                 #   help reconcile the gap faster.
                 _timestamp_to_event_from_destination,
+                # Since this endpoint is new, we should try other servers before giving up.
+                # We can safely remove this in a year (remove after 2023-11-16).
+                failover_on_unknown_endpoint=True,
             )
             return timestamp_to_event_response
-        except SynapseError:
+        except SynapseError as e:
+            logger.warn(
+                "timestamp_to_event(room_id=%s, timestamp=%s, direction=%s): encountered error when trying to fetch from destinations: %s",
+                room_id,
+                timestamp,
+                direction,
+                e,
+            )
             return None
 
     async def _timestamp_to_event_from_destination(
-        self, destination: str, room_id: str, timestamp: int, direction: str
+        self, destination: str, room_id: str, timestamp: int, direction: Direction
     ) -> "TimestampToEventResponse":
         """
         Calls a remote federating server at `destination` asking for their
@@ -1710,7 +1767,7 @@ class FederationClient(FederationBase):
             room_id: Room to fetch the event from
             timestamp: The point in time (inclusive) we should navigate from in
                 the given direction to find the closest event.
-            direction: ["f"|"b"] to indicate whether we should navigate forward
+            direction: indicates whether we should navigate forward
                 or backward from the given timestamp to find the closest event.
 
         Returns:
@@ -1823,7 +1880,7 @@ class TimestampToEventResponse:
             )
 
         origin_server_ts = d.get("origin_server_ts")
-        if not isinstance(origin_server_ts, int):
+        if type(origin_server_ts) is not int:
             raise ValueError(
                 "Invalid response: 'origin_server_ts' must be a int but received %r"
                 % origin_server_ts
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a1be969199..3b115bacdd 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -34,7 +34,13 @@ from prometheus_client import Counter, Gauge, Histogram
 from twisted.internet.abstract import isIPAddress
 from twisted.python import failure
 
-from synapse.api.constants import EduTypes, EventContentFields, EventTypes, Membership
+from synapse.api.constants import (
+    Direction,
+    EduTypes,
+    EventContentFields,
+    EventTypes,
+    Membership,
+)
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -62,7 +68,9 @@ from synapse.logging.context import (
     run_in_background,
 )
 from synapse.logging.opentracing import (
+    SynapseTags,
     log_kv,
+    set_tag,
     start_active_span_from_edu,
     tag_args,
     trace,
@@ -216,7 +224,7 @@ class FederationServer(FederationBase):
         return 200, res
 
     async def on_timestamp_to_event_request(
-        self, origin: str, room_id: str, timestamp: int, direction: str
+        self, origin: str, room_id: str, timestamp: int, direction: Direction
     ) -> Tuple[int, Dict[str, Any]]:
         """When we receive a federated `/timestamp_to_event` request,
         handle all of the logic for validating and fetching the event.
@@ -226,7 +234,7 @@ class FederationServer(FederationBase):
             room_id: Room to fetch the event from
             timestamp: The point in time (inclusive) we should navigate from in
                 the given direction to find the closest event.
-            direction: ["f"|"b"] to indicate whether we should navigate forward
+            direction: indicates whether we should navigate forward
                 or backward from the given timestamp to find the closest event.
 
         Returns:
@@ -678,6 +686,10 @@ class FederationServer(FederationBase):
         room_id: str,
         caller_supports_partial_state: bool = False,
     ) -> Dict[str, Any]:
+        set_tag(
+            SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
+            caller_supports_partial_state,
+        )
         await self._room_member_handler._join_rate_per_room_limiter.ratelimit(  # type: ignore[has-type]
             requester=None,
             key=room_id,
@@ -725,10 +737,12 @@ class FederationServer(FederationBase):
             "state": [p.get_pdu_json(time_now) for p in state_events],
             "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events],
             "org.matrix.msc3706.partial_state": caller_supports_partial_state,
+            "members_omitted": caller_supports_partial_state,
         }
 
         if servers_in_room is not None:
             resp["org.matrix.msc3706.servers_in_room"] = list(servers_in_room)
+            resp["servers_in_room"] = list(servers_in_room)
 
         return resp
 
@@ -1500,7 +1514,7 @@ def _get_event_ids_for_partial_state_join(
     prev_state_ids: StateMap[str],
     summary: Dict[str, MemberSummary],
 ) -> Collection[str]:
-    """Calculate state to be retuned in a partial_state send_join
+    """Calculate state to be returned in a partial_state send_join
 
     Args:
         join_event: the join event being send_joined
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index b6fdd1aa50..3e500e3643 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -434,7 +434,23 @@ class FederationSender(AbstractFederationSender):
                         # If there are no prev event IDs then the state is empty
                         # and so no remote servers in the room
                         destinations = set()
-                    else:
+
+                    if destinations is None:
+                        # During partial join we use the set of servers that we got
+                        # when beginning the join. It's still possible that we send
+                        # events to servers that left the room in the meantime, but
+                        # we consider that an acceptable risk since it is only our own
+                        # events that we leak and not other server's ones.
+                        partial_state_destinations = (
+                            await self.store.get_partial_state_servers_at_join(
+                                event.room_id
+                            )
+                        )
+
+                        if partial_state_destinations is not None:
+                            destinations = partial_state_destinations
+
+                    if destinations is None:
                         # We check the external cache for the destinations, which is
                         # stored per state group.
 
@@ -631,7 +647,7 @@ class FederationSender(AbstractFederationSender):
         room_id = receipt.room_id
 
         # Work out which remote servers should be poked and poke them.
-        domains_set = await self._storage_controllers.state.get_current_hosts_in_room(
+        domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
             room_id
         )
         domains = [
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index b4202d3e55..a2a907d6aa 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger
 from synapse.logging.opentracing import SynapseTags, set_tag
 from synapse.metrics import sent_transactions_counter
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import JsonDict, ReadReceipt
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.visibility import filter_events_for_server
 
@@ -136,8 +136,11 @@ class PerDestinationQueue:
         # destination
         self._pending_presence: Dict[str, UserPresenceState] = {}
 
-        # room_id -> receipt_type -> user_id -> receipt_dict
-        self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
+        # List of room_id -> receipt_type -> user_id -> receipt_dict,
+        #
+        # Each receipt can only have a single receipt per
+        # (room ID, receipt type, user ID, thread ID) tuple.
+        self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
         self._rrs_pending_flush = False
 
         # stream_id of last successfully sent to-device message.
@@ -202,17 +205,53 @@ class PerDestinationQueue:
         Args:
             receipt: receipt to be queued
         """
-        self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
-            receipt.receipt_type, {}
-        )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
+        serialized_receipt: JsonDict = {
+            "event_ids": receipt.event_ids,
+            "data": receipt.data,
+        }
+        if receipt.thread_id is not None:
+            serialized_receipt["data"]["thread_id"] = receipt.thread_id
+
+        # Find which EDU to add this receipt to. There's three situations depending
+        # on the (room ID, receipt type, user, thread ID) tuple:
+        #
+        # 1. If it fully matches, clobber the information.
+        # 2. If it is missing, add the information.
+        # 3. If the subset tuple of (room ID, receipt type, user) matches, check
+        #    the next EDU (or add a new EDU).
+        for edu in self._pending_receipt_edus:
+            receipt_content = edu.setdefault(receipt.room_id, {}).setdefault(
+                receipt.receipt_type, {}
+            )
+            # If this room ID, receipt type, user ID is not in this EDU, OR if
+            # the full tuple matches, use the current EDU.
+            if (
+                receipt.user_id not in receipt_content
+                or receipt_content[receipt.user_id].get("thread_id")
+                == receipt.thread_id
+            ):
+                receipt_content[receipt.user_id] = serialized_receipt
+                break
+
+        # If no matching EDU was found, create a new one.
+        else:
+            self._pending_receipt_edus.append(
+                {
+                    receipt.room_id: {
+                        receipt.receipt_type: {receipt.user_id: serialized_receipt}
+                    }
+                }
+            )
 
     def flush_read_receipts_for_room(self, room_id: str) -> None:
-        # if we don't have any read-receipts for this room, it may be that we've already
-        # sent them out, so we don't need to flush.
-        if room_id not in self._pending_rrs:
-            return
-        self._rrs_pending_flush = True
-        self.attempt_new_transaction()
+        # If there are any pending receipts for this room then force-flush them
+        # in a new transaction.
+        for edu in self._pending_receipt_edus:
+            if room_id in edu:
+                self._rrs_pending_flush = True
+                self.attempt_new_transaction()
+                # No use in checking remaining EDUs if the room was found.
+                break
 
     def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
         self._pending_edus_keyed[(edu.edu_type, key)] = edu
@@ -351,7 +390,7 @@ class PerDestinationQueue:
                 self._pending_edus = []
                 self._pending_edus_keyed = {}
                 self._pending_presence = {}
-                self._pending_rrs = {}
+                self._pending_receipt_edus = []
 
                 self._start_catching_up()
         except FederationDeniedError as e:
@@ -543,22 +582,27 @@ class PerDestinationQueue:
                     self._destination, last_successful_stream_ordering
                 )
 
-    def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
-        if not self._pending_rrs:
+    def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
+        if not self._pending_receipt_edus:
             return
         if not force_flush and not self._rrs_pending_flush:
             # not yet time for this lot
             return
 
-        edu = Edu(
-            origin=self._server_name,
-            destination=self._destination,
-            edu_type=EduTypes.RECEIPT,
-            content=self._pending_rrs,
-        )
-        self._pending_rrs = {}
-        self._rrs_pending_flush = False
-        yield edu
+        # Send at most limit EDUs for receipts.
+        for content in self._pending_receipt_edus[:limit]:
+            yield Edu(
+                origin=self._server_name,
+                destination=self._destination,
+                edu_type=EduTypes.RECEIPT,
+                content=content,
+            )
+        self._pending_receipt_edus = self._pending_receipt_edus[limit:]
+
+        # If there are still pending read-receipts, don't reset the pending flush
+        # flag.
+        if not self._pending_receipt_edus:
+            self._rrs_pending_flush = False
 
     def _pop_pending_edus(self, limit: int) -> List[Edu]:
         pending_edus = self._pending_edus
@@ -597,7 +641,7 @@ class PerDestinationQueue:
             if not message_id:
                 continue
 
-            set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+            set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
 
         edus = [
             Edu(
@@ -645,27 +689,61 @@ class _TransactionQueueManager:
     async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
         # First we calculate the EDUs we want to send, if any.
 
-        # We start by fetching device related EDUs, i.e device updates and to
-        # device messages. We have to keep 2 free slots for presence and rr_edus.
-        device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
+        # There's a maximum number of EDUs that can be sent with a transaction,
+        # generally device updates and to-device messages get priority, but we
+        # want to ensure that there's room for some other EDUs as well.
+        #
+        # This is done by:
+        #
+        # * Add a presence EDU, if one exists.
+        # * Add up-to a small limit of read receipt EDUs.
+        # * Add to-device EDUs, but leave some space for device list updates.
+        # * Add device list updates EDUs.
+        # * If there's any remaining room, add other EDUs.
+        pending_edus = []
+
+        # Add presence EDU.
+        if self.queue._pending_presence:
+            pending_edus.append(
+                Edu(
+                    origin=self.queue._server_name,
+                    destination=self.queue._destination,
+                    edu_type=EduTypes.PRESENCE,
+                    content={
+                        "push": [
+                            format_user_presence_state(
+                                presence, self.queue._clock.time_msec()
+                            )
+                            for presence in self.queue._pending_presence.values()
+                        ]
+                    },
+                )
+            )
+            self.queue._pending_presence = {}
 
-        # We prioritize to-device messages so that existing encryption channels
+        # Add read receipt EDUs.
+        pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
+        edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
+
+        # Next, prioritize to-device messages so that existing encryption channels
         # work. We also keep a few slots spare (by reducing the limit) so that
         # we can still trickle out some device list updates.
         (
             to_device_edus,
             device_stream_id,
-        ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
+        ) = await self.queue._get_to_device_message_edus(edu_limit - 10)
 
         if to_device_edus:
             self._device_stream_id = device_stream_id
         else:
             self.queue._last_device_stream_id = device_stream_id
 
-        device_edu_limit -= len(to_device_edus)
+        pending_edus.extend(to_device_edus)
+        edu_limit -= len(to_device_edus)
 
+        # Add device list update EDUs.
         device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
-            device_edu_limit
+            edu_limit
         )
 
         if device_update_edus:
@@ -673,40 +751,17 @@ class _TransactionQueueManager:
         else:
             self.queue._last_device_list_stream_id = dev_list_id
 
-        pending_edus = device_update_edus + to_device_edus
-
-        # Now add the read receipt EDU.
-        pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
-
-        # And presence EDU.
-        if self.queue._pending_presence:
-            pending_edus.append(
-                Edu(
-                    origin=self.queue._server_name,
-                    destination=self.queue._destination,
-                    edu_type=EduTypes.PRESENCE,
-                    content={
-                        "push": [
-                            format_user_presence_state(
-                                presence, self.queue._clock.time_msec()
-                            )
-                            for presence in self.queue._pending_presence.values()
-                        ]
-                    },
-                )
-            )
-            self.queue._pending_presence = {}
+        pending_edus.extend(device_update_edus)
+        edu_limit -= len(device_update_edus)
 
         # Finally add any other types of EDUs if there is room.
-        pending_edus.extend(
-            self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
-        )
-        while (
-            len(pending_edus) < MAX_EDUS_PER_TRANSACTION
-            and self.queue._pending_edus_keyed
-        ):
+        other_edus = self.queue._pop_pending_edus(edu_limit)
+        pending_edus.extend(other_edus)
+        edu_limit -= len(other_edus)
+        while edu_limit > 0 and self.queue._pending_edus_keyed:
             _, val = self.queue._pending_edus_keyed.popitem()
             pending_edus.append(val)
+            edu_limit -= 1
 
         # Now we look for any PDUs to send, by getting up to 50 PDUs from the
         # queue
@@ -717,8 +772,10 @@ class _TransactionQueueManager:
 
         # if we've decided to send a transaction anyway, and we have room, we
         # may as well send any pending RRs
-        if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
-            pending_edus.extend(self.queue._get_rr_edus(force_flush=True))
+        if edu_limit:
+            pending_edus.extend(
+                self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
+            )
 
         if self._pdus:
             self._last_stream_ordering = self._pdus[
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index a3cfc701cd..c05d598b70 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -32,7 +32,7 @@ from typing import (
 import attr
 import ijson
 
-from synapse.api.constants import Membership
+from synapse.api.constants import Direction, Membership
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
 from synapse.api.room_versions import RoomVersion
 from synapse.api.urls import (
@@ -102,6 +102,10 @@ class TransportLayerClient:
             destination,
             path=path,
             args={"event_id": event_id},
+            # This can take a looooooong time for large rooms. Give this a generous
+            # timeout of 10 minutes to avoid the partial state resync timing out early
+            # and trying a bunch of servers who haven't seen our join yet.
+            timeout=600_000,
             parser=_StateParser(room_version),
         )
 
@@ -165,7 +169,7 @@ class TransportLayerClient:
         )
 
     async def timestamp_to_event(
-        self, destination: str, room_id: str, timestamp: int, direction: str
+        self, destination: str, room_id: str, timestamp: int, direction: Direction
     ) -> Union[JsonDict, List]:
         """
         Calls a remote federating server at `destination` asking for their
@@ -176,7 +180,7 @@ class TransportLayerClient:
             room_id: Room to fetch the event from
             timestamp: The point in time (inclusive) we should navigate from in
                 the given direction to find the closest event.
-            direction: ["f"|"b"] to indicate whether we should navigate forward
+            direction: indicates whether we should navigate forward
                 or backward from the given timestamp to find the closest event.
 
         Returns:
@@ -185,13 +189,12 @@ class TransportLayerClient:
         Raises:
             Various exceptions when the request fails
         """
-        path = _create_path(
-            FEDERATION_UNSTABLE_PREFIX,
-            "/org.matrix.msc3030/timestamp_to_event/%s",
+        path = _create_v1_path(
+            "/timestamp_to_event/%s",
             room_id,
         )
 
-        args = {"ts": [str(timestamp)], "dir": [direction]}
+        args = {"ts": [str(timestamp)], "dir": [direction.value]}
 
         remote_response = await self.client.get_json(
             destination, path=path, args=args, try_trailing_slash_on_400=True
@@ -352,12 +355,16 @@ class TransportLayerClient:
         room_id: str,
         event_id: str,
         content: JsonDict,
+        omit_members: bool,
     ) -> "SendJoinResponse":
         path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
         query_params: Dict[str, str] = {}
         if self._faster_joins_enabled:
             # lazy-load state on join
-            query_params["org.matrix.msc3706.partial_state"] = "true"
+            query_params["org.matrix.msc3706.partial_state"] = (
+                "true" if omit_members else "false"
+            )
+            query_params["omit_members"] = "true" if omit_members else "false"
 
         return await self.client.put_json(
             destination=destination,
@@ -795,7 +802,7 @@ class SendJoinResponse:
     event: Optional[EventBase] = None
 
     # The room state is incomplete
-    partial_state: bool = False
+    members_omitted: bool = False
 
     # List of servers in the room
     servers_in_room: Optional[List[str]] = None
@@ -835,16 +842,18 @@ def _event_list_parser(
 
 
 @ijson.coroutine
-def _partial_state_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
+def _members_omitted_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
     """Helper function for use with `ijson.items_coro`
 
-    Parses the partial_state field in send_join responses
+    Parses the members_omitted field in send_join responses
     """
     while True:
         val = yield
         if not isinstance(val, bool):
-            raise TypeError("partial_state must be a boolean")
-        response.partial_state = val
+            raise TypeError(
+                "members_omitted (formerly org.matrix.msc370c.partial_state) must be a boolean"
+            )
+        response.members_omitted = val
 
 
 @ijson.coroutine
@@ -905,11 +914,19 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
         if not v1_api:
             self._coros.append(
                 ijson.items_coro(
-                    _partial_state_parser(self._response),
+                    _members_omitted_parser(self._response),
                     "org.matrix.msc3706.partial_state",
                     use_float="True",
                 )
             )
+            # The stable field name comes last, so it "wins" if the fields disagree
+            self._coros.append(
+                ijson.items_coro(
+                    _members_omitted_parser(self._response),
+                    "members_omitted",
+                    use_float="True",
+                )
+            )
 
             self._coros.append(
                 ijson.items_coro(
@@ -919,6 +936,15 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
                 )
             )
 
+            # Again, stable field name comes last
+            self._coros.append(
+                ijson.items_coro(
+                    _servers_in_room_parser(self._response),
+                    "servers_in_room",
+                    use_float="True",
+                )
+            )
+
     def write(self, data: bytes) -> int:
         for c in self._coros:
             c.send(data)
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 50623cd385..2725f53cf6 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -25,7 +25,6 @@ from synapse.federation.transport.server._base import (
 from synapse.federation.transport.server.federation import (
     FEDERATION_SERVLET_CLASSES,
     FederationAccountStatusServlet,
-    FederationTimestampLookupServlet,
 )
 from synapse.http.server import HttpServer, JsonResource
 from synapse.http.servlet import (
@@ -291,13 +290,6 @@ def register_servlets(
             )
 
         for servletclass in SERVLET_GROUPS[servlet_group]:
-            # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
-            if (
-                servletclass == FederationTimestampLookupServlet
-                and not hs.config.experimental.msc3030_enabled
-            ):
-                continue
-
             # Only allow the `/account_status` servlet if msc3720 is enabled
             if (
                 servletclass == FederationAccountStatusServlet
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 205fd16daa..f7ca87adc4 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -26,7 +26,7 @@ from typing import (
 
 from typing_extensions import Literal
 
-from synapse.api.constants import EduTypes
+from synapse.api.constants import Direction, EduTypes
 from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import FEDERATION_UNSTABLE_PREFIX, FEDERATION_V2_PREFIX
@@ -218,14 +218,13 @@ class FederationTimestampLookupServlet(BaseFederationServerServlet):
     `dir` can be `f` or `b` to indicate forwards and backwards in time from the
     given timestamp.
 
-    GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
+    GET /_matrix/federation/v1/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
     {
         "event_id": ...
     }
     """
 
     PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?"
-    PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030"
 
     async def on_GET(
         self,
@@ -235,9 +234,10 @@ class FederationTimestampLookupServlet(BaseFederationServerServlet):
         room_id: str,
     ) -> Tuple[int, JsonDict]:
         timestamp = parse_integer_from_args(query, "ts", required=True)
-        direction = parse_string_from_args(
-            query, "dir", default="f", allowed_values=["f", "b"], required=True
+        direction_str = parse_string_from_args(
+            query, "dir", allowed_values=["f", "b"], required=True
         )
+        direction = Direction(direction_str)
 
         return await self.handler.on_timestamp_to_event_request(
             origin, room_id, timestamp, direction
@@ -423,7 +423,7 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet):
         server_name: str,
     ):
         super().__init__(hs, authenticator, ratelimiter, server_name)
-        self._msc3706_enabled = hs.config.experimental.msc3706_enabled
+        self._read_msc3706_query_param = hs.config.experimental.msc3706_enabled
 
     async def on_PUT(
         self,
@@ -437,10 +437,16 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet):
         #   match those given in content
 
         partial_state = False
-        if self._msc3706_enabled:
+        # The stable query parameter wins, if it disagrees with the unstable
+        # parameter for some reason.
+        stable_param = parse_boolean_from_args(query, "omit_members", default=None)
+        if stable_param is not None:
+            partial_state = stable_param
+        elif self._read_msc3706_query_param:
             partial_state = parse_boolean_from_args(
                 query, "org.matrix.msc3706.partial_state", default=False
             )
+
         result = await self.handler.on_send_join_request(
             origin, content, room_id, caller_supports_partial_state=partial_state
         )