diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 2522bf78fc..6bd4742140 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Awaitable, Callable, Optional
from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
@@ -23,6 +23,7 @@ from synapse.crypto.keyring import Keyring
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
from synapse.http.servlet import assert_params_in_dict
+from synapse.logging.opentracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id
if TYPE_CHECKING:
@@ -55,8 +56,14 @@ class FederationBase:
self._clock = hs.get_clock()
self._storage_controllers = hs.get_storage_controllers()
+ @trace
async def _check_sigs_and_hash(
- self, room_version: RoomVersion, pdu: EventBase
+ self,
+ room_version: RoomVersion,
+ pdu: EventBase,
+ record_failure_callback: Optional[
+ Callable[[EventBase, str], Awaitable[None]]
+ ] = None,
) -> EventBase:
"""Checks that event is correctly signed by the sending server.
@@ -68,6 +75,11 @@ class FederationBase:
Args:
room_version: The room version of the PDU
pdu: the event to be checked
+ record_failure_callback: A callback to run whenever the given event
+ fails signature or hash checks. This includes exceptions
+ that would be normally be thrown/raised but also things like
+ checking for event tampering where we just return the redacted
+ event.
Returns:
* the original event if the checks pass
@@ -78,7 +90,12 @@ class FederationBase:
InvalidEventSignatureError if the signature check failed. Nothing
will be logged in this case.
"""
- await _check_sigs_on_pdu(self.keyring, room_version, pdu)
+ try:
+ await _check_sigs_on_pdu(self.keyring, room_version, pdu)
+ except InvalidEventSignatureError as exc:
+ if record_failure_callback:
+ await record_failure_callback(pdu, str(exc))
+ raise exc
if not check_event_content_hash(pdu):
# let's try to distinguish between failures because the event was
@@ -97,17 +114,40 @@ class FederationBase:
"Event %s seems to have been redacted; using our redacted copy",
pdu.event_id,
)
+ log_kv(
+ {
+ "message": "Event seems to have been redacted; using our redacted copy",
+ "event_id": pdu.event_id,
+ }
+ )
else:
logger.warning(
"Event %s content has been tampered, redacting",
pdu.event_id,
)
+ log_kv(
+ {
+ "message": "Event content has been tampered, redacting",
+ "event_id": pdu.event_id,
+ }
+ )
+ if record_failure_callback:
+ await record_failure_callback(
+ pdu, "Event content has been tampered with"
+ )
return redacted_event
spam_check = await self.spam_checker.check_event_for_spam(pdu)
if spam_check != self.spam_checker.NOT_SPAM:
logger.warning("Event contains spam, soft-failing %s", pdu.event_id)
+ log_kv(
+ {
+ "message": "Event contains spam, redacting (to save disk space) "
+ "as well as soft-failing (to stop using the event in prev_events)",
+ "event_id": pdu.event_id,
+ }
+ )
# we redact (to save disk space) as well as soft-failing (to stop
# using the event in prev_events).
redacted_event = prune_event(pdu)
@@ -117,6 +157,7 @@ class FederationBase:
return pdu
+@trace
async def _check_sigs_on_pdu(
keyring: Keyring, room_version: RoomVersion, pdu: EventBase
) -> None:
@@ -172,7 +213,7 @@ async def _check_sigs_on_pdu(
# event id's domain (normally only the case for joins/leaves), and add additional
# checks. Only do this if the room version has a concept of event ID domain
# (ie, the room version uses old-style non-hash event IDs).
- if room_version.event_format == EventFormatVersions.V1:
+ if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
event_domain = get_domain_from_id(pdu.event_id)
if event_domain != sender_domain:
try:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 6a8d76529b..c4c0bc7315 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -61,6 +61,7 @@ from synapse.federation.federation_base import (
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
+from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -79,6 +80,18 @@ PDU_RETRY_TIME_MS = 1 * 60 * 1000
T = TypeVar("T")
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class PulledPduInfo:
+ """
+ A result object that stores the PDU and info about it like which homeserver we
+ pulled it from (`pull_origin`)
+ """
+
+ pdu: EventBase
+ # Which homeserver we pulled the PDU from
+ pull_origin: str
+
+
class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response
we couldn't parse
@@ -113,7 +126,9 @@ class FederationClient(FederationBase):
self.hostname = hs.hostname
self.signing_key = hs.signing_key
- self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache(
+ # Cache mapping `event_id` to a tuple of the event itself and the `pull_origin`
+ # (which server we pulled the event from)
+ self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
max_len=1000,
@@ -233,6 +248,8 @@ class FederationClient(FederationBase):
destination, content, timeout
)
+ @trace
+ @tag_args
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> Optional[List[EventBase]]:
@@ -275,7 +292,7 @@ class FederationClient(FederationBase):
pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]
# Check signatures and hash of pdus, removing any from the list that fail checks
- pdus[:] = await self._check_sigs_and_hash_and_fetch(
+ pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
dest, pdus, room_version=room_version
)
@@ -325,7 +342,17 @@ class FederationClient(FederationBase):
# Check signatures are correct.
try:
- signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
+
+ async def _record_failure_callback(
+ event: EventBase, cause: str
+ ) -> None:
+ await self.store.record_event_failed_pull_attempt(
+ event.room_id, event.event_id, cause
+ )
+
+ signed_pdu = await self._check_sigs_and_hash(
+ room_version, pdu, _record_failure_callback
+ )
except InvalidEventSignatureError as e:
errmsg = f"event id {pdu.event_id}: {e}"
logger.warning("%s", errmsg)
@@ -335,13 +362,15 @@ class FederationClient(FederationBase):
return None
+ @trace
+ @tag_args
async def get_pdu(
self,
- destinations: Iterable[str],
+ destinations: Collection[str],
event_id: str,
room_version: RoomVersion,
timeout: Optional[int] = None,
- ) -> Optional[EventBase]:
+ ) -> Optional[PulledPduInfo]:
"""Requests the PDU with given origin and ID from the remote home
servers.
@@ -356,11 +385,11 @@ class FederationClient(FederationBase):
moving to the next destination. None indicates no timeout.
Returns:
- The requested PDU, or None if we were unable to find it.
+ The requested PDU wrapped in `PulledPduInfo`, or None if we were unable to find it.
"""
logger.debug(
- "get_pdu: event_id=%s from destinations=%s", event_id, destinations
+ "get_pdu(event_id=%s): from destinations=%s", event_id, destinations
)
# TODO: Rate limit the number of times we try and get the same event.
@@ -369,19 +398,25 @@ class FederationClient(FederationBase):
# it gets persisted to the database), so we cache the results of the lookup.
# Note that this is separate to the regular get_event cache which caches
# events once they have been persisted.
- event = self._get_pdu_cache.get(event_id)
+ get_pdu_cache_entry = self._get_pdu_cache.get(event_id)
+ event = None
+ pull_origin = None
+ if get_pdu_cache_entry:
+ event, pull_origin = get_pdu_cache_entry
# If we don't see the event in the cache, go try to fetch it from the
# provided remote federated destinations
- if not event:
+ else:
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+ # TODO: We can probably refactor this to use `_try_destination_list`
for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
logger.debug(
- "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
+ "get_pdu(event_id=%s): skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
+ event_id,
destination,
last_attempt,
PDU_RETRY_TIME_MS,
@@ -396,43 +431,48 @@ class FederationClient(FederationBase):
room_version=room_version,
timeout=timeout,
)
+ pull_origin = destination
pdu_attempts[destination] = now
if event:
# Prime the cache
- self._get_pdu_cache[event.event_id] = event
+ self._get_pdu_cache[event.event_id] = (event, pull_origin)
# Now that we have an event, we can break out of this
# loop and stop asking other destinations.
break
+ except NotRetryingDestination as e:
+ logger.info("get_pdu(event_id=%s): %s", event_id, e)
+ continue
+ except FederationDeniedError:
+ logger.info(
+ "get_pdu(event_id=%s): Not attempting to fetch PDU from %s because the homeserver is not on our federation whitelist",
+ event_id,
+ destination,
+ )
+ continue
except SynapseError as e:
logger.info(
- "Failed to get PDU %s from %s because %s",
+ "get_pdu(event_id=%s): Failed to get PDU from %s because %s",
event_id,
destination,
e,
)
continue
- except NotRetryingDestination as e:
- logger.info(str(e))
- continue
- except FederationDeniedError as e:
- logger.info(str(e))
- continue
except Exception as e:
pdu_attempts[destination] = now
logger.info(
- "Failed to get PDU %s from %s because %s",
+ "get_pdu(event_id=%s): Failed to get PDU from %s because %s",
event_id,
destination,
e,
)
continue
- if not event:
+ if not event or not pull_origin:
return None
# `event` now refers to an object stored in `get_pdu_cache`. Our
@@ -444,8 +484,10 @@ class FederationClient(FederationBase):
event.room_version,
)
- return event_copy
+ return PulledPduInfo(event_copy, pull_origin)
+ @trace
+ @tag_args
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
) -> Tuple[List[str], List[str]]:
@@ -465,6 +507,23 @@ class FederationClient(FederationBase):
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "state_event_ids",
+ str(state_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "state_event_ids.length",
+ str(len(state_event_ids)),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "auth_event_ids",
+ str(auth_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "auth_event_ids.length",
+ str(len(auth_event_ids)),
+ )
+
if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
@@ -472,6 +531,8 @@ class FederationClient(FederationBase):
return state_event_ids, auth_event_ids
+ @trace
+ @tag_args
async def get_room_state(
self,
destination: str,
@@ -521,23 +582,28 @@ class FederationClient(FederationBase):
len(auth_event_map),
)
- valid_auth_events = await self._check_sigs_and_hash_and_fetch(
+ valid_auth_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_event_map.values(), room_version
)
- valid_state_events = await self._check_sigs_and_hash_and_fetch(
- destination, state_event_map.values(), room_version
+ valid_state_events = (
+ await self._check_sigs_and_hash_for_pulled_events_and_fetch(
+ destination, state_event_map.values(), room_version
+ )
)
return valid_state_events, valid_auth_events
- async def _check_sigs_and_hash_and_fetch(
+ @trace
+ async def _check_sigs_and_hash_for_pulled_events_and_fetch(
self,
origin: str,
pdus: Collection[EventBase],
room_version: RoomVersion,
) -> List[EventBase]:
- """Checks the signatures and hashes of a list of events.
+ """
+ Checks the signatures and hashes of a list of pulled events we got from
+ federation and records any signature failures as failed pull attempts.
If a PDU fails its signature check then we check if we have it in
the database, and if not then request it from the sender's server (if that
@@ -560,17 +626,27 @@ class FederationClient(FederationBase):
Returns:
A list of PDUs that have valid signatures and hashes.
"""
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "pdus.length",
+ str(len(pdus)),
+ )
# We limit how many PDUs we check at once, as if we try to do hundreds
# of thousands of PDUs at once we see large memory spikes.
- valid_pdus = []
+ valid_pdus: List[EventBase] = []
+
+ async def _record_failure_callback(event: EventBase, cause: str) -> None:
+ await self.store.record_event_failed_pull_attempt(
+ event.room_id, event.event_id, cause
+ )
async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=pdu,
origin=origin,
room_version=room_version,
+ record_failure_callback=_record_failure_callback,
)
if valid_pdu:
@@ -580,11 +656,16 @@ class FederationClient(FederationBase):
return valid_pdus
+ @trace
+ @tag_args
async def _check_sigs_and_hash_and_fetch_one(
self,
pdu: EventBase,
origin: str,
room_version: RoomVersion,
+ record_failure_callback: Optional[
+ Callable[[EventBase, str], Awaitable[None]]
+ ] = None,
) -> Optional[EventBase]:
"""Takes a PDU and checks its signatures and hashes.
@@ -601,6 +682,11 @@ class FederationClient(FederationBase):
origin
pdu
room_version
+ record_failure_callback: A callback to run whenever the given event
+ fails signature or hash checks. This includes exceptions
+ that would be normally be thrown/raised but also things like
+ checking for event tampering where we just return the redacted
+ event.
Returns:
The PDU (possibly redacted) if it has valid signatures and hashes.
@@ -608,29 +694,44 @@ class FederationClient(FederationBase):
"""
try:
- return await self._check_sigs_and_hash(room_version, pdu)
+ return await self._check_sigs_and_hash(
+ room_version, pdu, record_failure_callback
+ )
except InvalidEventSignatureError as e:
logger.warning(
"Signature on retrieved event %s was invalid (%s). "
- "Checking local store/orgin server",
+ "Checking local store/origin server",
pdu.event_id,
e,
)
+ log_kv(
+ {
+ "message": "Signature on retrieved event was invalid. "
+ "Checking local store/origin server",
+ "event_id": pdu.event_id,
+ "InvalidEventSignatureError": e,
+ }
+ )
# Check local db.
res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True
)
+ # If the PDU fails its signature check and we don't have it in our
+ # database, we then request it from sender's server (if that is not the
+ # same as `origin`).
pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin:
try:
- res = await self.get_pdu(
+ pulled_pdu_info = await self.get_pdu(
destinations=[pdu_origin],
event_id=pdu.event_id,
room_version=room_version,
timeout=10000,
)
+ if pulled_pdu_info is not None:
+ res = pulled_pdu_info.pdu
except SynapseError:
pass
@@ -650,7 +751,7 @@ class FederationClient(FederationBase):
auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]
- signed_auth = await self._check_sigs_and_hash_and_fetch(
+ signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_chain, room_version=room_version
)
@@ -732,6 +833,7 @@ class FederationClient(FederationBase):
)
for destination in destinations:
+ # We don't want to ask our own server for information we don't have
if destination == self.server_name:
continue
@@ -740,9 +842,21 @@ class FederationClient(FederationBase):
except (
RequestSendFailed,
InvalidResponseError,
- NotRetryingDestination,
) as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
+ # Skip to the next homeserver in the list to try.
+ continue
+ except NotRetryingDestination as e:
+ logger.info("%s: %s", description, e)
+ continue
+ except FederationDeniedError:
+ logger.info(
+ "%s: Not attempting to %s from %s because the homeserver is not on our federation whitelist",
+ description,
+ description,
+ destination,
+ )
+ continue
except UnsupportedRoomVersionError:
raise
except HttpResponseException as e:
@@ -862,9 +976,6 @@ class FederationClient(FederationBase):
# The protoevent received over the JSON wire may not have all
# the required fields. Lets just gloss over that because
# there's some we never care about
- if "prev_state" not in pdu_dict:
- pdu_dict["prev_state"] = []
-
ev = builder.create_local_event_from_event_dict(
self._clock,
self.hostname,
@@ -1146,7 +1257,7 @@ class FederationClient(FederationBase):
# Otherwise, consider it a legitimate error and raise.
err = e.to_synapse_error()
if self._is_unknown_endpoint(e, err):
- if room_version.event_format != EventFormatVersions.V1:
+ if room_version.event_format != EventFormatVersions.ROOM_V1_V2:
raise SynapseError(
400,
"User's homeserver does not support this room version",
@@ -1223,7 +1334,7 @@ class FederationClient(FederationBase):
return resp[1]
async def send_knock(self, destinations: List[str], pdu: EventBase) -> JsonDict:
- """Attempts to send a knock event to given a list of servers. Iterates
+ """Attempts to send a knock event to a given list of servers. Iterates
through the list until one attempt succeeds.
Doing so will cause the remote server to add the event to the graph,
@@ -1360,7 +1471,7 @@ class FederationClient(FederationBase):
event_from_pdu_json(e, room_version) for e in content.get("events", [])
]
- signed_events = await self._check_sigs_and_hash_and_fetch(
+ signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, events, room_version=room_version
)
except HttpResponseException as e:
@@ -1538,6 +1649,54 @@ class FederationClient(FederationBase):
return result
async def timestamp_to_event(
+ self, *, destinations: List[str], room_id: str, timestamp: int, direction: str
+ ) -> Optional["TimestampToEventResponse"]:
+ """
+ Calls each remote federating server from `destinations` asking for their closest
+ event to the given timestamp in the given direction until we get a response.
+ Also validates the response to always return the expected keys or raises an
+ error.
+
+ Args:
+ destinations: The domains of homeservers to try fetching from
+ room_id: Room to fetch the event from
+ timestamp: The point in time (inclusive) we should navigate from in
+ the given direction to find the closest event.
+ direction: ["f"|"b"] to indicate whether we should navigate forward
+ or backward from the given timestamp to find the closest event.
+
+ Returns:
+ A parsed TimestampToEventResponse including the closest event_id
+ and origin_server_ts or None if no destination has a response.
+ """
+
+ async def _timestamp_to_event_from_destination(
+ destination: str,
+ ) -> TimestampToEventResponse:
+ return await self._timestamp_to_event_from_destination(
+ destination, room_id, timestamp, direction
+ )
+
+ try:
+ # Loop through each homeserver candidate until we get a succesful response
+ timestamp_to_event_response = await self._try_destination_list(
+ "timestamp_to_event",
+ destinations,
+ # TODO: The requested timestamp may lie in a part of the
+ # event graph that the remote server *also* didn't have,
+ # in which case they will have returned another event
+ # which may be nowhere near the requested timestamp. In
+ # the future, we may need to reconcile that gap and ask
+ # other homeservers, and/or extend `/timestamp_to_event`
+ # to return events on *both* sides of the timestamp to
+ # help reconcile the gap faster.
+ _timestamp_to_event_from_destination,
+ )
+ return timestamp_to_event_response
+ except SynapseError:
+ return None
+
+ async def _timestamp_to_event_from_destination(
self, destination: str, room_id: str, timestamp: int, direction: str
) -> "TimestampToEventResponse":
"""
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 1d60137411..bb20af6e91 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -61,7 +61,12 @@ from synapse.logging.context import (
nested_logging_context,
run_in_background,
)
-from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
+from synapse.logging.opentracing import (
+ log_kv,
+ start_active_span_from_edu,
+ tag_args,
+ trace,
+)
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -69,6 +74,8 @@ from synapse.replication.http.federation import (
)
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.lock import Lock
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.roommember import MemberSummary
from synapse.types import JsonDict, StateMap, get_domain_from_id
from synapse.util import json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
@@ -476,6 +483,14 @@ class FederationServer(FederationBase):
pdu_results[pdu.event_id] = await process_pdu(pdu)
async def process_pdu(pdu: EventBase) -> JsonDict:
+ """
+ Processes a pushed PDU sent to us via a `/send` transaction
+
+ Returns:
+ JsonDict representing a "PDU Processing Result" that will be bundled up
+ with the other processed PDU's in the `/send` transaction and sent back
+ to remote homeserver.
+ """
event_id = pdu.event_id
with nested_logging_context(event_id):
try:
@@ -525,13 +540,10 @@ class FederationServer(FederationBase):
async def on_room_state_request(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, JsonDict]:
+ await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
# we grab the linearizer to protect ourselves from servers which hammer
# us. In theory we might already have the response to this query
# in the cache so we could return it without waiting for the linearizer
@@ -547,19 +559,18 @@ class FederationServer(FederationBase):
return 200, resp
+ @trace
+ @tag_args
async def on_state_ids_request(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, JsonDict]:
if not event_id:
raise NotImplementedError("Specify an event")
+ await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
resp = await self._state_ids_resp_cache.wrap(
(room_id, event_id),
self._on_state_ids_request_compute,
@@ -569,6 +580,8 @@ class FederationServer(FederationBase):
return 200, resp
+ @trace
+ @tag_args
async def _on_state_ids_request_compute(
self, room_id: str, event_id: str
) -> JsonDict:
@@ -680,8 +693,9 @@ class FederationServer(FederationBase):
state_event_ids: Collection[str]
servers_in_room: Optional[Collection[str]]
if caller_supports_partial_state:
+ summary = await self.store.get_room_summary(room_id)
state_event_ids = _get_event_ids_for_partial_state_join(
- event, prev_state_ids
+ event, prev_state_ids, summary
)
servers_in_room = await self.state.get_hosts_in_room_at_events(
room_id, event_ids=event.prev_event_ids()
@@ -754,6 +768,17 @@ class FederationServer(FederationBase):
The partial knock event.
"""
origin_host, _ = parse_server_name(origin)
+
+ if await self.store.is_partial_state_room(room_id):
+ # Before we do anything: check if the room is partial-stated.
+ # Note that at the time this check was added, `on_make_knock_request` would
+ # block due to https://github.com/matrix-org/synapse/issues/12997.
+ raise SynapseError(
+ 404,
+ "Unable to handle /make_knock right now; this server is not fully joined.",
+ errcode=Codes.NOT_FOUND,
+ )
+
await self.check_server_matches_acl(origin_host, room_id)
room_version = await self.store.get_room_version(room_id)
@@ -810,7 +835,14 @@ class FederationServer(FederationBase):
context, self._room_prejoin_state_types
)
)
- return {"knock_state_events": stripped_room_state}
+ return {
+ "knock_room_state": stripped_room_state,
+ # Since v1.37, Synapse incorrectly used "knock_state_events" for this field.
+ # Thus, we also populate a 'knock_state_events' with the same content to
+ # support old instances.
+ # See https://github.com/matrix-org/synapse/issues/14088.
+ "knock_state_events": stripped_room_state,
+ }
async def _on_send_membership_event(
self, origin: str, content: JsonDict, membership_type: str, room_id: str
@@ -843,8 +875,25 @@ class FederationServer(FederationBase):
Codes.BAD_JSON,
)
+ # Note that get_room_version throws if the room does not exist here.
room_version = await self.store.get_room_version(room_id)
+ if await self.store.is_partial_state_room(room_id):
+ # If our server is still only partially joined, we can't give a complete
+ # response to /send_join, /send_knock or /send_leave.
+ # This is because we will not be able to provide the server list (for partial
+ # joins) or the full state (for full joins).
+ # Return a 404 as we would if we weren't in the room at all.
+ logger.info(
+ f"Rejecting /send_{membership_type} to %s because it's a partial state room",
+ room_id,
+ )
+ raise SynapseError(
+ 404,
+ f"Unable to handle /send_{membership_type} right now; this server is not fully joined.",
+ errcode=Codes.NOT_FOUND,
+ )
+
if membership_type == Membership.KNOCK and not room_version.msc2403_knocking:
raise SynapseError(
403,
@@ -918,6 +967,7 @@ class FederationServer(FederationBase):
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, Dict[str, Any]]:
async with self._server_linearizer.queue((origin, room_id)):
+ await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
@@ -1448,6 +1498,7 @@ class FederationHandlerRegistry:
def _get_event_ids_for_partial_state_join(
join_event: EventBase,
prev_state_ids: StateMap[str],
+ summary: Dict[str, MemberSummary],
) -> Collection[str]:
"""Calculate state to be retuned in a partial_state send_join
@@ -1474,8 +1525,19 @@ def _get_event_ids_for_partial_state_join(
if current_membership_event_id is not None:
state_event_ids.add(current_membership_event_id)
- # TODO: return a few more members:
- # - those with invites
- # - those that are kicked? / banned
+ name_id = prev_state_ids.get((EventTypes.Name, ""))
+ canonical_alias_id = prev_state_ids.get((EventTypes.CanonicalAlias, ""))
+ if not name_id and not canonical_alias_id:
+ # Also include the hero members of the room (for DM rooms without a title).
+ # To do this properly, we should select the correct subset of membership events
+ # from `prev_state_ids`. Instead, we are lazier and use the (cached)
+ # `get_room_summary` function, which is based on the current state of the room.
+ # This introduces races; we choose to ignore them because a) they should be rare
+ # and b) even if it's wrong, joining servers will get the full state eventually.
+ heroes = extract_heroes_from_room_summary(summary, join_event.state_key)
+ for hero in heroes:
+ membership_event_id = prev_state_ids.get((EventTypes.Member, hero))
+ if membership_event_id:
+ state_event_ids.add(membership_event_id)
return state_event_ids
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 94a65ac65f..fc1d8c88a7 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -62,12 +62,12 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
- "synapse_federation_client_sent_pdu_destinations:count",
+ "synapse_federation_client_sent_pdu_destinations_count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
- "synapse_federation_client_sent_pdu_destinations:total",
+ "synapse_federation_client_sent_pdu_destinations",
"Total number of PDUs queued for sending across all destinations",
)
@@ -353,21 +353,25 @@ class FederationSender(AbstractFederationSender):
last_token = await self.store.get_federation_out_pos("events")
(
next_token,
- events,
event_to_received_ts,
- ) = await self.store.get_all_new_events_stream(
+ ) = await self.store.get_all_new_event_ids_stream(
last_token, self._last_poked_id, limit=100
)
+ event_ids = event_to_received_ts.keys()
+ event_entries = await self.store.get_unredacted_events_from_cache_or_db(
+ event_ids
+ )
+
logger.debug(
"Handling %i -> %i: %i events to send (current id %i)",
last_token,
next_token,
- len(events),
+ len(event_entries),
self._last_poked_id,
)
- if not events and next_token >= self._last_poked_id:
+ if not event_entries and next_token >= self._last_poked_id:
logger.debug("All events processed")
break
@@ -430,7 +434,23 @@ class FederationSender(AbstractFederationSender):
# If there are no prev event IDs then the state is empty
# and so no remote servers in the room
destinations = set()
- else:
+
+ if destinations is None:
+ # During partial join we use the set of servers that we got
+ # when beginning the join. It's still possible that we send
+ # events to servers that left the room in the meantime, but
+ # we consider that an acceptable risk since it is only our own
+ # events that we leak and not other server's ones.
+ partial_state_destinations = (
+ await self.store.get_partial_state_servers_at_join(
+ event.room_id
+ )
+ )
+
+ if len(partial_state_destinations) > 0:
+ destinations = partial_state_destinations
+
+ if destinations is None:
# We check the external cache for the destinations, which is
# stored per state group.
@@ -441,6 +461,19 @@ class FederationSender(AbstractFederationSender):
destinations = await self._external_cache.get(
"get_joined_hosts", str(sg)
)
+ if destinations is None:
+ # Add logging to help track down #13444
+ logger.info(
+ "Unexpectedly did not have cached destinations for %s / %s",
+ sg,
+ event.event_id,
+ )
+ else:
+ # Add logging to help track down #13444
+ logger.info(
+ "Unexpectedly did not have cached prev group for %s",
+ event.event_id,
+ )
if destinations is None:
try:
@@ -495,8 +528,14 @@ class FederationSender(AbstractFederationSender):
await handle_event(event)
events_by_room: Dict[str, List[EventBase]] = {}
- for event in events:
- events_by_room.setdefault(event.room_id, []).append(event)
+
+ for event_id in event_ids:
+ # `event_entries` is unsorted, so we have to iterate over `event_ids`
+ # to ensure the events are in the right order
+ event_cache = event_entries.get(event_id)
+ if event_cache:
+ event = event_cache.event
+ events_by_room.setdefault(event.room_id, []).append(event)
await make_deferred_yieldable(
defer.gatherResults(
@@ -511,9 +550,9 @@ class FederationSender(AbstractFederationSender):
logger.debug("Successfully handled up to %i", next_token)
await self.store.update_federation_out_pos("events", next_token)
- if events:
+ if event_entries:
now = self.clock.time_msec()
- ts = event_to_received_ts[events[-1].event_id]
+ ts = max(t for t in event_to_received_ts.values() if t)
assert ts is not None
synapse.metrics.event_processing_lag.labels(
@@ -523,7 +562,7 @@ class FederationSender(AbstractFederationSender):
"federation_sender"
).set(ts)
- events_processed_counter.inc(len(events))
+ events_processed_counter.inc(len(event_entries))
event_processing_loop_room_count.labels("federation_sender").inc(
len(events_by_room)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 41d8b937af..3ae5e8634c 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -505,6 +505,7 @@ class PerDestinationQueue:
new_pdus = await filter_events_for_server(
self._storage_controllers,
self._destination,
+ self._server_name,
new_pdus,
redact=False,
)
@@ -646,29 +647,32 @@ class _TransactionQueueManager:
# We start by fetching device related EDUs, i.e device updates and to
# device messages. We have to keep 2 free slots for presence and rr_edus.
- limit = MAX_EDUS_PER_TRANSACTION - 2
-
- device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
- limit
- )
-
- if device_update_edus:
- self._device_list_id = dev_list_id
- else:
- self.queue._last_device_list_stream_id = dev_list_id
-
- limit -= len(device_update_edus)
+ device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
+ # We prioritize to-device messages so that existing encryption channels
+ # work. We also keep a few slots spare (by reducing the limit) so that
+ # we can still trickle out some device list updates.
(
to_device_edus,
device_stream_id,
- ) = await self.queue._get_to_device_message_edus(limit)
+ ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
if to_device_edus:
self._device_stream_id = device_stream_id
else:
self.queue._last_device_stream_id = device_stream_id
+ device_edu_limit -= len(to_device_edus)
+
+ device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
+ device_edu_limit
+ )
+
+ if device_update_edus:
+ self._device_list_id = dev_list_id
+ else:
+ self.queue._last_device_list_stream_id = dev_list_id
+
pending_edus = device_update_edus + to_device_edus
# Now add the read receipt EDU.
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 32074b8ca6..a3cfc701cd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -45,6 +45,7 @@ from synapse.federation.units import Transaction
from synapse.http.matrixfederationclient import ByteParser
from synapse.http.types import QueryParams
from synapse.types import JsonDict
+from synapse.util import ExceptionBundle
logger = logging.getLogger(__name__)
@@ -279,12 +280,11 @@ class TransportLayerClient:
Note that this does not append any events to any graphs.
Args:
- destination (str): address of remote homeserver
- room_id (str): room to join/leave
- user_id (str): user to be joined/left
- membership (str): one of join/leave
- params (dict[str, str|Iterable[str]]): Query parameters to include in the
- request.
+ destination: address of remote homeserver
+ room_id: room to join/leave
+ user_id: user to be joined/left
+ membership: one of join/leave
+ params: Query parameters to include in the request.
Returns:
Succeeds when we get a 2xx HTTP response. The result
@@ -926,8 +926,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
return len(data)
def finish(self) -> SendJoinResponse:
- for c in self._coros:
- c.close()
+ _close_coros(self._coros)
if self._response.event_dict:
self._response.event = make_event_from_dict(
@@ -970,6 +969,27 @@ class _StateParser(ByteParser[StateRequestResponse]):
return len(data)
def finish(self) -> StateRequestResponse:
- for c in self._coros:
- c.close()
+ _close_coros(self._coros)
return self._response
+
+
+def _close_coros(coros: Iterable[Generator[None, bytes, None]]) -> None:
+ """Close each of the given coroutines.
+
+ Always calls .close() on each coroutine, even if doing so raises an exception.
+ Any exceptions raised are aggregated into an ExceptionBundle.
+
+ :raises ExceptionBundle: if at least one coroutine fails to close.
+ """
+ exceptions = []
+ for c in coros:
+ try:
+ c.close()
+ except Exception as e:
+ exceptions.append(e)
+
+ if exceptions:
+ # raise from the first exception so that the traceback has slightly more context
+ raise ExceptionBundle(
+ f"There were {len(exceptions)} errors closing coroutines", exceptions
+ ) from exceptions[0]
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index bb0f8d6b7b..cdaf0d5de7 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tupl
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.urls import FEDERATION_V1_PREFIX
-from synapse.http.server import HttpServer, ServletCallback, is_method_cancellable
+from synapse.http.server import HttpServer, ServletCallback
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import run_in_background
@@ -34,6 +34,7 @@ from synapse.logging.opentracing import (
whitelisted_homeserver,
)
from synapse.types import JsonDict
+from synapse.util.cancellation import is_function_cancellable
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import parse_and_validate_server_name
@@ -223,10 +224,10 @@ class BaseFederationServlet:
With arguments:
- origin (unicode|None): The authenticated server_name of the calling server,
+ origin (str|None): The authenticated server_name of the calling server,
unless REQUIRE_AUTH is set to False and authentication failed.
- content (unicode|None): decoded json body of the request. None if the
+ content (str|None): decoded json body of the request. None if the
request was a GET.
query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
@@ -375,7 +376,7 @@ class BaseFederationServlet:
if code is None:
continue
- if is_method_cancellable(code):
+ if is_function_cancellable(code):
# The wrapper added by `self._wrap` will inherit the cancellable flag,
# but the wrapper itself does not support cancellation yet.
# Once resolved, the cancellation tests in
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index f7884bfbe0..205fd16daa 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -489,7 +489,7 @@ class FederationV2InviteServlet(BaseFederationServerServlet):
room_version = content["room_version"]
event = content["event"]
- invite_room_state = content["invite_room_state"]
+ invite_room_state = content.get("invite_room_state", [])
# Synapse expects invite_room_state to be in unsigned, as it is in v1
# API
@@ -499,6 +499,11 @@ class FederationV2InviteServlet(BaseFederationServerServlet):
result = await self.handler.on_invite_request(
origin, event, room_version_id=room_version
)
+
+ # We only store invite_room_state for internal use, so remove it before
+ # returning the event to the remote homeserver.
+ result["event"].get("unsigned", {}).pop("invite_room_state", None)
+
return 200, result
@@ -549,8 +554,7 @@ class FederationClientKeysClaimServlet(BaseFederationServerServlet):
class FederationGetMissingEventsServlet(BaseFederationServerServlet):
- # TODO(paul): Why does this path alone end with "/?" optional?
- PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
+ PATH = "/get_missing_events/(?P<room_id>[^/]*)"
async def on_POST(
self,
|