diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index addc0bf000..896168c05c 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -230,6 +230,10 @@ def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventB
# origin, etc etc)
assert_params_in_dict(pdu_json, ("type", "depth"))
+ # Strip any unauthorized values from "unsigned" if they exist
+ if "unsigned" in pdu_json:
+ _strip_unsigned_values(pdu_json)
+
depth = pdu_json["depth"]
if not isinstance(depth, int):
raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
@@ -245,3 +249,24 @@ def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventB
event = make_event_from_dict(pdu_json, room_version)
return event
+
+
+def _strip_unsigned_values(pdu_dict: JsonDict) -> None:
+ """
+ Strip any unsigned values unless specifically allowed, as defined by the whitelist.
+
+ pdu: the json dict to strip values from. Note that the dict is mutated by this
+ function
+ """
+ unsigned = pdu_dict["unsigned"]
+
+ if not isinstance(unsigned, dict):
+ pdu_dict["unsigned"] = {}
+
+ if pdu_dict["type"] == "m.room.member":
+ whitelist = ["knock_room_state", "invite_room_state", "age"]
+ else:
+ whitelist = ["age"]
+
+ filtered_unsigned = {k: v for k, v in unsigned.items() if k in whitelist}
+ pdu_dict["unsigned"] = filtered_unsigned
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 6ea4edfc71..74f17aa4da 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -56,7 +56,6 @@ from synapse.api.room_versions import (
from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.transport.client import SendJoinResponse
-from synapse.logging.utils import log_function
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -119,7 +118,8 @@ class FederationClient(FederationBase):
# It is a map of (room ID, suggested-only) -> the response of
# get_room_hierarchy.
self._get_room_hierarchy_cache: ExpiringCache[
- Tuple[str, bool], Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]
+ Tuple[str, bool],
+ Tuple[JsonDict, Sequence[JsonDict], Sequence[JsonDict], Sequence[str]],
] = ExpiringCache(
cache_name="get_room_hierarchy_cache",
clock=self._clock,
@@ -144,7 +144,6 @@ class FederationClient(FederationBase):
if destination_dict:
self.pdu_destination_tried[event_id] = destination_dict
- @log_function
async def make_query(
self,
destination: str,
@@ -178,7 +177,6 @@ class FederationClient(FederationBase):
ignore_backoff=ignore_backoff,
)
- @log_function
async def query_client_keys(
self, destination: str, content: JsonDict, timeout: int
) -> JsonDict:
@@ -196,7 +194,6 @@ class FederationClient(FederationBase):
destination, content, timeout
)
- @log_function
async def query_user_devices(
self, destination: str, user_id: str, timeout: int = 30000
) -> JsonDict:
@@ -208,7 +205,6 @@ class FederationClient(FederationBase):
destination, user_id, timeout
)
- @log_function
async def claim_client_keys(
self, destination: str, content: JsonDict, timeout: int
) -> JsonDict:
@@ -1338,7 +1334,7 @@ class FederationClient(FederationBase):
destinations: Iterable[str],
room_id: str,
suggested_only: bool,
- ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
+ ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[JsonDict], Sequence[str]]:
"""
Call other servers to get a hierarchy of the given room.
@@ -1353,7 +1349,8 @@ class FederationClient(FederationBase):
Returns:
A tuple of:
- The room as a JSON dictionary.
+ The room as a JSON dictionary, without a "children_state" key.
+ A list of `m.space.child` state events.
A list of children rooms, as JSON dictionaries.
A list of inaccessible children room IDs.
@@ -1368,7 +1365,7 @@ class FederationClient(FederationBase):
async def send_request(
destination: str,
- ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
+ ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[JsonDict], Sequence[str]]:
try:
res = await self.transport_layer.get_room_hierarchy(
destination=destination,
@@ -1397,7 +1394,7 @@ class FederationClient(FederationBase):
raise InvalidResponseError("'room' must be a dict")
# Validate children_state of the room.
- children_state = room.get("children_state", [])
+ children_state = room.pop("children_state", [])
if not isinstance(children_state, Sequence):
raise InvalidResponseError("'room.children_state' must be a list")
if any(not isinstance(e, dict) for e in children_state):
@@ -1426,7 +1423,7 @@ class FederationClient(FederationBase):
"Invalid room ID in 'inaccessible_children' list"
)
- return room, children, inaccessible_children
+ return room, children_state, children, inaccessible_children
try:
result = await self._try_destination_list(
@@ -1474,8 +1471,6 @@ class FederationClient(FederationBase):
if event.room_id == room_id:
children_events.append(event.data)
children_room_ids.add(event.state_key)
- # And add them under the requested room.
- requested_room["children_state"] = children_events
# Find the children rooms.
children = []
@@ -1485,7 +1480,7 @@ class FederationClient(FederationBase):
# It isn't clear from the response whether some of the rooms are
# not accessible.
- result = (requested_room, children, ())
+ result = (requested_room, children_events, children, ())
# Cache the result to avoid fetching data over federation every time.
self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ee71f289c8..af9cb98f67 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -58,7 +58,6 @@ from synapse.logging.context import (
run_in_background,
)
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
-from synapse.logging.utils import log_function
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -859,7 +858,6 @@ class FederationServer(FederationBase):
res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
return 200, res
- @log_function
async def on_query_client_keys(
self, origin: str, content: Dict[str, str]
) -> Tuple[int, Dict[str, Any]]:
@@ -940,7 +938,6 @@ class FederationServer(FederationBase):
return {"events": [ev.get_pdu_json(time_now) for ev in missing_events]}
- @log_function
async def on_openid_userinfo(self, token: str) -> Optional[str]:
ts_now_ms = self._clock.time_msec()
return await self.store.get_user_id_for_open_id_token(token, ts_now_ms)
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 523ab1c51e..60e2e6cf01 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -23,7 +23,6 @@ import logging
from typing import Optional, Tuple
from synapse.federation.units import Transaction
-from synapse.logging.utils import log_function
from synapse.storage.databases.main import DataStore
from synapse.types import JsonDict
@@ -36,7 +35,6 @@ class TransactionActions:
def __init__(self, datastore: DataStore):
self.store = datastore
- @log_function
async def have_responded(
self, origin: str, transaction: Transaction
) -> Optional[Tuple[int, JsonDict]]:
@@ -53,7 +51,6 @@ class TransactionActions:
return await self.store.get_received_txn_response(transaction_id, origin)
- @log_function
async def set_response(
self, origin: str, transaction: Transaction, code: int, response: JsonDict
) -> None:
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 391b30fbb5..8152e80b88 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -607,18 +607,18 @@ class PerDestinationQueue:
self._pending_pdus = []
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
class _TransactionQueueManager:
"""A helper async context manager for pulling stuff off the queues and
tracking what was last successfully sent, etc.
"""
- queue = attr.ib(type=PerDestinationQueue)
+ queue: PerDestinationQueue
- _device_stream_id = attr.ib(type=Optional[int], default=None)
- _device_list_id = attr.ib(type=Optional[int], default=None)
- _last_stream_ordering = attr.ib(type=Optional[int], default=None)
- _pdus = attr.ib(type=List[EventBase], factory=list)
+ _device_stream_id: Optional[int] = None
+ _device_list_id: Optional[int] = None
+ _last_stream_ordering: Optional[int] = None
+ _pdus: List[EventBase] = attr.Factory(list)
async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# First we calculate the EDUs we want to send, if any.
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index ab935e5a7e..742ee57255 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -35,6 +35,7 @@ if TYPE_CHECKING:
import synapse.server
logger = logging.getLogger(__name__)
+issue_8631_logger = logging.getLogger("synapse.8631_debug")
last_pdu_ts_metric = Gauge(
"synapse_federation_last_sent_pdu_time",
@@ -124,6 +125,17 @@ class TransactionManager:
len(pdus),
len(edus),
)
+ if issue_8631_logger.isEnabledFor(logging.DEBUG):
+ DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
+ device_list_updates = [
+ edu.content for edu in edus if edu.edu_type in DEVICE_UPDATE_EDUS
+ ]
+ if device_list_updates:
+ issue_8631_logger.debug(
+ "about to send txn [%s] including device list updates: %s",
+ transaction.transaction_id,
+ device_list_updates,
+ )
# Actually send the transaction
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 9fc4c31c93..8782586cd6 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -44,7 +44,6 @@ from synapse.api.urls import (
from synapse.events import EventBase, make_event_from_dict
from synapse.federation.units import Transaction
from synapse.http.matrixfederationclient import ByteParser
-from synapse.logging.utils import log_function
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -62,7 +61,6 @@ class TransportLayerClient:
self.server_name = hs.hostname
self.client = hs.get_federation_http_client()
- @log_function
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
) -> JsonDict:
@@ -88,7 +86,6 @@ class TransportLayerClient:
try_trailing_slash_on_400=True,
)
- @log_function
async def get_event(
self, destination: str, event_id: str, timeout: Optional[int] = None
) -> JsonDict:
@@ -111,7 +108,6 @@ class TransportLayerClient:
destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
)
- @log_function
async def backfill(
self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
) -> Optional[JsonDict]:
@@ -149,7 +145,6 @@ class TransportLayerClient:
destination, path=path, args=args, try_trailing_slash_on_400=True
)
- @log_function
async def timestamp_to_event(
self, destination: str, room_id: str, timestamp: int, direction: str
) -> Union[JsonDict, List]:
@@ -185,7 +180,6 @@ class TransportLayerClient:
return remote_response
- @log_function
async def send_transaction(
self,
transaction: Transaction,
@@ -234,7 +228,6 @@ class TransportLayerClient:
try_trailing_slash_on_400=True,
)
- @log_function
async def make_query(
self,
destination: str,
@@ -254,7 +247,6 @@ class TransportLayerClient:
ignore_backoff=ignore_backoff,
)
- @log_function
async def make_membership_event(
self,
destination: str,
@@ -317,7 +309,6 @@ class TransportLayerClient:
ignore_backoff=ignore_backoff,
)
- @log_function
async def send_join_v1(
self,
room_version: RoomVersion,
@@ -336,7 +327,6 @@ class TransportLayerClient:
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
)
- @log_function
async def send_join_v2(
self,
room_version: RoomVersion,
@@ -355,7 +345,6 @@ class TransportLayerClient:
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
)
- @log_function
async def send_leave_v1(
self, destination: str, room_id: str, event_id: str, content: JsonDict
) -> Tuple[int, JsonDict]:
@@ -372,7 +361,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def send_leave_v2(
self, destination: str, room_id: str, event_id: str, content: JsonDict
) -> JsonDict:
@@ -389,7 +377,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def send_knock_v1(
self,
destination: str,
@@ -423,7 +410,6 @@ class TransportLayerClient:
destination=destination, path=path, data=content
)
- @log_function
async def send_invite_v1(
self, destination: str, room_id: str, event_id: str, content: JsonDict
) -> Tuple[int, JsonDict]:
@@ -433,7 +419,6 @@ class TransportLayerClient:
destination=destination, path=path, data=content, ignore_backoff=True
)
- @log_function
async def send_invite_v2(
self, destination: str, room_id: str, event_id: str, content: JsonDict
) -> JsonDict:
@@ -443,7 +428,6 @@ class TransportLayerClient:
destination=destination, path=path, data=content, ignore_backoff=True
)
- @log_function
async def get_public_rooms(
self,
remote_server: str,
@@ -516,7 +500,6 @@ class TransportLayerClient:
return response
- @log_function
async def exchange_third_party_invite(
self, destination: str, room_id: str, event_dict: JsonDict
) -> JsonDict:
@@ -526,7 +509,6 @@ class TransportLayerClient:
destination=destination, path=path, data=event_dict
)
- @log_function
async def get_event_auth(
self, destination: str, room_id: str, event_id: str
) -> JsonDict:
@@ -534,7 +516,6 @@ class TransportLayerClient:
return await self.client.get_json(destination=destination, path=path)
- @log_function
async def query_client_keys(
self, destination: str, query_content: JsonDict, timeout: int
) -> JsonDict:
@@ -576,7 +557,6 @@ class TransportLayerClient:
destination=destination, path=path, data=query_content, timeout=timeout
)
- @log_function
async def query_user_devices(
self, destination: str, user_id: str, timeout: int
) -> JsonDict:
@@ -616,7 +596,6 @@ class TransportLayerClient:
destination=destination, path=path, timeout=timeout
)
- @log_function
async def claim_client_keys(
self, destination: str, query_content: JsonDict, timeout: int
) -> JsonDict:
@@ -655,7 +634,6 @@ class TransportLayerClient:
destination=destination, path=path, data=query_content, timeout=timeout
)
- @log_function
async def get_missing_events(
self,
destination: str,
@@ -680,7 +658,6 @@ class TransportLayerClient:
timeout=timeout,
)
- @log_function
async def get_group_profile(
self, destination: str, group_id: str, requester_user_id: str
) -> JsonDict:
@@ -694,7 +671,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def update_group_profile(
self, destination: str, group_id: str, requester_user_id: str, content: JsonDict
) -> JsonDict:
@@ -716,7 +692,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def get_group_summary(
self, destination: str, group_id: str, requester_user_id: str
) -> JsonDict:
@@ -730,7 +705,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def get_rooms_in_group(
self, destination: str, group_id: str, requester_user_id: str
) -> JsonDict:
@@ -798,7 +772,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def get_users_in_group(
self, destination: str, group_id: str, requester_user_id: str
) -> JsonDict:
@@ -812,7 +785,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def get_invited_users_in_group(
self, destination: str, group_id: str, requester_user_id: str
) -> JsonDict:
@@ -826,7 +798,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def accept_group_invite(
self, destination: str, group_id: str, user_id: str, content: JsonDict
) -> JsonDict:
@@ -837,7 +808,6 @@ class TransportLayerClient:
destination=destination, path=path, data=content, ignore_backoff=True
)
- @log_function
def join_group(
self, destination: str, group_id: str, user_id: str, content: JsonDict
) -> Awaitable[JsonDict]:
@@ -848,7 +818,6 @@ class TransportLayerClient:
destination=destination, path=path, data=content, ignore_backoff=True
)
- @log_function
async def invite_to_group(
self,
destination: str,
@@ -868,7 +837,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def invite_to_group_notification(
self, destination: str, group_id: str, user_id: str, content: JsonDict
) -> JsonDict:
@@ -882,7 +850,6 @@ class TransportLayerClient:
destination=destination, path=path, data=content, ignore_backoff=True
)
- @log_function
async def remove_user_from_group(
self,
destination: str,
@@ -902,7 +869,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def remove_user_from_group_notification(
self, destination: str, group_id: str, user_id: str, content: JsonDict
) -> JsonDict:
@@ -916,7 +882,6 @@ class TransportLayerClient:
destination=destination, path=path, data=content, ignore_backoff=True
)
- @log_function
async def renew_group_attestation(
self, destination: str, group_id: str, user_id: str, content: JsonDict
) -> JsonDict:
@@ -930,7 +895,6 @@ class TransportLayerClient:
destination=destination, path=path, data=content, ignore_backoff=True
)
- @log_function
async def update_group_summary_room(
self,
destination: str,
@@ -959,7 +923,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def delete_group_summary_room(
self,
destination: str,
@@ -986,7 +949,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def get_group_categories(
self, destination: str, group_id: str, requester_user_id: str
) -> JsonDict:
@@ -1000,7 +962,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def get_group_category(
self, destination: str, group_id: str, requester_user_id: str, category_id: str
) -> JsonDict:
@@ -1014,7 +975,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def update_group_category(
self,
destination: str,
@@ -1034,7 +994,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def delete_group_category(
self, destination: str, group_id: str, requester_user_id: str, category_id: str
) -> JsonDict:
@@ -1048,7 +1007,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def get_group_roles(
self, destination: str, group_id: str, requester_user_id: str
) -> JsonDict:
@@ -1062,7 +1020,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def get_group_role(
self, destination: str, group_id: str, requester_user_id: str, role_id: str
) -> JsonDict:
@@ -1076,7 +1033,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def update_group_role(
self,
destination: str,
@@ -1096,7 +1052,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def delete_group_role(
self, destination: str, group_id: str, requester_user_id: str, role_id: str
) -> JsonDict:
@@ -1110,7 +1065,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def update_group_summary_user(
self,
destination: str,
@@ -1136,7 +1090,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def set_group_join_policy(
self, destination: str, group_id: str, requester_user_id: str, content: JsonDict
) -> JsonDict:
@@ -1151,7 +1104,6 @@ class TransportLayerClient:
ignore_backoff=True,
)
- @log_function
async def delete_group_summary_user(
self,
destination: str,
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 77bfd88ad0..beadfa422b 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -36,6 +36,7 @@ from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
+issue_8631_logger = logging.getLogger("synapse.8631_debug")
class BaseFederationServerServlet(BaseFederationServlet):
@@ -95,6 +96,20 @@ class FederationSendServlet(BaseFederationServerServlet):
len(transaction_data.get("edus", [])),
)
+ if issue_8631_logger.isEnabledFor(logging.DEBUG):
+ DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
+ device_list_updates = [
+ edu.content
+ for edu in transaction_data.get("edus", [])
+ if edu.edu_type in DEVICE_UPDATE_EDUS
+ ]
+ if device_list_updates:
+ issue_8631_logger.debug(
+ "received transaction [%s] including device list updates: %s",
+ transaction_id,
+ device_list_updates,
+ )
+
except Exception as e:
logger.exception(e)
return 400, {"error": "Invalid transaction"}
|