diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index a571eff590..61e28bff66 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -19,5 +19,4 @@
#
#
-""" This package includes all the federation specific logic.
-"""
+"""This package includes all the federation specific logic."""
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index b101a389ef..45593430e8 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -20,7 +20,7 @@
#
#
import logging
-from typing import TYPE_CHECKING, Awaitable, Callable, Optional
+from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Sequence
from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
@@ -29,6 +29,8 @@ from synapse.crypto.event_signing import check_event_content_hash
from synapse.crypto.keyring import Keyring
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
+from synapse.federation.units import filter_pdus_for_valid_depth
+from synapse.handlers.room_policy import RoomPolicyHandler
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.opentracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id
@@ -63,6 +65,24 @@ class FederationBase:
self._clock = hs.get_clock()
self._storage_controllers = hs.get_storage_controllers()
+ # We need to define this lazily otherwise we get a cyclic dependency.
+ # self._policy_handler = hs.get_room_policy_handler()
+ self._policy_handler: Optional[RoomPolicyHandler] = None
+
+ def _lazily_get_policy_handler(self) -> RoomPolicyHandler:
+ """Lazily get the room policy handler.
+
+ This is required to avoid an import cycle: RoomPolicyHandler requires a
+ FederationClient, which requires a FederationBase, which requires a
+ RoomPolicyHandler.
+
+ Returns:
+ RoomPolicyHandler: The room policy handler.
+ """
+ if self._policy_handler is None:
+ self._policy_handler = self.hs.get_room_policy_handler()
+ return self._policy_handler
+
@trace
async def _check_sigs_and_hash(
self,
@@ -79,6 +99,10 @@ class FederationBase:
Also runs the event through the spam checker; if it fails, redacts the event
and flags it as soft-failed.
+ Also checks that the event is allowed by the policy server, if the room uses
+ a policy server. If the event is not allowed, the event is flagged as
+ soft-failed but not redacted.
+
Args:
room_version: The room version of the PDU
pdu: the event to be checked
@@ -144,6 +168,17 @@ class FederationBase:
)
return redacted_event
+ policy_allowed = await self._lazily_get_policy_handler().is_event_allowed(pdu)
+ if not policy_allowed:
+ logger.warning(
+ "Event not allowed by policy server, soft-failing %s", pdu.event_id
+ )
+ pdu.internal_metadata.soft_failed = True
+ # Note: we don't redact the event so admins can inspect the event after the
+ # fact. Other processes may redact the event, but that won't be applied to
+ # the database copy of the event until the server's config requires it.
+ return pdu
+
spam_check = await self._spam_checker_module_callbacks.check_event_for_spam(pdu)
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
@@ -267,6 +302,15 @@ def _is_invite_via_3pid(event: EventBase) -> bool:
)
+def parse_events_from_pdu_json(
+ pdus_json: Sequence[JsonDict], room_version: RoomVersion
+) -> List[EventBase]:
+ return [
+ event_from_pdu_json(pdu_json, room_version)
+ for pdu_json in filter_pdus_for_valid_depth(pdus_json)
+ ]
+
+
def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventBase:
"""Construct an EventBase from an event json received over federation
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7d80ff6998..7c485aa7e0 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -68,12 +68,14 @@ from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
event_from_pdu_json,
+ parse_events_from_pdu_json,
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.client import is_unknown_endpoint
from synapse.http.types import QueryParams
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id
+from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
@@ -349,7 +351,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
- pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]
+ pdus = parse_events_from_pdu_json(transaction_data_pdus, room_version)
# Check signatures and hash of pdus, removing any from the list that fail checks
pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
@@ -393,9 +395,7 @@ class FederationClient(FederationBase):
transaction_data,
)
- pdu_list: List[EventBase] = [
- event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
- ]
+ pdu_list = parse_events_from_pdu_json(transaction_data["pdus"], room_version)
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
@@ -424,6 +424,62 @@ class FederationClient(FederationBase):
@trace
@tag_args
+ async def get_pdu_policy_recommendation(
+ self, destination: str, pdu: EventBase, timeout: Optional[int] = None
+ ) -> str:
+ """Requests that the destination server (typically a policy server)
+ check the event and return its recommendation on how to handle the
+ event.
+
+ If the policy server could not be contacted or the policy server
+ returned an unknown recommendation, this returns an OK recommendation.
+ This type fixing behaviour is done because the typical caller will be
+ in a critical call path and would generally interpret a `None` or similar
+ response as "weird value; don't care; move on without taking action". We
+ just frontload that logic here.
+
+
+ Args:
+ destination: The remote homeserver to ask (a policy server)
+ pdu: The event to check
+ timeout: How long to try (in ms) the destination for before
+ giving up. None indicates no timeout.
+
+ Returns:
+ The policy recommendation, or RECOMMENDATION_OK if the policy server was
+ uncontactable or returned an unknown recommendation.
+ """
+
+ logger.debug(
+ "get_pdu_policy_recommendation for event_id=%s from %s",
+ pdu.event_id,
+ destination,
+ )
+
+ try:
+ res = await self.transport_layer.get_policy_recommendation_for_pdu(
+ destination, pdu, timeout=timeout
+ )
+ recommendation = res.get("recommendation")
+ if not isinstance(recommendation, str):
+ raise InvalidResponseError("recommendation is not a string")
+ if recommendation not in (RECOMMENDATION_OK, RECOMMENDATION_SPAM):
+ logger.warning(
+ "get_pdu_policy_recommendation: unknown recommendation: %s",
+ recommendation,
+ )
+ return RECOMMENDATION_OK
+ return recommendation
+ except Exception as e:
+ logger.warning(
+ "get_pdu_policy_recommendation: server %s responded with error, assuming OK recommendation: %s",
+ destination,
+ e,
+ )
+ return RECOMMENDATION_OK
+
+ @trace
+ @tag_args
async def get_pdu(
self,
destinations: Collection[str],
@@ -809,7 +865,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
- auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]
+ auth_chain = parse_events_from_pdu_json(res["auth_chain"], room_version)
signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_chain, room_version=room_version
@@ -1529,9 +1585,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
- events = [
- event_from_pdu_json(e, room_version) for e in content.get("events", [])
- ]
+ events = parse_events_from_pdu_json(content.get("events", []), room_version)
signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, events, room_version=room_version
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 1932fa82a4..2f2c78babc 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -66,7 +66,7 @@ from synapse.federation.federation_base import (
event_from_pdu_json,
)
from synapse.federation.persistence import TransactionActions
-from synapse.federation.units import Edu, Transaction
+from synapse.federation.units import Edu, Transaction, serialize_and_filter_pdus
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
@@ -469,7 +469,12 @@ class FederationServer(FederationBase):
logger.info("Ignoring PDU: %s", e)
continue
- event = event_from_pdu_json(p, room_version)
+ try:
+ event = event_from_pdu_json(p, room_version)
+ except SynapseError as e:
+ logger.info("Ignoring PDU for failing to deserialize: %s", e)
+ continue
+
pdus_by_room.setdefault(room_id, []).append(event)
if event.origin_server_ts > newest_pdu_ts:
@@ -636,8 +641,8 @@ class FederationServer(FederationBase):
)
return {
- "pdus": [pdu.get_pdu_json() for pdu in pdus],
- "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
+ "pdus": serialize_and_filter_pdus(pdus),
+ "auth_chain": serialize_and_filter_pdus(auth_chain),
}
async def on_pdu_request(
@@ -696,6 +701,12 @@ class FederationServer(FederationBase):
pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
+ if await self._spam_checker_module_callbacks.should_drop_federated_event(pdu):
+ logger.info(
+ "Federated event contains spam, dropping %s",
+ pdu.event_id,
+ )
+ raise SynapseError(403, Codes.FORBIDDEN)
try:
pdu = await self._check_sigs_and_hash(room_version, pdu)
except InvalidEventSignatureError as e:
@@ -761,8 +772,8 @@ class FederationServer(FederationBase):
event_json = event.get_pdu_json(time_now)
resp = {
"event": event_json,
- "state": [p.get_pdu_json(time_now) for p in state_events],
- "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events],
+ "state": serialize_and_filter_pdus(state_events, time_now),
+ "auth_chain": serialize_and_filter_pdus(auth_chain_events, time_now),
"members_omitted": caller_supports_partial_state,
}
@@ -1005,7 +1016,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = await self.handler.on_event_auth(event_id)
- res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
+ res = {"auth_chain": serialize_and_filter_pdus(auth_pdus, time_now)}
return 200, res
async def on_query_client_keys(
@@ -1090,7 +1101,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
- return {"events": [ev.get_pdu_json(time_now) for ev in missing_events]}
+ return {"events": serialize_and_filter_pdus(missing_events, time_now)}
async def on_openid_userinfo(self, token: str) -> Optional[str]:
ts_now_ms = self._clock.time_msec()
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 0bfde00315..8340b48503 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -20,7 +20,7 @@
#
#
-""" This module contains all the persistence actions done by the federation
+"""This module contains all the persistence actions done by the federation
package.
These actions are mostly only used by the :py:mod:`.replication` module.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 1888480881..2eef7b707d 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -139,14 +139,13 @@ from typing import (
Hashable,
Iterable,
List,
+ Literal,
Optional,
- Set,
Tuple,
)
import attr
from prometheus_client import Counter
-from typing_extensions import Literal
from twisted.internet import defer
@@ -170,7 +169,13 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
+from synapse.types import (
+ JsonDict,
+ ReadReceipt,
+ RoomStreamToken,
+ StrCollection,
+ get_domain_from_id,
+)
from synapse.util import Clock
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter
@@ -297,12 +302,10 @@ class _DestinationWakeupQueue:
# being woken up.
_MAX_TIME_IN_QUEUE = 30.0
- # The maximum duration in seconds between waking up consecutive destination
- # queues.
- _MAX_DELAY = 0.1
-
sender: "FederationSender" = attr.ib()
clock: Clock = attr.ib()
+ max_delay_s: int = attr.ib()
+
queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
processing: bool = attr.ib(default=False)
@@ -332,13 +335,15 @@ class _DestinationWakeupQueue:
# We also add an upper bound to the delay, to gracefully handle the
# case where the queue only has a few entries in it.
current_sleep_seconds = min(
- self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
+ self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue)
)
while self.queue:
destination, _ = self.queue.popitem(last=False)
queue = self.sender._get_per_destination_queue(destination)
+ if queue is None:
+ continue
if not queue._new_data_to_send:
# The per destination queue has already been woken up.
@@ -416,19 +421,14 @@ class FederationSender(AbstractFederationSender):
self._is_processing = False
self._last_poked_id = -1
- # map from room_id to a set of PerDestinationQueues which we believe are
- # awaiting a call to flush_read_receipts_for_room. The presence of an entry
- # here for a given room means that we are rate-limiting RR flushes to that room,
- # and that there is a pending call to _flush_rrs_for_room in the system.
- self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
+ self._external_cache = hs.get_external_cache()
- self._rr_txn_interval_per_room_ms = (
- 1000.0
- / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
+ rr_txn_interval_per_room_s = (
+ 1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
+ )
+ self._destination_wakeup_queue = _DestinationWakeupQueue(
+ self, self.clock, max_delay_s=rr_txn_interval_per_room_s
)
-
- self._external_cache = hs.get_external_cache()
- self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call_now(
@@ -438,12 +438,23 @@ class FederationSender(AbstractFederationSender):
self._wake_destinations_needing_catchup,
)
- def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
+ def _get_per_destination_queue(
+ self, destination: str
+ ) -> Optional[PerDestinationQueue]:
"""Get or create a PerDestinationQueue for the given destination
Args:
destination: server_name of remote server
+
+ Returns:
+ None if the destination is not allowed by the federation whitelist.
+ Otherwise a PerDestinationQueue for this destination.
"""
+ if not self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
+ destination
+ ):
+ return None
+
queue = self._per_destination_queues.get(destination)
if not queue:
queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
@@ -720,6 +731,16 @@ class FederationSender(AbstractFederationSender):
# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
+ # Filter out any destinations not present in the federation_domain_whitelist, if
+ # the whitelist exists. These destinations should not be sent to so let's not
+ # waste time or space keeping track of events destined for them.
+ destinations = [
+ d
+ for d in destinations
+ if self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
+ d
+ )
+ ]
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
@@ -734,7 +755,12 @@ class FederationSender(AbstractFederationSender):
)
for destination in destinations:
- self._get_per_destination_queue(destination).send_pdu(pdu)
+ queue = self._get_per_destination_queue(destination)
+ # We expect `queue` to not be None as we already filtered out
+ # non-whitelisted destinations above.
+ assert queue is not None
+
+ queue.send_pdu(pdu)
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
@@ -745,37 +771,48 @@ class FederationSender(AbstractFederationSender):
# Some background on the rate-limiting going on here.
#
- # It turns out that if we attempt to send out RRs as soon as we get them from
- # a client, then we end up trying to do several hundred Hz of federation
- # transactions. (The number of transactions scales as O(N^2) on the size of a
- # room, since in a large room we have both more RRs coming in, and more servers
- # to send them to.)
+ # It turns out that if we attempt to send out RRs as soon as we get them
+ # from a client, then we end up trying to do several hundred Hz of
+ # federation transactions. (The number of transactions scales as O(N^2)
+ # on the size of a room, since in a large room we have both more RRs
+ # coming in, and more servers to send them to.)
#
- # This leads to a lot of CPU load, and we end up getting behind. The solution
- # currently adopted is as follows:
+ # This leads to a lot of CPU load, and we end up getting behind. The
+ # solution currently adopted is to differentiate between receipts and
+ # destinations we should immediately send to, and those we can trickle
+ # the receipts to.
#
- # The first receipt in a given room is sent out immediately, at time T0. Any
- # further receipts are, in theory, batched up for N seconds, where N is calculated
- # based on the number of servers in the room to achieve a transaction frequency
- # of around 50Hz. So, for example, if there were 100 servers in the room, then
- # N would be 100 / 50Hz = 2 seconds.
+ # The current logic is to send receipts out immediately if:
+ # - the room is "small", i.e. there's only N servers to send receipts
+ # to, and so sending out the receipts immediately doesn't cause too
+ # much load; or
+ # - the receipt is for an event that happened recently, as users
+ # notice if receipts are delayed when they know other users are
+ # currently reading the room; or
+ # - the receipt is being sent to the server that sent the event, so
+ # that users see receipts for their own receipts quickly.
#
- # Then, after T+N, we flush out any receipts that have accumulated, and restart
- # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
- # we stop the cycle and go back to the start.
+ # For destinations that we should delay sending the receipt to, we queue
+ # the receipts up to be sent in the next transaction, but don't trigger
+ # a new transaction to be sent. We then add the destination to the
+ # `DestinationWakeupQueue`, which will slowly iterate over each
+ # destination and trigger a new transaction to be sent.
#
- # However, in practice, it is often possible to flush out receipts earlier: in
- # particular, if we are sending a transaction to a given server anyway (for
- # example, because we have a PDU or a RR in another room to send), then we may
- # as well send out all of the pending RRs for that server. So it may be that
- # by the time we get to T+N, we don't actually have any RRs left to send out.
- # Nevertheless we continue to buffer up RRs for the room in question until we
- # reach the point that no RRs arrive between timer ticks.
+ # However, in practice, it is often possible to send out delayed
+ # receipts earlier: in particular, if we are sending a transaction to a
+ # given server anyway (for example, because we have a PDU or a RR in
+ # another room to send), then we may as well send out all of the pending
+ # RRs for that server. So it may be that by the time we get to waking up
+ # the destination, we don't actually have any RRs left to send out.
#
- # For even more background, see https://github.com/matrix-org/synapse/issues/4730.
+ # For even more background, see
+ # https://github.com/matrix-org/synapse/issues/4730.
room_id = receipt.room_id
+ # Local read receipts always have 1 event ID.
+ event_id = receipt.event_ids[0]
+
# Work out which remote servers should be poked and poke them.
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
@@ -797,49 +834,55 @@ class FederationSender(AbstractFederationSender):
if not domains:
return
- queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
+ # We now split which domains we want to wake up immediately vs which we
+ # want to delay waking up.
+ immediate_domains: StrCollection
+ delay_domains: StrCollection
- # if there is no flush yet scheduled, we will send out these receipts with
- # immediate flushes, and schedule the next flush for this room.
- if queues_pending_flush is not None:
- logger.debug("Queuing receipt for: %r", domains)
+ if len(domains) < 10:
+ # For "small" rooms send to all domains immediately
+ immediate_domains = domains
+ delay_domains = ()
else:
- logger.debug("Sending receipt to: %r", domains)
- self._schedule_rr_flush_for_room(room_id, len(domains))
+ metadata = await self.store.get_metadata_for_event(
+ receipt.room_id, event_id
+ )
+ assert metadata is not None
- for domain in domains:
- queue = self._get_per_destination_queue(domain)
- queue.queue_read_receipt(receipt)
+ sender_domain = get_domain_from_id(metadata.sender)
- # if there is already a RR flush pending for this room, then make sure this
- # destination is registered for the flush
- if queues_pending_flush is not None:
- queues_pending_flush.add(queue)
+ if self.clock.time_msec() - metadata.received_ts < 60_000:
+ # We always send receipts for recent messages immediately
+ immediate_domains = domains
+ delay_domains = ()
else:
- queue.flush_read_receipts_for_room(room_id)
-
- def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
- # that is going to cause approximately len(domains) transactions, so now back
- # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
- backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
-
- logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
- self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
- self._queues_awaiting_rr_flush_by_room[room_id] = set()
-
- def _flush_rrs_for_room(self, room_id: str) -> None:
- queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
- logger.debug("Flushing RRs in %s to %s", room_id, queues)
-
- if not queues:
- # no more RRs arrived for this room; we are done.
- return
+ # Otherwise, we delay waking up all destinations except for the
+ # sender's domain.
+ immediate_domains = []
+ delay_domains = []
+ for domain in domains:
+ if domain == sender_domain:
+ immediate_domains.append(domain)
+ else:
+ delay_domains.append(domain)
+
+ for domain in immediate_domains:
+ # Add to destination queue and wake the destination up
+ queue = self._get_per_destination_queue(domain)
+ if queue is None:
+ continue
+ queue.queue_read_receipt(receipt)
+ queue.attempt_new_transaction()
- # schedule the next flush
- self._schedule_rr_flush_for_room(room_id, len(queues))
+ for domain in delay_domains:
+ # Add to destination queue...
+ queue = self._get_per_destination_queue(domain)
+ if queue is None:
+ continue
+ queue.queue_read_receipt(receipt)
- for queue in queues:
- queue.flush_read_receipts_for_room(room_id)
+ # ... and schedule the destination to be woken up.
+ self._destination_wakeup_queue.add_to_queue(domain)
async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
@@ -871,9 +914,10 @@ class FederationSender(AbstractFederationSender):
if self.is_mine_server_name(destination):
continue
- self._get_per_destination_queue(destination).send_presence(
- states, start_loop=False
- )
+ queue = self._get_per_destination_queue(destination)
+ if queue is None:
+ continue
+ queue.send_presence(states, start_loop=False)
self._destination_wakeup_queue.add_to_queue(destination)
@@ -923,6 +967,8 @@ class FederationSender(AbstractFederationSender):
return
queue = self._get_per_destination_queue(edu.destination)
+ if queue is None:
+ return
if key:
queue.send_keyed_edu(edu, key)
else:
@@ -947,9 +993,15 @@ class FederationSender(AbstractFederationSender):
for destination in destinations:
if immediate:
- self._get_per_destination_queue(destination).attempt_new_transaction()
+ queue = self._get_per_destination_queue(destination)
+ if queue is None:
+ continue
+ queue.attempt_new_transaction()
else:
- self._get_per_destination_queue(destination).mark_new_data()
+ queue = self._get_per_destination_queue(destination)
+ if queue is None:
+ continue
+ queue.mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)
def wake_destination(self, destination: str) -> None:
@@ -968,7 +1020,9 @@ class FederationSender(AbstractFederationSender):
):
return
- self._get_per_destination_queue(destination).attempt_new_transaction()
+ queue = self._get_per_destination_queue(destination)
+ if queue is not None:
+ queue.attempt_new_transaction()
@staticmethod
def get_current_token() -> int:
@@ -1013,6 +1067,9 @@ class FederationSender(AbstractFederationSender):
d
for d in destinations_to_wake
if self._federation_shard_config.should_handle(self._instance_name, d)
+ and self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
+ d
+ )
]
for destination in destinations_to_wake:
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d097e65ea7..b3f65e8237 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -156,7 +156,6 @@ class PerDestinationQueue:
# Each receipt can only have a single receipt per
# (room ID, receipt type, user ID, thread ID) tuple.
self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
- self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
@@ -258,15 +257,7 @@ class PerDestinationQueue:
}
)
- def flush_read_receipts_for_room(self, room_id: str) -> None:
- # If there are any pending receipts for this room then force-flush them
- # in a new transaction.
- for edu in self._pending_receipt_edus:
- if room_id in edu:
- self._rrs_pending_flush = True
- self.attempt_new_transaction()
- # No use in checking remaining EDUs if the room was found.
- break
+ self.mark_new_data()
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
@@ -603,12 +594,9 @@ class PerDestinationQueue:
self._destination, last_successful_stream_ordering
)
- def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
+ def _get_receipt_edus(self, limit: int) -> Iterable[Edu]:
if not self._pending_receipt_edus:
return
- if not force_flush and not self._rrs_pending_flush:
- # not yet time for this lot
- return
# Send at most limit EDUs for receipts.
for content in self._pending_receipt_edus[:limit]:
@@ -747,7 +735,7 @@ class _TransactionQueueManager:
)
# Add read receipt EDUs.
- pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
+ pending_edus.extend(self.queue._get_receipt_edus(limit=5))
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
# Next, prioritize to-device messages so that existing encryption channels
@@ -795,13 +783,6 @@ class _TransactionQueueManager:
if not self._pdus and not pending_edus:
return [], []
- # if we've decided to send a transaction anyway, and we have room, we
- # may as well send any pending RRs
- if edu_limit:
- pending_edus.extend(
- self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
- )
-
if self._pdus:
self._last_stream_ordering = self._pdus[
-1
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 206e91ed14..62bf96ce91 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -143,6 +143,33 @@ class TransportLayerClient:
destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
)
+ async def get_policy_recommendation_for_pdu(
+ self, destination: str, event: EventBase, timeout: Optional[int] = None
+ ) -> JsonDict:
+ """Requests the policy recommendation for the given pdu from the given policy server.
+
+ Args:
+ destination: The host name of the remote homeserver checking the event.
+ event: The event to check.
+ timeout: How long to try (in ms) the destination for before giving up.
+ None indicates no timeout.
+
+ Returns:
+ The full recommendation object from the remote server.
+ """
+ logger.debug(
+ "get_policy_recommendation_for_pdu dest=%s, event_id=%s",
+ destination,
+ event.event_id,
+ )
+ return await self.client.post_json(
+ destination=destination,
+ path=f"/_matrix/policy/unstable/org.matrix.msc4284/event/{event.event_id}/check",
+ data=event.get_pdu_json(),
+ ignore_backoff=True,
+ timeout=timeout,
+ )
+
async def backfill(
self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
) -> Optional[Union[JsonDict, list]]:
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 43102567db..174d02ab6b 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -20,9 +20,7 @@
#
#
import logging
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type
-
-from typing_extensions import Literal
+from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Tuple, Type
from synapse.api.errors import FederationDeniedError, SynapseError
from synapse.federation.transport.server._base import (
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 9094201da0..cba309635b 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -113,7 +113,7 @@ class Authenticator:
):
raise AuthenticationError(
HTTPStatus.UNAUTHORIZED,
- "Destination mismatch in auth header",
+ f"Destination mismatch in auth header, received: {destination!r}",
Codes.UNAUTHORIZED,
)
if (
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 20f87c885e..eb96ff27f9 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -24,6 +24,7 @@ from typing import (
TYPE_CHECKING,
Dict,
List,
+ Literal,
Mapping,
Optional,
Sequence,
@@ -32,8 +33,6 @@ from typing import (
Union,
)
-from typing_extensions import Literal
-
from synapse.api.constants import Direction, EduTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
@@ -509,6 +508,9 @@ class FederationV2InviteServlet(BaseFederationServerServlet):
event = content["event"]
invite_room_state = content.get("invite_room_state", [])
+ if not isinstance(invite_room_state, list):
+ invite_room_state = []
+
# Synapse expects invite_room_state to be in unsigned, as it is in v1
# API
@@ -859,7 +861,6 @@ class FederationMediaThumbnailServlet(BaseFederationServerServlet):
request: SynapseRequest,
media_id: str,
) -> None:
-
width = parse_integer(request, "width", required=True)
height = parse_integer(request, "height", required=True)
method = parse_string(request, "method", "scale")
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index b2c8ba5887..3bb5f824b7 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -19,15 +19,17 @@
#
#
-""" Defines the JSON structure of the protocol units used by the server to
+"""Defines the JSON structure of the protocol units used by the server to
server protocol.
"""
import logging
-from typing import List, Optional
+from typing import List, Optional, Sequence
import attr
+from synapse.api.constants import CANONICALJSON_MAX_INT, CANONICALJSON_MIN_INT
+from synapse.events import EventBase
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -104,8 +106,28 @@ class Transaction:
result = {
"origin": self.origin,
"origin_server_ts": self.origin_server_ts,
- "pdus": self.pdus,
+ "pdus": filter_pdus_for_valid_depth(self.pdus),
}
if self.edus:
result["edus"] = self.edus
return result
+
+
+def filter_pdus_for_valid_depth(pdus: Sequence[JsonDict]) -> List[JsonDict]:
+ filtered_pdus = []
+ for pdu in pdus:
+ # Drop PDUs that have a depth that is outside of the range allowed
+ # by canonical json.
+ if (
+ "depth" in pdu
+ and CANONICALJSON_MIN_INT <= pdu["depth"] <= CANONICALJSON_MAX_INT
+ ):
+ filtered_pdus.append(pdu)
+
+ return filtered_pdus
+
+
+def serialize_and_filter_pdus(
+ pdus: Sequence[EventBase], time_now: Optional[int] = None
+) -> List[JsonDict]:
+ return filter_pdus_for_valid_depth([pdu.get_pdu_json(time_now) for pdu in pdus])
|