summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py171
-rw-r--r--synapse/federation/federation_server.py40
-rw-r--r--synapse/federation/sender/__init__.py48
-rw-r--r--synapse/federation/sender/per_destination_queue.py186
-rw-r--r--synapse/federation/transport/client.py16
-rw-r--r--synapse/federation/transport/server/__init__.py8
-rw-r--r--synapse/federation/transport/server/_base.py4
-rw-r--r--synapse/federation/transport/server/federation.py10
8 files changed, 350 insertions, 133 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index 4dca711cd2..137cfb3346 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -80,6 +80,18 @@ PDU_RETRY_TIME_MS = 1 * 60 * 1000 T = TypeVar("T") +@attr.s(frozen=True, slots=True, auto_attribs=True) +class PulledPduInfo: + """ + A result object that stores the PDU and info about it like which homeserver we + pulled it from (`pull_origin`) + """ + + pdu: EventBase + # Which homeserver we pulled the PDU from + pull_origin: str + + class InvalidResponseError(RuntimeError): """Helper for _try_destination_list: indicates that the server returned a response we couldn't parse @@ -114,7 +126,9 @@ class FederationClient(FederationBase): self.hostname = hs.hostname self.signing_key = hs.signing_key - self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache( + # Cache mapping `event_id` to a tuple of the event itself and the `pull_origin` + # (which server we pulled the event from) + self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, max_len=1000, @@ -352,11 +366,11 @@ class FederationClient(FederationBase): @tag_args async def get_pdu( self, - destinations: Iterable[str], + destinations: Collection[str], event_id: str, room_version: RoomVersion, timeout: Optional[int] = None, - ) -> Optional[EventBase]: + ) -> Optional[PulledPduInfo]: """Requests the PDU with given origin and ID from the remote home servers. @@ -371,11 +385,11 @@ class FederationClient(FederationBase): moving to the next destination. None indicates no timeout. Returns: - The requested PDU, or None if we were unable to find it. + The requested PDU wrapped in `PulledPduInfo`, or None if we were unable to find it. """ logger.debug( - "get_pdu: event_id=%s from destinations=%s", event_id, destinations + "get_pdu(event_id=%s): from destinations=%s", event_id, destinations ) # TODO: Rate limit the number of times we try and get the same event. @@ -384,19 +398,25 @@ class FederationClient(FederationBase): # it gets persisted to the database), so we cache the results of the lookup. # Note that this is separate to the regular get_event cache which caches # events once they have been persisted. - event = self._get_pdu_cache.get(event_id) + get_pdu_cache_entry = self._get_pdu_cache.get(event_id) + event = None + pull_origin = None + if get_pdu_cache_entry: + event, pull_origin = get_pdu_cache_entry # If we don't see the event in the cache, go try to fetch it from the # provided remote federated destinations - if not event: + else: pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + # TODO: We can probably refactor this to use `_try_destination_list` for destination in destinations: now = self._clock.time_msec() last_attempt = pdu_attempts.get(destination, 0) if last_attempt + PDU_RETRY_TIME_MS > now: logger.debug( - "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", + "get_pdu(event_id=%s): skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", + event_id, destination, last_attempt, PDU_RETRY_TIME_MS, @@ -411,43 +431,48 @@ class FederationClient(FederationBase): room_version=room_version, timeout=timeout, ) + pull_origin = destination pdu_attempts[destination] = now if event: # Prime the cache - self._get_pdu_cache[event.event_id] = event + self._get_pdu_cache[event.event_id] = (event, pull_origin) # Now that we have an event, we can break out of this # loop and stop asking other destinations. break + except NotRetryingDestination as e: + logger.info("get_pdu(event_id=%s): %s", event_id, e) + continue + except FederationDeniedError: + logger.info( + "get_pdu(event_id=%s): Not attempting to fetch PDU from %s because the homeserver is not on our federation whitelist", + event_id, + destination, + ) + continue except SynapseError as e: logger.info( - "Failed to get PDU %s from %s because %s", + "get_pdu(event_id=%s): Failed to get PDU from %s because %s", event_id, destination, e, ) continue - except NotRetryingDestination as e: - logger.info(str(e)) - continue - except FederationDeniedError as e: - logger.info(str(e)) - continue except Exception as e: pdu_attempts[destination] = now logger.info( - "Failed to get PDU %s from %s because %s", + "get_pdu(event_id=%s): Failed to get PDU from %s because %s", event_id, destination, e, ) continue - if not event: + if not event or not pull_origin: return None # `event` now refers to an object stored in `get_pdu_cache`. Our @@ -459,7 +484,7 @@ class FederationClient(FederationBase): event.room_version, ) - return event_copy + return PulledPduInfo(event_copy, pull_origin) @trace @tag_args @@ -699,12 +724,14 @@ class FederationClient(FederationBase): pdu_origin = get_domain_from_id(pdu.sender) if not res and pdu_origin != origin: try: - res = await self.get_pdu( + pulled_pdu_info = await self.get_pdu( destinations=[pdu_origin], event_id=pdu.event_id, room_version=room_version, timeout=10000, ) + if pulled_pdu_info is not None: + res = pulled_pdu_info.pdu except SynapseError: pass @@ -744,17 +771,28 @@ class FederationClient(FederationBase): """ if synapse_error is None: synapse_error = e.to_synapse_error() - # There is no good way to detect an "unknown" endpoint. - # - # Dendrite returns a 404 (with a body of "404 page not found"); - # Conduit returns a 404 (with no body); and Synapse returns a 400 - # with M_UNRECOGNIZED. + # MSC3743 specifies that servers should return a 404 or 405 with an errcode + # of M_UNRECOGNIZED when they receive a request to an unknown endpoint or + # to an unknown method, respectively. # - # This needs to be rather specific as some endpoints truly do return 404 - # errors. + # Older versions of servers don't properly handle this. This needs to be + # rather specific as some endpoints truly do return 404 errors. return ( - e.code == 404 and (not e.response or e.response == b"404 page not found") - ) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED) + # 404 is an unknown endpoint, 405 is a known endpoint, but unknown method. + (e.code == 404 or e.code == 405) + and ( + # Older Dendrites returned a text or empty body. + # Older Conduit returned an empty body. + not e.response + or e.response == b"404 page not found" + # The proper response JSON with M_UNRECOGNIZED errcode. + or synapse_error.errcode == Codes.UNRECOGNIZED + ) + ) or ( + # Older Synapses returned a 400 error. + e.code == 400 + and synapse_error.errcode == Codes.UNRECOGNIZED + ) async def _try_destination_list( self, @@ -806,6 +844,7 @@ class FederationClient(FederationBase): ) for destination in destinations: + # We don't want to ask our own server for information we don't have if destination == self.server_name: continue @@ -814,9 +853,21 @@ class FederationClient(FederationBase): except ( RequestSendFailed, InvalidResponseError, - NotRetryingDestination, ) as e: logger.warning("Failed to %s via %s: %s", description, destination, e) + # Skip to the next homeserver in the list to try. + continue + except NotRetryingDestination as e: + logger.info("%s: %s", description, e) + continue + except FederationDeniedError: + logger.info( + "%s: Not attempting to %s from %s because the homeserver is not on our federation whitelist", + description, + description, + destination, + ) + continue except UnsupportedRoomVersionError: raise except HttpResponseException as e: @@ -1294,7 +1345,7 @@ class FederationClient(FederationBase): return resp[1] async def send_knock(self, destinations: List[str], pdu: EventBase) -> JsonDict: - """Attempts to send a knock event to given a list of servers. Iterates + """Attempts to send a knock event to a given list of servers. Iterates through the list until one attempt succeeds. Doing so will cause the remote server to add the event to the graph, @@ -1609,6 +1660,64 @@ class FederationClient(FederationBase): return result async def timestamp_to_event( + self, *, destinations: List[str], room_id: str, timestamp: int, direction: str + ) -> Optional["TimestampToEventResponse"]: + """ + Calls each remote federating server from `destinations` asking for their closest + event to the given timestamp in the given direction until we get a response. + Also validates the response to always return the expected keys or raises an + error. + + Args: + destinations: The domains of homeservers to try fetching from + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + A parsed TimestampToEventResponse including the closest event_id + and origin_server_ts or None if no destination has a response. + """ + + async def _timestamp_to_event_from_destination( + destination: str, + ) -> TimestampToEventResponse: + return await self._timestamp_to_event_from_destination( + destination, room_id, timestamp, direction + ) + + try: + # Loop through each homeserver candidate until we get a succesful response + timestamp_to_event_response = await self._try_destination_list( + "timestamp_to_event", + destinations, + # TODO: The requested timestamp may lie in a part of the + # event graph that the remote server *also* didn't have, + # in which case they will have returned another event + # which may be nowhere near the requested timestamp. In + # the future, we may need to reconcile that gap and ask + # other homeservers, and/or extend `/timestamp_to_event` + # to return events on *both* sides of the timestamp to + # help reconcile the gap faster. + _timestamp_to_event_from_destination, + # Since this endpoint is new, we should try other servers before giving up. + # We can safely remove this in a year (remove after 2023-11-16). + failover_on_unknown_endpoint=True, + ) + return timestamp_to_event_response + except SynapseError as e: + logger.warn( + "timestamp_to_event(room_id=%s, timestamp=%s, direction=%s): encountered error when trying to fetch from destinations: %s", + room_id, + timestamp, + direction, + e, + ) + return None + + async def _timestamp_to_event_from_destination( self, destination: str, room_id: str, timestamp: int, direction: str ) -> "TimestampToEventResponse": """ diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 907940e19e..bb20af6e91 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -74,6 +74,8 @@ from synapse.replication.http.federation import ( ) from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.lock import Lock +from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary +from synapse.storage.roommember import MemberSummary from synapse.types import JsonDict, StateMap, get_domain_from_id from synapse.util import json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results @@ -481,6 +483,14 @@ class FederationServer(FederationBase): pdu_results[pdu.event_id] = await process_pdu(pdu) async def process_pdu(pdu: EventBase) -> JsonDict: + """ + Processes a pushed PDU sent to us via a `/send` transaction + + Returns: + JsonDict representing a "PDU Processing Result" that will be bundled up + with the other processed PDU's in the `/send` transaction and sent back + to remote homeserver. + """ event_id = pdu.event_id with nested_logging_context(event_id): try: @@ -683,8 +693,9 @@ class FederationServer(FederationBase): state_event_ids: Collection[str] servers_in_room: Optional[Collection[str]] if caller_supports_partial_state: + summary = await self.store.get_room_summary(room_id) state_event_ids = _get_event_ids_for_partial_state_join( - event, prev_state_ids + event, prev_state_ids, summary ) servers_in_room = await self.state.get_hosts_in_room_at_events( room_id, event_ids=event.prev_event_ids() @@ -824,7 +835,14 @@ class FederationServer(FederationBase): context, self._room_prejoin_state_types ) ) - return {"knock_state_events": stripped_room_state} + return { + "knock_room_state": stripped_room_state, + # Since v1.37, Synapse incorrectly used "knock_state_events" for this field. + # Thus, we also populate a 'knock_state_events' with the same content to + # support old instances. + # See https://github.com/matrix-org/synapse/issues/14088. + "knock_state_events": stripped_room_state, + } async def _on_send_membership_event( self, origin: str, content: JsonDict, membership_type: str, room_id: str @@ -1480,6 +1498,7 @@ class FederationHandlerRegistry: def _get_event_ids_for_partial_state_join( join_event: EventBase, prev_state_ids: StateMap[str], + summary: Dict[str, MemberSummary], ) -> Collection[str]: """Calculate state to be retuned in a partial_state send_join @@ -1506,8 +1525,19 @@ def _get_event_ids_for_partial_state_join( if current_membership_event_id is not None: state_event_ids.add(current_membership_event_id) - # TODO: return a few more members: - # - those with invites - # - those that are kicked? / banned + name_id = prev_state_ids.get((EventTypes.Name, "")) + canonical_alias_id = prev_state_ids.get((EventTypes.CanonicalAlias, "")) + if not name_id and not canonical_alias_id: + # Also include the hero members of the room (for DM rooms without a title). + # To do this properly, we should select the correct subset of membership events + # from `prev_state_ids`. Instead, we are lazier and use the (cached) + # `get_room_summary` function, which is based on the current state of the room. + # This introduces races; we choose to ignore them because a) they should be rare + # and b) even if it's wrong, joining servers will get the full state eventually. + heroes = extract_heroes_from_room_summary(summary, join_event.state_key) + for hero in heroes: + membership_event_id = prev_state_ids.get((EventTypes.Member, hero)) + if membership_event_id: + state_event_ids.add(membership_event_id) return state_event_ids diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index a6cb3ba58f..30ebd62883 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -353,21 +353,25 @@ class FederationSender(AbstractFederationSender): last_token = await self.store.get_federation_out_pos("events") ( next_token, - events, event_to_received_ts, - ) = await self.store.get_all_new_events_stream( + ) = await self.store.get_all_new_event_ids_stream( last_token, self._last_poked_id, limit=100 ) + event_ids = event_to_received_ts.keys() + event_entries = await self.store.get_unredacted_events_from_cache_or_db( + event_ids + ) + logger.debug( "Handling %i -> %i: %i events to send (current id %i)", last_token, next_token, - len(events), + len(event_entries), self._last_poked_id, ) - if not events and next_token >= self._last_poked_id: + if not event_entries and next_token >= self._last_poked_id: logger.debug("All events processed") break @@ -430,7 +434,23 @@ class FederationSender(AbstractFederationSender): # If there are no prev event IDs then the state is empty # and so no remote servers in the room destinations = set() - else: + + if destinations is None: + # During partial join we use the set of servers that we got + # when beginning the join. It's still possible that we send + # events to servers that left the room in the meantime, but + # we consider that an acceptable risk since it is only our own + # events that we leak and not other server's ones. + partial_state_destinations = ( + await self.store.get_partial_state_servers_at_join( + event.room_id + ) + ) + + if len(partial_state_destinations) > 0: + destinations = partial_state_destinations + + if destinations is None: # We check the external cache for the destinations, which is # stored per state group. @@ -508,8 +528,14 @@ class FederationSender(AbstractFederationSender): await handle_event(event) events_by_room: Dict[str, List[EventBase]] = {} - for event in events: - events_by_room.setdefault(event.room_id, []).append(event) + + for event_id in event_ids: + # `event_entries` is unsorted, so we have to iterate over `event_ids` + # to ensure the events are in the right order + event_cache = event_entries.get(event_id) + if event_cache: + event = event_cache.event + events_by_room.setdefault(event.room_id, []).append(event) await make_deferred_yieldable( defer.gatherResults( @@ -524,9 +550,9 @@ class FederationSender(AbstractFederationSender): logger.debug("Successfully handled up to %i", next_token) await self.store.update_federation_out_pos("events", next_token) - if events: + if event_entries: now = self.clock.time_msec() - ts = event_to_received_ts[events[-1].event_id] + ts = max(t for t in event_to_received_ts.values() if t) assert ts is not None synapse.metrics.event_processing_lag.labels( @@ -536,7 +562,7 @@ class FederationSender(AbstractFederationSender): "federation_sender" ).set(ts) - events_processed_counter.inc(len(events)) + events_processed_counter.inc(len(event_entries)) event_processing_loop_room_count.labels("federation_sender").inc( len(events_by_room) @@ -621,7 +647,7 @@ class FederationSender(AbstractFederationSender): room_id = receipt.room_id # Work out which remote servers should be poked and poke them. - domains_set = await self._storage_controllers.state.get_current_hosts_in_room( + domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) domains = [ diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 084c45a95c..ffc9d95ee7 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger from synapse.logging.opentracing import SynapseTags, set_tag from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import JsonDict, ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.visibility import filter_events_for_server @@ -136,8 +136,11 @@ class PerDestinationQueue: # destination self._pending_presence: Dict[str, UserPresenceState] = {} - # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {} + # List of room_id -> receipt_type -> user_id -> receipt_dict, + # + # Each receipt can only have a single receipt per + # (room ID, receipt type, user ID, thread ID) tuple. + self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -202,17 +205,53 @@ class PerDestinationQueue: Args: receipt: receipt to be queued """ - self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( - receipt.receipt_type, {} - )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} + serialized_receipt: JsonDict = { + "event_ids": receipt.event_ids, + "data": receipt.data, + } + if receipt.thread_id is not None: + serialized_receipt["data"]["thread_id"] = receipt.thread_id + + # Find which EDU to add this receipt to. There's three situations depending + # on the (room ID, receipt type, user, thread ID) tuple: + # + # 1. If it fully matches, clobber the information. + # 2. If it is missing, add the information. + # 3. If the subset tuple of (room ID, receipt type, user) matches, check + # the next EDU (or add a new EDU). + for edu in self._pending_receipt_edus: + receipt_content = edu.setdefault(receipt.room_id, {}).setdefault( + receipt.receipt_type, {} + ) + # If this room ID, receipt type, user ID is not in this EDU, OR if + # the full tuple matches, use the current EDU. + if ( + receipt.user_id not in receipt_content + or receipt_content[receipt.user_id].get("thread_id") + == receipt.thread_id + ): + receipt_content[receipt.user_id] = serialized_receipt + break + + # If no matching EDU was found, create a new one. + else: + self._pending_receipt_edus.append( + { + receipt.room_id: { + receipt.receipt_type: {receipt.user_id: serialized_receipt} + } + } + ) def flush_read_receipts_for_room(self, room_id: str) -> None: - # if we don't have any read-receipts for this room, it may be that we've already - # sent them out, so we don't need to flush. - if room_id not in self._pending_rrs: - return - self._rrs_pending_flush = True - self.attempt_new_transaction() + # If there are any pending receipts for this room then force-flush them + # in a new transaction. + for edu in self._pending_receipt_edus: + if room_id in edu: + self._rrs_pending_flush = True + self.attempt_new_transaction() + # No use in checking remaining EDUs if the room was found. + break def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu @@ -351,7 +390,7 @@ class PerDestinationQueue: self._pending_edus = [] self._pending_edus_keyed = {} self._pending_presence = {} - self._pending_rrs = {} + self._pending_receipt_edus = [] self._start_catching_up() except FederationDeniedError as e: @@ -505,6 +544,7 @@ class PerDestinationQueue: new_pdus = await filter_events_for_server( self._storage_controllers, self._destination, + self._server_name, new_pdus, redact=False, ) @@ -542,22 +582,27 @@ class PerDestinationQueue: self._destination, last_successful_stream_ordering ) - def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: - if not self._pending_rrs: + def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: + if not self._pending_receipt_edus: return if not force_flush and not self._rrs_pending_flush: # not yet time for this lot return - edu = Edu( - origin=self._server_name, - destination=self._destination, - edu_type=EduTypes.RECEIPT, - content=self._pending_rrs, - ) - self._pending_rrs = {} - self._rrs_pending_flush = False - yield edu + # Send at most limit EDUs for receipts. + for content in self._pending_receipt_edus[:limit]: + yield Edu( + origin=self._server_name, + destination=self._destination, + edu_type=EduTypes.RECEIPT, + content=content, + ) + self._pending_receipt_edus = self._pending_receipt_edus[limit:] + + # If there are still pending read-receipts, don't reset the pending flush + # flag. + if not self._pending_receipt_edus: + self._rrs_pending_flush = False def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus @@ -596,7 +641,7 @@ class PerDestinationQueue: if not message_id: continue - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id) edus = [ Edu( @@ -644,27 +689,61 @@ class _TransactionQueueManager: async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # First we calculate the EDUs we want to send, if any. - # We start by fetching device related EDUs, i.e device updates and to - # device messages. We have to keep 2 free slots for presence and rr_edus. - device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2 + # There's a maximum number of EDUs that can be sent with a transaction, + # generally device updates and to-device messages get priority, but we + # want to ensure that there's room for some other EDUs as well. + # + # This is done by: + # + # * Add a presence EDU, if one exists. + # * Add up-to a small limit of read receipt EDUs. + # * Add to-device EDUs, but leave some space for device list updates. + # * Add device list updates EDUs. + # * If there's any remaining room, add other EDUs. + pending_edus = [] + + # Add presence EDU. + if self.queue._pending_presence: + pending_edus.append( + Edu( + origin=self.queue._server_name, + destination=self.queue._destination, + edu_type=EduTypes.PRESENCE, + content={ + "push": [ + format_user_presence_state( + presence, self.queue._clock.time_msec() + ) + for presence in self.queue._pending_presence.values() + ] + }, + ) + ) + self.queue._pending_presence = {} - # We prioritize to-device messages so that existing encryption channels + # Add read receipt EDUs. + pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) + edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) + + # Next, prioritize to-device messages so that existing encryption channels # work. We also keep a few slots spare (by reducing the limit) so that # we can still trickle out some device list updates. ( to_device_edus, device_stream_id, - ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10) + ) = await self.queue._get_to_device_message_edus(edu_limit - 10) if to_device_edus: self._device_stream_id = device_stream_id else: self.queue._last_device_stream_id = device_stream_id - device_edu_limit -= len(to_device_edus) + pending_edus.extend(to_device_edus) + edu_limit -= len(to_device_edus) + # Add device list update EDUs. device_update_edus, dev_list_id = await self.queue._get_device_update_edus( - device_edu_limit + edu_limit ) if device_update_edus: @@ -672,40 +751,17 @@ class _TransactionQueueManager: else: self.queue._last_device_list_stream_id = dev_list_id - pending_edus = device_update_edus + to_device_edus - - # Now add the read receipt EDU. - pending_edus.extend(self.queue._get_rr_edus(force_flush=False)) - - # And presence EDU. - if self.queue._pending_presence: - pending_edus.append( - Edu( - origin=self.queue._server_name, - destination=self.queue._destination, - edu_type=EduTypes.PRESENCE, - content={ - "push": [ - format_user_presence_state( - presence, self.queue._clock.time_msec() - ) - for presence in self.queue._pending_presence.values() - ] - }, - ) - ) - self.queue._pending_presence = {} + pending_edus.extend(device_update_edus) + edu_limit -= len(device_update_edus) # Finally add any other types of EDUs if there is room. - pending_edus.extend( - self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) - ) - while ( - len(pending_edus) < MAX_EDUS_PER_TRANSACTION - and self.queue._pending_edus_keyed - ): + other_edus = self.queue._pop_pending_edus(edu_limit) + pending_edus.extend(other_edus) + edu_limit -= len(other_edus) + while edu_limit > 0 and self.queue._pending_edus_keyed: _, val = self.queue._pending_edus_keyed.popitem() pending_edus.append(val) + edu_limit -= 1 # Now we look for any PDUs to send, by getting up to 50 PDUs from the # queue @@ -716,8 +772,10 @@ class _TransactionQueueManager: # if we've decided to send a transaction anyway, and we have room, we # may as well send any pending RRs - if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: - pending_edus.extend(self.queue._get_rr_edus(force_flush=True)) + if edu_limit: + pending_edus.extend( + self.queue._get_receipt_edus(force_flush=True, limit=edu_limit) + ) if self._pdus: self._last_stream_ordering = self._pdus[ diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index cd39d4d111..77f1f39cac 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -185,9 +185,8 @@ class TransportLayerClient: Raises: Various exceptions when the request fails """ - path = _create_path( - FEDERATION_UNSTABLE_PREFIX, - "/org.matrix.msc3030/timestamp_to_event/%s", + path = _create_v1_path( + "/timestamp_to_event/%s", room_id, ) @@ -280,12 +279,11 @@ class TransportLayerClient: Note that this does not append any events to any graphs. Args: - destination (str): address of remote homeserver - room_id (str): room to join/leave - user_id (str): user to be joined/left - membership (str): one of join/leave - params (dict[str, str|Iterable[str]]): Query parameters to include in the - request. + destination: address of remote homeserver + room_id: room to join/leave + user_id: user to be joined/left + membership: one of join/leave + params: Query parameters to include in the request. Returns: Succeeds when we get a 2xx HTTP response. The result diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 50623cd385..2725f53cf6 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py
@@ -25,7 +25,6 @@ from synapse.federation.transport.server._base import ( from synapse.federation.transport.server.federation import ( FEDERATION_SERVLET_CLASSES, FederationAccountStatusServlet, - FederationTimestampLookupServlet, ) from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import ( @@ -291,13 +290,6 @@ def register_servlets( ) for servletclass in SERVLET_GROUPS[servlet_group]: - # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled - if ( - servletclass == FederationTimestampLookupServlet - and not hs.config.experimental.msc3030_enabled - ): - continue - # Only allow the `/account_status` servlet if msc3720 is enabled if ( servletclass == FederationAccountStatusServlet diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 1db8009d6c..cdaf0d5de7 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py
@@ -224,10 +224,10 @@ class BaseFederationServlet: With arguments: - origin (unicode|None): The authenticated server_name of the calling server, + origin (str|None): The authenticated server_name of the calling server, unless REQUIRE_AUTH is set to False and authentication failed. - content (unicode|None): decoded json body of the request. None if the + content (str|None): decoded json body of the request. None if the request was a GET. query (dict[bytes, list[bytes]]): Query params from the request. url-decoded diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 6bb4659c4c..53e77b4bb6 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py
@@ -218,14 +218,13 @@ class FederationTimestampLookupServlet(BaseFederationServerServlet): `dir` can be `f` or `b` to indicate forwards and backwards in time from the given timestamp. - GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction> + GET /_matrix/federation/v1/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction> { "event_id": ... } """ PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?" - PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030" async def on_GET( self, @@ -489,7 +488,7 @@ class FederationV2InviteServlet(BaseFederationServerServlet): room_version = content["room_version"] event = content["event"] - invite_room_state = content["invite_room_state"] + invite_room_state = content.get("invite_room_state", []) # Synapse expects invite_room_state to be in unsigned, as it is in v1 # API @@ -499,6 +498,11 @@ class FederationV2InviteServlet(BaseFederationServerServlet): result = await self.handler.on_invite_request( origin, event, room_version_id=room_version ) + + # We only store invite_room_state for internal use, so remove it before + # returning the event to the remote homeserver. + result["event"].get("unsigned", {}).pop("invite_room_state", None) + return 200, result