summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/federation/__init__.py3
-rw-r--r--synapse/federation/federation_base.py46
-rw-r--r--synapse/federation/federation_client.py70
-rw-r--r--synapse/federation/federation_server.py27
-rw-r--r--synapse/federation/persistence.py2
-rw-r--r--synapse/federation/sender/__init__.py229
-rw-r--r--synapse/federation/sender/per_destination_queue.py25
-rw-r--r--synapse/federation/transport/client.py27
-rw-r--r--synapse/federation/transport/server/__init__.py4
-rw-r--r--synapse/federation/transport/server/_base.py2
-rw-r--r--synapse/federation/transport/server/federation.py7
-rw-r--r--synapse/federation/units.py28
12 files changed, 332 insertions, 138 deletions
diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py

index a571eff590..61e28bff66 100644 --- a/synapse/federation/__init__.py +++ b/synapse/federation/__init__.py
@@ -19,5 +19,4 @@ # # -""" This package includes all the federation specific logic. -""" +"""This package includes all the federation specific logic.""" diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index b101a389ef..45593430e8 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -20,7 +20,7 @@ # # import logging -from typing import TYPE_CHECKING, Awaitable, Callable, Optional +from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Sequence from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership from synapse.api.errors import Codes, SynapseError @@ -29,6 +29,8 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.crypto.keyring import Keyring from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import prune_event, validate_canonicaljson +from synapse.federation.units import filter_pdus_for_valid_depth +from synapse.handlers.room_policy import RoomPolicyHandler from synapse.http.servlet import assert_params_in_dict from synapse.logging.opentracing import log_kv, trace from synapse.types import JsonDict, get_domain_from_id @@ -63,6 +65,24 @@ class FederationBase: self._clock = hs.get_clock() self._storage_controllers = hs.get_storage_controllers() + # We need to define this lazily otherwise we get a cyclic dependency. + # self._policy_handler = hs.get_room_policy_handler() + self._policy_handler: Optional[RoomPolicyHandler] = None + + def _lazily_get_policy_handler(self) -> RoomPolicyHandler: + """Lazily get the room policy handler. + + This is required to avoid an import cycle: RoomPolicyHandler requires a + FederationClient, which requires a FederationBase, which requires a + RoomPolicyHandler. + + Returns: + RoomPolicyHandler: The room policy handler. + """ + if self._policy_handler is None: + self._policy_handler = self.hs.get_room_policy_handler() + return self._policy_handler + @trace async def _check_sigs_and_hash( self, @@ -79,6 +99,10 @@ class FederationBase: Also runs the event through the spam checker; if it fails, redacts the event and flags it as soft-failed. + Also checks that the event is allowed by the policy server, if the room uses + a policy server. If the event is not allowed, the event is flagged as + soft-failed but not redacted. + Args: room_version: The room version of the PDU pdu: the event to be checked @@ -144,6 +168,17 @@ class FederationBase: ) return redacted_event + policy_allowed = await self._lazily_get_policy_handler().is_event_allowed(pdu) + if not policy_allowed: + logger.warning( + "Event not allowed by policy server, soft-failing %s", pdu.event_id + ) + pdu.internal_metadata.soft_failed = True + # Note: we don't redact the event so admins can inspect the event after the + # fact. Other processes may redact the event, but that won't be applied to + # the database copy of the event until the server's config requires it. + return pdu + spam_check = await self._spam_checker_module_callbacks.check_event_for_spam(pdu) if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: @@ -267,6 +302,15 @@ def _is_invite_via_3pid(event: EventBase) -> bool: ) +def parse_events_from_pdu_json( + pdus_json: Sequence[JsonDict], room_version: RoomVersion +) -> List[EventBase]: + return [ + event_from_pdu_json(pdu_json, room_version) + for pdu_json in filter_pdus_for_valid_depth(pdus_json) + ] + + def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventBase: """Construct an EventBase from an event json received over federation diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7d80ff6998..7c485aa7e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -68,12 +68,14 @@ from synapse.federation.federation_base import ( FederationBase, InvalidEventSignatureError, event_from_pdu_json, + parse_events_from_pdu_json, ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.client import is_unknown_endpoint from synapse.http.types import QueryParams from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id +from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -349,7 +351,7 @@ class FederationClient(FederationBase): room_version = await self.store.get_room_version(room_id) - pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus] + pdus = parse_events_from_pdu_json(transaction_data_pdus, room_version) # Check signatures and hash of pdus, removing any from the list that fail checks pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch( @@ -393,9 +395,7 @@ class FederationClient(FederationBase): transaction_data, ) - pdu_list: List[EventBase] = [ - event_from_pdu_json(p, room_version) for p in transaction_data["pdus"] - ] + pdu_list = parse_events_from_pdu_json(transaction_data["pdus"], room_version) if pdu_list and pdu_list[0]: pdu = pdu_list[0] @@ -424,6 +424,62 @@ class FederationClient(FederationBase): @trace @tag_args + async def get_pdu_policy_recommendation( + self, destination: str, pdu: EventBase, timeout: Optional[int] = None + ) -> str: + """Requests that the destination server (typically a policy server) + check the event and return its recommendation on how to handle the + event. + + If the policy server could not be contacted or the policy server + returned an unknown recommendation, this returns an OK recommendation. + This type fixing behaviour is done because the typical caller will be + in a critical call path and would generally interpret a `None` or similar + response as "weird value; don't care; move on without taking action". We + just frontload that logic here. + + + Args: + destination: The remote homeserver to ask (a policy server) + pdu: The event to check + timeout: How long to try (in ms) the destination for before + giving up. None indicates no timeout. + + Returns: + The policy recommendation, or RECOMMENDATION_OK if the policy server was + uncontactable or returned an unknown recommendation. + """ + + logger.debug( + "get_pdu_policy_recommendation for event_id=%s from %s", + pdu.event_id, + destination, + ) + + try: + res = await self.transport_layer.get_policy_recommendation_for_pdu( + destination, pdu, timeout=timeout + ) + recommendation = res.get("recommendation") + if not isinstance(recommendation, str): + raise InvalidResponseError("recommendation is not a string") + if recommendation not in (RECOMMENDATION_OK, RECOMMENDATION_SPAM): + logger.warning( + "get_pdu_policy_recommendation: unknown recommendation: %s", + recommendation, + ) + return RECOMMENDATION_OK + return recommendation + except Exception as e: + logger.warning( + "get_pdu_policy_recommendation: server %s responded with error, assuming OK recommendation: %s", + destination, + e, + ) + return RECOMMENDATION_OK + + @trace + @tag_args async def get_pdu( self, destinations: Collection[str], @@ -809,7 +865,7 @@ class FederationClient(FederationBase): room_version = await self.store.get_room_version(room_id) - auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]] + auth_chain = parse_events_from_pdu_json(res["auth_chain"], room_version) signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, auth_chain, room_version=room_version @@ -1529,9 +1585,7 @@ class FederationClient(FederationBase): room_version = await self.store.get_room_version(room_id) - events = [ - event_from_pdu_json(e, room_version) for e in content.get("events", []) - ] + events = parse_events_from_pdu_json(content.get("events", []), room_version) signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, events, room_version=room_version diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 1932fa82a4..2f2c78babc 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -66,7 +66,7 @@ from synapse.federation.federation_base import ( event_from_pdu_json, ) from synapse.federation.persistence import TransactionActions -from synapse.federation.units import Edu, Transaction +from synapse.federation.units import Edu, Transaction, serialize_and_filter_pdus from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( @@ -469,7 +469,12 @@ class FederationServer(FederationBase): logger.info("Ignoring PDU: %s", e) continue - event = event_from_pdu_json(p, room_version) + try: + event = event_from_pdu_json(p, room_version) + except SynapseError as e: + logger.info("Ignoring PDU for failing to deserialize: %s", e) + continue + pdus_by_room.setdefault(room_id, []).append(event) if event.origin_server_ts > newest_pdu_ts: @@ -636,8 +641,8 @@ class FederationServer(FederationBase): ) return { - "pdus": [pdu.get_pdu_json() for pdu in pdus], - "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], + "pdus": serialize_and_filter_pdus(pdus), + "auth_chain": serialize_and_filter_pdus(auth_chain), } async def on_pdu_request( @@ -696,6 +701,12 @@ class FederationServer(FederationBase): pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) + if await self._spam_checker_module_callbacks.should_drop_federated_event(pdu): + logger.info( + "Federated event contains spam, dropping %s", + pdu.event_id, + ) + raise SynapseError(403, Codes.FORBIDDEN) try: pdu = await self._check_sigs_and_hash(room_version, pdu) except InvalidEventSignatureError as e: @@ -761,8 +772,8 @@ class FederationServer(FederationBase): event_json = event.get_pdu_json(time_now) resp = { "event": event_json, - "state": [p.get_pdu_json(time_now) for p in state_events], - "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events], + "state": serialize_and_filter_pdus(state_events, time_now), + "auth_chain": serialize_and_filter_pdus(auth_chain_events, time_now), "members_omitted": caller_supports_partial_state, } @@ -1005,7 +1016,7 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() auth_pdus = await self.handler.on_event_auth(event_id) - res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]} + res = {"auth_chain": serialize_and_filter_pdus(auth_pdus, time_now)} return 200, res async def on_query_client_keys( @@ -1090,7 +1101,7 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() - return {"events": [ev.get_pdu_json(time_now) for ev in missing_events]} + return {"events": serialize_and_filter_pdus(missing_events, time_now)} async def on_openid_userinfo(self, token: str) -> Optional[str]: ts_now_ms = self._clock.time_msec() diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 0bfde00315..8340b48503 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py
@@ -20,7 +20,7 @@ # # -""" This module contains all the persistence actions done by the federation +"""This module contains all the persistence actions done by the federation package. These actions are mostly only used by the :py:mod:`.replication` module. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 1888480881..2eef7b707d 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -139,14 +139,13 @@ from typing import ( Hashable, Iterable, List, + Literal, Optional, - Set, Tuple, ) import attr from prometheus_client import Counter -from typing_extensions import Literal from twisted.internet import defer @@ -170,7 +169,13 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection +from synapse.types import ( + JsonDict, + ReadReceipt, + RoomStreamToken, + StrCollection, + get_domain_from_id, +) from synapse.util import Clock from synapse.util.metrics import Measure from synapse.util.retryutils import filter_destinations_by_retry_limiter @@ -297,12 +302,10 @@ class _DestinationWakeupQueue: # being woken up. _MAX_TIME_IN_QUEUE = 30.0 - # The maximum duration in seconds between waking up consecutive destination - # queues. - _MAX_DELAY = 0.1 - sender: "FederationSender" = attr.ib() clock: Clock = attr.ib() + max_delay_s: int = attr.ib() + queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict) processing: bool = attr.ib(default=False) @@ -332,13 +335,15 @@ class _DestinationWakeupQueue: # We also add an upper bound to the delay, to gracefully handle the # case where the queue only has a few entries in it. current_sleep_seconds = min( - self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue) + self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue) ) while self.queue: destination, _ = self.queue.popitem(last=False) queue = self.sender._get_per_destination_queue(destination) + if queue is None: + continue if not queue._new_data_to_send: # The per destination queue has already been woken up. @@ -416,19 +421,14 @@ class FederationSender(AbstractFederationSender): self._is_processing = False self._last_poked_id = -1 - # map from room_id to a set of PerDestinationQueues which we believe are - # awaiting a call to flush_read_receipts_for_room. The presence of an entry - # here for a given room means that we are rate-limiting RR flushes to that room, - # and that there is a pending call to _flush_rrs_for_room in the system. - self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {} + self._external_cache = hs.get_external_cache() - self._rr_txn_interval_per_room_ms = ( - 1000.0 - / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second + rr_txn_interval_per_room_s = ( + 1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second + ) + self._destination_wakeup_queue = _DestinationWakeupQueue( + self, self.clock, max_delay_s=rr_txn_interval_per_room_s ) - - self._external_cache = hs.get_external_cache() - self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock) # Regularly wake up destinations that have outstanding PDUs to be caught up self.clock.looping_call_now( @@ -438,12 +438,23 @@ class FederationSender(AbstractFederationSender): self._wake_destinations_needing_catchup, ) - def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: + def _get_per_destination_queue( + self, destination: str + ) -> Optional[PerDestinationQueue]: """Get or create a PerDestinationQueue for the given destination Args: destination: server_name of remote server + + Returns: + None if the destination is not allowed by the federation whitelist. + Otherwise a PerDestinationQueue for this destination. """ + if not self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist( + destination + ): + return None + queue = self._per_destination_queues.get(destination) if not queue: queue = PerDestinationQueue(self.hs, self._transaction_manager, destination) @@ -720,6 +731,16 @@ class FederationSender(AbstractFederationSender): # track the fact that we have a PDU for these destinations, # to allow us to perform catch-up later on if the remote is unreachable # for a while. + # Filter out any destinations not present in the federation_domain_whitelist, if + # the whitelist exists. These destinations should not be sent to so let's not + # waste time or space keeping track of events destined for them. + destinations = [ + d + for d in destinations + if self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist( + d + ) + ] await self.store.store_destination_rooms_entries( destinations, pdu.room_id, @@ -734,7 +755,12 @@ class FederationSender(AbstractFederationSender): ) for destination in destinations: - self._get_per_destination_queue(destination).send_pdu(pdu) + queue = self._get_per_destination_queue(destination) + # We expect `queue` to not be None as we already filtered out + # non-whitelisted destinations above. + assert queue is not None + + queue.send_pdu(pdu) async def send_read_receipt(self, receipt: ReadReceipt) -> None: """Send a RR to any other servers in the room @@ -745,37 +771,48 @@ class FederationSender(AbstractFederationSender): # Some background on the rate-limiting going on here. # - # It turns out that if we attempt to send out RRs as soon as we get them from - # a client, then we end up trying to do several hundred Hz of federation - # transactions. (The number of transactions scales as O(N^2) on the size of a - # room, since in a large room we have both more RRs coming in, and more servers - # to send them to.) + # It turns out that if we attempt to send out RRs as soon as we get them + # from a client, then we end up trying to do several hundred Hz of + # federation transactions. (The number of transactions scales as O(N^2) + # on the size of a room, since in a large room we have both more RRs + # coming in, and more servers to send them to.) # - # This leads to a lot of CPU load, and we end up getting behind. The solution - # currently adopted is as follows: + # This leads to a lot of CPU load, and we end up getting behind. The + # solution currently adopted is to differentiate between receipts and + # destinations we should immediately send to, and those we can trickle + # the receipts to. # - # The first receipt in a given room is sent out immediately, at time T0. Any - # further receipts are, in theory, batched up for N seconds, where N is calculated - # based on the number of servers in the room to achieve a transaction frequency - # of around 50Hz. So, for example, if there were 100 servers in the room, then - # N would be 100 / 50Hz = 2 seconds. + # The current logic is to send receipts out immediately if: + # - the room is "small", i.e. there's only N servers to send receipts + # to, and so sending out the receipts immediately doesn't cause too + # much load; or + # - the receipt is for an event that happened recently, as users + # notice if receipts are delayed when they know other users are + # currently reading the room; or + # - the receipt is being sent to the server that sent the event, so + # that users see receipts for their own receipts quickly. # - # Then, after T+N, we flush out any receipts that have accumulated, and restart - # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate, - # we stop the cycle and go back to the start. + # For destinations that we should delay sending the receipt to, we queue + # the receipts up to be sent in the next transaction, but don't trigger + # a new transaction to be sent. We then add the destination to the + # `DestinationWakeupQueue`, which will slowly iterate over each + # destination and trigger a new transaction to be sent. # - # However, in practice, it is often possible to flush out receipts earlier: in - # particular, if we are sending a transaction to a given server anyway (for - # example, because we have a PDU or a RR in another room to send), then we may - # as well send out all of the pending RRs for that server. So it may be that - # by the time we get to T+N, we don't actually have any RRs left to send out. - # Nevertheless we continue to buffer up RRs for the room in question until we - # reach the point that no RRs arrive between timer ticks. + # However, in practice, it is often possible to send out delayed + # receipts earlier: in particular, if we are sending a transaction to a + # given server anyway (for example, because we have a PDU or a RR in + # another room to send), then we may as well send out all of the pending + # RRs for that server. So it may be that by the time we get to waking up + # the destination, we don't actually have any RRs left to send out. # - # For even more background, see https://github.com/matrix-org/synapse/issues/4730. + # For even more background, see + # https://github.com/matrix-org/synapse/issues/4730. room_id = receipt.room_id + # Local read receipts always have 1 event ID. + event_id = receipt.event_ids[0] + # Work out which remote servers should be poked and poke them. domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id @@ -797,49 +834,55 @@ class FederationSender(AbstractFederationSender): if not domains: return - queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id) + # We now split which domains we want to wake up immediately vs which we + # want to delay waking up. + immediate_domains: StrCollection + delay_domains: StrCollection - # if there is no flush yet scheduled, we will send out these receipts with - # immediate flushes, and schedule the next flush for this room. - if queues_pending_flush is not None: - logger.debug("Queuing receipt for: %r", domains) + if len(domains) < 10: + # For "small" rooms send to all domains immediately + immediate_domains = domains + delay_domains = () else: - logger.debug("Sending receipt to: %r", domains) - self._schedule_rr_flush_for_room(room_id, len(domains)) + metadata = await self.store.get_metadata_for_event( + receipt.room_id, event_id + ) + assert metadata is not None - for domain in domains: - queue = self._get_per_destination_queue(domain) - queue.queue_read_receipt(receipt) + sender_domain = get_domain_from_id(metadata.sender) - # if there is already a RR flush pending for this room, then make sure this - # destination is registered for the flush - if queues_pending_flush is not None: - queues_pending_flush.add(queue) + if self.clock.time_msec() - metadata.received_ts < 60_000: + # We always send receipts for recent messages immediately + immediate_domains = domains + delay_domains = () else: - queue.flush_read_receipts_for_room(room_id) - - def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None: - # that is going to cause approximately len(domains) transactions, so now back - # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM - backoff_ms = self._rr_txn_interval_per_room_ms * n_domains - - logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms) - self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id) - self._queues_awaiting_rr_flush_by_room[room_id] = set() - - def _flush_rrs_for_room(self, room_id: str) -> None: - queues = self._queues_awaiting_rr_flush_by_room.pop(room_id) - logger.debug("Flushing RRs in %s to %s", room_id, queues) - - if not queues: - # no more RRs arrived for this room; we are done. - return + # Otherwise, we delay waking up all destinations except for the + # sender's domain. + immediate_domains = [] + delay_domains = [] + for domain in domains: + if domain == sender_domain: + immediate_domains.append(domain) + else: + delay_domains.append(domain) + + for domain in immediate_domains: + # Add to destination queue and wake the destination up + queue = self._get_per_destination_queue(domain) + if queue is None: + continue + queue.queue_read_receipt(receipt) + queue.attempt_new_transaction() - # schedule the next flush - self._schedule_rr_flush_for_room(room_id, len(queues)) + for domain in delay_domains: + # Add to destination queue... + queue = self._get_per_destination_queue(domain) + if queue is None: + continue + queue.queue_read_receipt(receipt) - for queue in queues: - queue.flush_read_receipts_for_room(room_id) + # ... and schedule the destination to be woken up. + self._destination_wakeup_queue.add_to_queue(domain) async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] @@ -871,9 +914,10 @@ class FederationSender(AbstractFederationSender): if self.is_mine_server_name(destination): continue - self._get_per_destination_queue(destination).send_presence( - states, start_loop=False - ) + queue = self._get_per_destination_queue(destination) + if queue is None: + continue + queue.send_presence(states, start_loop=False) self._destination_wakeup_queue.add_to_queue(destination) @@ -923,6 +967,8 @@ class FederationSender(AbstractFederationSender): return queue = self._get_per_destination_queue(edu.destination) + if queue is None: + return if key: queue.send_keyed_edu(edu, key) else: @@ -947,9 +993,15 @@ class FederationSender(AbstractFederationSender): for destination in destinations: if immediate: - self._get_per_destination_queue(destination).attempt_new_transaction() + queue = self._get_per_destination_queue(destination) + if queue is None: + continue + queue.attempt_new_transaction() else: - self._get_per_destination_queue(destination).mark_new_data() + queue = self._get_per_destination_queue(destination) + if queue is None: + continue + queue.mark_new_data() self._destination_wakeup_queue.add_to_queue(destination) def wake_destination(self, destination: str) -> None: @@ -968,7 +1020,9 @@ class FederationSender(AbstractFederationSender): ): return - self._get_per_destination_queue(destination).attempt_new_transaction() + queue = self._get_per_destination_queue(destination) + if queue is not None: + queue.attempt_new_transaction() @staticmethod def get_current_token() -> int: @@ -1013,6 +1067,9 @@ class FederationSender(AbstractFederationSender): d for d in destinations_to_wake if self._federation_shard_config.should_handle(self._instance_name, d) + and self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist( + d + ) ] for destination in destinations_to_wake: diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d097e65ea7..b3f65e8237 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -156,7 +156,6 @@ class PerDestinationQueue: # Each receipt can only have a single receipt per # (room ID, receipt type, user ID, thread ID) tuple. self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] - self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. # NB: may be a long or an int. @@ -258,15 +257,7 @@ class PerDestinationQueue: } ) - def flush_read_receipts_for_room(self, room_id: str) -> None: - # If there are any pending receipts for this room then force-flush them - # in a new transaction. - for edu in self._pending_receipt_edus: - if room_id in edu: - self._rrs_pending_flush = True - self.attempt_new_transaction() - # No use in checking remaining EDUs if the room was found. - break + self.mark_new_data() def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu @@ -603,12 +594,9 @@ class PerDestinationQueue: self._destination, last_successful_stream_ordering ) - def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: + def _get_receipt_edus(self, limit: int) -> Iterable[Edu]: if not self._pending_receipt_edus: return - if not force_flush and not self._rrs_pending_flush: - # not yet time for this lot - return # Send at most limit EDUs for receipts. for content in self._pending_receipt_edus[:limit]: @@ -747,7 +735,7 @@ class _TransactionQueueManager: ) # Add read receipt EDUs. - pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) + pending_edus.extend(self.queue._get_receipt_edus(limit=5)) edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) # Next, prioritize to-device messages so that existing encryption channels @@ -795,13 +783,6 @@ class _TransactionQueueManager: if not self._pdus and not pending_edus: return [], [] - # if we've decided to send a transaction anyway, and we have room, we - # may as well send any pending RRs - if edu_limit: - pending_edus.extend( - self.queue._get_receipt_edus(force_flush=True, limit=edu_limit) - ) - if self._pdus: self._last_stream_ordering = self._pdus[ -1 diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 206e91ed14..62bf96ce91 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -143,6 +143,33 @@ class TransportLayerClient: destination, path=path, timeout=timeout, try_trailing_slash_on_400=True ) + async def get_policy_recommendation_for_pdu( + self, destination: str, event: EventBase, timeout: Optional[int] = None + ) -> JsonDict: + """Requests the policy recommendation for the given pdu from the given policy server. + + Args: + destination: The host name of the remote homeserver checking the event. + event: The event to check. + timeout: How long to try (in ms) the destination for before giving up. + None indicates no timeout. + + Returns: + The full recommendation object from the remote server. + """ + logger.debug( + "get_policy_recommendation_for_pdu dest=%s, event_id=%s", + destination, + event.event_id, + ) + return await self.client.post_json( + destination=destination, + path=f"/_matrix/policy/unstable/org.matrix.msc4284/event/{event.event_id}/check", + data=event.get_pdu_json(), + ignore_backoff=True, + timeout=timeout, + ) + async def backfill( self, destination: str, room_id: str, event_tuples: Collection[str], limit: int ) -> Optional[Union[JsonDict, list]]: diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 43102567db..174d02ab6b 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py
@@ -20,9 +20,7 @@ # # import logging -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type - -from typing_extensions import Literal +from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Tuple, Type from synapse.api.errors import FederationDeniedError, SynapseError from synapse.federation.transport.server._base import ( diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 9094201da0..cba309635b 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py
@@ -113,7 +113,7 @@ class Authenticator: ): raise AuthenticationError( HTTPStatus.UNAUTHORIZED, - "Destination mismatch in auth header", + f"Destination mismatch in auth header, received: {destination!r}", Codes.UNAUTHORIZED, ) if ( diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 20f87c885e..eb96ff27f9 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py
@@ -24,6 +24,7 @@ from typing import ( TYPE_CHECKING, Dict, List, + Literal, Mapping, Optional, Sequence, @@ -32,8 +33,6 @@ from typing import ( Union, ) -from typing_extensions import Literal - from synapse.api.constants import Direction, EduTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions @@ -509,6 +508,9 @@ class FederationV2InviteServlet(BaseFederationServerServlet): event = content["event"] invite_room_state = content.get("invite_room_state", []) + if not isinstance(invite_room_state, list): + invite_room_state = [] + # Synapse expects invite_room_state to be in unsigned, as it is in v1 # API @@ -859,7 +861,6 @@ class FederationMediaThumbnailServlet(BaseFederationServerServlet): request: SynapseRequest, media_id: str, ) -> None: - width = parse_integer(request, "width", required=True) height = parse_integer(request, "height", required=True) method = parse_string(request, "method", "scale") diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index b2c8ba5887..3bb5f824b7 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py
@@ -19,15 +19,17 @@ # # -""" Defines the JSON structure of the protocol units used by the server to +"""Defines the JSON structure of the protocol units used by the server to server protocol. """ import logging -from typing import List, Optional +from typing import List, Optional, Sequence import attr +from synapse.api.constants import CANONICALJSON_MAX_INT, CANONICALJSON_MIN_INT +from synapse.events import EventBase from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -104,8 +106,28 @@ class Transaction: result = { "origin": self.origin, "origin_server_ts": self.origin_server_ts, - "pdus": self.pdus, + "pdus": filter_pdus_for_valid_depth(self.pdus), } if self.edus: result["edus"] = self.edus return result + + +def filter_pdus_for_valid_depth(pdus: Sequence[JsonDict]) -> List[JsonDict]: + filtered_pdus = [] + for pdu in pdus: + # Drop PDUs that have a depth that is outside of the range allowed + # by canonical json. + if ( + "depth" in pdu + and CANONICALJSON_MIN_INT <= pdu["depth"] <= CANONICALJSON_MAX_INT + ): + filtered_pdus.append(pdu) + + return filtered_pdus + + +def serialize_and_filter_pdus( + pdus: Sequence[EventBase], time_now: Optional[int] = None +) -> List[JsonDict]: + return filter_pdus_for_valid_depth([pdu.get_pdu_json(time_now) for pdu in pdus])