diff options
author | David Robertson <davidr@element.io> | 2023-02-06 12:55:59 +0000 |
---|---|---|
committer | David Robertson <davidr@element.io> | 2023-02-06 12:55:59 +0000 |
commit | be45edb35b2a235be159b0d974d2880539bfbc0c (patch) | |
tree | 327fab0685eeae5337649c6bb03ee34df3c53dc5 /synapse/federation | |
parent | Manual format of md example (diff) | |
parent | Properly typecheck tests.api (#14983) (diff) | |
download | synapse-be45edb35b2a235be159b0d974d2880539bfbc0c.tar.xz |
Merge remote-tracking branch 'origin/develop' into mv/mypy-unused-awaitable
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_base.py | 2 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 111 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 22 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 20 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 185 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 54 | ||||
-rw-r--r-- | synapse/federation/transport/server/__init__.py | 8 | ||||
-rw-r--r-- | synapse/federation/transport/server/federation.py | 20 |
8 files changed, 295 insertions, 127 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 6bd4742140..29fae716f5 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -280,7 +280,7 @@ def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventB _strip_unsigned_values(pdu_json) depth = pdu_json["depth"] - if not isinstance(depth, int): + if type(depth) is not int: raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON) if depth < 0: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c4c0bc7315..0ac85a3be7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,6 +19,7 @@ import itertools import logging from typing import ( TYPE_CHECKING, + AbstractSet, Awaitable, Callable, Collection, @@ -37,7 +38,7 @@ from typing import ( import attr from prometheus_client import Counter -from synapse.api.constants import EventContentFields, EventTypes, Membership +from synapse.api.constants import Direction, EventContentFields, EventTypes, Membership from synapse.api.errors import ( CodeMessageException, Codes, @@ -110,8 +111,9 @@ class SendJoinResult: # True if 'state' elides non-critical membership events partial_state: bool - # if 'partial_state' is set, a list of the servers in the room (otherwise empty) - servers_in_room: List[str] + # If 'partial_state' is set, a set of the servers in the room (otherwise empty). + # Always contains the server we joined off. + servers_in_room: AbstractSet[str] class FederationClient(FederationBase): @@ -771,17 +773,28 @@ class FederationClient(FederationBase): """ if synapse_error is None: synapse_error = e.to_synapse_error() - # There is no good way to detect an "unknown" endpoint. + # MSC3743 specifies that servers should return a 404 or 405 with an errcode + # of M_UNRECOGNIZED when they receive a request to an unknown endpoint or + # to an unknown method, respectively. # - # Dendrite returns a 404 (with a body of "404 page not found"); - # Conduit returns a 404 (with no body); and Synapse returns a 400 - # with M_UNRECOGNIZED. - # - # This needs to be rather specific as some endpoints truly do return 404 - # errors. + # Older versions of servers don't properly handle this. This needs to be + # rather specific as some endpoints truly do return 404 errors. return ( - e.code == 404 and (not e.response or e.response == b"404 page not found") - ) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED) + # 404 is an unknown endpoint, 405 is a known endpoint, but unknown method. + (e.code == 404 or e.code == 405) + and ( + # Older Dendrites returned a text or empty body. + # Older Conduit returned an empty body. + not e.response + or e.response == b"404 page not found" + # The proper response JSON with M_UNRECOGNIZED errcode. + or synapse_error.errcode == Codes.UNRECOGNIZED + ) + ) or ( + # Older Synapses returned a 400 error. + e.code == 400 + and synapse_error.errcode == Codes.UNRECOGNIZED + ) async def _try_destination_list( self, @@ -1003,7 +1016,11 @@ class FederationClient(FederationBase): ) async def send_join( - self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion + self, + destinations: Iterable[str], + pdu: EventBase, + room_version: RoomVersion, + partial_state: bool = True, ) -> SendJoinResult: """Sends a join event to one of a list of homeservers. @@ -1016,6 +1033,10 @@ class FederationClient(FederationBase): pdu: event to be sent room_version: the version of the room (according to the server that did the make_join) + partial_state: whether to ask the remote server to omit membership state + events from the response. If the remote server complies, + `partial_state` in the send join result will be set. Defaults to + `True`. Returns: The result of the send join request. @@ -1026,7 +1047,9 @@ class FederationClient(FederationBase): """ async def send_request(destination: str) -> SendJoinResult: - response = await self._do_send_join(room_version, destination, pdu) + response = await self._do_send_join( + room_version, destination, pdu, omit_members=partial_state + ) # If an event was returned (and expected to be returned): # @@ -1131,18 +1154,32 @@ class FederationClient(FederationBase): % (auth_chain_create_events,) ) - if response.partial_state and not response.servers_in_room: - raise InvalidResponseError( - "partial_state was set, but no servers were listed in the room" - ) + servers_in_room = None + if response.servers_in_room is not None: + servers_in_room = set(response.servers_in_room) + + if response.members_omitted: + if not servers_in_room: + raise InvalidResponseError( + "members_omitted was set, but no servers were listed in the room" + ) + + if not partial_state: + raise InvalidResponseError( + "members_omitted was set, but we asked for full state" + ) + + # `servers_in_room` is supposed to be a complete list. + # Fix things up in case the remote homeserver is badly behaved. + servers_in_room.add(destination) return SendJoinResult( event=event, state=signed_state, auth_chain=signed_auth, origin=destination, - partial_state=response.partial_state, - servers_in_room=response.servers_in_room or [], + partial_state=response.members_omitted, + servers_in_room=servers_in_room or frozenset(), ) # MSC3083 defines additional error codes for room joins. @@ -1166,7 +1203,11 @@ class FederationClient(FederationBase): ) async def _do_send_join( - self, room_version: RoomVersion, destination: str, pdu: EventBase + self, + room_version: RoomVersion, + destination: str, + pdu: EventBase, + omit_members: bool, ) -> SendJoinResponse: time_now = self._clock.time_msec() @@ -1177,6 +1218,7 @@ class FederationClient(FederationBase): room_id=pdu.room_id, event_id=pdu.event_id, content=pdu.get_pdu_json(time_now), + omit_members=omit_members, ) except HttpResponseException as e: # If an error is received that is due to an unrecognised endpoint, @@ -1649,7 +1691,12 @@ class FederationClient(FederationBase): return result async def timestamp_to_event( - self, *, destinations: List[str], room_id: str, timestamp: int, direction: str + self, + *, + destinations: List[str], + room_id: str, + timestamp: int, + direction: Direction, ) -> Optional["TimestampToEventResponse"]: """ Calls each remote federating server from `destinations` asking for their closest @@ -1662,7 +1709,7 @@ class FederationClient(FederationBase): room_id: Room to fetch the event from timestamp: The point in time (inclusive) we should navigate from in the given direction to find the closest event. - direction: ["f"|"b"] to indicate whether we should navigate forward + direction: indicates whether we should navigate forward or backward from the given timestamp to find the closest event. Returns: @@ -1691,13 +1738,23 @@ class FederationClient(FederationBase): # to return events on *both* sides of the timestamp to # help reconcile the gap faster. _timestamp_to_event_from_destination, + # Since this endpoint is new, we should try other servers before giving up. + # We can safely remove this in a year (remove after 2023-11-16). + failover_on_unknown_endpoint=True, ) return timestamp_to_event_response - except SynapseError: + except SynapseError as e: + logger.warn( + "timestamp_to_event(room_id=%s, timestamp=%s, direction=%s): encountered error when trying to fetch from destinations: %s", + room_id, + timestamp, + direction, + e, + ) return None async def _timestamp_to_event_from_destination( - self, destination: str, room_id: str, timestamp: int, direction: str + self, destination: str, room_id: str, timestamp: int, direction: Direction ) -> "TimestampToEventResponse": """ Calls a remote federating server at `destination` asking for their @@ -1710,7 +1767,7 @@ class FederationClient(FederationBase): room_id: Room to fetch the event from timestamp: The point in time (inclusive) we should navigate from in the given direction to find the closest event. - direction: ["f"|"b"] to indicate whether we should navigate forward + direction: indicates whether we should navigate forward or backward from the given timestamp to find the closest event. Returns: @@ -1823,7 +1880,7 @@ class TimestampToEventResponse: ) origin_server_ts = d.get("origin_server_ts") - if not isinstance(origin_server_ts, int): + if type(origin_server_ts) is not int: raise ValueError( "Invalid response: 'origin_server_ts' must be a int but received %r" % origin_server_ts diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a1be969199..3b115bacdd 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -34,7 +34,13 @@ from prometheus_client import Counter, Gauge, Histogram from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import EduTypes, EventContentFields, EventTypes, Membership +from synapse.api.constants import ( + Direction, + EduTypes, + EventContentFields, + EventTypes, + Membership, +) from synapse.api.errors import ( AuthError, Codes, @@ -62,7 +68,9 @@ from synapse.logging.context import ( run_in_background, ) from synapse.logging.opentracing import ( + SynapseTags, log_kv, + set_tag, start_active_span_from_edu, tag_args, trace, @@ -216,7 +224,7 @@ class FederationServer(FederationBase): return 200, res async def on_timestamp_to_event_request( - self, origin: str, room_id: str, timestamp: int, direction: str + self, origin: str, room_id: str, timestamp: int, direction: Direction ) -> Tuple[int, Dict[str, Any]]: """When we receive a federated `/timestamp_to_event` request, handle all of the logic for validating and fetching the event. @@ -226,7 +234,7 @@ class FederationServer(FederationBase): room_id: Room to fetch the event from timestamp: The point in time (inclusive) we should navigate from in the given direction to find the closest event. - direction: ["f"|"b"] to indicate whether we should navigate forward + direction: indicates whether we should navigate forward or backward from the given timestamp to find the closest event. Returns: @@ -678,6 +686,10 @@ class FederationServer(FederationBase): room_id: str, caller_supports_partial_state: bool = False, ) -> Dict[str, Any]: + set_tag( + SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE, + caller_supports_partial_state, + ) await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] requester=None, key=room_id, @@ -725,10 +737,12 @@ class FederationServer(FederationBase): "state": [p.get_pdu_json(time_now) for p in state_events], "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events], "org.matrix.msc3706.partial_state": caller_supports_partial_state, + "members_omitted": caller_supports_partial_state, } if servers_in_room is not None: resp["org.matrix.msc3706.servers_in_room"] = list(servers_in_room) + resp["servers_in_room"] = list(servers_in_room) return resp @@ -1500,7 +1514,7 @@ def _get_event_ids_for_partial_state_join( prev_state_ids: StateMap[str], summary: Dict[str, MemberSummary], ) -> Collection[str]: - """Calculate state to be retuned in a partial_state send_join + """Calculate state to be returned in a partial_state send_join Args: join_event: the join event being send_joined diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index b6fdd1aa50..3e500e3643 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -434,7 +434,23 @@ class FederationSender(AbstractFederationSender): # If there are no prev event IDs then the state is empty # and so no remote servers in the room destinations = set() - else: + + if destinations is None: + # During partial join we use the set of servers that we got + # when beginning the join. It's still possible that we send + # events to servers that left the room in the meantime, but + # we consider that an acceptable risk since it is only our own + # events that we leak and not other server's ones. + partial_state_destinations = ( + await self.store.get_partial_state_servers_at_join( + event.room_id + ) + ) + + if partial_state_destinations is not None: + destinations = partial_state_destinations + + if destinations is None: # We check the external cache for the destinations, which is # stored per state group. @@ -631,7 +647,7 @@ class FederationSender(AbstractFederationSender): room_id = receipt.room_id # Work out which remote servers should be poked and poke them. - domains_set = await self._storage_controllers.state.get_current_hosts_in_room( + domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) domains = [ diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index b4202d3e55..a2a907d6aa 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger from synapse.logging.opentracing import SynapseTags, set_tag from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import JsonDict, ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.visibility import filter_events_for_server @@ -136,8 +136,11 @@ class PerDestinationQueue: # destination self._pending_presence: Dict[str, UserPresenceState] = {} - # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {} + # List of room_id -> receipt_type -> user_id -> receipt_dict, + # + # Each receipt can only have a single receipt per + # (room ID, receipt type, user ID, thread ID) tuple. + self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -202,17 +205,53 @@ class PerDestinationQueue: Args: receipt: receipt to be queued """ - self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( - receipt.receipt_type, {} - )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} + serialized_receipt: JsonDict = { + "event_ids": receipt.event_ids, + "data": receipt.data, + } + if receipt.thread_id is not None: + serialized_receipt["data"]["thread_id"] = receipt.thread_id + + # Find which EDU to add this receipt to. There's three situations depending + # on the (room ID, receipt type, user, thread ID) tuple: + # + # 1. If it fully matches, clobber the information. + # 2. If it is missing, add the information. + # 3. If the subset tuple of (room ID, receipt type, user) matches, check + # the next EDU (or add a new EDU). + for edu in self._pending_receipt_edus: + receipt_content = edu.setdefault(receipt.room_id, {}).setdefault( + receipt.receipt_type, {} + ) + # If this room ID, receipt type, user ID is not in this EDU, OR if + # the full tuple matches, use the current EDU. + if ( + receipt.user_id not in receipt_content + or receipt_content[receipt.user_id].get("thread_id") + == receipt.thread_id + ): + receipt_content[receipt.user_id] = serialized_receipt + break + + # If no matching EDU was found, create a new one. + else: + self._pending_receipt_edus.append( + { + receipt.room_id: { + receipt.receipt_type: {receipt.user_id: serialized_receipt} + } + } + ) def flush_read_receipts_for_room(self, room_id: str) -> None: - # if we don't have any read-receipts for this room, it may be that we've already - # sent them out, so we don't need to flush. - if room_id not in self._pending_rrs: - return - self._rrs_pending_flush = True - self.attempt_new_transaction() + # If there are any pending receipts for this room then force-flush them + # in a new transaction. + for edu in self._pending_receipt_edus: + if room_id in edu: + self._rrs_pending_flush = True + self.attempt_new_transaction() + # No use in checking remaining EDUs if the room was found. + break def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu @@ -351,7 +390,7 @@ class PerDestinationQueue: self._pending_edus = [] self._pending_edus_keyed = {} self._pending_presence = {} - self._pending_rrs = {} + self._pending_receipt_edus = [] self._start_catching_up() except FederationDeniedError as e: @@ -543,22 +582,27 @@ class PerDestinationQueue: self._destination, last_successful_stream_ordering ) - def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: - if not self._pending_rrs: + def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: + if not self._pending_receipt_edus: return if not force_flush and not self._rrs_pending_flush: # not yet time for this lot return - edu = Edu( - origin=self._server_name, - destination=self._destination, - edu_type=EduTypes.RECEIPT, - content=self._pending_rrs, - ) - self._pending_rrs = {} - self._rrs_pending_flush = False - yield edu + # Send at most limit EDUs for receipts. + for content in self._pending_receipt_edus[:limit]: + yield Edu( + origin=self._server_name, + destination=self._destination, + edu_type=EduTypes.RECEIPT, + content=content, + ) + self._pending_receipt_edus = self._pending_receipt_edus[limit:] + + # If there are still pending read-receipts, don't reset the pending flush + # flag. + if not self._pending_receipt_edus: + self._rrs_pending_flush = False def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus @@ -597,7 +641,7 @@ class PerDestinationQueue: if not message_id: continue - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id) edus = [ Edu( @@ -645,27 +689,61 @@ class _TransactionQueueManager: async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # First we calculate the EDUs we want to send, if any. - # We start by fetching device related EDUs, i.e device updates and to - # device messages. We have to keep 2 free slots for presence and rr_edus. - device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2 + # There's a maximum number of EDUs that can be sent with a transaction, + # generally device updates and to-device messages get priority, but we + # want to ensure that there's room for some other EDUs as well. + # + # This is done by: + # + # * Add a presence EDU, if one exists. + # * Add up-to a small limit of read receipt EDUs. + # * Add to-device EDUs, but leave some space for device list updates. + # * Add device list updates EDUs. + # * If there's any remaining room, add other EDUs. + pending_edus = [] + + # Add presence EDU. + if self.queue._pending_presence: + pending_edus.append( + Edu( + origin=self.queue._server_name, + destination=self.queue._destination, + edu_type=EduTypes.PRESENCE, + content={ + "push": [ + format_user_presence_state( + presence, self.queue._clock.time_msec() + ) + for presence in self.queue._pending_presence.values() + ] + }, + ) + ) + self.queue._pending_presence = {} - # We prioritize to-device messages so that existing encryption channels + # Add read receipt EDUs. + pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) + edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) + + # Next, prioritize to-device messages so that existing encryption channels # work. We also keep a few slots spare (by reducing the limit) so that # we can still trickle out some device list updates. ( to_device_edus, device_stream_id, - ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10) + ) = await self.queue._get_to_device_message_edus(edu_limit - 10) if to_device_edus: self._device_stream_id = device_stream_id else: self.queue._last_device_stream_id = device_stream_id - device_edu_limit -= len(to_device_edus) + pending_edus.extend(to_device_edus) + edu_limit -= len(to_device_edus) + # Add device list update EDUs. device_update_edus, dev_list_id = await self.queue._get_device_update_edus( - device_edu_limit + edu_limit ) if device_update_edus: @@ -673,40 +751,17 @@ class _TransactionQueueManager: else: self.queue._last_device_list_stream_id = dev_list_id - pending_edus = device_update_edus + to_device_edus - - # Now add the read receipt EDU. - pending_edus.extend(self.queue._get_rr_edus(force_flush=False)) - - # And presence EDU. - if self.queue._pending_presence: - pending_edus.append( - Edu( - origin=self.queue._server_name, - destination=self.queue._destination, - edu_type=EduTypes.PRESENCE, - content={ - "push": [ - format_user_presence_state( - presence, self.queue._clock.time_msec() - ) - for presence in self.queue._pending_presence.values() - ] - }, - ) - ) - self.queue._pending_presence = {} + pending_edus.extend(device_update_edus) + edu_limit -= len(device_update_edus) # Finally add any other types of EDUs if there is room. - pending_edus.extend( - self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) - ) - while ( - len(pending_edus) < MAX_EDUS_PER_TRANSACTION - and self.queue._pending_edus_keyed - ): + other_edus = self.queue._pop_pending_edus(edu_limit) + pending_edus.extend(other_edus) + edu_limit -= len(other_edus) + while edu_limit > 0 and self.queue._pending_edus_keyed: _, val = self.queue._pending_edus_keyed.popitem() pending_edus.append(val) + edu_limit -= 1 # Now we look for any PDUs to send, by getting up to 50 PDUs from the # queue @@ -717,8 +772,10 @@ class _TransactionQueueManager: # if we've decided to send a transaction anyway, and we have room, we # may as well send any pending RRs - if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: - pending_edus.extend(self.queue._get_rr_edus(force_flush=True)) + if edu_limit: + pending_edus.extend( + self.queue._get_receipt_edus(force_flush=True, limit=edu_limit) + ) if self._pdus: self._last_stream_ordering = self._pdus[ diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index a3cfc701cd..c05d598b70 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -32,7 +32,7 @@ from typing import ( import attr import ijson -from synapse.api.constants import Membership +from synapse.api.constants import Direction, Membership from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.api.room_versions import RoomVersion from synapse.api.urls import ( @@ -102,6 +102,10 @@ class TransportLayerClient: destination, path=path, args={"event_id": event_id}, + # This can take a looooooong time for large rooms. Give this a generous + # timeout of 10 minutes to avoid the partial state resync timing out early + # and trying a bunch of servers who haven't seen our join yet. + timeout=600_000, parser=_StateParser(room_version), ) @@ -165,7 +169,7 @@ class TransportLayerClient: ) async def timestamp_to_event( - self, destination: str, room_id: str, timestamp: int, direction: str + self, destination: str, room_id: str, timestamp: int, direction: Direction ) -> Union[JsonDict, List]: """ Calls a remote federating server at `destination` asking for their @@ -176,7 +180,7 @@ class TransportLayerClient: room_id: Room to fetch the event from timestamp: The point in time (inclusive) we should navigate from in the given direction to find the closest event. - direction: ["f"|"b"] to indicate whether we should navigate forward + direction: indicates whether we should navigate forward or backward from the given timestamp to find the closest event. Returns: @@ -185,13 +189,12 @@ class TransportLayerClient: Raises: Various exceptions when the request fails """ - path = _create_path( - FEDERATION_UNSTABLE_PREFIX, - "/org.matrix.msc3030/timestamp_to_event/%s", + path = _create_v1_path( + "/timestamp_to_event/%s", room_id, ) - args = {"ts": [str(timestamp)], "dir": [direction]} + args = {"ts": [str(timestamp)], "dir": [direction.value]} remote_response = await self.client.get_json( destination, path=path, args=args, try_trailing_slash_on_400=True @@ -352,12 +355,16 @@ class TransportLayerClient: room_id: str, event_id: str, content: JsonDict, + omit_members: bool, ) -> "SendJoinResponse": path = _create_v2_path("/send_join/%s/%s", room_id, event_id) query_params: Dict[str, str] = {} if self._faster_joins_enabled: # lazy-load state on join - query_params["org.matrix.msc3706.partial_state"] = "true" + query_params["org.matrix.msc3706.partial_state"] = ( + "true" if omit_members else "false" + ) + query_params["omit_members"] = "true" if omit_members else "false" return await self.client.put_json( destination=destination, @@ -795,7 +802,7 @@ class SendJoinResponse: event: Optional[EventBase] = None # The room state is incomplete - partial_state: bool = False + members_omitted: bool = False # List of servers in the room servers_in_room: Optional[List[str]] = None @@ -835,16 +842,18 @@ def _event_list_parser( @ijson.coroutine -def _partial_state_parser(response: SendJoinResponse) -> Generator[None, Any, None]: +def _members_omitted_parser(response: SendJoinResponse) -> Generator[None, Any, None]: """Helper function for use with `ijson.items_coro` - Parses the partial_state field in send_join responses + Parses the members_omitted field in send_join responses """ while True: val = yield if not isinstance(val, bool): - raise TypeError("partial_state must be a boolean") - response.partial_state = val + raise TypeError( + "members_omitted (formerly org.matrix.msc370c.partial_state) must be a boolean" + ) + response.members_omitted = val @ijson.coroutine @@ -905,11 +914,19 @@ class SendJoinParser(ByteParser[SendJoinResponse]): if not v1_api: self._coros.append( ijson.items_coro( - _partial_state_parser(self._response), + _members_omitted_parser(self._response), "org.matrix.msc3706.partial_state", use_float="True", ) ) + # The stable field name comes last, so it "wins" if the fields disagree + self._coros.append( + ijson.items_coro( + _members_omitted_parser(self._response), + "members_omitted", + use_float="True", + ) + ) self._coros.append( ijson.items_coro( @@ -919,6 +936,15 @@ class SendJoinParser(ByteParser[SendJoinResponse]): ) ) + # Again, stable field name comes last + self._coros.append( + ijson.items_coro( + _servers_in_room_parser(self._response), + "servers_in_room", + use_float="True", + ) + ) + def write(self, data: bytes) -> int: for c in self._coros: c.send(data) diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index 50623cd385..2725f53cf6 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -25,7 +25,6 @@ from synapse.federation.transport.server._base import ( from synapse.federation.transport.server.federation import ( FEDERATION_SERVLET_CLASSES, FederationAccountStatusServlet, - FederationTimestampLookupServlet, ) from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import ( @@ -291,13 +290,6 @@ def register_servlets( ) for servletclass in SERVLET_GROUPS[servlet_group]: - # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled - if ( - servletclass == FederationTimestampLookupServlet - and not hs.config.experimental.msc3030_enabled - ): - continue - # Only allow the `/account_status` servlet if msc3720 is enabled if ( servletclass == FederationAccountStatusServlet diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 205fd16daa..f7ca87adc4 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -26,7 +26,7 @@ from typing import ( from typing_extensions import Literal -from synapse.api.constants import EduTypes +from synapse.api.constants import Direction, EduTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.api.urls import FEDERATION_UNSTABLE_PREFIX, FEDERATION_V2_PREFIX @@ -218,14 +218,13 @@ class FederationTimestampLookupServlet(BaseFederationServerServlet): `dir` can be `f` or `b` to indicate forwards and backwards in time from the given timestamp. - GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction> + GET /_matrix/federation/v1/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction> { "event_id": ... } """ PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?" - PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030" async def on_GET( self, @@ -235,9 +234,10 @@ class FederationTimestampLookupServlet(BaseFederationServerServlet): room_id: str, ) -> Tuple[int, JsonDict]: timestamp = parse_integer_from_args(query, "ts", required=True) - direction = parse_string_from_args( - query, "dir", default="f", allowed_values=["f", "b"], required=True + direction_str = parse_string_from_args( + query, "dir", allowed_values=["f", "b"], required=True ) + direction = Direction(direction_str) return await self.handler.on_timestamp_to_event_request( origin, room_id, timestamp, direction @@ -423,7 +423,7 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): server_name: str, ): super().__init__(hs, authenticator, ratelimiter, server_name) - self._msc3706_enabled = hs.config.experimental.msc3706_enabled + self._read_msc3706_query_param = hs.config.experimental.msc3706_enabled async def on_PUT( self, @@ -437,10 +437,16 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): # match those given in content partial_state = False - if self._msc3706_enabled: + # The stable query parameter wins, if it disagrees with the unstable + # parameter for some reason. + stable_param = parse_boolean_from_args(query, "omit_members", default=None) + if stable_param is not None: + partial_state = stable_param + elif self._read_msc3706_query_param: partial_state = parse_boolean_from_args( query, "org.matrix.msc3706.partial_state", default=False ) + result = await self.handler.on_send_join_request( origin, content, room_id, caller_supports_partial_state=partial_state ) |